import datetime

import tempfile
from nbdev import show_doc
from fastcore.test import test_eq, test_fail, test_warns

from mlforecast.callbacks import SaveFeatures
from mlforecast.lag_transforms import ExpandingMean, RollingMean
from mlforecast.target_transforms import Differences, LocalStandardScaler
from mlforecast.utils import generate_daily_series, generate_prices_for_series

数据格式

所需的输入格式是一个数据框,至少包含以下列:* unique_id,表示每个时间序列的唯一标识符 * ds,表示日期戳列 * y,表示序列的值。

除非在 TimeSeries.fit 中另有说明,否则其他所有列都被视为静态特征

series = generate_daily_series(20, n_static_features=2)
series
unique_iddsystatic_0static_1
0id_002000-01-017.4045292753
1id_002000-01-0235.9526242753
2id_002000-01-0368.9583532753
3id_002000-01-0484.9945052753
4id_002000-01-05113.2198102753
4869id_192000-03-25400.6068079745
4870id_192000-03-26538.7948249745
4871id_192000-03-27620.2021049745
4872id_192000-03-2820.6254269745
4873id_192000-03-29141.5131699745

为简单起见,我们这里只取一个时间序列。

uids = series['unique_id'].unique()
serie = series[series['unique_id'].eq(uids[0])]
serie
unique_iddsystatic_0static_1
0id_002000-01-017.4045292753
1id_002000-01-0235.9526242753
2id_002000-01-0368.9583532753
3id_002000-01-0484.9945052753
4id_002000-01-05113.2198102753
217id_002000-08-0513.2631882753
218id_002000-08-0638.2319812753
219id_002000-08-0759.5551832753
220id_002000-08-0886.9863682753
221id_002000-08-09119.2548102753

源代码

TimeSeries

 TimeSeries (freq:Union[int,str], lags:Optional[Iterable[int]]=None, lag_t
             ransforms:Optional[Dict[int,List[Union[Callable,Tuple[Callabl
             e,Any]]]]]=None,
             date_features:Optional[Iterable[Union[str,Callable]]]=None,
             num_threads:int=1, target_transforms:Optional[List[Union[mlfo
             recast.target_transforms.BaseTargetTransform,mlforecast.targe
             t_transforms._BaseGroupedArrayTargetTransform]]]=None,
             lag_transforms_namer:Optional[Callable]=None)

用于存储和转换时间序列数据的实用类。

TimeSeries 类负责定义要执行的变换(lagslag_transformsdate_features)。如果 num_threads > 1,可以使用多线程计算这些变换。

def month_start_or_end(dates):
    return dates.is_month_start | dates.is_month_end

flow_config = dict(
    freq='W-THU',
    lags=[7],
    lag_transforms={
        1: [ExpandingMean(), RollingMean(7)]
    },
    date_features=['dayofweek', 'week', month_start_or_end]
)

ts = TimeSeries(**flow_config)
ts
TimeSeries(freq=W-THU, transforms=['lag7', 'expanding_mean_lag1', 'rolling_mean_lag1_window_size7'], date_features=['dayofweek', 'week', 'month_start_or_end'], num_threads=1)

频率被转换为一个偏移量。

test_eq(ts.freq, pd.tseries.frequencies.to_offset(flow_config['freq']))

日期特征按传递给构造函数的方式存储。

test_eq(ts.date_features, flow_config['date_features'])

变换存储为一个字典,其中键是变换的名称(数据框中包含计算特征的列名),它是使用 build_transform_name 构建的,值是一个元组,其中第一个元素是应用该变换的滞后量,然后是函数,然后是函数参数。

test_eq(
    ts.transforms.keys(),
    ['lag7', 'expanding_mean_lag1', 'rolling_mean_lag1_window_size7'],
)

注意,对于 lags,我们将变换定义为其对应滞后量应用的恒等函数。这是因为 _transform_series 将滞后量作为参数,并在计算变换之前移动数组。


源代码

TimeSeries.fit_transform

 TimeSeries.fit_transform (data:~DFType, id_col:str, time_col:str,
                           target_col:str,
                           static_features:Optional[List[str]]=None,
                           dropna:bool=True,
                           keep_last_n:Optional[int]=None,
                           max_horizon:Optional[int]=None,
                           return_X_y:bool=False, as_numpy:bool=False,
                           weight_col:Optional[str]=None)

*将特征添加到 data 中,并保存预测步骤所需的信息。*

如果并非所有特征都是静态的,请在 static_features 中指定哪些是静态的。如果不想在变换后丢弃包含空值的行,请设置 dropna=False。如果 keep_last_n 不是 None,则保留跨所有序列的该数量的观测值用于更新。*

flow_config = dict(
    freq='D',
    lags=[7, 14],
    lag_transforms={
        2: [
            RollingMean(7),
            RollingMean(14),
        ]
    },
    date_features=['dayofweek', 'month', 'year'],
    num_threads=2
)

ts = TimeSeries(**flow_config)
_ = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')

序列值作为 GroupedArray 存储在属性 ga 中。如果序列值的数据类型是整型,则将其转换为 np.float32,这是因为滞后会生成 np.nan,因此我们需要浮点数据类型来存储它们。

np.testing.assert_equal(
    ts.ga.data,
    series.groupby('unique_id', observed=True).tail(ts.keep_last_n)['y'],
)

序列 ID 存储在 uids 属性中。

test_eq(ts.uids, series['unique_id'].unique())

对于每个时间序列,最后一个观测日期会被存储,以便预测从最后一个日期 + 频率开始。

test_eq(ts.last_dates, series.groupby('unique_id', observed=True)['ds'].max().values)

每个序列的最后一行(不包含 yds 列)被视为静态特征。

pd.testing.assert_frame_equal(
    ts.static_features_,
    series.groupby('unique_id', observed=True).tail(1).drop(columns=['ds', 'y']).reset_index(drop=True),
)

如果您将 static_features 传递给 TimeSeries.fit_transform,则只保留这些特征。

ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', static_features=['static_0'])

pd.testing.assert_frame_equal(
    ts.static_features_,
    series.groupby('unique_id', observed=True).tail(1)[['unique_id', 'static_0']].reset_index(drop=True),
)

您还可以在 TimeSeries.fit_transform 中指定 keep_last_n,这意味着在计算用于训练的特征后,我们只想保留每个时间序列的最后 n 个样本用于计算更新。这既节省内存又节省时间,因为更新是通过再次对所有时间序列运行变换函数并只保留最后一个值(即更新值)来执行的。

如果您的时间序列非常长,并且更新只需要少量样本,建议您将 keep_last_n 设置为计算更新所需的最小样本数,在这种情况下是 15,因为我们在滞后 2 上有一个大小为 14 的滚动平均,并且在第一次更新中,滞后 2 变成了滞后 1。这是因为在第一次更新中,滞后 1 是序列的最后一个值(或滞后 0),滞后 2 是滞后 1,依此类推。

keep_last_n = 15

ts = TimeSeries(**flow_config)
df = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=keep_last_n)
ts._predict_setup()

expected_lags = ['lag7', 'lag14']
expected_transforms = ['rolling_mean_lag2_window_size7', 
                       'rolling_mean_lag2_window_size14']
expected_date_features = ['dayofweek', 'month', 'year']

test_eq(ts.features, expected_lags + expected_transforms + expected_date_features)
test_eq(ts.static_features_.columns.tolist() + ts.features, df.columns.drop(['ds', 'y']).tolist())
# we dropped 2 rows because of the lag 2 and 13 more to have the window of size 14
test_eq(df.shape[0], series.shape[0] - (2 + 13) * ts.ga.n_groups)
test_eq(ts.ga.data.size, ts.ga.n_groups * keep_last_n)

TimeSeries.fit_transform 要求 y 列不包含任何空值。这是因为变换可能会将空值向前传播,因此如果 y 列中存在空值,您将会收到错误。

series_with_nulls = series.copy()
series_with_nulls.loc[1, 'y'] = np.nan
test_fail(
    lambda: ts.fit_transform(series_with_nulls, id_col='unique_id', time_col='ds', target_col='y'),
    contains='y column contains null values'
)

源代码

TimeSeries.predict

 TimeSeries.predict (models:Dict[str,Union[sklearn.base.BaseEstimator,List
                     [sklearn.base.BaseEstimator]]], horizon:int,
                     before_predict_callback:Optional[Callable]=None,
                     after_predict_callback:Optional[Callable]=None,
                     X_df:Optional[~DFType]=None,
                     ids:Optional[List[str]]=None)

训练好模型后,我们可以使用 TimeSeries.predict,传入模型和预测范围,以获得预测结果。

class DummyModel:
    def predict(self, X: pd.DataFrame) -> np.ndarray:
        return X['lag7'].values

horizon = 7
model = DummyModel()
ts = TimeSeries(**flow_config)
ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
predictions = ts.predict({'DummyModel': model}, horizon)

grouped_series = series.groupby('unique_id', observed=True)
expected_preds = grouped_series['y'].tail(7)  # the model predicts the lag-7
last_dates = grouped_series['ds'].max()
expected_dsmin = last_dates + pd.offsets.Day()
expected_dsmax = last_dates + horizon * pd.offsets.Day()
grouped_preds = predictions.groupby('unique_id', observed=True)

np.testing.assert_allclose(predictions['DummyModel'], expected_preds)
pd.testing.assert_series_equal(grouped_preds['ds'].min(), expected_dsmin)
pd.testing.assert_series_equal(grouped_preds['ds'].max(), expected_dsmax)

如果存在动态特征,我们可以将它们传递给 X_df

class PredictPrice:
    def predict(self, X):
        return X['price']

series = generate_daily_series(20, n_static_features=2, equal_ends=True)
dynamic_series = series.rename(columns={'static_1': 'product_id'})
prices_catalog = generate_prices_for_series(dynamic_series)
series_with_prices = dynamic_series.merge(prices_catalog, how='left')

model = PredictPrice()
ts = TimeSeries(**flow_config)
ts.fit_transform(
    series_with_prices,
    id_col='unique_id',
    time_col='ds',
    target_col='y',
    static_features=['static_0', 'product_id'],
)
predictions = ts.predict({'PredictPrice': model}, horizon=1, X_df=prices_catalog)
pd.testing.assert_frame_equal(
    predictions.rename(columns={'PredictPrice': 'price'}),
    prices_catalog.merge(predictions[['unique_id', 'ds']])[['unique_id', 'ds', 'price']]
)

源代码

TimeSeries.update

 TimeSeries.update
                    (df:Union[pandas.core.frame.DataFrame,polars.dataframe
                    .frame.DataFrame])

更新存储序列的值。