动机

AutoARIMA 模型被广泛用于生产中的时间序列预测和作为基准。然而,Python 实现 (`pmdarima`) 非常慢,导致数据科学家从业者难以快速迭代和部署 AutoARIMA 用于大量时间序列的生产环境。在此 notebook 中,我们展示了 Nixtla 基于 R 实现(由 Rob Hyndman 开发)并使用 `numba` 优化的 AutoARIMA

示例

# !pip install statsforecast prophet statsmodels sklearn matplotlib pmdarima
import logging
import os
import random
import time
import warnings
warnings.filterwarnings("ignore")
from itertools import product
from multiprocessing import cpu_count, Pool # for prophet

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pmdarima import auto_arima as auto_arima_p
from prophet import Prophet
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, _TS
from statsmodels.graphics.tsaplots import plot_acf
from sklearn.model_selection import ParameterGrid
from utilsforecast.plotting import plot_series

实用函数

def plot_autocorrelation_grid(df_train):
    fig, axes = plt.subplots(4, 2, figsize = (24, 14))

    unique_ids = df_train['unique_id'].unique()

    assert len(unique_ids) >= 8, "Must provide at least 8 ts"

    unique_ids = random.sample(list(unique_ids), k=8)

    for uid, (idx, idy) in zip(unique_ids, product(range(4), range(2))):
        train_uid = df_train.query('unique_id == @uid')
        plot_acf(train_uid['y'].values, ax=axes[idx, idy], 
                 title=f'ACF M4 Hourly {uid}')
        axes[idx, idy].set_xlabel('Timestamp [t]')
        axes[idx, idy].set_ylabel('Autocorrelation')
    fig.subplots_adjust(hspace=0.5)
    plt.show()

数据

为了测试目的,我们将使用 M4 竞赛中的每小时数据集。

train = pd.read_csv('https://auto-arima-results.s3.amazonaws.com/M4-Hourly.csv')
test = pd.read_csv('https://auto-arima-results.s3.amazonaws.com/M4-Hourly-test.csv').rename(columns={'y': 'y_test'})

在此示例中,我们将使用数据的子集,以避免等待过长时间。您可以根据需要修改序列数量。

n_series = 16
uids = train['unique_id'].unique()[:n_series]
train = train.query('unique_id in @uids')
test = test.query('unique_id in @uids')
plot_series(train, test, max_ids=n_series)

自回归模型是否适合我们的数据?毫无疑问,我们观察到季节性周期。自相关函数 (`acf`) 可以帮助我们回答这个问题。直观地说,我们必须观察到相关性递减才能选择 AR 模型。

plot_autocorrelation_grid(train)

因此,我们观察到前期滞后项以及季节性滞后项都存在高自相关。因此,我们将让 `auto_arima` 来处理我们的数据。

训练和预测

StatsForecast 接收一个模型列表,用于拟合每个时间序列。由于我们处理的是每小时数据,因此使用 24 作为季节性会很有益。

?AutoARIMA
Init signature:
AutoARIMA(
    d: Optional[int] = None,
    D: Optional[int] = None,
    max_p: int = 5,
    max_q: int = 5,
    max_P: int = 2,
    max_Q: int = 2,
    max_order: int = 5,
    max_d: int = 2,
    max_D: int = 1,
    start_p: int = 2,
    start_q: int = 2,
    start_P: int = 1,
    start_Q: int = 1,
    stationary: bool = False,
    seasonal: bool = True,
    ic: str = 'aicc',
    stepwise: bool = True,
    nmodels: int = 94,
    trace: bool = False,
    approximation: Optional[bool] = False,
    method: Optional[str] = None,
    truncate: Optional[bool] = None,
    test: str = 'kpss',
    test_kwargs: Optional[str] = None,
    seasonal_test: str = 'seas',
    seasonal_test_kwargs: Optional[Dict] = None,
    allowdrift: bool = False,
    allowmean: bool = False,
    blambda: Optional[float] = None,
    biasadj: bool = False,
    season_length: int = 1,
    alias: str = 'AutoARIMA',
    prediction_intervals: Optional[statsforecast.utils.ConformalIntervals] = None,
)
Docstring:     
AutoARIMA model.

Automatically selects the best ARIMA (AutoRegressive Integrated Moving Average)
model using an information criterion. Default is Akaike Information Criterion (AICc).

**Note:**<br/>
This implementation is a mirror of Hyndman's [forecast::auto.arima](https://github.com/robjhyndman/forecast).

**References:**<br/>
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).

Parameters
----------
d : Optional[int]
    Order of first-differencing.
D : Optional[int]
    Order of seasonal-differencing.
max_p : int
    Max autorregresives p.
max_q : int
    Max moving averages q.
max_P : int
    Max seasonal autorregresives P.
max_Q : int
    Max seasonal moving averages Q.
max_order : int
    Max p+q+P+Q value if not stepwise selection.
max_d : int
    Max non-seasonal differences.
max_D : int
    Max seasonal differences.
start_p : int
    Starting value of p in stepwise procedure.
start_q : int
    Starting value of q in stepwise procedure.
start_P : int
    Starting value of P in stepwise procedure.
start_Q : int
    Starting value of Q in stepwise procedure.
stationary : bool
    If True, restricts search to stationary models.
seasonal : bool
    If False, restricts search to non-seasonal models.
ic : str
    Information criterion to be used in model selection.
stepwise : bool
    If True, will do stepwise selection (faster).
nmodels : int
    Number of models considered in stepwise search.
trace : bool
    If True, the searched ARIMA models is reported.
approximation : Optional[bool]
    If True, conditional sums-of-squares estimation, final MLE.
method : Optional[str]
    Fitting method between maximum likelihood or sums-of-squares.
truncate : Optional[int]
    Observations truncated series used in model selection.
test : str
    Unit root test to use. See `ndiffs` for details.
test_kwargs : Optional[str]
    Unit root test additional arguments.
seasonal_test : str
    Selection method for seasonal differences.
seasonal_test_kwargs : Optional[dict]
    Seasonal unit root test arguments.
allowdrift : bool (default True)
    If True, drift models terms considered.
allowmean : bool (default True)
    If True, non-zero mean models considered.
blambda : Optional[float]
    Box-Cox transformation parameter.
biasadj : bool
    Use adjusted back-transformed mean Box-Cox.
season_length : int
    Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
    Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
    Information to compute conformal prediction intervals.
    By default, the model will compute the native prediction
    intervals.
File:           /hdd/github/statsforecast/statsforecast/models.py
Type:           type
Subclasses:     

如我们所见,我们可以将 `season_length` 传递给 AutoARIMA,因此我们的模型定义如下:

models = [AutoARIMA(season_length=24, approximation=True)]
fcst = StatsForecast(df=train, 
                     models=models, 
                     freq='H', 
                     n_jobs=-1)
init = time.time()
forecasts = fcst.forecast(48)
end = time.time()

time_nixtla = end - init
time_nixtla
40.38660216331482
forecasts.head()
dsAutoARIMA
unique_id
H1701616.084167
H1702544.432129
H1703510.414490
H1704481.046539
H1705460.893066
forecasts = forecasts.reset_index()
test = test.merge(forecasts, how='left', on=['unique_id', 'ds'])
plot_series(train, test)

替代方案

pmdarima

您可以使用 StatsForecast 类并行化您自己的模型。在本节中,我们将使用它来运行 `pmdarima` 中的 `auto_arima` 模型。

class PMDAutoARIMA(_TS):
    
    def __init__(self, season_length: int):
        self.season_length = season_length
        
    def forecast(self, y, h, X=None, X_future=None, fitted=False):
        mod = auto_arima_p(
            y, m=self.season_length,
            with_intercept=False #ensure comparability with Nixtla's implementation
        ) 
        return {'mean': mod.predict(h)}
    
    def __repr__(self):
        return 'pmdarima'
n_series_pmdarima = 2
fcst = StatsForecast(
    df = train.query('unique_id in ["H1", "H10"]'), 
    models=[PMDAutoARIMA(season_length=24)],
    freq='H',
    n_jobs=-1
)
init = time.time()
forecast_pmdarima = fcst.forecast(48)
end = time.time()

time_pmdarima = end - init
time_pmdarima
886.2768685817719
forecast_pmdarima.head()
dspmdarima
unique_id
H1701628.310547
H1702571.659851
H1703543.504700
H1704517.539062
H1705502.829559
forecast_pmdarima = forecast_pmdarima.reset_index()
test = test.merge(forecast_pmdarima, how='left', on=['unique_id', 'ds'])
plot_series(train, test, plot_random=False)

Prophet

Prophet 被设计为接收 pandas 数据帧,因此我们不能使用 StatForecast。因此,我们需要从头开始并行化。

params_grid = {'seasonality_mode': ['multiplicative','additive'],
               'growth': ['linear', 'flat'], 
               'changepoint_prior_scale': [0.1, 0.2, 0.3, 0.4, 0.5], 
               'n_changepoints': [5, 10, 15, 20]} 
grid = ParameterGrid(params_grid)
def fit_and_predict(index, ts):
    df = ts.drop(columns='unique_id', axis=1)
    max_ds = df['ds'].max()
    df['ds'] = pd.date_range(start='1970-01-01', periods=df.shape[0], freq='H')
    df_val = df.tail(48) 
    df_train = df.drop(df_val.index) 
    y_val = df_val['y'].values
    
    if len(df_train) >= 48:
        val_results = {'losses': [], 'params': []}

        for params in grid:
            model = Prophet(seasonality_mode=params['seasonality_mode'],
                            growth=params['growth'],
                            weekly_seasonality=True,
                            daily_seasonality=True,
                            yearly_seasonality=True,
                            n_changepoints=params['n_changepoints'],
                            changepoint_prior_scale=params['changepoint_prior_scale'])
            model = model.fit(df_train)
            
            forecast = model.make_future_dataframe(periods=48, 
                                                   include_history=False, 
                                                   freq='H')
            forecast = model.predict(forecast)
            forecast['unique_id'] = index
            forecast = forecast.filter(items=['unique_id', 'ds', 'yhat'])
            
            loss = np.mean(abs(y_val - forecast['yhat'].values))
            
            val_results['losses'].append(loss)
            val_results['params'].append(params)

        idx_params = np.argmin(val_results['losses']) 
        params = val_results['params'][idx_params]
    else:
        params = {'seasonality_mode': 'multiplicative',
                  'growth': 'flat',
                  'n_changepoints': 150,
                  'changepoint_prior_scale': 0.5}
    model = Prophet(seasonality_mode=params['seasonality_mode'],
                    growth=params['growth'],
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    yearly_seasonality=True,
                    n_changepoints=params['n_changepoints'],
                    changepoint_prior_scale=params['changepoint_prior_scale'])
    model = model.fit(df)
    
    forecast = model.make_future_dataframe(periods=48, 
                                           include_history=False, 
                                           freq='H')
    forecast = model.predict(forecast)
    forecast.insert(0, 'unique_id', index)
    forecast['ds'] = np.arange(max_ds + 1, max_ds + 48 + 1)
    forecast = forecast.filter(items=['unique_id', 'ds', 'yhat'])
    
    return forecast
init = time.time()
with Pool(cpu_count()) as pool:
    forecast_prophet = pool.starmap(fit_and_predict, train.groupby('unique_id'))
end = time.time()
forecast_prophet = pd.concat(forecast_prophet).rename(columns={'yhat': 'prophet'})
time_prophet = end - init
time_prophet
120.7272641658783
forecast_prophet
unique_iddsprophet
0H1701635.914254
1H1702565.976464
2H1703505.095507
3H1704462.559539
4H1705438.766801
43H1127446184.686240
44H1127456188.851888
45H1127466129.306256
46H1127476058.040672
47H1127485991.982370
test = test.merge(forecast_prophet, how='left', on=['unique_id', 'ds'])
plot_series(train, test)

评估

时间

由于 AutoARIMA 使用 numba 工作,因此计算单个时间序列的时间会很有用。

fcst = StatsForecast(df=train.query('unique_id == "H1"'), 
                     models=models, freq='H', 
                     n_jobs=1)
init = time.time()
forecasts = fcst.forecast(48)
end = time.time()

time_nixtla_1 = end - init
time_nixtla_1
18.752424716949463
times = pd.DataFrame({'n_series': np.arange(1, 414 + 1)})
times['pmdarima'] = time_pmdarima * times['n_series'] / n_series_pmdarima
times['prophet'] = time_prophet * times['n_series'] / n_series
times['AutoARIMA_nixtla'] = time_nixtla_1 + times['n_series'] * (time_nixtla - time_nixtla_1) / n_series
times = times.set_index('n_series')
times.tail(5)
pmdarimaprophetAutoARIMA_nixtla
n_series
410181686.7580593093.636144573.128222
411182129.8964943101.181598574.480358
412182573.0349283108.727052575.832494
413183016.1733623116.272506577.184630
414183459.3117963123.817960578.536766
fig, axes = plt.subplots(1, 2, figsize = (24, 7))
(times/3600).plot(ax=axes[0], linewidth=4)
np.log10(times).plot(ax=axes[1], linewidth=4)
axes[0].set_title('Time across models [Hours]', fontsize=22)
axes[1].set_title('Time across models [Log10 Scale]', fontsize=22)
axes[0].set_ylabel('Time [Hours]', fontsize=20)
axes[1].set_ylabel('Time Seconds [Log10 Scale]', fontsize=20)
fig.suptitle('Time comparison using M4-Hourly data', fontsize=27)
for ax in axes:
    ax.set_xlabel('Number of Time Series [N]', fontsize=20)
    ax.legend(prop={'size': 20})
    ax.grid()
    for label in (ax.get_xticklabels() + ax.get_yticklabels()):
        label.set_fontsize(20)

fig.savefig('computational-efficiency.png', dpi=300)

性能

pmdarima(仅两个时间序列)

name_models = test.drop(columns=['unique_id', 'ds', 'y_test']).columns.tolist()
test_pmdarima = test.query('unique_id in ["H1", "H10"]')
eval_pmdarima = []
for model in name_models:
    mae = np.mean(abs(test_pmdarima[model] - test_pmdarima['y_test']))
    eval_pmdarima.append({'model': model, 'mae': mae})
pd.DataFrame(eval_pmdarima).sort_values('mae')
模型mae
0AutoARIMA20.289669
1pmdarima24.676279
2prophet39.201933

Prophet

eval_prophet = []
for model in name_models:
    if 'pmdarima' in model:
        continue
    mae = np.mean(abs(test[model] - test['y_test']))
    eval_prophet.append({'model': model, 'mae': mae})
pd.DataFrame(eval_prophet).sort_values('mae')
模型mae
0AutoARIMA680.202965
1prophet1058.578963

如需完整对比,请查看完整实验