本地
目标变换
import pandas as pd
from fastcore.test import test_fail
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PowerTransformer
from utilsforecast.processing import counts_by_id
from mlforecast import MLForecast
from mlforecast.utils import generate_daily_series
来源
BaseTargetTransform
BaseTargetTransform ()
用于目标变换的基类。
来源
差分
Differences (differences:Iterable[int])
减去序列的先前值。可用于去除趋势或季节性。
series = generate_daily_series(10, min_length=50, max_length=100)
diffs = Differences([1, 2, 5])
id_counts = counts_by_id(series, 'unique_id')
indptr = np.append(0, id_counts['counts'].cumsum())
ga = GroupedArray(series['y'].values, indptr)
# differences are applied correctly
transformed = diffs.fit_transform(ga)
assert diffs.fitted_ == []
expected = series.copy()
for d in diffs.differences:
expected['y'] -= expected.groupby('unique_id', observed=True)['y'].shift(d)
np.testing.assert_allclose(transformed.data, expected['y'].values)
# fitted differences are restored correctly
diffs.store_fitted = True
transformed = diffs.fit_transform(ga)
keep_mask = ~np.isnan(transformed.data)
restored = diffs.inverse_transform_fitted(transformed)
np.testing.assert_allclose(ga.data[keep_mask], restored.data[keep_mask])
# test transform
new_ga = GroupedArray(np.random.rand(10), np.arange(11))
prev_orig = [diffs.scalers_[i].tails_[::d].copy() for i, d in enumerate(diffs.differences)]
expected = new_ga.data - np.add.reduce(prev_orig)
updates = diffs.update(new_ga)
np.testing.assert_allclose(expected, updates.data)
np.testing.assert_allclose(diffs.scalers_[0].tails_, new_ga.data)
np.testing.assert_allclose(diffs.scalers_[1].tails_[1::2], new_ga.data - prev_orig[0])
np.testing.assert_allclose(diffs.scalers_[2].tails_[4::5], new_ga.data - np.add.reduce(prev_orig[:2]))
# variable sizes
diff1 = Differences([1])
ga = GroupedArray(np.arange(10), np.array([0, 3, 10]))
diff1.fit_transform(ga)
new_ga = GroupedArray(np.arange(4), np.array([0, 1, 4]))
updates = diff1.update(new_ga)
np.testing.assert_allclose(updates.data, np.array([0 - 2, 1 - 9, 2 - 1, 3 - 2]))
np.testing.assert_allclose(diff1.scalers_[0].tails_, np.array([0, 3]))
# short series
ga = GroupedArray(np.arange(20), np.array([0, 2, 20]))
test_fail(lambda: diffs.fit_transform(ga), contains="[0]")
# stack
diffs = Differences([1, 2, 5])
ga = GroupedArray(series['y'].values, indptr)
diffs.fit_transform(ga)
stacked = Differences.stack([diffs, diffs])
for i in range(len(diffs.differences)):
np.testing.assert_allclose(
stacked.scalers_[i].tails_,
np.tile(diffs.scalers_[i].tails_, 2)
)
来源
自动差分
AutoDifferences (max_diffs:int)
找到并对每个序列应用最佳差分数。
来源
自动季节性差分
AutoSeasonalDifferences (season_length:int, max_diffs:int, n_seasons:Optional[int]=10)
找到并对每个组应用最佳季节性差分数。
类型 | 默认值 | 详情 | |
---|---|---|---|
season_length | int | 季节周期的长度。 | |
max_diffs | int | 要应用的最大差分数。 | |
n_seasons | 可选 | 10 | 用于确定差分数的季节数。默认为 10。 如果 None ,将使用所有样本;否则,将使用 season_length * n_seasons samples 的样本进行测试。值越小速度越快,但可能精度较低。 |
来源
自动季节性和差分
AutoSeasonalityAndDifferences (max_season_length:int, max_diffs:int, n_seasons:Optional[int]=10)
找到季节周期的长度并对每个组应用最佳差分数。
类型 | 默认值 | 详情 | |
---|---|---|---|
max_season_length | int | 季节周期的最大长度。 | |
max_diffs | int | 要应用的最大差分数。 | |
n_seasons | 可选 | 10 | 用于确定差分数的季节数。默认为 10。 如果 None ,将使用所有样本;否则,将使用 max_season_length * n_seasons samples 的样本进行测试。值越小速度越快,但可能精度较低。 |
def test_scaler(sc, series):
id_counts = counts_by_id(series, 'unique_id')
indptr = np.append(0, id_counts['counts'].cumsum())
ga = GroupedArray(series['y'].values, indptr)
transformed = sc.fit_transform(ga)
np.testing.assert_allclose(
sc.inverse_transform(transformed).data,
ga.data,
)
transformed2 = sc.update(ga)
np.testing.assert_allclose(transformed.data, transformed2.data)
idxs = [0, 7]
subset = ga.take(idxs)
transformed_subset = transformed.take(idxs)
subsc = sc.take(idxs)
np.testing.assert_allclose(
subsc.inverse_transform(transformed_subset).data,
subset.data,
)
stacked = sc.stack([sc, sc])
stacked_stats = stacked.scaler_.stats_
np.testing.assert_allclose(
stacked_stats,
np.tile(sc.scaler_.stats_, (2, 1)),
)
来源
LocalStandardScaler
LocalStandardScaler ()
通过减去其均值并除以其标准差来标准化每个序列。
test_scaler(LocalStandardScaler(), series)
来源
LocalMinMaxScaler
LocalMinMaxScaler ()
将每个序列缩放到 [0, 1] 区间。
test_scaler(LocalMinMaxScaler(), series)
来源
LocalRobustScaler
LocalRobustScaler (scale:str)
对异常值具有鲁棒性的缩放器。
类型 | 详情 | |
---|---|---|
scale | str | 用于缩放的统计量。可以是 ‘iqr’ (四分位距) 或 ‘mad’ (中位数绝对偏差) |
test_scaler(LocalRobustScaler(scale='iqr'), series)
test_scaler(LocalRobustScaler(scale='mad'), series)
来源
LocalBoxCox
LocalBoxCox ()
找到每个序列的最佳 lambda 并应用 Box-Cox 变换
test_scaler(LocalBoxCox(), series)
来源
GlobalSklearnTransformer
GlobalSklearnTransformer (transformer:sklearn.base.TransformerMixin)
对所有序列应用相同的 scikit-learn 变换器。
# need this import in order for isinstance to work
from mlforecast.target_transforms import Differences as ExportedDifferences
sk_boxcox = PowerTransformer(method='box-cox', standardize=False)
boxcox_global = GlobalSklearnTransformer(sk_boxcox)
single_difference = ExportedDifferences([1])
series = generate_daily_series(10)
fcst = MLForecast(
models=[LinearRegression(), HistGradientBoostingRegressor()],
freq='D',
lags=[1, 2],
target_transforms=[boxcox_global, single_difference]
)
prep = fcst.preprocess(series, dropna=False)
expected = (
pd.Series(
sk_boxcox.fit_transform(series[['y']])[:, 0], index=series['unique_id']
).groupby('unique_id', observed=True)
.diff()
.dropna()
.values
)
np.testing.assert_allclose(prep['y'].values, expected)
preds = fcst.fit(series).predict(5)
助手
回复由 AI 生成,可能包含错误。