操作指南
MLflow
记录您的指标和模型
库
import copy
import subprocess
import time
import lightgbm as lgb
import mlflow
import pandas as pd
import requests
from sklearn.linear_model import LinearRegression
from utilsforecast.data import generate_series
from utilsforecast.losses import rmse, smape
from utilsforecast.evaluation import evaluate
from utilsforecast.feature_engineering import fourier
import mlforecast.flavor
from mlforecast import MLForecast
from mlforecast.lag_transforms import ExponentiallyWeightedMean
from mlforecast.utils import PredictionIntervals
数据设置
freq = 'h'
h = 10
series = generate_series(5, freq=freq)
valid = series.groupby('unique_id', observed=True).tail(h)
train = series.drop(valid.index)
train, X_df = fourier(train, freq=freq, season_length=24, k=2, h=h)
参数
params = {
'init': {
'models': {
'lgb': lgb.LGBMRegressor(
n_estimators=50, num_leaves=16, verbosity=-1
),
'lr': LinearRegression(),
},
'freq': freq,
'lags': [24],
'lag_transforms': {
1: [ExponentiallyWeightedMean(0.9)],
},
'num_threads': 2,
},
'fit': {
'static_features': ['unique_id'],
'prediction_intervals': PredictionIntervals(n_windows=2, h=h),
}
}
日志记录
如果您有跟踪服务器,可以运行 mlflow.set_tracking_uri(your_server_uri)
连接到它。
mlflow.set_experiment("mlforecast")
with mlflow.start_run() as run:
train_ds = mlflow.data.from_pandas(train)
valid_ds = mlflow.data.from_pandas(valid)
mlflow.log_input(train_ds, context="training")
mlflow.log_input(valid_ds, context="validation")
logged_params = copy.deepcopy(params)
logged_params['init']['models'] = {
k: (v.__class__.__name__, v.get_params())
for k, v in params['init']['models'].items()
}
mlflow.log_params(logged_params)
mlf = MLForecast(**params['init'])
mlf.fit(train, **params['fit'])
preds = mlf.predict(h, X_df=X_df)
eval_result = evaluate(
valid.merge(preds, on=['unique_id', 'ds']),
metrics=[rmse, smape],
agg_fn='mean',
)
models = mlf.models_.keys()
logged_metrics = {}
for _, row in eval_result.iterrows():
metric = row['metric']
for model in models:
logged_metrics[f'{metric}_{model}'] = row[model]
mlflow.log_metrics(logged_metrics)
mlforecast.flavor.log_model(model=mlf, artifact_path="model")
model_uri = mlflow.get_artifact_uri("model")
run_id = run.info.run_id
/home/ubuntu/repos/mlforecast/.venv/lib/python3.10/site-packages/mlflow/types/utils.py:406: UserWarning: Hint: Inferred schema contains integer column(s). Integer columns in Python cannot represent missing values. If your input data contains missing values at inference time, it will be encoded as floats and will cause a schema enforcement error. The best way to avoid this problem is to infer the model schema based on a realistic data sample (training dataset) that includes missing values. Alternatively, you can declare integer columns as doubles (float64) whenever these columns may have missing values. See `Handling Integers With Missing Values <https://www.mlflow.org/docs/latest/models.html#handling-integers-with-missing-values>`_ for more details.
warnings.warn(
2024/08/23 02:57:14 WARNING mlflow.models.model: Input example should be provided to infer model signature if the model signature is not provided when logging the model.
加载模型
loaded_model = mlforecast.flavor.load_model(model_uri=model_uri)
results = loaded_model.predict(h=h, X_df=X_df, ids=[3])
results.head(2)
unique_id | ds | lgb | lr | |
---|---|---|---|---|
0 | 3 | 2000-01-10 16:00:00 | 0.333308 | 0.243017 |
1 | 3 | 2000-01-10 17:00:00 | 0.127424 | 0.249742 |
PyFunc
loaded_pyfunc = mlforecast.flavor.pyfunc.load_model(model_uri=model_uri)
# single row dataframe
predict_conf = pd.DataFrame(
[
{
"h": h,
"ids": [0, 2],
"X_df": X_df,
"level": [80]
}
]
)
pyfunc_result = loaded_pyfunc.predict(predict_conf)
pyfunc_result.head(2)
unique_id | ds | lgb | lr | lgb-lo-80 | lgb-hi-80 | lr-lo-80 | lr-hi-80 | |
---|---|---|---|---|---|---|---|---|
0 | 0 | 2000-01-09 20:00:00 | 0.260544 | 0.244128 | 0.140168 | 0.380921 | 0.114001 | 0.374254 |
1 | 0 | 2000-01-09 21:00:00 | 0.250096 | 0.247742 | 0.072820 | 0.427372 | 0.047584 | 0.447900 |
模型服务
host = 'localhost'
port = '5000'
cmd = f'mlflow models serve -m runs:/{run_id}/model -h {host} -p {port} --env-manager local'
# initialize server
process = subprocess.Popen(cmd.split())
time.sleep(5)
# single row dataframe. must be JSON serializable
predict_conf = pd.DataFrame(
[
{
"h": h,
"ids": [3, 4],
"X_df": X_df.astype({'ds': 'str'}).to_dict(orient='list'),
"level": [95]
}
]
)
payload = {'dataframe_split': predict_conf.to_dict(orient='split', index=False)}
resp = requests.post(f'http://{host}:{port}/invocations', json=payload)
print(pd.DataFrame(resp.json()['predictions']).head(2))
process.terminate()
process.wait(timeout=10)
Downloading artifacts: 100%|██████████| 7/7 [00:00<00:00, 18430.71it/s]
2024/08/23 02:57:16 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
2024/08/23 02:57:16 INFO mlflow.pyfunc.backend: === Running command 'exec gunicorn --timeout=60 -b localhost:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2024-08-23 02:57:16 +0000] [23054] [INFO] Starting gunicorn 22.0.0
[2024-08-23 02:57:16 +0000] [23054] [INFO] Listening at: http://127.0.0.1:5000 (23054)
[2024-08-23 02:57:16 +0000] [23054] [INFO] Using worker: sync
[2024-08-23 02:57:16 +0000] [23055] [INFO] Booting worker with pid: 23055
unique_id ds lgb lr lgb-lo-95 lgb-hi-95 \
0 3 2000-01-10T16:00:00 0.333308 0.243017 0.174073 0.492544
1 3 2000-01-10T17:00:00 0.127424 0.249742 -0.009993 0.264842
lr-lo-95 lr-hi-95
0 0.032451 0.453583
1 0.045525 0.453959
[2024-08-23 02:57:20 +0000] [23054] [INFO] Handling signal: term
[2024-08-23 02:57:20 +0000] [23055] [INFO] Worker exiting (pid: 23055)
[2024-08-23 02:57:21 +0000] [23054] [INFO] Shutting down: Master
助手
回复由AI生成,可能包含错误。