Source code for etna.transforms.math.statistics
from abc import ABC
from abc import abstractmethod
from typing import Dict
from typing import List
from typing import Optional
import bottleneck as bn
import numpy as np
import pandas as pd
from etna.datasets import TSDataset
from etna.distributions import BaseDistribution
from etna.distributions import FloatDistribution
from etna.distributions import IntDistribution
from etna.transforms.base import IrreversibleTransform
[docs]class WindowStatisticsTransform(IrreversibleTransform, ABC):
"""WindowStatisticsTransform handles computation of statistical features on windows."""
def __init__(
self,
in_column: str,
out_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
**kwargs,
):
"""Init WindowStatisticsTransform.
Parameters
----------
in_column: str
name of processed column
out_column: str
result column name
window: int
size of window to aggregate, if -1 is set all history is used
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
"""
super().__init__(required_features=[in_column])
self.in_column = in_column
self.out_column_name = out_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.kwargs = kwargs
self.in_column_regressor: Optional[bool] = None
[docs] def fit(self, ts: TSDataset) -> "WindowStatisticsTransform":
"""Fit the transform."""
self.in_column_regressor = self.in_column in ts.regressors
super().fit(ts)
return self
def _fit(self, df: pd.DataFrame) -> "WindowStatisticsTransform":
"""Fits transform."""
return self
@abstractmethod
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Aggregate targets from given series."""
pass
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute feature's value.
Parameters
----------
df: pd.DataFrame
dataframe to generate features for
Returns
-------
result: pd.DataFrame
dataframe with results
"""
history = self.seasonality * self.window if self.window != -1 else len(df)
segments = sorted(df.columns.get_level_values("segment").unique())
df_slice = df.loc[:, pd.IndexSlice[:, self.in_column]].sort_index(axis=1)
x = df_slice.values[::-1]
# Addend NaNs to obtain a window of length "history" for each point
x = np.append(x, np.empty((history - 1, x.shape[1])) * np.nan, axis=0)
isnan = np.isnan(x)
isnan = np.lib.stride_tricks.sliding_window_view(isnan, window_shape=(history, 1))[:, :, :: self.seasonality]
isnan = np.squeeze(isnan, axis=-1) # (len(df), n_segments, window)
non_nan_per_window_counts = bn.nansum(~isnan, axis=2) # (len(df), n_segments)
x = np.lib.stride_tricks.sliding_window_view(x, window_shape=(history, 1))[:, :, :: self.seasonality]
x = np.squeeze(x, axis=-1) # (len(df), n_segments, window)
y = self._aggregate(series=x) # (len(df), n_segments)
y[non_nan_per_window_counts < self.min_periods] = np.nan
y = np.nan_to_num(y, copy=False, nan=self.fillna)[::-1]
result = df.join(
pd.DataFrame(y, columns=pd.MultiIndex.from_product([segments, [self.out_column_name]]), index=df.index)
)
result = result.sort_index(axis=1)
return result
[docs] def get_regressors_info(self) -> List[str]:
"""Return the list with regressors created by the transform."""
if self.in_column_regressor is None:
raise ValueError("Fit the transform to get the correct regressors info!")
return [self.out_column_name] if self.in_column_regressor else []
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes only ``window`` parameter. Other parameters are expected to be set by the user.
Returns
-------
:
Grid to tune.
"""
return {
"window": IntDistribution(low=1, high=20),
}
[docs]class MeanTransform(WindowStatisticsTransform):
"""MeanTransform computes average value for given window.
.. math::
MeanTransform(x_t) = \\sum_{i=1}^{window}{x_{t - i}\\cdot\\alpha^{i - 1}}
"""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
alpha: float = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init MeanTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
alpha: float
autoregressive coefficient
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.window = window
self.in_column = in_column
self.seasonality = seasonality
self.alpha = alpha
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
self._alpha_range: Optional[np.ndarray] = None
super().__init__(
in_column=in_column,
window=window,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute feature's value.
Parameters
----------
df: pd.DataFrame
dataframe to generate features for
Returns
-------
result: pd.DataFrame
dataframe with results
"""
window = self.window if self.window != -1 else len(df)
self._alpha_range = np.array([self.alpha**i for i in range(window)])
self._alpha_range = np.expand_dims(self._alpha_range, axis=0) # (1, window)
return super()._transform(df)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute weighted average for window series."""
mean = np.zeros((series.shape[0], series.shape[1]))
for segment in range(mean.shape[1]):
# Loop prevents from memory overflow, 3d tensor is materialized after multiplication
mean[:, segment] = bn.nanmean(series[:, segment] * self._alpha_range, axis=1)
return mean
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes parameters: ``window``, ``alpha``. Other parameters are expected to be set by the user.
Returns
-------
:
Grid to tune.
"""
grid = super().params_to_tune()
grid.update(
{
"alpha": FloatDistribution(low=0.2, high=1),
}
)
return grid
[docs]class StdTransform(WindowStatisticsTransform):
"""StdTransform computes std value for given window.
Notes
-----
Note that ``pd.Series([1]).std()`` is ``np.nan``.
"""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
ddof: int = 1,
):
"""Init StdTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
ddof:
delta degrees of freedom; the divisor used in calculations is N - ddof, where N is the number of elements
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
self.ddof = ddof
super().__init__(
window=window,
in_column=in_column,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute std over the series."""
series = bn.nanstd(series, axis=2, ddof=self.ddof)
return series
[docs]class QuantileTransform(WindowStatisticsTransform):
"""QuantileTransform computes quantile value for given window."""
def __init__(
self,
in_column: str,
quantile: float,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init QuantileTransform.
Parameters
----------
in_column: str
name of processed column
quantile: float
quantile to calculate
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.quantile = quantile
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
in_column=in_column,
window=window,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute quantile over the series."""
# There is no "nanquantile" in bottleneck, "apply_along_axis" can't be replace with "axis=2"
series = np.apply_along_axis(np.nanquantile, axis=2, arr=series, q=self.quantile)
return series
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes parameters: ``window``, ``quantile``. Other parameters are expected to be set by the user.
Returns
-------
:
Grid to tune.
"""
grid = super().params_to_tune()
grid.update(
{
"quantile": FloatDistribution(low=0, high=1),
}
)
return grid
[docs]class MinTransform(WindowStatisticsTransform):
"""MinTransform computes min value for given window."""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init MinTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
window=window,
in_column=in_column,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute min over the series."""
series = bn.nanmin(series, axis=2)
return series
[docs]class MaxTransform(WindowStatisticsTransform):
"""MaxTransform computes max value for given window."""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init MaxTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
window=window,
in_column=in_column,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute max over the series."""
series = bn.nanmax(series, axis=2)
return series
[docs]class MedianTransform(WindowStatisticsTransform):
"""MedianTransform computes median value for given window."""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init MedianTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
window=window,
in_column=in_column,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute median over the series."""
series = bn.nanmedian(series, axis=2)
return series
[docs]class MADTransform(WindowStatisticsTransform):
"""MADTransform computes Mean Absolute Deviation over the window."""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init MADTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
window=window,
in_column=in_column,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute MAD over the series."""
mean = bn.nanmean(series, axis=2)
mean = np.expand_dims(mean, axis=-1) # (len(df), n_segments, 1)
mad = np.zeros((series.shape[0], series.shape[1]))
for segment in range(mad.shape[1]):
# Loop prevents from memory overflow, 3d tensor is materialized after multiplication
ad = np.abs(series[:, segment] - mean[:, segment])
mad[:, segment] = bn.nanmean(ad, axis=1)
return mad
[docs]class MinMaxDifferenceTransform(WindowStatisticsTransform):
"""MinMaxDifferenceTransform computes difference between max and min values for given window."""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init MaxTransform.
Parameters
----------
in_column: str
name of processed column
window: int
size of window to aggregate
seasonality: int
seasonality of lags to compute window's aggregation with
min_periods: int
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna: float
value to fill results NaNs with
out_column: str, optional
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
window=window,
in_column=in_column,
seasonality=seasonality,
min_periods=min_periods,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute max over the series."""
max_values = bn.nanmax(series, axis=2)
min_values = bn.nanmin(series, axis=2)
result = max_values - min_values
return result
[docs]class SumTransform(WindowStatisticsTransform):
"""SumTransform computes sum of values over given window."""
def __init__(
self,
in_column: str,
window: int,
seasonality: int = 1,
min_periods: int = 1,
fillna: float = 0,
out_column: Optional[str] = None,
):
"""Init SumTransform.
Parameters
----------
in_column:
name of processed column
window:
size of window to aggregate, if window == -1 compute rolling sum all over the given series
seasonality:
seasonality of lags to compute window's aggregation with
min_periods:
min number of targets in window to compute aggregation;
if there is less than ``min_periods`` number of targets return None
fillna:
value to fill results NaNs with
out_column:
result column name. If not given use ``self.__repr__()``
"""
self.in_column = in_column
self.window = window
self.seasonality = seasonality
self.min_periods = min_periods
self.fillna = fillna
self.out_column = out_column
super().__init__(
in_column=in_column,
out_column=self.out_column if self.out_column is not None else self.__repr__(),
window=window,
seasonality=seasonality,
min_periods=min_periods,
fillna=fillna,
)
def _aggregate(self, series: np.ndarray) -> np.ndarray:
"""Compute sum over the series."""
series = bn.nansum(series, axis=2)
return series
__all__ = [
"MedianTransform",
"MaxTransform",
"MinTransform",
"QuantileTransform",
"StdTransform",
"MeanTransform",
"WindowStatisticsTransform",
"MADTransform",
"MinMaxDifferenceTransform",
"SumTransform",
]