本 Notebook 的目的是创建一个可扩展性基准(时间和性能)。为此,我们使用 Spark 分布式训练,在 M5 数据集上训练 Nixtla 的 StatsForecast (使用 ETS 模型)。作为对比,使用了 Facebook 的 Prophet 模型。

使用了 AWS 集群 (安装在 Databricks 上),共有 11 个 m5.2xlarge 类型实例(8 核,32 GB 内存),运行时为 10.4 LTS。 Notebook 被用作基准案例。

本示例使用 M5 数据集。它包含 30,490 个低层时间序列。

主要结果

方法时间 (分钟)性能 (wRMSSE)
StatsForecast7.50.68
Prophet18.230.77

安装库

pip install prophet "neuralforecast<1.0.0" "statsforecast[fugue]"

StatsForecast 流水线

from time import time

from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import ETS, SeasonalNaive
from statsforecast.core import StatsForecast

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})

预测

使用 StatsForecast,您无需下载数据。分布式后端可以直接处理您的数据文件。

init = time()
ets_forecasts = backend.forecast(
    "s3://m5-benchmarks/data/train/m5-target.parquet", 
    [ETS(season_length=7, model='ZAA')], 
    freq="D", 
    h=28, 
).toPandas()
end = time()
print(f'Minutes taken by StatsForecast on a Spark cluster: {(end - init) / 60}')

评估性能

M5 竞赛使用了加权均方根比例误差。您可以在此处找到该指标的详细信息。

Y_hat = ets_forecasts.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
wrmsse_ets = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
wrmsse_ets
wrmsse
总计0.682358
级别10.449115
级别20.533754
级别30.592317
级别40.497086
级别50.572189
级别60.593880
级别70.665358
级别80.652183
级别90.734492
级别101.012633
级别110.969902
级别120.915380

Prophet 流水线

import logging
from time import time

import pandas as pd
from neuralforecast.data.datasets.m5 import M5, M5Evaluation
from prophet import Prophet
from pyspark.sql.types import *

# disable informational messages from prophet
logging.getLogger('py4j').setLevel(logging.ERROR)

下载数据

# structure of the training data set
train_schema = StructType([
  StructField('unique_id', StringType()),  
  StructField('ds', DateType()),
  StructField('y', DoubleType())
  ])
 
# read the training file into a dataframe
train = spark.read.parquet(
  's3://m5-benchmarks/data/train/m5-target.parquet', 
  header=True, 
  schema=train_schema
 )
 
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')
sql_statement = '''
  SELECT
    unique_id AS unique_id,
    CAST(ds as date) as ds,
    y as y
  FROM train
  '''
 
m5_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['unique_id'])
  ).cache()

使用 Prophet 的预测函数

def forecast( history_pd: pd.DataFrame ) -> pd.DataFrame:
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
    history_pd = history_pd.dropna()

    # configure the model
    model = Prophet(
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    # train the model
    model.fit( history_pd )
    # --------------------------------------

    # BUILD FORECAST AS BEFORE
    # --------------------------------------
    # make predictions
    future_pd = model.make_future_dataframe(
        periods=28, 
        freq='d', 
        include_history=False
    )
    forecast_pd = model.predict( future_pd )  
    # --------------------------------------

    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    forecast_pd['unique_id'] = history_pd['unique_id'].unique()[0]
    f_pd = forecast_pd[['unique_id', 'ds','yhat']]
    # --------------------------------------

    # return expected dataset
    return f_pd
result_schema = StructType([
  StructField('unique_id', StringType()), 
  StructField('ds',DateType()),
  StructField('yhat',FloatType()),
])

在 M5 数据集上训练 Prophet

init = time()
results = (
  m5_history
    .groupBy('unique_id')
      .applyInPandas(forecast, schema=result_schema)
    ).toPandas()
end = time()
print(f'Minutes taken by Prophet on a Spark cluster: {(end - init) / 60}')

评估性能

M5 竞赛使用了加权均方根比例误差。您可以在此处找到该指标的详细信息。

Y_hat = results.set_index(['unique_id', 'ds']).unstack()
Y_hat = Y_hat.droplevel(0, 1).reset_index()
*_, S_df = M5.load('./data')
Y_hat = S_df.merge(Y_hat, how='left', on=['unique_id'])#.drop(columns=['unique_id'])
wrmsse = M5Evaluation.evaluate(y_hat=Y_hat, directory='./data')
wrmsse
wrmsse
总计0.771800
级别10.507905
级别20.586328
级别30.666686
级别40.549358
级别50.655003
级别60.647176
级别70.747047
级别80.743422
级别90.824667
级别101.207069
级别111.108780
级别121.018163