NeuralForecast 使用的标准 DataLoader 类期望数据集由一个单一的 DataFrame 表示,该 DataFrame 在拟合模型时会完全加载到内存中。然而,当数据集过大无法满足此要求时,我们可以转而使用自定义的大规模 DataLoader。这种自定义加载器假定每个时间序列都分散存储在一系列 Parquet 文件中,并确保在给定时间点只有一批数据被加载到内存中。

在本笔记本中,我们将演示这些文件的预期格式、如何训练模型以及如何使用这种大规模 DataLoader 执行推理。

加载库

import logging
import os
import tempfile

import pandas as pd

from neuralforecast import NeuralForecast
from neuralforecast.models import NHITS
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse, smape
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)

数据

每个时间序列应存储在一个名为 unique_id=timeseries_id 的目录中。在此目录内,时间序列可以完全包含在一个 Parquet 文件中,或者分散在多个 Parquet 文件中。无论格式如何,时间序列都必须按时间排序。

例如,以下代码将 AirPassengers DataFrame(其中每个时间序列已按时间排序)拆分成以下格式



>  data
    >  unique_id=Airline1
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet
    >  unique_id=Airline2
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet


然后,我们只需输入这些目录路径的列表。

Y_df = AirPassengersPanel.copy()
Y_df
unique_iddsy趋势y_[lag12]
0Airline11949-01-31112.00112.0
1Airline11949-02-28118.01118.0
2Airline11949-03-31132.02132.0
3Airline11949-04-30129.03129.0
4Airline11949-05-31121.04121.0
283Airline21960-08-31906.0283859.0
284Airline21960-09-30808.0284763.0
285Airline21960-10-31761.0285707.0
286Airline21960-11-30690.0286662.0
287Airline21960-12-31732.0287705.0
valid = Y_df.groupby('unique_id').tail(72)
# from now on we will use the id_col as the unique identifier for the timeseries (this is because we are using the unique_id column to partition the data into parquet files)
valid = valid.rename(columns={'unique_id': 'id_col'})

train = Y_df.drop(valid.index)
train['id_col'] = train['unique_id'].copy()

# we generate the files using a temporary directory here to demonstrate the expected file structure
tmpdir = tempfile.TemporaryDirectory()
train.to_parquet(tmpdir.name, partition_cols=['unique_id'], index=False)
files_list = [f"{tmpdir.name}/{dir}" for dir in os.listdir(tmpdir.name)]
files_list
['C:\\Users\\ospra\\AppData\\Local\\Temp\\tmpxe__gjoo/unique_id=Airline1',
 'C:\\Users\\ospra\\AppData\\Local\\Temp\\tmpxe__gjoo/unique_id=Airline2']

您也可以使用 spark dataframe 创建此目录结构,如下所示:

spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
(
  spark_df
  .repartition(id_col)
  .sortWithinPartitions(id_col, time_col)
  .write
  .partitionBy(id_col)
  .parquet(out_dir)
)

DataLoader 类仍然期望静态数据作为单个 DataFrame 传入,其中每行对应一个时间序列。

static = AirPassengersStatic.rename(columns={'unique_id': 'id_col'})
static
id_colairline1airline2
0Airline101
1Airline210

模型训练

现在我们在上述数据集上训练一个 NHITS 模型。值得注意的是,NeuralForecast 目前在使用此 DataLoader 时不支持缩放。如果您想缩放时间序列,应在将其传递给 fit 方法之前完成。

horizon = 12
stacks = 3
models = [NHITS(input_size=5 * horizon,
                h=horizon,
                futr_exog_list=['trend', 'y_[lag12]'],
                stat_exog_list=['airline1', 'airline2'],
                max_steps=100,
                stack_types = stacks*['identity'],
                n_blocks = stacks*[1],
                mlp_units = [[256,256] for _ in range(stacks)],
                n_pool_kernel_size = stacks*[1],
                interpolation_mode="nearest")]
nf = NeuralForecast(models=models, freq='ME')
nf.fit(df=files_list, static_df=static, id_col='id_col')
Seed set to 1
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]

预测

处理大型数据集时,我们需要提供一个单一的 DataFrame,其中包含所有我们希望为其生成预测的时间序列的输入时间步。如果存在未来的外生特征,我们也应将这些特征的未来值包含在单独的 futr_df DataFrame 中。

对于下面的预测,我们假设只想预测 Airline2 的接下来的 12 个时间步。

valid_df = valid[valid['id_col'] == 'Airline2']
# we set input_size=60 and horizon=12 when fitting the model
pred_df = valid_df[:60]
futr_df = valid_df[60:72]
futr_df = futr_df.drop(["y"], axis=1)

predictions = nf.predict(df=pred_df, futr_df=futr_df, static_df=static)
Predicting: |          | 0/? [00:00<?, ?it/s]
predictions
id_coldsNHITS
0Airline21960-01-31713.441406
1Airline21960-02-29688.176880
2Airline21960-03-31763.382935
3Airline21960-04-30745.478027
4Airline21960-05-31758.036438
5Airline21960-06-30806.288574
6Airline21960-07-31869.563782
7Airline21960-08-31858.105896
8Airline21960-09-30803.531555
9Airline21960-10-31751.093079
10Airline21960-11-30700.435852
11Airline21960-12-31746.640259

评估

target = valid_df[60:72]
evaluate(
    predictions.merge(target.drop(["trend", "y_[lag12]"], axis=1), on=['id_col', 'ds']),
    metrics=[mae, rmse, smape],
    id_col='id_col',
    agg_fn='mean',
)
指标NHITS
0mae20.728617
1rmse26.980698
2smape0.012879