先决条件

本指南假定您对 NeuralForecast 有基本了解。有关最小示例,请访问快速入门

按照本文提供的分步指南,构建用于多个时间序列的生产级预测管道。

在本指南中,您将熟悉核心NeuralForecast类以及一些相关方法,例如NeuralForecast.fitNeuralForecast.predictStatsForecast.cross_validation.

我们将使用 M4 竞赛中的经典基准数据集。该数据集包含来自不同领域的时间序列,如金融、经济和销售。在此示例中,我们将使用 Hourly 数据集的子集。

我们将对每个时间序列进行全局建模。因此,您将为整个数据集训练一组模型,然后为每个单独的时间序列选择最佳模型。NeuralForecast 注重速度、简单性和可扩展性,这使其非常适合此任务。

大纲

  1. 安装包。
  2. 读取数据。
  3. 探索数据。
  4. 为整个数据集全局训练多个模型。
  5. 使用交叉验证评估模型的性能。
  6. 为每个独特的时间序列选择最佳模型。

本指南未涵盖的内容

  • 使用外部回归因子或外生变量
    • 按照此教程包含外生变量,例如天气或假期,或分类或系列等静态变量。
  • 概率预测
  • 迁移学习
    • 训练一个模型并使用此教程在不同数据上进行预测

提示

您可以使用 Colab 以交互方式运行此 Notebook

警告

为了减少计算时间,建议使用 GPU。使用 Colab 时,请务必启用它。只需转到运行时 > 更改运行时类型并选择 GPU 作为硬件加速器即可。

1. 安装库

我们假设您已经安装了NeuralForecast。有关如何安装 NeuralForecast的说明,请查阅本指南。

! pip install neuralforecast

2. 读取数据

我们将使用 pandas 读取存储在 parquet 文件中的 M4 每小时数据集以提高效率。您可以使用普通的 pandas 操作读取.csv等其他格式的数据。

NeuralForecast的输入始终是采用长格式的数据帧,包含三列:unique_iddsy

  • unique_id(字符串、整数或类别)表示系列标识符。

  • ds(日期戳或整数)列应为表示时间的整数索引或日期戳,理想格式为日期的 YYYY-MM-DD 或时间戳的 YYYY-MM-DD HH:MM:SS。

  • y(数值)表示我们希望预测的测量值。我们将重命名

该数据集已经满足要求。

根据您的网络连接情况,此步骤大约需要 10 秒。

import pandas as pd
Y_df = pd.read_parquet('https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet')
Y_df.head()
unique_iddsy
0H11605.0
1H12586.0
2H13586.0
3H14559.0
4H15511.0

此数据集包含 414 个独特系列,平均每个系列有 900 个观测值。为了此示例的可复现性,我们将仅选择 10 个独特 ID。您可以根据您的处理基础设施随意选择更多或更少的系列。

注意

处理时间取决于可用的计算资源。在 AWS 的 c5d.24xlarge (96 核) 实例上使用完整数据集运行此示例大约需要 10 分钟。

uids = Y_df['unique_id'].unique()[:10] # Select 10 ids to make the example faster
Y_df = Y_df.query('unique_id in @uids').reset_index(drop=True)

3. 使用 plot_series 函数探索数据

使用 utilsforecast 库中的 plot_series 函数绘制一些系列。此方法会打印数据集中的 8 个随机系列,对于基本 EDA 非常有用。

注意

plot_series 函数默认使用 matplotlib 作为引擎。您可以通过设置 engine="plotly" 更改为 plotly。

from utilsforecast.plotting import plot_series
plot_series(Y_df)

4. 为多个系列训练多个模型

NeuralForecast 可以在全局范围高效地为多个时间序列训练多个模型。

import logging

import optuna
import ray.tune as tune
import torch

from neuralforecast import NeuralForecast
from neuralforecast.auto import AutoNHITS, AutoLSTM
from neuralforecast.losses.pytorch import MQLoss
optuna.logging.set_verbosity(optuna.logging.WARNING)
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)
torch.set_float32_matmul_precision('high')

每个 Auto 模型都包含一个默认搜索空间,该空间已在多个大规模数据集上进行了广泛测试。此外,用户可以定义针对特定数据集和任务定制的特定搜索空间。

首先,我们为AutoNHITSAutoLSTM模型创建自定义搜索空间。搜索空间使用字典指定,其中键对应于模型的超参数,值是指定超参数采样方式的Tune函数。例如,使用randint均匀采样整数,使用choice采样列表中的值。

def config_nhits(trial):
    return {
        "input_size": trial.suggest_categorical(          # Length of input window
            "input_size", (48, 48*2, 48*3)                
        ),                                                
        "start_padding_enabled": True,                                          
        "n_blocks": 5 * [1],                              # Length of input window
        "mlp_units": 5 * [[64, 64]],                      # Length of input window
        "n_pool_kernel_size": trial.suggest_categorical(  # MaxPooling Kernel size
            "n_pool_kernel_size",
            (5*[1], 5*[2], 5*[4], [8, 4, 2, 1, 1])
        ),     
        "n_freq_downsample": trial.suggest_categorical(   # Interpolation expressivity ratios
            "n_freq_downsample",
            ([8, 4, 2, 1, 1],  [1, 1, 1, 1, 1])
        ),     
        "learning_rate": trial.suggest_float(             # Initial Learning rate
            "learning_rate",
            low=1e-4,
            high=1e-2,
            log=True,
        ),            
        "scaler_type": None,                              # Scaler type
        "max_steps": 1000,                                # Max number of training iterations
        "batch_size": trial.suggest_categorical(          # Number of series in batch
            "batch_size",
            (1, 4, 10),
        ),                   
        "windows_batch_size": trial.suggest_categorical(  # Number of windows in batch
            "windows_batch_size",
            (128, 256, 512),
        ),      
        "random_seed": trial.suggest_int(                 # Random seed   
            "random_seed",
            low=1,
            high=20,
        ),                      
    }

def config_lstm(trial):
    return {
        "input_size": trial.suggest_categorical(           # Length of input window
            "input_size",
            (48, 48*2, 48*3)
        ),   
        "encoder_hidden_size": trial.suggest_categorical(  # Hidden size of LSTM cells
            "encoder_hidden_size",
            (64, 128),
        ),  
        "encoder_n_layers": trial.suggest_categorical(     # Number of layers in LSTM
            "encoder_n_layers",
            (2,4),
        ),        
        "learning_rate": trial.suggest_float(              # Initial Learning rate
            "learning_rate",
            low=1e-4,
            high=1e-2,
            log=True,
        ),   
        "scaler_type": 'robust',                           # Scaler type
        "max_steps": trial.suggest_categorical(           # Max number of training iterations
            "max_steps",
            (500, 1000)
        ),          
        "batch_size": trial.suggest_categorical(           # Number of series in batch
            "batch_size",
            (1, 4)
        ),              
        "random_seed": trial.suggest_int(                  # Random seed
            "random_seed",
            low=1,
            high=20
        ),             
    }

要实例化 Auto 模型,您需要定义

  • h:预测周期。
  • loss:来自 neuralforecast.losses.pytorch 的训练和验证损失。
  • config:超参数搜索空间。如果为None,则Auto类将使用预定义的建议超参数空间。
  • search_alg:搜索算法
  • num_samples:探索的配置数量。

在此示例中,我们将预测周期h设置为 48,使用MQLoss 分布损失进行训练和验证,并使用默认搜索算法。

nf = NeuralForecast(
    models=[
        AutoNHITS(h=48, config=config_nhits, loss=MQLoss(), backend='optuna', num_samples=5),
        AutoLSTM(h=48, config=config_lstm, loss=MQLoss(), backend='optuna', num_samples=2),
    ],
    freq=1,
)

提示

样本数量num_samples是一个关键参数!较大的值通常会产生更好的结果,因为我们在搜索空间中探索了更多配置,但这会增加训练时间。较大的搜索空间通常需要更多样本。作为一般规则,我们建议将num_samples设置为大于 20。

接下来,我们使用Neuralforecast类来训练Auto模型。在此步骤中,Auto模型将自动执行超参数调优,训练具有不同超参数的多个模型,在验证集上生成预测并进行评估。根据验证集上的误差选择最佳配置。只有最佳模型会被存储并在推理期间使用。

nf.fit(df=Y_df)

接下来,我们使用predict方法,使用最佳超参数预测未来 48 天。

fcst_df = nf.predict()
fcst_df.columns = fcst_df.columns.str.replace('-median', '')
fcst_df.head()
plot_series(Y_df, fcst_df, plot_random=False, max_insample_length=48 * 3, level=[80, 90])

plot_series 函数允许进一步定制。例如,绘制不同模型和独特 ID 的结果。

# Plot to unique_ids and some selected models
plot_series(Y_df, fcst_df, models=["AutoLSTM"], ids=["H107", "H104"], level=[80, 90])

# Explore other models 
plot_series(Y_df, fcst_df, models=["AutoNHITS"], ids=["H10", "H105"], level=[80, 90])

5. 评估模型的性能

在前面的步骤中,我们使用历史数据预测了未来。然而,为了评估其准确性,我们还想知道模型在过去的表现如何。为了评估模型在数据上的准确性和鲁棒性,请执行交叉验证。

对于时间序列数据,交叉验证是通过在历史数据上定义一个滑动窗口并预测其后续周期来完成的。这种形式的交叉验证使我们能够在更广泛的时间实例范围内更好地估计模型的预测能力,同时保持训练集中的数据连续,这是我们的模型所必需的。

下图描述了这种交叉验证策略

提示

设置 n_windows=1 类似于传统的训练-测试集划分,其中历史数据作为训练集,最后 48 小时作为测试集。

NeuralForecast 类的 cross_validation 方法接受以下参数。

  • df:训练数据帧

  • step_size (int):每个窗口之间的步长。换句话说:您希望多久运行一次预测过程。

  • n_windows (int):用于交叉验证的窗口数量。换句话说:您想评估过去多少个预测过程。

from neuralforecast.auto import AutoNHITS, AutoLSTM
nf = NeuralForecast(
    models=[
        AutoNHITS(h=48, config=config_nhits, loss=MQLoss(), num_samples=5, backend="optuna"),
        AutoLSTM(h=48, config=config_lstm, loss=MQLoss(), num_samples=2, backend="optuna"), 
    ],
    freq=1,
)
cv_df = nf.cross_validation(Y_df, n_windows=2)

cv_df 对象是一个新的数据帧,包含以下列

  • unique_id:标识每个时间序列
  • ds:日期戳或时间索引
  • cutoff:n_windows 的最后一个日期戳或时间索引。如果 n_windows=1,则只有一个唯一的截止值;如果 n_windows=2,则有两个唯一的截止值。
  • y:真实值
  • "model":包含模型名称和拟合值的列。
cv_df.columns = cv_df.columns.str.replace('-median', '')
cv_df.head()
unique_iddscutoffAutoNHITSAutoNHITS-lo-90AutoNHITS-lo-80AutoNHITS-hi-80AutoNHITS-hi-90AutoLSTMAutoLSTM-lo-90AutoLSTM-lo-80AutoLSTM-hi-80AutoLSTM-hi-90y
0H1700699654.506348615.993774616.021851693.879272712.376587777.396362511.052124585.006470992.8802491084.980957684.0
1H1701699619.320068573.836060577.762695663.133301683.214478691.002991417.614349488.192810905.1011351002.091919619.0
2H1702699546.807922486.383362498.541748599.284302623.889038569.914795314.173462389.398865763.250244852.974121565.0
3H1703699483.149811420.416351435.613708536.380005561.349487548.401917305.305054379.597839732.263123817.543152532.0
4H1704699434.347931381.605713394.665619481.329041501.715546511.798950269.810272346.146484692.443542776.531921495.0
from IPython.display import display
for cutoff in cv_df['cutoff'].unique():
    display(
        plot_series(
            Y_df,
            cv_df.query('cutoff == @cutoff').drop(columns=['y', 'cutoff']),
            max_insample_length=48 * 4, 
            ids=['H102'],
        )
    )

现在,让我们评估模型的性能。

from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mse, mae, rmse

警告

您也可以使用平均绝对百分比误差 (MAPE),但对于精细预测,MAPE 值非常难以判断,对于评估预测质量作用不大。

使用均方误差指标评估您的交叉验证数据帧的结果,创建结果数据帧。

evaluation_df = evaluate(cv_df.drop(columns='cutoff'), metrics=[mse, mae, rmse])
evaluation_df['best_model'] = evaluation_df.drop(columns=['metric', 'unique_id']).idxmin(axis=1)
evaluation_df.head()
unique_idmetricAutoNHITSAutoLSTMbest_model
0H1mse2295.6300681889.340182AutoLSTM
1H10mse724.468906362.463659AutoLSTM
2H100mse62943.03125017063.347107AutoLSTM
3H101mse48771.97354012213.554997AutoLSTM
4H102mse30671.34205084569.434859AutoNHITS

创建包含模型列和该模型表现最佳的系列数量的汇总表。

summary_df = evaluation_df.groupby(['metric', 'best_model']).size().sort_values().to_frame()
summary_df = summary_df.reset_index()
summary_df.columns = ['metric', 'model', 'nr. of unique_ids']
summary_df
metricmodelunique_id 数量
0maeAutoNHITS3
1mseAutoNHITS4
2rmseAutoNHITS4
3mseAutoLSTM6
4rmseAutoLSTM6
5maeAutoLSTM7
summary_df.query('metric == "mse"')
metricmodelunique_id 数量
1mseAutoNHITS4
3mseAutoLSTM6

您可以通过绘制特定模型表现最佳的 unique_ids 来进一步探索您的结果。

nhits_ids = evaluation_df.query('best_model == "AutoNHITS" and metric == "mse"')['unique_id'].unique()

plot_series(Y_df, fcst_df, ids=nhits_ids)

6. 为每个独特系列选择最佳模型

定义一个实用函数,该函数接受包含预测结果的预测数据帧和评估数据帧,并返回包含每个 unique_id 的最佳可能预测结果的数据帧。

def get_best_model_forecast(forecasts_df, evaluation_df, metric):
    metric_eval = evaluation_df.loc[evaluation_df['metric'] == metric, ['unique_id', 'best_model']]
    with_best = forecasts_df.merge(metric_eval)
    res = with_best[['unique_id', 'ds']].copy()
    for suffix in ('', '-lo-90', '-hi-90'):
        res[f'best_model{suffix}'] = with_best.apply(lambda row: row[row['best_model'] + suffix], axis=1)
    return res

创建包含每个 unique_id 最佳预测结果的生产级数据帧。

prod_forecasts_df = get_best_model_forecast(fcst_df, evaluation_df, metric='mse')
prod_forecasts_df
unique_iddsbest_model最佳模型-lo-90最佳模型-hi-90
0H1749603.923767437.270447786.502686
1H1750533.691284383.289154702.944397
2H1751490.400085349.417816648.831299
3H1752463.768066327.452026616.572144
4H1753454.710266320.023468605.468018
475H1077924720.2563484142.4599615235.727051
476H1077934394.6054693952.0590824992.124023
477H1077944161.2211913664.0915534632.160645
478H1077953945.4326173453.0119634437.968750
479H1077963666.4460453177.9377444059.684570

绘制结果。

plot_series(Y_df, prod_forecasts_df, level=[90])