from typing import Any
from typing import Dict
from typing import Iterator
from typing import Optional
import numpy as np
import pandas as pd
from typing_extensions import TypedDict
from etna import SETTINGS
from etna.distributions import BaseDistribution
from etna.distributions import FloatDistribution
from etna.distributions import IntDistribution
from etna.models.base import DeepBaseModel
from etna.models.base import DeepBaseNet
if SETTINGS.torch_required:
import torch
import torch.nn as nn
[docs]class RNNBatch(TypedDict):
"""Batch specification for RNN."""
encoder_real: "torch.Tensor"
decoder_real: "torch.Tensor"
encoder_target: "torch.Tensor"
decoder_target: "torch.Tensor"
segment: "torch.Tensor"
[docs]class RNNNet(DeepBaseNet):
"""RNN based Lightning module with LSTM cell."""
def __init__(
self,
input_size: int,
num_layers: int,
hidden_size: int,
lr: float,
loss: "torch.nn.Module",
optimizer_params: Optional[dict],
) -> None:
"""Init RNN based on LSTM cell.
Parameters
----------
input_size:
size of the input feature space: target plus extra features
num_layers:
number of layers
hidden_size:
size of the hidden state
lr:
learning rate
loss:
loss function
optimizer_params:
parameters for optimizer for Adam optimizer (api reference :py:class:`torch.optim.Adam`)
"""
super().__init__()
self.num_layers = num_layers
self.input_size = input_size
self.hidden_size = hidden_size
self.loss = torch.nn.MSELoss() if loss is None else loss
self.rnn = nn.LSTM(
num_layers=self.num_layers, hidden_size=self.hidden_size, input_size=self.input_size, batch_first=True
)
self.projection = nn.Linear(in_features=self.hidden_size, out_features=1)
self.lr = lr
self.optimizer_params = {} if optimizer_params is None else optimizer_params
[docs] def forward(self, x: RNNBatch, *args, **kwargs): # type: ignore
"""Forward pass.
Parameters
----------
x:
batch of data
Returns
-------
:
forecast with shape (batch_size, decoder_length, 1)
"""
encoder_real = x["encoder_real"].float() # (batch_size, encoder_length-1, input_size)
decoder_real = x["decoder_real"].float() # (batch_size, decoder_length, input_size)
decoder_target = x["decoder_target"].float() # (batch_size, decoder_length, 1)
decoder_length = decoder_real.shape[1]
output, (h_n, c_n) = self.rnn(encoder_real)
forecast = torch.zeros_like(decoder_target) # (batch_size, decoder_length, 1)
for i in range(decoder_length - 1):
output, (h_n, c_n) = self.rnn(decoder_real[:, i, None], (h_n, c_n))
forecast_point = self.projection(output[:, -1]).flatten()
forecast[:, i, 0] = forecast_point
decoder_real[:, i + 1, 0] = forecast_point
# Last point is computed out of the loop because `decoder_real[:, i + 1, 0]` would cause index error
output, (h_n, c_n) = self.rnn(decoder_real[:, decoder_length - 1, None], (h_n, c_n))
forecast_point = self.projection(output[:, -1]).flatten()
forecast[:, decoder_length - 1, 0] = forecast_point
return forecast
[docs] def step(self, batch: RNNBatch, *args, **kwargs): # type: ignore
"""Step for loss computation for training or validation.
Parameters
----------
batch:
batch of data
Returns
-------
:
loss, true_target, prediction_target
"""
encoder_real = batch["encoder_real"].float() # (batch_size, encoder_length-1, input_size)
decoder_real = batch["decoder_real"].float() # (batch_size, decoder_length, input_size)
encoder_target = batch["encoder_target"].float() # (batch_size, encoder_length-1, 1)
decoder_target = batch["decoder_target"].float() # (batch_size, decoder_length, 1)
decoder_length = decoder_real.shape[1]
output, (_, _) = self.rnn(torch.cat((encoder_real, decoder_real), dim=1))
target_prediction = output[:, -decoder_length:]
target_prediction = self.projection(target_prediction) # (batch_size, decoder_length, 1)
loss = self.loss(target_prediction, decoder_target)
return loss, decoder_target, target_prediction
[docs] def make_samples(self, df: pd.DataFrame, encoder_length: int, decoder_length: int) -> Iterator[dict]:
"""Make samples from segment DataFrame."""
values_real = (
df.select_dtypes(include=[np.number])
.assign(target_shifted=df["target"].shift(1))
.drop(["target"], axis=1)
.pipe(lambda x: x[["target_shifted"] + [i for i in x.columns if i != "target_shifted"]])
.values
)
values_target = df["target"].values
segment = df["segment"].values[0]
def _make(
values_real: np.ndarray,
values_target: np.ndarray,
segment: str,
start_idx: int,
encoder_length: int,
decoder_length: int,
) -> Optional[dict]:
sample: Dict[str, Any] = {
"encoder_real": list(),
"decoder_real": list(),
"encoder_target": list(),
"decoder_target": list(),
"segment": None,
}
total_length = len(values_target)
total_sample_length = encoder_length + decoder_length
if total_sample_length + start_idx > total_length:
return None
# Get shifted target and concatenate it with real values features
sample["decoder_real"] = values_real[start_idx + encoder_length : start_idx + total_sample_length]
# Get shifted target and concatenate it with real values features
sample["encoder_real"] = values_real[start_idx : start_idx + encoder_length]
sample["encoder_real"] = sample["encoder_real"][1:]
target = values_target[start_idx : start_idx + encoder_length + decoder_length].reshape(-1, 1)
sample["encoder_target"] = target[1:encoder_length]
sample["decoder_target"] = target[encoder_length:]
sample["segment"] = segment
return sample
start_idx = 0
while True:
batch = _make(
values_target=values_target,
values_real=values_real,
segment=segment,
start_idx=start_idx,
encoder_length=encoder_length,
decoder_length=decoder_length,
)
if batch is None:
break
yield batch
start_idx += 1
[docs]class RNNModel(DeepBaseModel):
"""RNN based model on LSTM cell."""
def __init__(
self,
input_size: int,
decoder_length: int,
encoder_length: int,
num_layers: int = 2,
hidden_size: int = 16,
lr: float = 1e-3,
loss: Optional["torch.nn.Module"] = None,
train_batch_size: int = 16,
test_batch_size: int = 16,
optimizer_params: Optional[dict] = None,
trainer_params: Optional[dict] = None,
train_dataloader_params: Optional[dict] = None,
test_dataloader_params: Optional[dict] = None,
val_dataloader_params: Optional[dict] = None,
split_params: Optional[dict] = None,
):
"""Init RNN model based on LSTM cell.
Parameters
----------
input_size:
size of the input feature space: target plus extra features
encoder_length:
encoder length
decoder_length:
decoder length
num_layers:
number of layers
hidden_size:
size of the hidden state
lr:
learning rate
loss:
loss function, MSELoss by default
train_batch_size:
batch size for training
test_batch_size:
batch size for testing
optimizer_params:
parameters for optimizer for Adam optimizer (api reference :py:class:`torch.optim.Adam`)
trainer_params:
Pytorch ligthning trainer parameters (api reference :py:class:`pytorch_lightning.trainer.trainer.Trainer`)
train_dataloader_params:
parameters for train dataloader like sampler for example (api reference :py:class:`torch.utils.data.DataLoader`)
test_dataloader_params:
parameters for test dataloader
val_dataloader_params:
parameters for validation dataloader
split_params:
dictionary with parameters for :py:func:`torch.utils.data.random_split` for train-test splitting
* **train_size**: (*float*) value from 0 to 1 - fraction of samples to use for training
* **generator**: (*Optional[torch.Generator]*) - generator for reproducibile train-test splitting
* **torch_dataset_size**: (*Optional[int]*) - number of samples in dataset, in case of dataset not implementing ``__len__``
"""
self.input_size = input_size
self.num_layers = num_layers
self.hidden_size = hidden_size
self.lr = lr
self.loss = loss
self.optimizer_params = optimizer_params
super().__init__(
net=RNNNet(
input_size=input_size,
num_layers=num_layers,
hidden_size=hidden_size,
lr=lr,
loss=nn.MSELoss() if loss is None else loss,
optimizer_params=optimizer_params,
),
decoder_length=decoder_length,
encoder_length=encoder_length,
train_batch_size=train_batch_size,
test_batch_size=test_batch_size,
train_dataloader_params=train_dataloader_params,
test_dataloader_params=test_dataloader_params,
val_dataloader_params=val_dataloader_params,
trainer_params=trainer_params,
split_params=split_params,
)
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes parameters: ``num_layers``, ``hidden_size``, ``lr``, ``encoder_length``.
Other parameters are expected to be set by the user.
Returns
-------
:
Grid to tune.
"""
return {
"num_layers": IntDistribution(low=1, high=3),
"hidden_size": IntDistribution(low=4, high=64, step=4),
"lr": FloatDistribution(low=1e-5, high=1e-2, log=True),
"encoder_length": IntDistribution(low=1, high=20),
}