from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series
n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)

sf.cross_validation(df=series, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).head()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Make unique_id a column
series['unique_id'] = series['unique_id'].astype(str)

# Convert to Spark
sdf = spark.createDataFrame(series)
# Returns a Spark DataFrame
sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)
sf.cross_validation(df=sdf, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).show()

源代码

FugueBackend

 FugueBackend (engine:Any=None, conf:Any=None, **transform_kwargs:Any)

*用于分布式计算的 FugueBackend。源代码

此类使用 Fugue 后端,能够在 Spark、Dask 和 Ray 上分发计算,无需任何重写。*

类型默认值详细信息
engine任意类型选择 Spark, Dask 或 Ray。
conf任意类型引擎配置。
transform_kwargs任意类型

源代码

FugueBackend.forecast

 FugueBackend.forecast (df:~AnyDataFrame, freq:Union[str,int],
                        models:List[Any], fallback_model:Optional[Any],
                        X_df:Optional[~AnyDataFrame], h:int,
                        level:Optional[List[int]], fitted:bool, prediction
                        _intervals:Optional[statsforecast.utils.ConformalI
                        ntervals], id_col:str, time_col:str,
                        target_col:str)

*使用 FugueBackend 进行内存高效的 core.StatsForecast 预测。

此方法使用 Fugue 的 transform 函数,结合 core.StatsForecast 的 forecast 方法,高效地拟合 StatsForecast 模型列表。*

类型详细信息
df任意DataFrame包含 ids、times、targets 和 exogenous 的 DataFrame。
freq联合数据的频率。必须是有效的 pandas 或 polars 偏移量别名,或一个整数。
models列表models.StatsForecast 实例化对象列表。
fallback_model可选任意类型,可选 (默认=None)
如果模型失败时使用的模型。
仅适用于 forecastcross_validation 方法。
X_df可选包含 ids、times 和未来 exogenous 的 DataFrame。
h整型预测范围。
level可选预测区间在 0 到 100 之间的置信水平。
fitted布尔型存储样本内预测结果。
prediction_intervals可选用于校准预测区间的配置 (一致性预测)。
id_col字符串标识每个系列(时间序列)的列。
time_col字符串标识每个时间步长的列,其值可以是时间戳或整数。
target_col字符串包含目标值的列。
返回值任意类型一个 DataFrame,其中包含 models 列用于点预测,以及所有拟合模型的概率预测结果。

源代码

FugueBackend.cross_validation

 FugueBackend.cross_validation (df:~AnyDataFrame, freq:Union[str,int],
                                models:List[Any],
                                fallback_model:Optional[Any], h:int,
                                n_windows:int, step_size:int,
                                test_size:int, input_size:int,
                                level:Optional[List[int]], refit:bool,
                                fitted:bool, prediction_intervals:Optional
                                [statsforecast.utils.ConformalIntervals],
                                id_col:str, time_col:str, target_col:str)

*使用 core.StatsForecast 和 FugueBackend 进行时间交叉验证。

此方法使用 Fugue 的 transform 函数,结合 core.StatsForecast 的 cross-validation 方法,通过多个训练窗口(链式或滚动方式)高效地拟合 StatsForecast 模型列表。

StatsForecast.models 的速度以及 Fugue 的分布式计算能力,使得克服这种评估技术的高计算成本成为可能。时间交叉验证通过增加测试长度和多样性,提供了更好的模型泛化能力度量。*

类型详细信息
df任意DataFrame包含 ids、times、targets 和 exogenous 的 DataFrame。
freq联合数据的频率。必须是有效的 pandas 或 polars 偏移量别名,或一个整数。
models列表models.StatsForecast 实例化对象列表。
fallback_model可选任意类型,可选 (默认=None)
如果模型失败时使用的模型。
仅适用于 forecastcross_validation 方法。
h整型预测范围。
n_windows整型用于交叉验证的窗口数量。
step_size整型每个窗口之间的步长。
test_size整型测试集大小的长度。如果传入此值,则设置 n_windows=None
input_size整型每个窗口的输入大小,如果非 None 则为滚动窗口。
level可选预测区间在 0 到 100 之间的置信水平。
refit布尔型是否为每个窗口重新拟合模型。
如果是整数,则每隔 refit 个窗口训练模型。
fitted布尔型存储样本内预测结果。
prediction_intervals可选用于校准预测区间的配置 (一致性预测)。
id_col字符串标识每个系列(时间序列)的列。
time_col字符串标识每个时间步长的列,其值可以是时间戳或整数。
target_col字符串包含目标值的列。
返回值任意类型一个 DataFrame,其中包含 models 列用于点预测,以及所有拟合模型的概率预测结果。

Dask 分布式预测

此处我们提供一个示例,演示如何使用 Fugue 在 Dask 集群中执行代码,实现 StatsForecast 预测的分布式计算。

为此,我们使用 DaskExecutionEngine 实例化 FugueBackend 类。

import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series
# Generate Synthetic Panel Data
df = generate_series(10)
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# Instantiate FugueBackend with DaskExecutionEngine
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)

我们只需像实例化通常的 StatsForecast 类一样创建该类。

sf = StatsForecast(models=[Naive()], freq='D')

分布式预测

为了实现极快的分布式预测,我们使用 FugueBackend 作为后端,其操作方式与原始的 StatsForecast.forecast 方法类似。

它接收一个 pandas.DataFrame 作为输入,包含 [unique_id,ds,y] 列以及外生变量;其中 ds(日期戳)列应为 Pandas 期望的格式。y 列必须是数字类型,表示我们希望预测的度量值。unique_id 在面板数据中唯一标识各个系列。

# Distributed predictions with FugueBackend.
sf.forecast(df=df, h=12).compute()
sf = StatsForecast(models=[Naive()], freq='D')
xx = sf.forecast(df=df, h=12, fitted=True).compute()
yy = sf.forecast_fitted_values().compute()

分布式交叉验证

为了实现极快的分布式时间交叉验证,我们使用 cross_validation 方法,其操作方式与原始的 StatsForecast.cross_validation 方法类似。

# Distributed cross-validation with FugueBackend.
sf.cross_validation(df=df, h=12, n_windows=2).compute()