Spark 是一个为大规模数据处理设计的开源分布式计算框架。在本指南中,我们将解释如何在 Spark 上使用 TimeGPT

大纲

  1. 安装

  2. 加载数据

  3. 初始化 Spark

  4. 在 Spark 上使用 TimeGPT

  5. 停止 Spark

1. 安装

通过 Fugue 安装 Spark。 Fugue 提供了一个易于使用的分布式计算接口,允许用户在包括 Spark 在内的多个分布式计算框架上执行 Python 代码。

注意

您可以使用 pip 安装 fugue

pip install fugue[spark]

如果在分布式 Spark 集群上执行,请确保 nixtla 库已安装在所有工作节点上。

2. 加载数据

您可以将数据加载为 pandas DataFrame。在本教程中,我们将使用一个包含来自不同市场的每小时电价的数据集。

import pandas as pd
df = pd.read_csv(
    'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv',
    parse_dates=['ds'],
) 
df.head()
unique_iddsy
0BE2016-10-22 00:00:0070.00
1BE2016-10-22 01:00:0037.10
2BE2016-10-22 02:00:0037.10
3BE2016-10-22 03:00:0044.75
4BE2016-10-22 04:00:0037.10

3. 初始化 Spark

初始化 Spark 并将 pandas DataFrame 转换为 Spark DataFrame。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(df)
spark_df.show(5)

4. 在 Spark 上使用 TimeGPT

Spark 上使用 TimeGPT 与非分布式情况几乎相同。唯一的区别是您需要使用 Spark DataFrame。

首先,实例化 NixtlaClient 类。

from nixtla import NixtlaClient
nixtla_client = NixtlaClient(
    # defaults to os.environ.get("NIXTLA_API_KEY")
    api_key = 'my_api_key_provided_by_nixtla'
)

👍 使用 Azure AI 端点

要使用 Azure AI 端点,设置 base_url 参数

nixtla_client = NixtlaClient(base_url="you azure ai endpoint", api_key="your api_key")

然后使用 NixtlaClient 类中的任何方法,例如 forecastcross_validation

fcst_df = nixtla_client.forecast(spark_df, h=12)
fcst_df.show(5)

📘 Azure AI 中可用的模型

如果您使用的是 Azure AI 端点,请务必设置 model="azureai"

nixtla_client.forecast(..., model="azureai")

对于公共 API,我们支持两种模型:timegpt-1timegpt-1-long-horizon

默认使用 timegpt-1。请参阅本教程了解如何以及何时使用 timegpt-1-long-horizon

cv_df = nixtla_client.cross_validation(spark_df, h=12, n_windows=5, step_size=2)
cv_df.show(5)

您还可以在 Spark 上将外生变量与 TimeGPT 一起使用。为此,请参阅外生变量教程。请记住,您需要使用 Spark DataFrame,而不是 pandas DataFrame。

5. 停止 Spark

完成后,停止 Spark 会话。

spark.stop()