Source code for etna.transforms.math.lags
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
from typing import Union
import pandas as pd
from etna.datasets import TSDataset
from etna.models.utils import determine_num_steps
from etna.transforms.base import FutureMixin
from etna.transforms.base import IrreversibleTransform
[docs]class LagTransform(IrreversibleTransform, FutureMixin):
"""Generates series of lags from given dataframe."""
def __init__(self, in_column: str, lags: Union[List[int], int], out_column: Optional[str] = None):
"""Create instance of LagTransform.
Parameters
----------
in_column:
name of processed column
lags:
int value or list of values for lags computation; if int, generate range of lags from 1 to given value
out_column:
base for the name of created columns;
* if set the final name is '{out_column}_{lag_number}';
* if don't set, name will be ``transform.__repr__()``,
repr will be made for transform that creates exactly this column
Raises
------
ValueError:
if lags value contains non-positive values
"""
super().__init__(required_features=[in_column])
if isinstance(lags, int):
if lags < 1:
raise ValueError(f"{type(self).__name__} works only with positive lags values, {lags} given")
self.lags = list(range(1, lags + 1))
else:
if any(lag_value < 1 for lag_value in lags):
raise ValueError(f"{type(self).__name__} works only with positive lags values")
self.lags = lags
self.in_column = in_column
self.out_column = out_column
def _get_column_name(self, lag: int) -> str:
if self.out_column is None:
temp_transform = LagTransform(in_column=self.in_column, out_column=self.out_column, lags=[lag])
return repr(temp_transform)
else:
return f"{self.out_column}_{lag}"
def _fit(self, df: pd.DataFrame) -> "LagTransform":
"""Fit method does nothing and is kept for compatibility.
Parameters
----------
df:
dataframe with data.
Returns
-------
result: LagTransform
"""
return self
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add lags to the dataset.
Parameters
----------
df:
dataframe with data to transform.
Returns
-------
result: pd.Dataframe
transformed dataframe
"""
result = df
segments = sorted(set(df.columns.get_level_values("segment")))
all_transformed_features = []
features = df.loc[:, pd.IndexSlice[:, self.in_column]]
for lag in self.lags:
column_name = self._get_column_name(lag)
transformed_features = features.shift(lag)
transformed_features.columns = pd.MultiIndex.from_product([segments, [column_name]])
all_transformed_features.append(transformed_features)
result = pd.concat([result] + all_transformed_features, axis=1)
result = result.sort_index(axis=1)
return result
[docs] def get_regressors_info(self) -> List[str]:
"""Return the list with regressors created by the transform."""
return [self._get_column_name(lag) for lag in self.lags]
[docs]class ExogShiftTransform(IrreversibleTransform, FutureMixin):
"""Shifts exogenous variables from a given dataframe."""
def __init__(self, lag: Union[int, Literal["auto"]], horizon: Optional[int] = None):
"""Create instance of ExogShiftTransform.
Parameters
----------
lag:
value for shift estimation
* if set to `int` all exogenous variables will be shifted `lag` steps forward;
* if set to `auto` minimal shift will be estimated for each variable based on
the prediction horizon and available timeline
horizon:
prediction horizon. Mandatory when set to `lag="auto"`, ignored otherwise
"""
super().__init__(required_features="all")
self.lag: Optional[int] = None
self.horizon: Optional[int] = None
self._auto = False
self._freq: Optional[str] = None
self._created_regressors: Optional[List[str]] = None
self._exog_shifts: Optional[Dict[str, int]] = None
self._exog_last_date: Optional[Dict[str, pd.Timestamp]] = None
self._filter_out_columns = {"target"}
if isinstance(lag, int):
if lag <= 0:
raise ValueError(f"{self.__class__.__name__} works only with positive lags values, {lag} given")
self.lag = lag
else:
if horizon is None:
raise ValueError("Value of `horizon` should be specified when using `auto`!")
if horizon < 1:
raise ValueError(f"{self.__class__.__name__} works only with positive horizon values, {horizon} given")
self.horizon = horizon
self._auto = True
def _save_exog_last_date(self, df_exog: Optional[pd.DataFrame] = None):
"""Save last available date of each exogenous variable."""
self._exog_last_date = {}
if df_exog is not None:
exog_names = set(df_exog.columns.get_level_values("feature"))
for name in exog_names:
feature = df_exog.loc[:, pd.IndexSlice[:, name]]
na_mask = pd.isna(feature).any(axis=1)
last_date = feature.index[~na_mask].max()
self._exog_last_date[name] = last_date
[docs] def fit(self, ts: TSDataset) -> "ExogShiftTransform":
"""Fit the transform.
Parameters
----------
ts:
Dataset to fit the transform on.
Returns
-------
:
The fitted transform instance.
"""
self._freq = ts.freq
self._save_exog_last_date(df_exog=ts.df_exog)
super().fit(ts=ts)
return self
def _fit(self, df: pd.DataFrame) -> "ExogShiftTransform":
"""Estimate shifts for exogenous variables.
Parameters
----------
df:
dataframe with data.
Returns
-------
:
Fitted `ExogShiftTransform` instance.
"""
feature_names = self._get_feature_names(df=df)
self._exog_shifts = dict()
self._created_regressors = []
for feature_name in feature_names:
shift = self._estimate_shift(df=df, feature_name=feature_name)
self._exog_shifts[feature_name] = shift
if shift > 0:
self._created_regressors.append(f"{feature_name}_shift_{shift}")
return self
def _get_feature_names(self, df: pd.DataFrame) -> List[str]:
"""Return the names of exogenous variables."""
feature_names = []
if self._exog_last_date is not None:
feature_names = list(self._exog_last_date.keys())
df_columns = df.columns.get_level_values("feature")
for name in feature_names:
if name not in df_columns:
raise ValueError(f"Feature `{name}` is expected to be in the dataframe!")
return feature_names
def _estimate_shift(self, df: pd.DataFrame, feature_name: str) -> int:
"""Estimate shift value for exogenous variable."""
if not self._auto:
return self.lag # type: ignore
if self._exog_last_date is None or self._freq is None:
raise ValueError("Call `fit()` method before estimating exog shifts!")
last_date = df.index.max()
last_feature_date = self._exog_last_date[feature_name]
if last_feature_date > last_date:
delta = -determine_num_steps(start_timestamp=last_date, end_timestamp=last_feature_date, freq=self._freq)
elif last_feature_date < last_date:
delta = determine_num_steps(start_timestamp=last_feature_date, end_timestamp=last_date, freq=self._freq)
else:
delta = 0
shift = max(0, delta + self.horizon) # type: ignore
return shift
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Shift exogenous variables.
Parameters
----------
df:
dataframe with data to transform.
Returns
-------
:
Transformed dataframe.
"""
if self._exog_shifts is None:
raise ValueError("Transform is not fitted!")
result = df
segments = sorted(set(df.columns.get_level_values("segment")))
feature_names = self._get_feature_names(df=df)
shifted_features = []
features_to_remove = []
for feature_name in feature_names:
shift = self._exog_shifts[feature_name]
feature = df.loc[:, pd.IndexSlice[:, feature_name]]
if shift > 0:
shifted_feature = feature.shift(shift, freq=self._freq)
column_name = f"{feature_name}_shift_{shift}"
shifted_feature.columns = pd.MultiIndex.from_product([segments, [column_name]])
shifted_features.append(shifted_feature)
features_to_remove.append(feature_name)
if len(features_to_remove) > 0:
result = result.drop(columns=pd.MultiIndex.from_product([segments, features_to_remove]))
result = pd.concat([result] + shifted_features, axis=1)
result.sort_index(axis=1, inplace=True)
return result
[docs] def get_regressors_info(self) -> List[str]:
"""Return the list with regressors created by the transform."""
if self._created_regressors is None:
raise ValueError("Fit the transform to get the regressors info!")
return self._created_regressors