Source code for etna.transforms.decomposition.change_points_based.base

from abc import ABC
from abc import abstractmethod
from copy import deepcopy
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import numpy as np
import pandas as pd

from etna.transforms.base import FutureMixin
from etna.transforms.base import IrreversiblePerSegmentWrapper
from etna.transforms.base import OneSegmentTransform
from etna.transforms.base import ReversiblePerSegmentWrapper
from etna.transforms.decomposition.change_points_based.change_points_models import BaseChangePointsModelAdapter
from etna.transforms.decomposition.change_points_based.per_interval_models import PerIntervalModel


[docs]class _OneSegmentChangePointsTransform(OneSegmentTransform, ABC): def __init__( self, in_column: str, change_points_model: BaseChangePointsModelAdapter, per_interval_model: PerIntervalModel ): """Init _OneSegmentChangePointsTransform. Parameters ---------- in_column: name of column to apple transform to change_points_model: model to get change points from data per_interval_model: model to process intervals between change points """ self.in_column = in_column self.change_points_model = change_points_model self.per_interval_model = per_interval_model self.per_interval_models: Optional[Dict[Any, PerIntervalModel]] = None self.intervals: Optional[List[Tuple[Any, Any]]] = None def _init_per_interval_models(self, intervals: List[Tuple[Any, Any]]) -> Dict[Tuple[Any, Any], PerIntervalModel]: """Multiply per interval model for given intervals.""" per_interval_models = {interval: deepcopy(self.per_interval_model) for interval in intervals} return per_interval_models @staticmethod def _get_features(series: pd.Series) -> np.ndarray: """Prepare features to train per interval model. Parameters ---------- series: series to get features from Returns ------- features: array with prepared features """ features = series.index.values.reshape((-1, 1)) return features @staticmethod def _get_targets(series: pd.Series) -> np.ndarray: """Get targets from given series to train per interval model. Parameters ---------- series: series to get targets from Returns ------- targets: array with targets """ return series.values def _fit_per_interval_models(self, series: pd.Series): """Fit per-interval models with corresponding data from series.""" if self.intervals is None or self.per_interval_models is None: raise ValueError("Something went wrong on fit! Check the parameters of the transform.") for interval in self.intervals: tmp_series = series[interval[0] : interval[1]] features = self._get_features(series=tmp_series) targets = self._get_targets(series=tmp_series) self.per_interval_models[interval].fit(features=features, target=targets)
[docs] def fit(self, df: pd.DataFrame) -> "_OneSegmentChangePointsTransform": """Fit transform. Get no-changepoints intervals with change_points_model and fit per_interval_model on the intervals. Parameters ---------- df: dataframe to process Returns ------- self: fitted _OneSegmentChangePointsTransform """ self.intervals = self.change_points_model.get_change_points_intervals(df=df, in_column=self.in_column) self.per_interval_models = self._init_per_interval_models(intervals=self.intervals) series = df.loc[df[self.in_column].first_valid_index() : df[self.in_column].last_valid_index(), self.in_column] self._fit_per_interval_models(series=series) return self
def _predict_per_interval_model(self, series: pd.Series) -> pd.Series: """Apply per-interval detrending to series.""" if self.intervals is None or self.per_interval_models is None: raise ValueError("Transform is not fitted! Fit the Transform before calling transform method.") prediction_series = pd.Series(index=series.index) for interval in self.intervals: tmp_series = series[interval[0] : interval[1]] if tmp_series.empty: continue features = self._get_features(series=tmp_series) per_interval_prediction = self.per_interval_models[interval].predict(features=features) prediction_series[tmp_series.index] = per_interval_prediction return prediction_series @abstractmethod def _apply_transformation(self, df: pd.DataFrame, transformed_series: pd.Series) -> pd.DataFrame: """Apply transformation given by per_interval_model. Parameters ---------- df: original dataframe to apply transformation transformed_series: transformed series to be applied to df Returns ------- transformed_df: dataframe with applied transformation """ pass @abstractmethod def _apply_inverse_transformation(self, df: pd.DataFrame, transformed_series: pd.Series) -> pd.DataFrame: """Apply inverse transformation given by per_interval_model. Parameters ---------- df: transformed dataframe to apply inverse transformation transformed_series: transformed series to be applied to df Returns ------- transformed_df: dataframe with inverse transformation """ pass
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame: """Transform data from df. Parameters ---------- df: dataframe to apply transformation to Returns ------- transformed_df: dataframe with applied transformation """ df._is_copy = False series = df[self.in_column] transformed_series = self._predict_per_interval_model(series=series) transformed_df = self._apply_transformation(df=df, transformed_series=transformed_series) return transformed_df
[docs] def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: """Split df to intervals of stable trend according to previous change point detection and add trend to each one. Parameters ---------- df: one segment dataframe to turn trend back Returns ------- df: pd.DataFrame df with restored trend in in_column """ df._is_copy = False series = df[self.in_column] trend_series = self._predict_per_interval_model(series=series) self._apply_inverse_transformation(df=df, transformed_series=trend_series) return df
[docs]class BaseChangePointsTransform: """Base class for all the change points based transforms.""" pass
[docs]class ReversibleChangePointsTransform(BaseChangePointsTransform, ReversiblePerSegmentWrapper): """ReversibleChangePointsTransform class is a base class for all reversible transforms that work with change point."""
[docs] def get_regressors_info(self) -> List[str]: """Return the list with regressors created by the transform.""" return []
[docs]class IrreversibleChangePointsTransform(BaseChangePointsTransform, IrreversiblePerSegmentWrapper, FutureMixin): """IrreversibleChangePointsTransform class is a base class for all irreversible transforms that work with change point.""" out_column: Optional[str] = None
[docs] def get_regressors_info(self) -> List[str]: """Return the list with regressors created by the transform.""" return [self.out_column] # type: ignore