DistributedMLForecast 类是一个高层抽象,它封装了管道中的所有步骤(预处理、拟合模型和计算预测)并以分布式方式应用它们。

使用 DistributedMLForecast(相对于 MLForecast)所需的不同之处在于:

  1. 你需要搭建一个集群。我们目前支持 dask、ray 和 spark。
  2. 你的数据需要是分布式集合(dask, ray 或 spark dataframe)。
  3. 你需要使用在所选框架中实现了分布式训练的模型,例如 spark 中 LightGBM 的 SynapseML。
import platform
import sys
import tempfile

import matplotlib.pyplot as plt
import git
import numpy as np
import pandas as pd
import s3fs
from sklearn.base import BaseEstimator
from utilsforecast.feature_engineering import fourier

from mlforecast.distributed import DistributedMLForecast
from mlforecast.lag_transforms import ExpandingMean, ExponentiallyWeightedMean, RollingMean
from mlforecast.target_transforms import Differences
from mlforecast.utils import generate_daily_series, generate_prices_for_series

Dask

import dask.dataframe as dd
from dask.distributed import Client

客户端设置

client = Client(n_workers=2, threads_per_worker=1)

这里我们定义一个连接到 dask.distributed.LocalCluster 的客户端,但它也可以是任何其他类型的集群。

数据设置

对于 dask,数据必须是 dask.dataframe.DataFrame。你需要确保每个时间序列只在一个分区中,并且建议你的分区数与 worker 数相同。如果分区数多于 worker 数,请确保设置 num_threads=1 以避免嵌套并行。

所需的输入格式与 MLForecast 相同,只是它是一个 dask.dataframe.DataFrame 而不是 pandas.Dataframe

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
train, future = fourier(series, freq='d', season_length=7, k=2, h=7)
npartitions = 10
partitioned_series = dd.from_pandas(train.set_index('unique_id'), npartitions=npartitions)  # make sure we split by the id_col
partitioned_series = partitioned_series.map_partitions(lambda df: df.reset_index())
partitioned_series['unique_id'] = partitioned_series['unique_id'].astype(str)  # can't handle categoricals atm
partitioned_series
unique_iddsystatic_0static_1sin1_7sin2_7cos1_7cos2_7
npartitions=10
id_00objectdatetime64[ns]float64int64int64float32float32float32float32
id_10
id_90
id_99

模型

为了执行分布式预测,我们需要使用能够利用 dask 进行分布式训练的模型。目前的实现位于 DaskLGBMForecastDaskXGBForecast 中,它们只是原生实现的包装器。

from mlforecast.distributed.models.dask.lgb import DaskLGBMForecast
from mlforecast.distributed.models.dask.xgb import DaskXGBForecast
models = [
    DaskXGBForecast(random_state=0),
    DaskLGBMForecast(random_state=0, verbosity=-1),
]

训练

有了模型后,我们通过定义特征来实例化一个 DistributedMLForecast 对象。然后可以在此对象上调用 fit 方法,并传入 dask dataframe。

fcst = DistributedMLForecast(
    models=models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[7],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
        7: [RollingMean(window_size=14)],
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
    engine=client,
)
fcst.fit(partitioned_series, static_features=['static_0', 'static_1'])

有了拟合好的模型后,我们可以计算未来 7 个时间步的预测。

预测

preds = fcst.predict(7, X_df=future).compute()
preds.head()
unique_iddsDaskXGBForecastDaskLGBMForecast
0id_002002-09-27 00:00:0021.72284121.725511
1id_002002-09-28 00:00:0084.91819484.606362
2id_002002-09-29 00:00:00162.067624163.36802
3id_002002-09-30 00:00:00249.001477246.422894
4id_002002-10-01 00:00:00317.149512315.538403

保存和加载

训练好模型后,可以使用 DistributedMLForecast.save 方法保存用于推理的artifact。请记住,如果在远程集群上,应将远程存储(如 S3)设置为目标路径。

mlforecast 使用 fsspec 处理不同的文件系统,因此如果使用 s3,例如,还需要安装 s3fs。如果使用 pip,只需包含 aws extra,例如 pip install 'mlforecast[aws,dask]',这将安装使用 dask 进行分布式训练和保存到 S3 所需的依赖项。如果使用 conda,则需要手动安装它们 (conda install dask fsspec fugue s3fs)。

# define unique name for CI
def build_unique_name(engine):
    pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
    repo = git.Repo(search_parent_directories=True)
    sha = repo.head.object.hexsha
    return f'{sys.platform}-{pyver}-{engine}-{sha}'
save_dir = build_unique_name('dask')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
tmpdir = tempfile.TemporaryDirectory()
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)

保存好预测对象后,可以通过指定保存路径以及用于执行分布式计算的引擎(在本例中为 dask 客户端)来重新加载。

fcst2 = DistributedMLForecast.load(save_path, engine=client)

我们可以验证此对象是否产生相同的结果。

preds = fa.as_pandas(fcst.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)

转换为本地

另一种存储分布式预测对象的方法是先将其转换为本地对象,然后保存。请记住,为了这样做,存储在远程的系列数据都必须拉取到一台机器(dask 中的调度器,spark 中的驱动器等)上,因此必须确保它能容纳在内存中,它应该消耗大约目标列大小的两倍(通过在 fit 方法中使用 keep_last_n 参数可以进一步减少)。

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(7, X_df=future)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

交叉验证

cv_res = fcst.cross_validation(
    partitioned_series,
    n_windows=3,
    h=14,
    static_features=['static_0', 'static_1'],
)
cv_res.compute().head()
unique_iddsDaskXGBForecastDaskLGBMForecastcutoffy
61id_042002-08-21 00:00:0068.341868.9445392002-08-15 00:00:0069.699857
83id_152002-08-29 00:00:00199.315403199.6635552002-08-15 00:00:00206.082864
103id_172002-08-21 00:00:00156.822598158.0182462002-08-15 00:00:00152.227984
61id_242002-08-21 00:00:00136.598356136.5768652002-08-15 00:00:00138.559945
36id_332002-08-24 00:00:0095.607296.2493542002-08-15 00:00:00102.068997
client.close()

Spark

会话设置

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)

数据设置

对于 spark,数据必须是 pyspark DataFrame。你需要确保每个时间序列只在一个分区中(例如可以使用 repartitionByRange 来实现),并且建议你的分区数与 worker 数相同。如果分区数多于 worker 数,请确保设置 num_threads=1 以避免嵌套并行。

所需的输入格式与 MLForecast 相同,即它应至少包含一个 id 列、一个时间列和一个目标列。

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
series['unique_id'] = series['unique_id'].astype(str)  # can't handle categoricals atm
train, future = fourier(series, freq='d', season_length=7, k=2, h=7)
numPartitions = 4
spark_series = spark.createDataFrame(train).repartitionByRange(numPartitions, 'unique_id')

模型

为了执行分布式预测,我们需要使用能够利用 spark 进行分布式训练的模型。目前的实现位于 SparkLGBMForecastSparkXGBForecast 中,它们只是原生实现的包装器。

from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
models = [
    SparkLGBMForecast(seed=0, verbosity=-1),
    SparkXGBForecast(random_state=0),
]

训练

fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],    
    lags=[1],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
    },
    date_features=['dayofweek'],
)
fcst.fit(
    spark_series,
    static_features=['static_0', 'static_1'],
)

预测

preds = fcst.predict(7, X_df=future).toPandas()
                                                                                
preds.head()
unique_iddsSparkLGBMForecastSparkXGBForecast
0id_002002-09-2715.05357718.631477
1id_002002-09-2893.01003793.796269
2id_002002-09-29160.120148159.582315
3id_002002-09-30250.445885250.861651
4id_002002-10-01323.335956321.564089

保存和加载

训练好模型后,可以使用 DistributedMLForecast.save 方法保存用于推理的artifact。请记住,如果在远程集群上,应将远程存储(如 S3)设置为目标路径。

mlforecast 使用 fsspec 处理不同的文件系统,因此如果使用 s3,例如,还需要安装 s3fs。如果使用 pip,只需包含 aws extra,例如 pip install 'mlforecast[aws,spark]',这将安装使用 spark 进行分布式训练和保存到 S3 所需的依赖项。如果使用 conda,则需要手动安装它们 (conda install fsspec fugue pyspark s3fs)。

save_dir = build_unique_name('spark')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)
                                                                                

保存好预测对象后,可以通过指定保存路径以及用于执行分布式计算的引擎(在本例中为 spark 会话)来重新加载。

fcst2 = DistributedMLForecast.load(save_path, engine=spark)
                                                                                

我们可以验证此对象是否产生相同的结果。

preds = fa.as_pandas(fcst.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
                                                                                

转换为本地

另一种存储分布式预测对象的方法是先将其转换为本地对象,然后保存。请记住,为了这样做,存储在远程的系列数据都必须拉取到一台机器(dask 中的调度器,spark 中的驱动器等)上,因此必须确保它能容纳在内存中,它应该消耗大约目标列大小的两倍(通过在 fit 方法中使用 keep_last_n 参数可以进一步减少)。

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(7, X_df=future)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

交叉验证

cv_res = fcst.cross_validation(
    spark_series,
    n_windows=3,
    h=14,
    static_features=['static_0', 'static_1'],
).toPandas()
cv_res.head()
unique_iddsSparkLGBMForecastSparkXGBForecastcutoffy
0id_032002-08-183.2729223.3488742002-08-153.060194
1id_092002-08-20402.718091402.6225012002-08-15398.784459
2id_252002-08-2287.18981186.8916322002-08-1582.731377
3id_062002-08-2120.41679020.4785022002-08-1519.196394
4id_222002-08-23357.718513360.5020242002-08-15394.770699
spark.stop()

Ray

会话设置

import ray
from ray.cluster_utils import Cluster
ray_cluster = Cluster(
    initialize_head=True,
    head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
# add mock node to simulate a cluster
mock_node = ray_cluster.add_node(num_cpus=2)

数据设置

对于 ray,数据必须是 ray DataFrame。建议你的分区数与 worker 数相同。如果分区数多于 worker 数,请确保设置 num_threads=1 以避免嵌套并行。

所需的输入格式与 MLForecast 相同,即它应至少包含一个 id 列、一个时间列和一个目标列。

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
series['unique_id'] = series['unique_id'].astype(str)  # can't handle categoricals atm
train, future = fourier(series, freq='d', season_length=7, k=2, h=7)
ray_series = ray.data.from_pandas(train)

模型

ray 集成允许包含 lightgbm (RayLGBMRegressor) 和 xgboost (RayXGBRegressor)。

from mlforecast.distributed.models.ray.lgb import RayLGBMForecast
from mlforecast.distributed.models.ray.xgb import RayXGBForecast
models = [
    RayLGBMForecast(random_state=0, verbosity=-1),
    RayXGBForecast(random_state=0),
]

训练

要控制使用 Ray 的分区数量,我们需要在 DistributedMLForecast 中包含 num_partitions

num_partitions = 4
fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[1],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
    },
    date_features=['dayofweek'],
    num_partitions=num_partitions, # Use num_partitions to reduce overhead
)
fcst.fit(
    ray_series,
    static_features=['static_0', 'static_1'],
)

预测

preds = fcst.predict(7, X_df=future).to_pandas()
preds.head()
unique_iddsRayLGBMForecastRayXGBForecast
0id_002002-09-2715.23245510.38301
1id_002002-09-2892.28899492.531502
2id_002002-09-29160.043472160.722885
3id_002002-09-30250.03212252.821899
4id_002002-10-01322.905182324.387695

保存和加载

训练好模型后,可以使用 DistributedMLForecast.save 方法保存用于推理的artifact。请记住,如果在远程集群上,应将远程存储(如 S3)设置为目标路径。

mlforecast 使用 fsspec 处理不同的文件系统,因此如果使用 s3,例如,还需要安装 s3fs。如果使用 pip,只需包含 aws extra,例如 pip install 'mlforecast[aws,ray]',这将安装使用 ray 进行分布式训练和保存到 S3 所需的依赖项。如果使用 conda,则需要手动安装它们 (conda install fsspec fugue ray s3fs)。

save_dir = build_unique_name('ray')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)

保存好预测对象后,可以通过指定保存路径以及用于执行分布式计算的引擎(在本例中为字符串 'ray')来重新加载。

fcst2 = DistributedMLForecast.load(save_path, engine='ray')

我们可以验证此对象是否产生相同的结果。

preds = fa.as_pandas(fcst.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(7, X_df=future)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)

转换为本地

另一种存储分布式预测对象的方法是先将其转换为本地对象,然后保存。请记住,为了这样做,存储在远程的系列数据都必须拉取到一台机器(dask 中的调度器,spark 中的驱动器等)上,因此必须确保它能容纳在内存中,它应该消耗大约目标列大小的两倍(通过在 fit 方法中使用 keep_last_n 参数可以进一步减少)。

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(7, X_df=future)
# we don't check the dtype because sometimes these are arrow dtypes
# or different precisions of float
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

交叉验证

cv_res = fcst.cross_validation(
    ray_series,
    n_windows=3,
    h=14,
    static_features=['static_0', 'static_1'],
).to_pandas()
cv_res.head()
unique_iddsRayLGBMForecastRayXGBForecastcutoffy
0id_052002-09-21108.285187108.6196982002-09-12108.726387
1id_082002-09-1626.28795626.5896032002-09-1227.980670
2id_082002-09-2583.21094584.1949622002-09-1286.344885
3id_112002-09-22416.994843417.1065062002-09-12425.434661
4id_162002-09-14377.916382375.4216002002-09-12400.361977
ray.shutdown()