# !/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import math
import numpy as np
import pandas as pd
import pickle
from typing import List, Optional, Tuple
from paddlets.models.base import Trainable
from paddlets.datasets.tsdataset import TSDataset, TimeSeries
from paddlets.logger import Logger, raise_if_not, raise_if, raise_log
from paddlets.logger.logger import log_decorator
from paddlets.models.model_loader import load as paddlets_model_load
logger = Logger(__name__)
[文档]class Pipeline(Trainable):
"""
The pipeline is designed to build a workflow for time series modeling which may be comprised of a set of
transformers and an model.
**Note**: The model is optional.
Args:
steps(List[Tuple[object, str]]): A list of transformers and a final model.
Examples:
>>> ...
>>> ksigma_params = {"cols":['example_columns'], "k": 0.5}
>>> mlp_params = {'in_chunk_len': 7, 'out_chunk_len': 3, 'skip_chunk_len': 0, 'eval_metrics': ["mse", "mae"]}
>>> pipeline = Pipeline([(KSigma, ksigma_params), (TimeFeatureGenerator, {}), (MLPRegressor, mlp_params)])
"""
def __init__(self, steps: List[Tuple[object, str]]):
raise_if(steps is None, ValueError("steps must not be None"))
for e in steps:
if 2 != len(e):
raise_log(ValueError("The expected length of the tuple is 2, but actual element len: %s" % len(e)))
self._steps = steps
self._fitted = False
self._model = None
self._model_exist = False
self._transform_list = []
# Init transformers
for index in range(len(self._steps) - 1):
e = self._steps[index]
transform_params = e[-1]
try:
transform = e[0](**transform_params)
except Exception as e:
raise_log(ValueError("init error: %s" % (str(e))))
self._transform_list.append(transform)
# Init final model
try:
last_object = self._steps[-1][0](**self._steps[-1][-1])
except Exception as e:
raise_log(ValueError("init error: %s" % (str(e))))
if hasattr(last_object, "fit_transform"):
self._transform_list.append(last_object)
else:
self._model_exist = True
self._model = last_object
[文档] @log_decorator
def fit(
self,
train_tsdataset: TSDataset,
valid_tsdataset: Optional[TSDataset] = None):
"""
Fit transformers and transform the data then fit the model.
Args:
train_tsdataset(TSDataset): Train dataset.
valid_tsdataset(TSDataset, optional): Valid dataset.
Returns:
Pipeline: Pipeline with fitted transformers and fitted model.
"""
train_tsdataset_copy = train_tsdataset.copy()
if valid_tsdataset:
valid_tsdataset_copy = valid_tsdataset.copy()
# Transform
for transform in self._transform_list:
train_tsdataset_copy = transform.fit_transform(train_tsdataset_copy)
if valid_tsdataset:
valid_tsdataset_copy = transform.fit_transform(valid_tsdataset_copy)
# Final model
if self._model:
if valid_tsdataset:
self._model.fit(train_tsdataset_copy, valid_tsdataset_copy)
else:
self._model.fit(train_tsdataset_copy)
self._fitted = True
return self
[文档] def predict(self, tsdataset: TSDataset) -> TSDataset:
"""
Transform the `TSDataset` using the fitted transformers and perform prediction with the fitted model in the
pipeline, only effective when the model exists in the pipeline.
Args:
tsdataset(TSDataset): Data to be predicted.
Returns:
TSDataset: Predicted results of calling `self.predict` on the final model.
"""
self._check_model_exist()
self._check_fitted()
tsdataset_transformed = self.transform(tsdataset)
predictions = self._model.predict(tsdataset_transformed)
predictions = self.inverse_transform(predictions)
return predictions
[文档] def predict_proba(self, tsdataset: TSDataset) -> TSDataset:
"""
Transform the `TSDataset` using the fitted transformers and perform probability prediction with the fitted
model in the pipeline, only effective when the model exists in the pipeline.
Args:
tsdataset(TSDataset): Data to be predicted.
Returns:
TSDataset: Predicted results of calling `self.predict_proba` on the final model.
"""
self._check_model_exist()
self._check_fitted()
tsdataset_transformed = self.transform(tsdataset)
# Only valid if the final model implements predict_proba.
raise_if_not(hasattr(self._model, "predict_proba"), \
"predict_proba is only valid if the final model implements predict_proba")
return self._model.predict_proba(tsdataset_transformed)
[文档] def recursive_predict(
self,
tsdataset: TSDataset,
predict_length: int
) -> TSDataset:
"""
Apply `self.predict` method iteratively for multi-step time series forecasting, the predicted results from the
current call will be appended to the `TSDataset` object and will appear in the loopback window for next call.
Note that each call of `self.predict` will return a result of length `out_chunk_len`, so it will be called
ceiling(`predict_length`/`out_chunk_len`) times to meet the required length.
Args:
tsdataset(TSDataset): Data to be predicted.
predict_length(int): Length of predicted results.
Returns:
TSDataset: Predicted results.
"""
return self._recursive_predict(tsdataset, predict_length)
[文档] def recursive_predict_proba(
self,
tsdataset: TSDataset,
predict_length: int,
) -> TSDataset:
"""
Apply `self.predict_proba` method iteratively for multi-step time series forecasting, the predicted results
from the current call will be appended to the `TSDataset` object and will appear in the loopback window for
next call. Note that each call of `self.predict_proba` will return a result of length `out_chunk_len`,
so it will be called ceiling(`predict_length`/`out_chunk_len`) times to meet the required length.
Args:
tsdataset(TSDataset): Data to be predicted.
predict_length(int): Length of predicted results.
Returns:
TSDataset: Predicted results.
"""
return self._recursive_predict(tsdataset, predict_length, need_proba=True)
def _recursive_predict(
self,
tsdataset: TSDataset,
predict_length: int,
need_proba: bool = False
) -> TSDataset:
"""
_recursive_predict
Args:
tsdataset(TSDataset): Data to be predicted.
predict_length(int): Length of predicted results.
need_proba(bool): Whether to use predict_proba to infer the class probabilities.
Returns:
TSDataset: Predicted results.
"""
self._check_model_exist()
self._check_fitted()
self._check_recursive_predict_valid(predict_length, need_proba=need_proba)
recursive_rounds = math.ceil(predict_length / self._model._out_chunk_len)
"""
Use recursive_transform , which means:
Use the predicted value of the current time
step to determine its feature transform data in the next time step.
"""
tsdataset_copy = tsdataset.copy()
# Preprocess tsdataset
if isinstance(tsdataset.get_target().data.index, pd.RangeIndex):
dataset_end_time = max(
tsdataset_copy.get_target().end_time + \
recursive_rounds * self._model._out_chunk_len * \
(tsdataset_copy.get_target().time_index.step),
tsdataset_copy.get_known_cov().end_time \
if tsdataset_copy.get_known_cov() is not None \
else tsdataset_copy.get_target().start_time,
tsdataset_copy.get_observed_cov().end_time \
if tsdataset_copy.get_observed_cov() is not None \
else tsdataset_copy.get_target().start_time
)
elif isinstance(tsdataset.get_target().data.index, pd.DatetimeIndex):
dataset_end_time = max(
tsdataset_copy.get_target().end_time + \
recursive_rounds * self._model._out_chunk_len * \
(tsdataset_copy.get_target().time_index.freq),
tsdataset_copy.get_known_cov().end_time \
if tsdataset_copy.get_known_cov() is not None \
else tsdataset_copy.get_target().start_time,
tsdataset_copy.get_observed_cov().end_time \
if tsdataset_copy.get_observed_cov() is not None \
else tsdataset_copy.get_target().start_time
)
else:
raise_log(ValueError(f"time col type not support, \
index type:{type(tsdataset.get_target().data.index)}"))
# Reindex data and the default fill value is np.nan
fill_value = np.nan
if tsdataset_copy.get_known_cov() is not None:
if isinstance(tsdataset_copy.get_known_cov().data.index, pd.RangeIndex):
tsdataset_copy.get_known_cov().reindex(
pd.RangeIndex(start=tsdataset_copy.get_known_cov().start_time,
stop=dataset_end_time + 1,
step=tsdataset_copy.get_known_cov().time_index.step),
fill_value=fill_value
)
else:
tsdataset_copy.get_known_cov().reindex(
pd.date_range(start=tsdataset_copy.get_known_cov().start_time,
end=dataset_end_time,
freq=tsdataset_copy.get_known_cov().time_index.freq),
fill_value=fill_value
)
if tsdataset_copy.get_observed_cov() is not None:
if isinstance(tsdataset_copy.get_observed_cov().data.index, pd.RangeIndex):
tsdataset_copy.get_observed_cov().reindex(
pd.RangeIndex(start=tsdataset_copy.get_observed_cov().start_time,
stop=dataset_end_time + 1,
step=tsdataset_copy.get_observed_cov().time_index.step),
fill_value=fill_value
)
else:
tsdataset_copy.get_observed_cov().reindex(
pd.date_range(start=tsdataset_copy.get_observed_cov().start_time,
end=dataset_end_time,
freq=tsdataset_copy.get_observed_cov().time_index.freq),
fill_value=fill_value
)
results = []
for _ in range(recursive_rounds):
tsdataset_tmp = tsdataset_copy.copy()
# Predict
output = None
if need_proba == True:
output = self.predict_proba(tsdataset_tmp)
else:
output = self.predict(tsdataset_tmp)
# Update data using predicted value
tsdataset_copy = TSDataset.concat([tsdataset_copy, output])
results.append(output)
# Concat results
result = TSDataset.concat(results)
# Resize result
result.set_target(
TimeSeries(result.get_target().data[0: predict_length], result.freq)
)
return result
[文档] def save(self, path: str, pipeline_file_name: str = "pipeline-partial.pkl", model_file_name: str = "paddlets_model"):
"""
Save the pipeline to a directory.
Args:
path(str): Output directory path.
pipeline_file_name(str): Name of pipeline object. This file contains transformers and
meta information of pipeline.
model_file_name(str): Name of model object. See `BaseModel.save` for more information.
"""
if not os.path.exists(path):
# Check path
os.makedirs(path)
elif not os.path.isdir(path):
raise_log(ValueError(f"path is not a directory, path : {path}"))
# Check file not exist
pipeline_file_path = os.path.join(path, pipeline_file_name)
if os.path.exists(pipeline_file_path):
raise_log(FileExistsError(f"pipeline-partial file already exist, path : {pipeline_file_path}"))
# 1.Save model
if self._model is not None:
self._model.save(os.path.join(path, model_file_name))
# 2.Save pipeline(without final model)
model_tmp = self._model
self._model = None
try:
with open(pipeline_file_path, "wb") as f:
pickle.dump(self, f)
except Exception as e:
raise_log(ValueError("error occurred while saving pipeline, file path: %s, err: %s" \
% (pipeline_file_path, str(e))))
# Reset model
self._model = model_tmp
[文档] @classmethod
def load(cls, path: str, pipeline_file_name: str = "pipeline-partial.pkl", model_file_name: str = "paddlets_model"):
"""
Load the pipeline from a directory.
Args:
path(str): Input directory path.
pipeline_file_name(str): Name of pipeline object. This file contains transformers and
meta information of pipeline.
model_file_name(str): Name of model object. See `BaseModel.save` for more information.
Returns:
Pipeline: The loaded pipeline.
"""
if not os.path.exists(path):
raise_log(FileNotFoundError(f"file exist, path : {path}"))
if not os.path.isdir(path):
raise_log(ValueError(f"path is not a directory, path : {path}"))
# 1.Load pipeline
# Check file exist
pipeline_file_path = os.path.join(path, pipeline_file_name)
if not os.path.exists(pipeline_file_path):
raise_log(FileExistsError(f"pipeline-partial file not exist, path : {pipeline_file_path}"))
try:
with open(pipeline_file_path, "rb") as f:
pipeline = pickle.load(f)
except Exception as e:
raise_log(RuntimeError(
"error occurred while loading pipeline, path: %s, error: %s" % (pipeline_file_path, str(e))))
# 2.Load model
if pipeline._model_exist is True:
model = paddlets_model_load(os.path.join(path, model_file_name))
# Add model to pipeline
pipeline._model = model
return pipeline
def _check_fitted(self):
"""
Check that pipeline is fitted.
Raise error if pipeline not fitted.
"""
if not self._fitted:
raise_log(RuntimeError("please do fit first!"))
def _check_model_exist(self):
"""
Check that self._model exists.
Raise error if self._model does not exist.
"""
if self._model is None:
raise_log(RuntimeError("model not exist"))
def _check_recursive_predict_valid(self, predict_length: int, need_proba: bool = False):
"""
Check that `recursive_predict` is valid.
Raise error if `recursive_predict` is invalid.
"""
if need_proba == True:
raise_if_not(hasattr(self._model, "recursive_predict_proba"), \
"predict_proba is only valid if the final model implements predict_proba")
# Not supported when _skip_chunk !=0
raise_if(self._model._skip_chunk_len != 0, f"recursive_predict not supported when \
_skip_chunk_len!=0, got {self._model._skip_chunk_len}.")
raise_if(predict_length <= 0, f"predict_length must be > \
0, got {predict_length}.")