StatsForecast 通过 Fugue 在 Spark、Dask 和 Ray 之上运行。StatsForecast 会读取输入 DataFrame 并使用相应的引擎。例如,如果输入是 Spark DataFrame,StatsForecast 将使用现有的 Spark session 运行预测。

可以在此处找到一个基准测试(使用旧语法),我们在不到 15 分钟内预测了一百万个时间序列。

安装

只要安装并配置了 Spark,StatsForecast 就可以使用它。如果在分布式 Spark 集群上执行,请确保 statsforecast 库已安装在所有 worker 上。

在 Pandas 上使用 StatsForecast

在 Spark 上运行之前,建议在一个较小的 Pandas 数据集上进行测试,以确保一切正常。此示例还有助于展示使用 Spark 时的微小差异。

from statsforecast.core import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS
from statsforecast.utils import generate_series
n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)
sf.forecast(df=series, h=horizon).head()
unique_iddsAutoETS
002000-08-105.261609
102000-08-116.196357
202000-08-120.282309
302000-08-131.264195
402000-08-142.262453

在 Spark 上执行

要在 Spark 上分布式运行预测,只需传入一个 Spark DataFrame 即可。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

series['unique_id'] = series['unique_id'].astype(str)

# Convert to Spark
sdf = spark.createDataFrame(series)

# Returns a Spark DataFrame
sf.forecast(df=sdf, h=horizon, level=[90]).show(5)
+---------+-------------------+----------+-------------+-------------+
|unique_id|                 ds|   AutoETS|AutoETS-lo-90|AutoETS-hi-90|
+---------+-------------------+----------+-------------+-------------+
|        0|2000-08-10 00:00:00|  5.261609|    5.0255513|    5.4976664|
|        0|2000-08-11 00:00:00| 6.1963573|       5.9603|     6.432415|
|        0|2000-08-12 00:00:00|0.28230855|   0.04625102|    0.5183661|
|        0|2000-08-13 00:00:00| 1.2641948|    1.0281373|    1.5002524|
|        0|2000-08-14 00:00:00| 2.2624528|    2.0263953|    2.4985104|
+---------+-------------------+----------+-------------+-------------+
only showing top 5 rows