paddlets.utils.backtest 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from typing import List, Dict, Any, Callable, Optional, Tuple, Union
import math

import pandas as pd
import numpy as np

from paddlets.metrics import Metric, MSE
from paddlets.models.base import Trainable
from paddlets.datasets import TSDataset, TimeSeries
from paddlets.pipeline import Pipeline
from paddlets.logger import Logger, raise_if
from paddlets.utils.utils import check_model_fitted

logger = Logger(__name__)


[文档]def backtest( data: TSDataset, model: Trainable, start: Union[pd.Timestamp, int, str ,float] = None, predict_window: int = 1, stride: int = 1, metric: Optional[Metric] = None, return_score: bool = True, reduction: Union[Callable[[np.ndarray], float], None] = np.mean ) -> Union[TSDataset, List[TSDataset], float]: """ Backtest A repeated forecasting and validating process. It first use data with the length of predict_window, and then moves the end of the training set forward by `stride` time steps. By default, Backtest will generate a TSdataset with length (data_length - model.skip_chunk_len) as output. If set predict_window != stride Backtest will generate a List of TSdataset as output If set return_score=True, A metric (given by the `metric` function) is then evaluated on the forecast and the actual values. It will returns a mean-of all these metric scores by default. Args: data(TSDataset): The TSdataset to use to successively evaluate the historical forecasts model(Trainable): The fitted model to use to successively evaluate the historical forecasts start(pd.Timestamp|int|None): The first prediction time, at which a prediction is computed for a future time. predict_window(int): The predict window for the prediction. stride(int): The number of time steps between two consecutive predict window. metric(Metric): A function that takes two ``TSdataset`` instances as inputs and returns an error value. return_score(bool): If set return_score=True, A metric (given by the `metric` function) is then evaluated on the forecast and the actual values. It will returns a mean-of all these metric scores by default. reduction(Callable[[np.ndarray]|None):A function used to combine the individual error scores obtained when predict_window != stride. If explicitely set to `None`, the method will return a list of the individual error scores instead. Set to ``np.mean`` by default. Returns: Union[TSDataset,List[TSdataset],float] Raise: ValueError """ if isinstance(model, Pipeline): #如果是None, 后续的check会报错,这里就不做重复处理了 if model._model_exist: model_in_chunk_len = model._model._in_chunk_len model_skip_chunk_len = model._model._skip_chunk_len model_out_chunk_len = model._model._out_chunk_len else: model_in_chunk_len = model._in_chunk_len model_skip_chunk_len = model._skip_chunk_len model_out_chunk_len = model._out_chunk_len def _check(): # Check whether model fitted or not. check_model_fitted(model) raise_if(start < model_in_chunk_len, f"Parameter 'start' value should >= in_chunk_len {model_in_chunk_len}") raise_if(start > target_length, f"Parameter 'start' value should not exceed data target_len {target_length}") raise_if(predict_window <= 0, "Parameter 'predict_window' should be positive integer") raise_if(stride <= 0, "Parameter 'stride' should be positive integer") # When the prediction window is larger than output_chunk_len, recursive prediction is required. # Recursive prediction does not support skip_chunk_len !=0. raise_if(model_skip_chunk_len != 0 and predict_window > model._out_chunk_len, "Backtest can not work when _skip_chunk_len!=0 and \ window > _out_chunk_len at same time.") # If skip_chunk_len !=0, prediction will start from start + skip_chunk_len. if model_skip_chunk_len != 0 and predict_window > model_out_chunk_len: logger.info(f"model.skip_chunk_len is {model_skip_chunk_len}, backtest will start at \ index {start + model_skip_chunk_len} (start + skip_chunk_len)") data = data.copy() all_target = data.get_target() target_length = len(all_target) # If start is not set, set to model._in_chunk_len by default if start is None: start = model_in_chunk_len logger.info(f"Parameter 'start' not set, default set to model.in_chunk_len {model_in_chunk_len}") start = all_target.get_index_at_point(start) # Parameter check _check() # When predict_window == stride, the prediction will form a complete continuous time series, which will be automatically merged by default and return the complete TSdataset # If predict_window! = stride, the forecast will form a discontinuous time series, do not processed by default and returns List[TSdataset] return_tsdataset = True if predict_window == stride else False length = target_length - start predict_rounds = math.ceil(length / stride) results = [] scores = [] index = start for _ in range(predict_rounds): data._target, rest = all_target.split(index) rest_len = len(rest) if rest_len <= predict_window: if data.known_cov is not None: target_end_time = data._target.end_time known_index = data.known_cov.get_index_at_point(target_end_time) if len(data.known_cov) - known_index - 1 < predict_window: break predict_window = rest_len predict_window = predict_window - model_skip_chunk_len # Use recursive prediction if the prediction window is larger than the model output_chunk_len if predict_window > model_out_chunk_len: output = model.recursive_predict(data, predict_length=predict_window) else: output = model.predict(data) output.set_target( TimeSeries(output.get_target().data[0: predict_window], output.freq) ) results.append(output) real_values = TimeSeries(all_target.data[index: index + predict_window], output.freq).to_numpy() predict_values = output._target.to_numpy() if return_score: if metric is None: metric = MSE() score = metric.metric_fn(real_values, predict_values) scores.append(score) index = index + stride if return_tsdataset: results = TSDataset.concat(results) if reduction: return reduction(scores) if return_score else results else: return scores if return_score else results