import pandas as pd
from fastcore.test import test_fail
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PowerTransformer
from utilsforecast.processing import counts_by_id

from mlforecast import MLForecast
from mlforecast.utils import generate_daily_series

来源

BaseTargetTransform

 BaseTargetTransform ()

用于目标变换的基类。


来源

差分

 Differences (differences:Iterable[int])

减去序列的先前值。可用于去除趋势或季节性。

series = generate_daily_series(10, min_length=50, max_length=100)
diffs = Differences([1, 2, 5])
id_counts = counts_by_id(series, 'unique_id')
indptr = np.append(0, id_counts['counts'].cumsum())
ga = GroupedArray(series['y'].values, indptr)

# differences are applied correctly
transformed = diffs.fit_transform(ga)
assert diffs.fitted_ == []
expected = series.copy()
for d in diffs.differences:
    expected['y'] -= expected.groupby('unique_id', observed=True)['y'].shift(d)
np.testing.assert_allclose(transformed.data, expected['y'].values)

# fitted differences are restored correctly
diffs.store_fitted = True
transformed = diffs.fit_transform(ga)
keep_mask = ~np.isnan(transformed.data)
restored = diffs.inverse_transform_fitted(transformed)
np.testing.assert_allclose(ga.data[keep_mask], restored.data[keep_mask])

# test transform
new_ga = GroupedArray(np.random.rand(10), np.arange(11))
prev_orig = [diffs.scalers_[i].tails_[::d].copy() for i, d in enumerate(diffs.differences)]
expected = new_ga.data - np.add.reduce(prev_orig)
updates = diffs.update(new_ga)
np.testing.assert_allclose(expected, updates.data)
np.testing.assert_allclose(diffs.scalers_[0].tails_, new_ga.data)
np.testing.assert_allclose(diffs.scalers_[1].tails_[1::2], new_ga.data - prev_orig[0])
np.testing.assert_allclose(diffs.scalers_[2].tails_[4::5], new_ga.data - np.add.reduce(prev_orig[:2]))
# variable sizes
diff1 = Differences([1])
ga = GroupedArray(np.arange(10), np.array([0, 3, 10]))
diff1.fit_transform(ga)
new_ga = GroupedArray(np.arange(4), np.array([0, 1, 4]))
updates = diff1.update(new_ga)
np.testing.assert_allclose(updates.data, np.array([0 - 2, 1 - 9, 2 - 1, 3 - 2]))
np.testing.assert_allclose(diff1.scalers_[0].tails_, np.array([0, 3]))

# short series
ga = GroupedArray(np.arange(20), np.array([0, 2, 20]))
test_fail(lambda: diffs.fit_transform(ga), contains="[0]")

# stack
diffs = Differences([1, 2, 5])
ga = GroupedArray(series['y'].values, indptr)
diffs.fit_transform(ga)
stacked = Differences.stack([diffs, diffs])
for i in range(len(diffs.differences)):
    np.testing.assert_allclose(
        stacked.scalers_[i].tails_,
        np.tile(diffs.scalers_[i].tails_, 2)
    )

来源

自动差分

 AutoDifferences (max_diffs:int)

找到并对每个序列应用最佳差分数。


来源

自动季节性差分

 AutoSeasonalDifferences (season_length:int, max_diffs:int,
                          n_seasons:Optional[int]=10)

找到并对每个组应用最佳季节性差分数。

类型默认值详情
season_lengthint季节周期的长度。
max_diffsint要应用的最大差分数。
n_seasons可选10用于确定差分数的季节数。默认为 10。
如果 None,将使用所有样本;否则,将使用 season_length * n_seasons samples 的样本进行测试。
值越小速度越快,但可能精度较低。

来源

自动季节性和差分

 AutoSeasonalityAndDifferences (max_season_length:int, max_diffs:int,
                                n_seasons:Optional[int]=10)

找到季节周期的长度并对每个组应用最佳差分数。

类型默认值详情
max_season_lengthint季节周期的最大长度。
max_diffsint要应用的最大差分数。
n_seasons可选10用于确定差分数的季节数。默认为 10。
如果 None,将使用所有样本;否则,将使用 max_season_length * n_seasons samples 的样本进行测试。
值越小速度越快,但可能精度较低。
def test_scaler(sc, series):
    id_counts = counts_by_id(series, 'unique_id')
    indptr = np.append(0, id_counts['counts'].cumsum())
    ga = GroupedArray(series['y'].values, indptr)
    transformed = sc.fit_transform(ga)
    np.testing.assert_allclose(
        sc.inverse_transform(transformed).data,
        ga.data,
    )
    transformed2 = sc.update(ga)
    np.testing.assert_allclose(transformed.data, transformed2.data)
    
    idxs = [0, 7]
    subset = ga.take(idxs)
    transformed_subset = transformed.take(idxs)
    subsc = sc.take(idxs)
    np.testing.assert_allclose(
        subsc.inverse_transform(transformed_subset).data,
        subset.data,
    )

    stacked = sc.stack([sc, sc])
    stacked_stats = stacked.scaler_.stats_
    np.testing.assert_allclose(
        stacked_stats,
        np.tile(sc.scaler_.stats_, (2, 1)),
    )

来源

LocalStandardScaler

 LocalStandardScaler ()

通过减去其均值并除以其标准差来标准化每个序列。

test_scaler(LocalStandardScaler(), series)

来源

LocalMinMaxScaler

 LocalMinMaxScaler ()

将每个序列缩放到 [0, 1] 区间。

test_scaler(LocalMinMaxScaler(), series)

来源

LocalRobustScaler

 LocalRobustScaler (scale:str)

对异常值具有鲁棒性的缩放器。

类型详情
scalestr用于缩放的统计量。可以是 ‘iqr’ (四分位距) 或 ‘mad’ (中位数绝对偏差)
test_scaler(LocalRobustScaler(scale='iqr'), series)
test_scaler(LocalRobustScaler(scale='mad'), series)

来源

LocalBoxCox

 LocalBoxCox ()

找到每个序列的最佳 lambda 并应用 Box-Cox 变换

test_scaler(LocalBoxCox(), series)

来源

GlobalSklearnTransformer

 GlobalSklearnTransformer (transformer:sklearn.base.TransformerMixin)

对所有序列应用相同的 scikit-learn 变换器。

# need this import in order for isinstance to work
from mlforecast.target_transforms import Differences as ExportedDifferences
sk_boxcox = PowerTransformer(method='box-cox', standardize=False)
boxcox_global = GlobalSklearnTransformer(sk_boxcox)
single_difference = ExportedDifferences([1])
series = generate_daily_series(10)
fcst = MLForecast(
    models=[LinearRegression(), HistGradientBoostingRegressor()],
    freq='D',
    lags=[1, 2],
    target_transforms=[boxcox_global, single_difference]
)
prep = fcst.preprocess(series, dropna=False)
expected = (
    pd.Series(
        sk_boxcox.fit_transform(series[['y']])[:, 0], index=series['unique_id']
    ).groupby('unique_id', observed=True)
    .diff()
    .dropna()
    .values
)
np.testing.assert_allclose(prep['y'].values, expected)
preds = fcst.fit(series).predict(5)