Source code for etna.datasets.utils

import re
from enum import Enum
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set

import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix

from etna import SETTINGS

if SETTINGS.torch_required:
    from torch.utils.data import Dataset
else:
    from unittest.mock import Mock

    Dataset = Mock  # type: ignore


[docs]class DataFrameFormat(str, Enum): """Enum for different types of result.""" wide = "wide" long = "long"
[docs]def duplicate_data(df: pd.DataFrame, segments: Sequence[str], format: str = DataFrameFormat.wide) -> pd.DataFrame: """Duplicate dataframe for all the segments. Parameters ---------- df: dataframe to duplicate, there should be column "timestamp" segments: list of segments for making duplication format: represent the result in TSDataset inner format (wide) or in flatten format (long) Returns ------- result: pd.DataFrame result of duplication for all the segments Raises ------ ValueError: if segments list is empty ValueError: if incorrect strategy is given ValueError: if dataframe doesn't contain "timestamp" column Examples -------- >>> from etna.datasets import generate_const_df >>> from etna.datasets import duplicate_data >>> from etna.datasets import TSDataset >>> df = generate_const_df( ... periods=50, start_time="2020-03-10", ... n_segments=2, scale=1 ... ) >>> timestamp = pd.date_range("2020-03-10", periods=100, freq="D") >>> is_friday_13 = (timestamp.weekday == 4) & (timestamp.day == 13) >>> df_exog_raw = pd.DataFrame({"timestamp": timestamp, "is_friday_13": is_friday_13}) >>> df_exog = duplicate_data(df_exog_raw, segments=["segment_0", "segment_1"], format="wide") >>> df_ts_format = TSDataset.to_dataset(df) >>> ts = TSDataset(df=df_ts_format, df_exog=df_exog, freq="D", known_future="all") >>> ts.head() segment segment_0 segment_1 feature is_friday_13 target is_friday_13 target timestamp 2020-03-10 False 1.00 False 1.00 2020-03-11 False 1.00 False 1.00 2020-03-12 False 1.00 False 1.00 2020-03-13 True 1.00 True 1.00 2020-03-14 False 1.00 False 1.00 """ from etna.datasets.tsdataset import TSDataset # check segments length if len(segments) == 0: raise ValueError("Parameter segments shouldn't be empty") # check format format_enum = DataFrameFormat(format) # check the columns if "timestamp" not in df.columns: raise ValueError("There should be 'timestamp' column") # construct long version segments_results = [] for segment in segments: df_segment = df.copy() df_segment["segment"] = segment segments_results.append(df_segment) df_long = pd.concat(segments_results, ignore_index=True) # construct wide version if necessary if format_enum == DataFrameFormat.wide: df_wide = TSDataset.to_dataset(df_long) return df_wide return df_long
[docs]class _TorchDataset(Dataset): """In memory dataset for torch dataloader.""" def __init__(self, ts_samples: List[dict]): """Init torch dataset. Parameters ---------- ts_samples: time series samples for training or inference """ self.ts_samples = ts_samples def __getitem__(self, index): return self.ts_samples[index] def __len__(self): return len(self.ts_samples)
[docs]def set_columns_wide( df_left: pd.DataFrame, df_right: pd.DataFrame, timestamps_left: Optional[Sequence[pd.Timestamp]] = None, timestamps_right: Optional[Sequence[pd.Timestamp]] = None, segments_left: Optional[Sequence[str]] = None, features_right: Optional[Sequence[str]] = None, features_left: Optional[Sequence[str]] = None, segments_right: Optional[Sequence[str]] = None, ) -> pd.DataFrame: """Set columns in a left dataframe with values from the right dataframe. Parameters ---------- df_left: dataframe to set columns in df_right: dataframe to set columns from timestamps_left: timestamps to select in ``df_left`` timestamps_right: timestamps to select in ``df_right`` segments_left: segments to select in ``df_left`` segments_right: segments to select in ``df_right`` features_left: features to select in ``df_left`` features_right: features to select in ``df_right`` Returns ------- : a new dataframe with changed columns """ # sort columns df_left = df_left.sort_index(axis=1) df_right = df_right.sort_index(axis=1) # prepare indexing timestamps_left_index = slice(None) if timestamps_left is None else timestamps_left timestamps_right_index = slice(None) if timestamps_right is None else timestamps_right segments_left_index = slice(None) if segments_left is None else segments_left segments_right_index = slice(None) if segments_right is None else segments_right features_left_index = slice(None) if features_left is None else features_left features_right_index = slice(None) if features_right is None else features_right right_value = df_right.loc[timestamps_right_index, (segments_right_index, features_right_index)] df_left.loc[timestamps_left_index, (segments_left_index, features_left_index)] = right_value.values return df_left
[docs]def match_target_quantiles(features: Set[str]) -> Set[str]: """Find quantiles in dataframe columns.""" pattern = re.compile("target_\d+\.\d+$") return {i for i in list(features) if pattern.match(i) is not None}
[docs]def match_target_components(features: Set[str]) -> Set[str]: """Find target components in a set of features.""" return set(filter(lambda f: f.startswith("target_component_"), features))
[docs]def get_target_with_quantiles(columns: pd.Index) -> Set[str]: """Find "target" column and target quantiles among dataframe columns.""" column_names = set(columns.get_level_values(level="feature")) target_columns = match_target_quantiles(column_names) if "target" in column_names: target_columns.add("target") return target_columns
[docs]def get_level_dataframe( df: pd.DataFrame, mapping_matrix: csr_matrix, source_level_segments: List[str], target_level_segments: List[str], ): """Perform mapping to dataframe at the target level. Parameters ---------- df: dataframe at the source level mapping_matrix: mapping matrix between levels source_level_segments: list of segments at the source level, set the order of segments matching the mapping matrix target_level_segments: list of segments at the target level Returns ------- : dataframe at the target level """ column_names = sorted(set(df.columns.get_level_values("feature"))) num_columns = len(column_names) num_source_level_segments = len(source_level_segments) num_target_level_segments = len(target_level_segments) if set(df.columns.get_level_values(level="segment")) != set(source_level_segments): raise ValueError("Segments mismatch for provided dataframe and `source_level_segments`!") if num_source_level_segments != mapping_matrix.shape[1]: raise ValueError("Number of source level segments do not match mapping matrix number of columns!") if num_target_level_segments != mapping_matrix.shape[0]: raise ValueError("Number of target level segments do not match mapping matrix number of columns!") # Slice should be done by source_level_segments -- to fix the order of segments for mapping matrix, # by num_columns -- to fix the order of columns to create correct index in the end source_level_data = df.loc[ pd.IndexSlice[:], pd.IndexSlice[source_level_segments, column_names] ].values # shape: (t, num_source_level_segments * num_columns) source_level_data = source_level_data.reshape( (-1, num_source_level_segments, num_columns) ) # shape: (t, num_source_level_segments, num_columns) source_level_data = np.swapaxes(source_level_data, 1, 2) # shape: (t, num_columns, num_source_level_segments) source_level_data = source_level_data.reshape( (-1, num_source_level_segments) ) # shape: (t * num_columns, num_source_level_segments) target_level_data = source_level_data @ mapping_matrix.T target_level_data = target_level_data.reshape( (-1, num_columns, num_target_level_segments) ) # shape: (t, num_columns, num_target_level_segments) target_level_data = np.swapaxes(target_level_data, 1, 2) # shape: (t, num_target_level_segments, num_columns) target_level_data = target_level_data.reshape( (-1, num_columns * num_target_level_segments) ) # shape: (t, num_target_level_segments * num_columns) target_level_segments = pd.MultiIndex.from_product( [target_level_segments, column_names], names=["segment", "feature"] ) target_level_df = pd.DataFrame(data=target_level_data, index=df.index, columns=target_level_segments) return target_level_df
[docs]def inverse_transform_target_components( target_components_df: pd.DataFrame, target_df: pd.DataFrame, inverse_transformed_target_df: pd.DataFrame ) -> pd.DataFrame: """Inverse transform target components. Parameters ---------- target_components_df: Dataframe with target components target_df: Dataframe with transformed target inverse_transformed_target_df: Dataframe with inverse_transformed target Returns ------- : Dataframe with inverse transformed target components """ components_number = len(set(target_components_df.columns.get_level_values("feature"))) scale_coef = np.repeat((inverse_transformed_target_df / target_df).values, repeats=components_number, axis=1) inverse_transformed_target_components_df = target_components_df * scale_coef return inverse_transformed_target_components_df