import itertools
import math
import warnings
from copy import deepcopy
from itertools import combinations
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
from matplotlib.lines import Line2D
from matplotlib.ticker import MaxNLocator
from scipy.signal import periodogram
from typing_extensions import Literal
from etna.analysis.eda.utils import _create_holidays_df
from etna.analysis.eda.utils import get_correlation_matrix
from etna.analysis.feature_selection import AGGREGATION_FN
from etna.analysis.feature_selection import AggregationMode
from etna.analysis.utils import _get_borders_ts
from etna.analysis.utils import _prepare_axes
if TYPE_CHECKING:
from etna.datasets import TSDataset
from etna.transforms import TimeSeriesImputerTransform
plot_acf = sm.graphics.tsa.plot_acf
plot_pacf = sm.graphics.tsa.plot_pacf
[docs]def plot_correlation_matrix(
ts: "TSDataset",
columns: Optional[List[str]] = None,
segments: Optional[List[str]] = None,
method: str = "pearson",
mode: str = "macro",
columns_num: int = 2,
figsize: Tuple[int, int] = (10, 10),
**heatmap_kwargs,
):
"""Plot pairwise correlation heatmap for selected segments.
Parameters
----------
ts:
TSDataset with timeseries data
columns:
Columns to use, if None use all columns
segments:
Segments to use
method:
Method of correlation:
* pearson: standard correlation coefficient
* kendall: Kendall Tau correlation coefficient
* spearman: Spearman rank correlation
mode: 'macro' or 'per-segment'
Aggregation mode
columns_num:
Number of subplots columns
figsize:
size of the figure in inches
"""
if segments is None:
segments = sorted(ts.segments)
if columns is None:
columns = list(set(ts.df.columns.get_level_values("feature")))
if "vmin" not in heatmap_kwargs:
heatmap_kwargs["vmin"] = -1
if "vmax" not in heatmap_kwargs:
heatmap_kwargs["vmax"] = 1
if mode not in ["macro", "per-segment"]:
raise ValueError(f"'{mode}' is not a valid method of mode.")
if mode == "macro":
fig, ax = plt.subplots(figsize=figsize)
correlation_matrix = get_correlation_matrix(ts, columns, segments, method)
labels = list(ts[:, segments, columns].columns.values)
ax = sns.heatmap(correlation_matrix, annot=True, fmt=".1g", square=True, ax=ax, **heatmap_kwargs)
ax.set_xticks(np.arange(len(labels)) + 0.5, labels=labels)
ax.set_yticks(np.arange(len(labels)) + 0.5, labels=labels)
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
plt.setp(ax.get_yticklabels(), rotation=0, ha="right", rotation_mode="anchor")
ax.set_title("Correlation Heatmap")
if mode == "per-segment":
fig, ax = _prepare_axes(len(segments), columns_num=columns_num, figsize=figsize)
for i, segment in enumerate(segments):
correlation_matrix = get_correlation_matrix(ts, columns, [segment], method)
labels = list(ts[:, segment, columns].columns.values)
ax[i] = sns.heatmap(correlation_matrix, annot=True, fmt=".1g", square=True, ax=ax[i], **heatmap_kwargs)
ax[i].set_xticks(np.arange(len(labels)) + 0.5, labels=labels)
ax[i].set_yticks(np.arange(len(labels)) + 0.5, labels=labels)
plt.setp(ax[i].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
plt.setp(ax[i].get_yticklabels(), rotation=0, ha="right", rotation_mode="anchor")
ax[i].set_title("Correlation Heatmap" + " " + segment)
[docs]def plot_clusters(
ts: "TSDataset",
segment2cluster: Dict[str, int],
centroids_df: Optional[pd.DataFrame] = None,
columns_num: int = 2,
figsize: Tuple[int, int] = (10, 5),
):
"""Plot clusters [with centroids].
Parameters
----------
ts:
TSDataset with timeseries
segment2cluster:
mapping from segment to cluster in format {segment: cluster}
centroids_df:
dataframe with centroids
columns_num:
number of columns in subplots
figsize:
size of the figure per subplot with one segment in inches
"""
unique_clusters = sorted(set(segment2cluster.values()))
_, ax = _prepare_axes(num_plots=len(unique_clusters), columns_num=columns_num, figsize=figsize)
default_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
segment_color = default_colors[0]
for i, cluster in enumerate(unique_clusters):
segments = [segment for segment in segment2cluster if segment2cluster[segment] == cluster]
for segment in segments:
segment_slice = ts[:, segment, "target"]
ax[i].plot(
segment_slice.index.values,
segment_slice.values,
alpha=1 / math.sqrt(len(segments)),
c=segment_color,
)
ax[i].set_title(f"cluster={cluster}\n{len(segments)} segments in cluster")
if centroids_df is not None:
centroid = centroids_df[cluster, "target"]
ax[i].plot(centroid.index.values, centroid.values, c="red", label="centroid")
ax[i].legend()
[docs]def plot_periodogram(
ts: "TSDataset",
period: float,
amplitude_aggregation_mode: Union[str, Literal["per-segment"]] = AggregationMode.mean,
periodogram_params: Optional[Dict[str, Any]] = None,
segments: Optional[List[str]] = None,
xticks: Optional[List[Any]] = None,
columns_num: int = 2,
figsize: Tuple[int, int] = (10, 5),
):
"""Plot the periodogram using :py:func:`scipy.signal.periodogram`.
It is useful to determine the optimal ``order`` parameter
for :py:class:`~etna.transforms.timestamp.fourier.FourierTransform`.
Parameters
----------
ts:
TSDataset with timeseries data
period:
the period of the seasonality to capture in frequency units of time series, it should be >= 2;
it is translated to the ``fs`` parameter of :py:func:`scipy.signal.periodogram`
amplitude_aggregation_mode:
aggregation strategy for obtained per segment periodograms;
all the strategies can be examined
at :py:class:`~etna.analysis.feature_selection.mrmr_selection.AggregationMode`
periodogram_params:
additional keyword arguments for periodogram, :py:func:`scipy.signal.periodogram` is used
segments:
segments to use
xticks:
list of tick locations of the x-axis, useful to highlight specific reference periodicities
columns_num:
if ``amplitude_aggregation_mode="per-segment"`` number of columns in subplots, otherwise the value is ignored
figsize:
size of the figure per subplot with one segment in inches
Raises
------
ValueError:
if period < 2
ValueError:
if periodogram can't be calculated on segment because of the NaNs inside it
Notes
-----
In non per-segment mode all segments are cut to be the same length, the last values are taken.
"""
if period < 2:
raise ValueError("Period should be at least 2")
if periodogram_params is None:
periodogram_params = {}
if not segments:
segments = sorted(ts.segments)
df = ts.to_pandas()
# plot periodograms
if amplitude_aggregation_mode == "per-segment":
_, ax = _prepare_axes(num_plots=len(segments), columns_num=columns_num, figsize=figsize)
for i, segment in enumerate(segments):
segment_df = df.loc[:, pd.IndexSlice[segment, "target"]]
segment_df = segment_df[segment_df.first_valid_index() : segment_df.last_valid_index()]
if segment_df.isna().any():
raise ValueError(f"Periodogram can't be calculated on segment with NaNs inside: {segment}")
frequencies, spectrum = periodogram(x=segment_df, fs=period, **periodogram_params)
spectrum = spectrum[frequencies >= 1]
frequencies = frequencies[frequencies >= 1]
ax[i].step(frequencies, spectrum)
ax[i].set_xscale("log")
ax[i].set_xlabel("Frequency")
ax[i].set_ylabel("Power spectral density")
if xticks is not None:
ax[i].set_xticks(ticks=xticks, labels=xticks)
ax[i].set_title(f"Periodogram: {segment}")
else:
# find length of each segment
lengths_segments = []
for segment in segments:
segment_df = df.loc[:, pd.IndexSlice[segment, "target"]]
segment_df = segment_df[segment_df.first_valid_index() : segment_df.last_valid_index()]
if segment_df.isna().any():
raise ValueError(f"Periodogram can't be calculated on segment with NaNs inside: {segment}")
lengths_segments.append(len(segment_df))
cut_length = min(lengths_segments)
# cut each segment to `cut_length` last elements and find periodogram for each segment
frequencies_segments = []
spectrums_segments = []
for segment in segments:
segment_df = df.loc[:, pd.IndexSlice[segment, "target"]]
segment_df = segment_df[segment_df.first_valid_index() : segment_df.last_valid_index()][-cut_length:]
frequencies, spectrum = periodogram(x=segment_df, fs=period, **periodogram_params)
frequencies_segments.append(frequencies)
spectrums_segments.append(spectrum)
frequencies = frequencies_segments[0]
amplitude_aggregation_fn = AGGREGATION_FN[AggregationMode(amplitude_aggregation_mode)]
spectrum = amplitude_aggregation_fn(spectrums_segments, axis=0) # type: ignore
spectrum = spectrum[frequencies >= 1]
frequencies = frequencies[frequencies >= 1]
_, ax = plt.subplots(figsize=figsize, constrained_layout=True)
ax.step(frequencies, spectrum) # type: ignore
ax.set_xscale("log") # type: ignore
ax.set_xlabel("Frequency") # type: ignore
ax.set_ylabel("Power spectral density") # type: ignore
if xticks is not None:
ax.set_xticks(ticks=xticks, labels=xticks) # type: ignore
ax.set_title("Periodogram") # type: ignore
ax.grid() # type: ignore
[docs]def plot_holidays(
ts: "TSDataset",
holidays: Union[str, pd.DataFrame],
segments: Optional[List[str]] = None,
columns_num: int = 2,
figsize: Tuple[int, int] = (10, 5),
start: Optional[str] = None,
end: Optional[str] = None,
as_is: bool = False,
):
"""Plot holidays for segments.
Sequence of timestamps with one holiday is drawn as a colored region.
Individual holiday is drawn like a colored point.
It is not possible to distinguish points plotted at one timestamp, but this case is considered rare.
This the problem isn't relevant for region drawing because they are partially transparent.
Parameters
----------
ts:
TSDataset with timeseries data
holidays:
there are several options:
* if str, then this is code of the country in `holidays <https://pypi.org/project/holidays/>`_ library;
* if DataFrame, then dataframe is expected to be in prophet`s holiday format;
segments:
segments to use
columns_num:
number of columns in subplots
figsize:
size of the figure per subplot with one segment in inches
as_is:
* | Use this option if DataFrame is represented as a dataframe with a timestamp index and holiday names columns.
| In a holiday column values 0 represent absence of holiday in that timestamp, 1 represent the presence.
start:
start timestamp for plot
end:
end timestamp for plot
Raises
------
ValueError:
* Holiday nor pd.DataFrame or String.
* Holiday is an empty pd.DataFrame.
* `as_is=True` while holiday is String.
* If upper_window is negative.
* If lower_window is positive.
"""
start, end = _get_borders_ts(ts, start, end)
if segments is None:
segments = sorted(ts.segments)
holidays_df = _create_holidays_df(holidays, index=ts.index, as_is=as_is)
_, ax = _prepare_axes(num_plots=len(segments), columns_num=columns_num, figsize=figsize)
df = ts.to_pandas()
for i, segment in enumerate(segments):
segment_df = df.loc[start:end, pd.IndexSlice[segment, "target"]] # type: ignore
segment_df = segment_df[segment_df.first_valid_index() : segment_df.last_valid_index()]
# plot target on segment
target_plot = ax[i].plot(segment_df.index, segment_df)
target_color = target_plot[0].get_color()
# plot holidays on segment
# remember color of each holiday to reuse it
default_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
default_colors.remove(target_color)
color_cycle = itertools.cycle(default_colors)
holidays_colors = {holiday_name: next(color_cycle) for holiday_name in holidays_df.columns}
for holiday_name in holidays_df.columns:
holiday_df = holidays_df.loc[segment_df.index, holiday_name]
for _, holiday_group in itertools.groupby(enumerate(holiday_df.tolist()), key=lambda x: x[1]):
holiday_group_cached = list(holiday_group)
indices = [x[0] for x in holiday_group_cached]
values = [x[1] for x in holiday_group_cached]
# if we have group with zero value, then it is not a holidays, skip it
if values[0] == 0:
continue
color = holidays_colors[holiday_name]
if len(indices) == 1:
# plot individual holiday point
ax[i].scatter(segment_df.index[indices[0]], segment_df.iloc[indices[0]], color=color, zorder=2)
else:
# plot span with holiday borders
x_min = segment_df.index[indices[0]]
x_max = segment_df.index[indices[-1]]
ax[i].axvline(x_min, color=color, linestyle="dashed")
ax[i].axvline(x_max, color=color, linestyle="dashed")
ax[i].axvspan(xmin=x_min, xmax=x_max, alpha=1 / 4, color=color)
ax[i].set_title(segment)
ax[i].tick_params("x", rotation=45)
legend_handles = [
Line2D([0], [0], marker="o", color=color, label=label) for label, color in holidays_colors.items()
]
ax[i].legend(handles=legend_handles)
[docs]def _cross_correlation(
a: np.ndarray, b: np.ndarray, maxlags: Optional[int] = None, normed: bool = True
) -> Tuple[np.ndarray, np.ndarray]:
"""Calculate cross correlation between arrays.
This implementation is slow: O(n^2), but can properly ignore NaNs.
Parameters
----------
a:
first array, should be equal length with b
b:
second array, should be equal length with a
maxlags:
number of lags to compare, should be >=1 and < len(a)
normed:
should correlations be normed or not
Returns
-------
lags, result:
* lags: array of size ``maxlags * 2 + 1`` represents for which lags correlations are calculated in ``result``
* result: array of size ``maxlags * 2 + 1`` represents found correlations
Raises
------
ValueError:
lengths of ``a`` and ``b`` are not the same
ValueError:
parameter ``maxlags`` doesn't satisfy constraints
"""
if len(a) != len(b):
raise ValueError("Lengths of arrays should be equal")
length = len(a)
if maxlags is None:
maxlags = length - 1
if maxlags < 1 or maxlags >= length:
raise ValueError("Parameter maxlags should be >= 1 and < len(a)")
result = []
lags = np.arange(-maxlags, maxlags + 1)
for lag in lags:
if lag < 0:
cur_a = a[:lag]
cur_b = b[-lag:]
elif lag == 0:
cur_a = a
cur_b = b
else:
cur_a = a[lag:]
cur_b = b[:-lag]
dot_product = np.nansum(cur_a * cur_b)
if normed:
nan_mask_a = np.isnan(cur_a)
nan_mask_b = np.isnan(cur_b)
nan_mask = nan_mask_a | nan_mask_b
normed_dot_product = dot_product / np.sqrt(
np.sum(cur_a[~nan_mask] * cur_a[~nan_mask]) * np.sum(cur_b[~nan_mask] * cur_b[~nan_mask])
)
normed_dot_product = np.nan_to_num(normed_dot_product)
result.append(normed_dot_product)
else:
result.append(dot_product)
return lags, np.array(result)
[docs]def cross_corr_plot(
ts: "TSDataset",
n_segments: int = 10,
maxlags: int = 21,
segments: Optional[List[str]] = None,
columns_num: int = 2,
figsize: Tuple[int, int] = (10, 5),
):
"""
Cross-correlation plot between multiple timeseries.
Parameters
----------
ts:
TSDataset with timeseries data
n_segments:
number of random segments to plot, ignored if parameter ``segments`` is set
maxlags:
number of timeseries shifts for cross-correlation, should be >=1 and <= len(timeseries)
segments:
segments to plot
columns_num:
number of columns in subplots
figsize:
size of the figure per subplot with one segment in inches
Raises
------
ValueError:
parameter ``maxlags`` doesn't satisfy constraints
"""
if segments is None:
exist_segments = list(ts.segments)
chosen_segments = np.random.choice(exist_segments, size=min(len(exist_segments), n_segments), replace=False)
segments = list(chosen_segments)
segment_pairs = list(combinations(segments, r=2))
if len(segment_pairs) == 0:
raise ValueError("There are no pairs to plot! Try set n_segments > 1.")
fig, ax = _prepare_axes(num_plots=len(segment_pairs), columns_num=columns_num, figsize=figsize)
fig.suptitle("Cross-correlation", fontsize=16)
df = ts.to_pandas()
for i, (segment_1, segment_2) in enumerate(segment_pairs):
target_1 = df.loc[:, pd.IndexSlice[segment_1, "target"]]
target_2 = df.loc[:, pd.IndexSlice[segment_2, "target"]]
if target_1.dtype == int or target_2.dtype == int:
warnings.warn(
"At least one target column has integer dtype, "
"it is converted to float in order to calculate correlation."
)
target_1 = target_1.astype(float)
target_2 = target_2.astype(float)
lags, correlations = _cross_correlation(a=target_1.values, b=target_2.values, maxlags=maxlags, normed=True)
ax[i].plot(lags, correlations, "-o", markersize=5)
ax[i].set_title(f"{segment_1} vs {segment_2}")
ax[i].xaxis.set_major_locator(MaxNLocator(integer=True))
[docs]def acf_plot(
ts: "TSDataset",
n_segments: int = 10,
lags: int = 21,
partial: bool = False,
columns_num: int = 2,
segments: Optional[List[str]] = None,
figsize: Tuple[int, int] = (10, 5),
):
"""
Autocorrelation and partial autocorrelation plot for multiple timeseries.
Notes
-----
`Definition of autocorrelation <https://en.wikipedia.org/wiki/Autocorrelation>`_.
`Definition of partial autocorrelation <https://en.wikipedia.org/wiki/Partial_autocorrelation_function>`_.
* If ``partial=False`` function works with NaNs at any place of the time-series.
* if ``partial=True`` function works only with NaNs at the edges of the time-series and fails if there are NaNs inside it.
Parameters
----------
ts:
TSDataset with timeseries data
n_segments:
number of random segments to plot
lags:
number of timeseries shifts for cross-correlation
partial:
plot autocorrelation or partial autocorrelation
columns_num:
number of columns in subplots
segments:
segments to plot
figsize:
size of the figure per subplot with one segment in inches
Raises
------
ValueError:
If partial=True and there is a NaN in the middle of the time series
"""
if segments is None:
exist_segments = sorted(ts.segments)
chosen_segments = np.random.choice(exist_segments, size=min(len(exist_segments), n_segments), replace=False)
segments = list(chosen_segments)
title = "Partial Autocorrelation" if partial else "Autocorrelation"
fig, ax = _prepare_axes(num_plots=len(segments), columns_num=columns_num, figsize=figsize)
fig.suptitle(title, fontsize=16)
df = ts.to_pandas()
for i, name in enumerate(segments):
df_slice = df[name].reset_index()["target"]
if partial:
# for partial autocorrelation remove NaN from the beginning and end of the series
begin = df_slice.first_valid_index()
end = df_slice.last_valid_index()
x = df_slice.values[begin:end]
if np.isnan(x).any():
raise ValueError("There is a NaN in the middle of the time series!")
plot_pacf(x=x, ax=ax[i], lags=lags)
if not partial:
plot_acf(x=df_slice.values, ax=ax[i], lags=lags, missing="conservative")
ax[i].set_title(name)
plt.show()
[docs]def distribution_plot(
ts: "TSDataset",
n_segments: int = 10,
segments: Optional[List[str]] = None,
shift: int = 30,
window: int = 30,
freq: str = "1M",
n_rows: int = 10,
figsize: Tuple[int, int] = (10, 5),
):
"""Distribution of z-values grouped by segments and time frequency.
Mean is calculated by the windows:
.. math::
mean_{i} = \\sum_{j=i-\\text{shift}}^{i-\\text{shift}+\\text{window}} \\frac{x_{j}}{\\text{window}}
The same is applied to standard deviation.
Parameters
----------
ts:
dataset with timeseries data
n_segments:
number of random segments to plot
segments:
segments to plot
shift:
number of timeseries shifts for statistics calc
window:
number of points for statistics calc
freq:
group for z-values
n_rows:
maximum number of rows to plot
figsize:
size of the figure per subplot with one segment in inches
"""
df_pd = ts.to_pandas(flatten=True)
if segments is None:
exist_segments = df_pd.segment.unique()
chosen_segments = np.random.choice(exist_segments, size=min(len(exist_segments), n_segments), replace=False)
segments = list(chosen_segments)
df_full = df_pd[df_pd.segment.isin(segments)]
df_full.loc[:, "mean"] = (
df_full.groupby("segment").target.shift(shift).transform(lambda s: s.rolling(window).mean())
)
df_full.loc[:, "std"] = df_full.groupby("segment").target.shift(shift).transform(lambda s: s.rolling(window).std())
df_full = df_full.dropna()
df_full.loc[:, "z"] = (df_full["target"] - df_full["mean"]) / df_full["std"]
grouped_data = df_full.groupby([df_full.timestamp.dt.to_period(freq)])
columns_num = min(2, len(grouped_data))
rows_num = min(n_rows, math.ceil(len(grouped_data) / columns_num))
groups = set(list(grouped_data.groups.keys())[-rows_num * columns_num :])
figsize = (figsize[0] * columns_num, figsize[1] * rows_num)
fig, ax = plt.subplots(rows_num, columns_num, figsize=figsize, constrained_layout=True, squeeze=False)
fig.suptitle(f"Z statistic shift: {shift} window: {window}", fontsize=16)
ax = ax.ravel()
i = 0
for period, df_slice in grouped_data:
if period not in groups:
continue
sns.boxplot(data=df_slice.sort_values(by="segment"), y="z", x="segment", ax=ax[i], fliersize=False)
ax[i].set_title(f"{period}")
ax[i].grid()
i += 1
[docs]def plot_imputation(
ts: "TSDataset",
imputer: "TimeSeriesImputerTransform",
segments: Optional[List[str]] = None,
columns_num: int = 2,
figsize: Tuple[int, int] = (10, 5),
start: Optional[str] = None,
end: Optional[str] = None,
):
"""Plot the result of imputation by a given imputer.
Parameters
----------
ts:
TSDataset with timeseries data
imputer:
transform to make imputation of NaNs
segments:
segments to use
columns_num:
number of columns in subplots
figsize:
size of the figure per subplot with one segment in inches
start:
start timestamp for plot
end:
end timestamp for plot
"""
start, end = _get_borders_ts(ts, start, end)
if segments is None:
segments = sorted(ts.segments)
_, ax = _prepare_axes(num_plots=len(segments), columns_num=columns_num, figsize=figsize)
ts_after = deepcopy(ts)
imputer.fit_transform(ts_after)
feature_name = imputer.in_column
for i, segment in enumerate(segments):
# we want to capture nans at the beginning, so don't use `ts[:, segment, :]`
segment_before_df = ts.to_pandas().loc[start:end, pd.IndexSlice[segment, feature_name]] # type: ignore
segment_after_df = ts_after.to_pandas().loc[start:end, pd.IndexSlice[segment, feature_name]] # type: ignore
# plot result after imputation
ax[i].plot(segment_after_df.index, segment_after_df)
# highlight imputed points
imputed_index = ~segment_after_df.isna() & segment_before_df.isna()
ax[i].scatter(
segment_after_df.loc[imputed_index].index,
segment_after_df.loc[imputed_index],
c="red",
zorder=2,
)
ax[i].set_title(segment)
ax[i].tick_params("x", rotation=45)