先决条件

本指南假设您对 StatsForecast 有基本的了解。如需最小示例,请访问快速入门

遵循本文提供的逐步指南,了解如何构建用于多个时间序列的生产级预测流程。

在本指南中,您将熟悉核心的 StatsForecast 类以及一些相关方法,例如 StatsForecast.plotStatsForecast.forecastStatsForecast.cross_validation.

我们将使用 M4 竞赛中的经典基准数据集。该数据集包含来自不同领域的时间序列,如金融、经济和销售。在此示例中,我们将使用 Hourly 数据集的一个子集。

我们将对每个时间序列单独建模。此级别的预测也称为局部预测。因此,您将为每个独特序列训练一系列模型,然后选择最佳模型。StatsForecast 注重速度、简洁性和可伸缩性,这使其成为此任务的理想选择。

大纲

  1. 安装软件包。
  2. 读取数据。
  3. 探索数据。
  4. 为每个独特的时间序列组合训练多个模型。
  5. 使用交叉验证评估模型性能。
  6. 为每个独特的时间序列选择最佳模型。

本指南未涵盖的内容

安装库

我们假设您已安装 StatsForecast。请查看此指南了解如何安装 StatsForecast

读取数据

我们将使用 pandas 读取存储在 parquet 文件中的 M4 Hourly 数据集,以提高效率。您可以使用普通的 pandas 操作读取其他格式的数据,例如 .csv

StatsForecast 的输入始终是具有三列的长格式数据框:unique_iddsy

  • unique_id(字符串、整数或类别)表示序列的标识符。

  • ds(日期戳或整数)列应为索引时间的整数或日期戳,理想情况下日期格式为 YYYY-MM-DD,时间戳格式为 YYYY-MM-DD HH:MM:SS。

  • y(数值)表示我们希望预测的测量值。如果目标列名不同,需要将其重命名为 y

此数据集已满足要求。

取决于您的互联网连接速度,此步骤大约需要 10 秒。

import pandas as pd
Y_df = pd.read_parquet('https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet')
Y_df.head()
unique_iddsy
0H11605.0
1H12586.0
2H13586.0
3H14559.0
4H15511.0

此数据集包含 414 个独特序列,平均有 900 个观测值。出于示例和可重复性的考虑,我们将仅选择 10 个独特 ID 并仅保留最后一周的数据。根据您的处理基础设施,您可以随意选择更多或更少的序列。

注意

处理时间取决于可用的计算资源。在 AWS 的 c5d.24xlarge (96 核) 实例上运行完整数据集的此示例大约需要 10 分钟。

uids = Y_df['unique_id'].unique()[:10] # Select 10 ids to make the example faster
Y_df = Y_df.query('unique_id in @uids')
Y_df = Y_df.groupby('unique_id').tail(7 * 24) #Select last 7 days of data to make example faster

使用 plot 方法探索数据

使用 StatsForecast 类的 plot 方法绘制一些序列。此方法会打印数据集中的 8 个随机序列,对基本 EDA 非常有用。

注意

StatsForecast.plot 方法默认使用 Plotly 作为引擎。您可以通过设置 engine="matplotlib" 切换到 MatPlotLib。

from statsforecast import StatsForecast
StatsForecast.plot(Y_df)

为多个序列训练多个模型

StatsForecast 可以高效地在多个时间序列上训练多个模型。

首先导入并实例化所需的模型。StatsForecast 提供了多种模型,分为以下几类

  • 自动预测: 自动预测工具会搜索最佳参数,并为一系列时间序列选择最佳模型。这些工具对于大量单变量时间序列非常有用。包括 Arima、ETS、Theta、CES 的自动版本。

  • 指数平滑: 使用过去所有观测值的加权平均值,权重随时间呈指数衰减。适用于没有明显趋势或季节性的数据。示例:SES、Holt’s Winters、SSO。

  • 基准模型: 用于建立基准的经典模型。示例:Mean、Naive、Random Walk

  • 间歇性或稀疏模型: 适用于非零观测值很少的序列。示例:CROSTON、ADIDA、IMAPA

  • 多重季节性: 适用于具有多个明显季节性的信号。对电力和日志等低频数据非常有用。示例:MSTL。

  • Theta 模型: 使用不同的技术将两条 theta 线拟合到去季节化的时间序列,并组合这两条 theta 线以生成最终预测。示例:Theta、DynamicTheta

您可以在此处查看完整的模型列表。

在此示例中,我们将使用

导入并实例化模型。设置 season_length 参数有时很棘手。Hyndmann 大师的这篇关于季节周期的文章可能会有所帮助。

from statsforecast.models import (
    HoltWinters,
    CrostonClassic as Croston, 
    HistoricAverage,
    DynamicOptimizedTheta as DOT,
    SeasonalNaive
)
# Create a list of models and instantiation parameters
models = [
    HoltWinters(),
    Croston(),
    SeasonalNaive(season_length=24),
    HistoricAverage(),
    DOT(season_length=24)
]

我们通过实例化一个新的 StatsForecast 对象并使用以下参数来拟合模型

  • models:模型列表。从模型中选择您想要的模型并导入它们。

  • freq:表示数据频率的字符串。(参见pandas 可用频率。)

  • n_jobs:n_jobs: int,并行处理中使用的作业数,使用 -1 表示所有核心。

  • fallback_model:如果某个模型失败时使用的备用模型。

任何设置都传递到构造函数中。然后调用其 fit 方法并传入历史数据框。

# Instantiate StatsForecast class as sf
sf = StatsForecast( 
    models=models,
    freq=1, 
    fallback_model = SeasonalNaive(season_length=7),
    n_jobs=-1,
)

注意

StatsForecast 通过 Numba 的 JIT 编译实现惊人的速度。第一次调用 statsforecast 类时,fit 方法大约需要 5 秒。第二次(一旦 Numba 编译了您的设置)应该少于 0.2 秒。

forecast 方法接受两个参数:预测未来的 h(预测 horizons)和 level

  • h (int):表示未来 h 步的预测。在本例中,提前 12 个月。

  • level (float 列表):此可选参数用于概率预测。设置预测区间的 level(或置信百分位数)。例如,level=[90] 表示模型预期真实值有 90% 的时间会落在此区间内。

此处的 forecast 对象是一个新的数据框,包含一列模型名称和 y hat 值,以及不确定性区间的列。根据您的计算机性能,此步骤大约需要 1 分钟。(如果您想将速度提高到几秒钟,请删除 ARIMA 和 Theta 等 AutoModels)

注意

forecast 方法与分布式集群兼容,因此不存储任何模型参数。如果您想存储每个模型的参数,可以使用 fitpredict 方法。然而,这些方法未针对 Spark、Ray 或 Dask 等分布式引擎定义。

forecasts_df = sf.forecast(df=Y_df, h=48, level=[90])
forecasts_df.head()
unique_iddsHoltWintersHoltWinters-lo-90HoltWinters-hi-90CrostonClassicCrostonClassic-lo-90CrostonClassic-hi-90SeasonalNaiveSeasonalNaive-lo-90SeasonalNaive-hi-90HistoricAverageHistoricAverage-lo-90HistoricAverage-hi-90DynamicOptimizedThetaDynamicOptimizedTheta-lo-90DynamicOptimizedTheta-hi-90
0H1749829.0422.5492681235.450732829.0422.5492681235.450732635.0566.036734703.963266660.982143398.037761923.926524592.701851577.677280611.652639
1H1750807.0400.5492681213.450732807.0400.5492681213.450732572.0503.036734640.963266660.982143398.037761923.926524525.589116505.449755546.621805
2H1751785.0378.5492681191.450732785.0378.5492681191.450732532.0463.036734600.963266660.982143398.037761923.926524489.251814462.072871512.424116
3H1752756.0349.5492681162.450732756.0349.5492681162.450732493.0424.036734561.963266660.982143398.037761923.926524456.195032430.554302478.260963
4H1753719.0312.5492681125.450732719.0312.5492681125.450732477.0408.036734545.963266660.982143398.037761923.926524436.290514411.051232461.815932

使用 StatsForecast.plot 方法绘制 8 个随机序列的结果。

sf.plot(Y_df,forecasts_df)

StatsForecast.plot 允许进一步自定义。例如,绘制不同模型和唯一 ID 的结果。

# Plot to unique_ids and some selected models
sf.plot(Y_df, forecasts_df, models=["HoltWinters","DynamicOptimizedTheta"], unique_ids=["H10", "H105"], level=[90])

# Explore other models 
sf.plot(Y_df, forecasts_df, models=["SeasonalNaive"], unique_ids=["H10", "H105"], level=[90])

评估模型性能

在前面的步骤中,我们使用历史数据来预测未来。然而,为了评估其准确性,我们还想知道模型在过去会有怎样的表现。为了评估模型在您的数据上的准确性和稳健性,请执行交叉验证。

对于时间序列数据,交叉验证是通过在历史数据上定义一个滑动窗口并预测其后续周期来完成的。这种形式的交叉验证使我们能够更准确地估计模型在更广泛时间范围内的预测能力,同时保持训练集中的数据连续,这符合我们模型的要求。

下图描绘了这种交叉验证策略

时间序列模型的交叉验证被认为是最佳实践,但大多数实现速度非常慢。StatsForecast 库将交叉验证实现为分布式操作,从而减少了执行时间。如果您有大数据集,还可以使用 Ray、Dask 或 Spark 等分布式集群进行交叉验证。

在本例中,我们希望评估每个模型在过去 2 天(n_windows=2)的性能,每隔一天(step_size=48)进行预测。根据您的计算机性能,此步骤大约需要 1 分钟。

提示

n_windows 设置为 1 类似于传统的训练集-测试集划分,历史数据用作训练集,最后 48 小时用作测试集。

StatsForecast 类的 cross_validation 方法接受以下参数。

  • df:训练数据框

  • h (int):表示未来 h 步的预测。在本例中,提前 24 小时。

  • step_size (int):每个窗口之间的步长。换句话说:您希望多久运行一次预测过程。

  • n_windows (int):用于交叉验证的窗口数量。换句话说:您希望评估过去多少个预测过程。

cv_df = sf.cross_validation(
    df=Y_df,
    h=24,
    step_size=24,
    n_windows=2
)

cv_df 对象是一个新的数据框,包含以下列

  • unique_id:序列标识符

  • ds:日期戳或时间索引

  • cutoffn_windows 的最后一个日期戳或时间索引。如果 n_windows=1,则有一个唯一的截止值;如果 n_windows=2,则有两个唯一的截止值。

  • y:真实值

  • "model":包含模型名称和拟合值的列。

cv_df.head()
unique_iddscutoffyHoltWintersCrostonClassicSeasonalNaiveHistoricAverageDynamicOptimizedTheta
0H1701700619.0847.0742.668748691.0661.675612.767504
1H1702700565.0820.0742.668748618.0661.675536.846278
2H1703700532.0790.0742.668748563.0661.675497.824286
3H1704700495.0784.0742.668748529.0661.675464.723219
4H1705700481.0752.0742.668748504.0661.675440.972336

接下来,我们将使用常见的误差指标(如平均绝对误差 (MAE) 或均方误差 (MSE))评估每个模型在每个序列上的性能。定义一个实用函数来评估交叉验证数据框的不同误差指标。

首先从 utilsforecast.losses 导入所需的误差指标。然后定义一个实用函数,该函数将交叉验证数据框作为指标,并返回一个评估数据框,其中包含每个唯一 ID、拟合模型以及所有截止点的误差指标平均值。

from utilsforecast.losses import mse
def evaluate_cv(df, metric):
    models = df.columns.drop(['unique_id', 'ds', 'y', 'cutoff']).tolist()
    evals = metric(df, models=models)
    evals['best_model'] = evals[models].idxmin(axis=1)
    return evals

警告

您也可以使用平均绝对百分比误差 (MAPE),但对于细粒度预测,MAPE 值非常难以判断,并且不足以评估预测质量。

创建使用均方误差指标评估交叉验证数据框结果的数据框。

evaluation_df = evaluate_cv(cv_df, mse)
evaluation_df.head()
unique_idHoltWintersCrostonClassicSeasonalNaiveHistoricAverageDynamicOptimizedThetabest_model
0H144888.02083328038.7339851422.66666720927.6644881296.333977DynamicOptimizedTheta
1H102812.9166671483.48383996.8958331980.367543379.621134SeasonalNaive
2H100121625.37500091945.13923712019.00000078491.19143921699.649325SeasonalNaive
3H10128453.39583316183.63434010944.45833318208.40980063698.077266SeasonalNaive
4H102232924.854167132655.30913612699.895833309110.47521231393.535274SeasonalNaive

创建一个摘要表,其中包含模型列以及该模型表现最佳的序列数量。在本例中,Arima 和 Seasonal Naive 是 10 个序列的最佳模型,而 Theta 模型应用于两个序列。

evaluation_df['best_model'].value_counts().to_frame().reset_index()
best_modelcount
0SeasonalNaive6
1DynamicOptimizedTheta4

您可以通过绘制特定模型获胜的 unique_ids 来进一步探索您的结果。

seasonal_ids = evaluation_df.query('best_model == "SeasonalNaive"')['unique_id']
sf.plot(Y_df,forecasts_df, unique_ids=seasonal_ids, models=["SeasonalNaive","DynamicOptimizedTheta"])

为每个独特序列选择最佳模型

定义一个实用函数,该函数接受包含预测的预测数据框和评估数据框,并返回一个包含每个 unique_id 最佳预测的数据框。

def get_best_model_forecast(forecasts_df, evaluation_df):
    with_best = forecasts_df.merge(evaluation_df[['unique_id', 'best_model']])
    res = with_best[['unique_id', 'ds']].copy()
    for suffix in ('', '-lo-90', '-hi-90'):
        res[f'best_model{suffix}'] = with_best.apply(lambda row: row[row['best_model'] + suffix], axis=1)
    return res

创建包含每个 unique_id 最佳预测的生产级数据框。

prod_forecasts_df = get_best_model_forecast(forecasts_df, evaluation_df)
prod_forecasts_df.head()
unique_iddsbest_modelbest_model-lo-90best_model-hi-90
0H1749592.701851577.677280611.652639
1H1750525.589116505.449755546.621805
2H1751489.251814462.072871512.424116
3H1752456.195032430.554302478.260963
4H1753436.290514411.051232461.815932

绘制结果。

sf.plot(Y_df, prod_forecasts_df, level=[90])