paddlets.models.dl.paddlepaddle.rnn 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-


from typing import List, Dict, Any, Callable, Optional, Tuple

import numpy as np
import paddle
from paddle import nn
from paddle.optimizer import Optimizer
import paddle.nn.functional as F

from paddlets.datasets import TSDataset
from paddlets.models.dl.paddlepaddle.paddle_base_impl import PaddleBaseModelImpl
from paddlets.models.dl.paddlepaddle.callbacks import Callback
from paddlets.logger import raise_if_not, raise_log, Logger

logger = Logger(__name__)

class _RNNBlock(nn.Layer):
    """
    RNN model implemented by Paddle
    
    Args:
        in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model.
        out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model.
        target_dim(int): The numer of targets.
        known_cov_dim(int): The number of known covariate.
        observed_cov_dim(int): The number of observed covariate.
        rnn_type(str): The type of the specific paddle RNN module ("SimpleRNN", "GRU" or "LSTM").
        hidden_dim(int): The number of features in the hidden state `h` of the RNN module.
        num_layers_recurrent(int): The number of recurrent layers.
        out_fcn_config(Optional[List]): A list containing the dimensions of the hidden layers of the fully connected NN.
        dropout(float): The fraction of neurons that are dropped in all-but-last RNN layers.
    """
    def __init__(
        self,
        in_chunk_len: int,
        out_chunk_len: int,
        target_dim: int,
        known_cov_dim: int,
        observed_cov_dim: int,
        rnn_type: str,
        hidden_dim: int,
        num_layers_recurrent: int,
        out_fcn_config: Optional[List] = None,
        dropout: float = 0.0,
    ):
        super().__init__()
        self.in_chunk_len = in_chunk_len
        self.out_chunk_len = out_chunk_len

        #TODO: probability forecasting
        #self.likelihood = kwargs.get('likelihood', None)
        #self.nr_params = 1 if not self.likelihood else self.likelihood.num_parameters

        out_fcn_config = [] if out_fcn_config is None else out_fcn_config
        self._rnn_type = rnn_type
        self._target_dim = target_dim
        self._known_cov_dim = known_cov_dim
        self._observed_cov_dim = observed_cov_dim
        self._input_size = self._target_dim + self._known_cov_dim + self._observed_cov_dim
        self._rnn = getattr(nn, self._rnn_type)(self._input_size, hidden_dim, num_layers_recurrent, dropout=dropout)

        # Defining projection layer
        # The RNN module is followed by a fully connected network(FCN),
        # which maps the last hidden layer to the output of desired length
        last = hidden_dim
        feats = []
        for feature in out_fcn_config + [out_chunk_len * target_dim]:
            feats.append(nn.Linear(last, feature))
            last = feature
        self.fc = nn.Sequential(*feats)

    def forward(
            self,
            data: Dict[str, paddle.Tensor]
            ) -> paddle.Tensor:
        """
        Forward network.

        Args:
            data(Dict[str, paddle.Tensor]): a dict specifies all kinds of input data

        Returns:
            predictions: output of RNN model, with shape [batch_size, out_chunk_len, target_dim]
        """
        feature = [data["past_target"]]
        if self._known_cov_dim > 0:
            past_known = data["known_cov"][:, :self.in_chunk_len]
            feature.append(past_known)
        if self._observed_cov_dim > 0:
            observed = data["observed_cov"]
            feature.append(observed)

        x = paddle.concat(x=feature, axis=-1)

        # input x is of size: [batch_size, in_chunk_len, target_dim + observed_dim + known_dim]
        # `out` is the output of the model, with shape: [batch_size, time_steps, hidden_dim]
        # `hidden` is the final state of the model, with shape: [num_layers, batch_size, hidden_dim]
        out, hidden = self._rnn(x)

        #apply the FC network only on the last output point (at the final time step)
        if self._rnn_type == "LSTM":
            hidden = hidden[0] # for LSTM, hidden[0] shape: [num_layers, batch_size, hidden_dim]
        predictions = hidden[-1, :, :]
        predictions = self.fc(predictions)
        predictions = predictions.reshape([predictions.shape[0], self.out_chunk_len, self._target_dim])
        return predictions


[文档]class RNNBlockRegressor(PaddleBaseModelImpl): """ Implementation of RNN Block model. Args: in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. rnn_type_or_module(str, Optional): The type of the specific paddle RNN module ("SimpleRNN", "GRU" or "LSTM"). fcn_out_config(List[int], Optional): A list containing the dimensions of the hidden layers of the fully connected NN. hidden_size(int, Optional): The number of features in the hidden state `h` of the RNN module. num_layers_recurrent(int, Optional): The number of recurrent layers. dropout(float, Optional): The fraction of neurons that are dropped in all-but-last RNN layers. skip_chunk_len(int, Optional): Optional, the number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default it will NOT skip any time steps. sampling_stride(int, optional): sampling intervals between two adjacent samples. loss_fn(Callable, Optional): loss function. optimizer_fn(Callable, Optional): optimizer algorithm. optimizer_params(Dict, Optional): optimizer parameters. eval_metrics(List[str], Optional): evaluation metrics of model. callbacks(List[Callback], Optional): customized callback functions. batch_size(int, Optional): number of samples per batch. max_epochs(int, Optional): max epochs during training. verbose(int, Optional): verbosity mode. patience(int, Optional): number of epochs with no improvement after which learning rate wil be reduced. seed(int, Optional): global random seed. """ def __init__( self, in_chunk_len: int, out_chunk_len: int, rnn_type_or_module: str = "SimpleRNN", fcn_out_config: List[int] = None, hidden_size: int = 128, num_layers_recurrent: int = 1, dropout: float = 0.0, skip_chunk_len: int = 0, sampling_stride: int = 1, loss_fn: Callable[..., paddle.Tensor] = F.mse_loss, optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam, optimizer_params: Dict[str, Any] = dict(learning_rate=1e-4), eval_metrics: List[str] = [], callbacks: List[Callback] = [], batch_size: int = 128, max_epochs: int = 10, verbose: int = 1, patience: int = 4, seed: int = 0 ): self._rnn_type_or_module = rnn_type_or_module self._fcn_out_config = fcn_out_config self._hidden_size = hidden_size self._num_layers_recurrent = num_layers_recurrent self._dropout = dropout #check parameters validation raise_if_not( self._rnn_type_or_module in {"SimpleRNN", "LSTM", "GRU"}, "A valid RNN type should be specified, currently SimpleRNN, LSTM, and GRU are supported." ) super(RNNBlockRegressor, self).__init__( in_chunk_len=in_chunk_len, out_chunk_len=out_chunk_len, skip_chunk_len=skip_chunk_len, sampling_stride=sampling_stride, loss_fn=loss_fn, optimizer_fn=optimizer_fn, optimizer_params=optimizer_params, eval_metrics=eval_metrics, callbacks=callbacks, batch_size=batch_size, max_epochs=max_epochs, verbose=verbose, patience=patience, seed=seed, ) def _check_tsdataset( self, tsdataset: TSDataset ): """ Rewrite _check_tsdataset to fit the specific model. For RNN, all data variables are expected to be float32. """ for column, dtype in tsdataset.dtypes.items(): raise_if_not( np.issubdtype(dtype, np.floating), f"rnn variables' dtype only supports [float16, float32, float64], " \ f"but received {column}: {dtype}." ) super(RNNBlockRegressor, self)._check_tsdataset(tsdataset) def _update_fit_params( self, train_tsdataset: TSDataset, valid_tsdataset: Optional[TSDataset] = None ) -> Dict[str, Any]: """ Infer parameters by TSdataset automatically. Args: train_tsdataset(TSDataset): train dataset valid_tsdataset(TSDataset, optional): validation dataset Returns: Dict[str, Any]: model parameters """ fit_params = { "target_dim": train_tsdataset.get_target().data.shape[1], "known_cov_dim": 0, "observed_cov_dim": 0 } if train_tsdataset.get_known_cov() is not None: fit_params["known_cov_dim"] = train_tsdataset.get_known_cov().data.shape[1] if train_tsdataset.get_observed_cov() is not None: fit_params["observed_cov_dim"] = train_tsdataset.get_observed_cov().data.shape[1] return fit_params def _init_network(self) -> paddle.nn.Layer: """ Init network. Returns: paddle.nn.Layer """ return _RNNBlock( self._in_chunk_len, self._out_chunk_len, self._fit_params["target_dim"], self._fit_params["known_cov_dim"], self._fit_params["observed_cov_dim"], self._rnn_type_or_module, self._hidden_size, self._num_layers_recurrent, self._fcn_out_config, self._dropout )