Source code for etna.analysis.outliers.hist_outliers

import typing
from copy import deepcopy
from typing import TYPE_CHECKING
from typing import List

import numba
import numpy as np
import pandas as pd

if TYPE_CHECKING:
    from etna.datasets import TSDataset


[docs]@numba.jit(nopython=True) def optimal_sse(left: int, right: int, p: np.ndarray, pp: np.ndarray) -> float: """ Count the approximation error by 1 bin from left to right elements. Parameters ---------- left: left border right: right border p: array of sums of elements, ``p[i]`` - sum from first to i elements pp: array of sums of squares of elements, ``pp[i]`` - sum of squares from first to i elements Returns ------- result: float approximation error """ if left == 0: avg = p[right] return pp[right] - avg**2 / (right - left + 1) avg = p[right] - p[left - 1] return pp[right] - pp[left - 1] - avg**2 / (right - left + 1)
[docs]@numba.jit(nopython=True) def adjust_estimation(i: int, k: int, sse: np.ndarray, sse_one_bin: np.ndarray) -> float: """ Count sse_one_bin[i][k] using binary search. Parameters ---------- i: left border of series k: number of bins sse: array of approximation errors sse_one_bin: array of approximation errors with one bin Returns ------- result: float calculated sse_one_bin[i][k] """ now_evaluated = sse[i - 1][k - 1] first_evaluated = sse[i - 1][k - 1] idx_prev = np.inf idx_now = 0 left = 0 while idx_now != idx_prev: right = i idx_prev = idx_now while right - left > 1: if sse_one_bin[(left + right) // 2][i] > now_evaluated: left = (left + right) // 2 else: right = (left + right) // 2 idx_now = left now_evaluated = first_evaluated - sse[idx_now][k - 1] now_min = np.inf for j in range(idx_now, i): now = sse[j][k - 1] + sse_one_bin[j + 1][i] now_min = min(now_min, now) return now_min
[docs]@numba.jit(nopython=True) def v_optimal_hist(series: np.ndarray, bins_number: int, p: np.ndarray, pp: np.ndarray) -> np.ndarray: """ Count an approximation error of a series with [1, bins_number] bins. `Reference <http://www.vldb.org/conf/1998/p275.pdf>`_. Parameters ---------- series: array to count an approximation error with bins_number bins bins_number: number of bins p: array of sums of elements, p[i] - sum from 0th to i elements pp: array of sums of squares of elements, p[i] - sum of squares from 0th to i elements Returns ------- error: np.ndarray approximation error of a series with [1, bins_number] bins """ sse = np.zeros((len(series), bins_number)) for i in range(len(series)): sse[i][0] = optimal_sse(0, i, p, pp) sse_one_bin = np.zeros((len(series), len(series))) for i in range(len(series)): for j in range(i, len(series)): sse_one_bin[i][j] = optimal_sse(i, j, p, pp) for tmp_bins_number in range(1, bins_number): for i in range(tmp_bins_number, len(series)): sse[i][tmp_bins_number] = adjust_estimation(i, tmp_bins_number, sse, sse_one_bin) return sse
[docs]def compute_f(series: np.ndarray, k: int, p: np.ndarray, pp: np.ndarray) -> typing.Tuple[np.ndarray, list]: """ Compute F. F[a][b][k] - minimum approximation error on series[a:b+1] with k outliers. `Reference <http://www.vldb.org/conf/1999/P9.pdf>`_. Parameters ---------- series: array to count F k: number of outliers p: array of sums of elements, ``p[i]`` - sum from 0th to i elements pp: array of sums of squares of elements, ``pp[i]`` - sum of squares from 0th to i elements Returns ------- result: np.ndarray array F, outliers_indices """ f = np.zeros((len(series), len(series), k + 1)) s: list = [[[[] for i in range(k + 1)] for j in range(len(series))] for s in range(len(series))] ss: list = [[[[] for i in range(k + 1)] for j in range(len(series))] for s in range(len(series))] outliers_indices: list = [[[[] for i in range(k + 1)] for j in range(len(series))] for s in range(len(series))] for right_border in range(0, len(series)): f[0][right_border][0] = optimal_sse(0, right_border, p, pp) s[0][right_border][0] = [p[right_border]] ss[0][right_border][0] = [pp[right_border]] for left_border in range(1, len(series)): for right_border in range(left_border, len(series)): f[left_border][right_border][0] = optimal_sse(left_border, right_border, p, pp) s[left_border][right_border][0] = [p[right_border] - p[left_border - 1]] ss[left_border][right_border][0] = [pp[right_border] - pp[left_border - 1]] for left_border in range(0, len(series)): for right_border in range(left_border, min(len(series), left_border + k)): s[left_border][right_border][right_border - left_border + 1] = [0] ss[left_border][right_border][right_border - left_border + 1] = [0] outliers_indices[left_border][right_border][right_border - left_border + 1] = [ list(np.arange(left_border, right_border + 1)) ] for left_border in range(len(series)): for right_border in range(left_border + 1, len(series)): for outlier_number in range(1, min(right_border - left_border + 1, k + 1)): f1 = f[left_border][right_border - 1][outlier_number - 1] tmp_ss = [] tmp_s = [] f2 = [] now_min = np.inf now_outliers_indices = [] where = 0 for i in range(len(ss[left_border][right_border - 1][outlier_number])): tmp_ss.append(ss[left_border][right_border - 1][outlier_number][i] + series[right_border] ** 2) tmp_s.append(s[left_border][right_border - 1][outlier_number][i] + series[right_border]) now_outliers_indices.append( deepcopy(outliers_indices[left_border][right_border - 1][outlier_number][i]) ) f2.append(tmp_ss[-1] - tmp_s[-1] ** 2 / (right_border - left_border + 1 - outlier_number)) if f2[-1] < now_min: now_min = f2[-1] where = i if f1 < now_min: f[left_border][right_border][outlier_number] = f1 s[left_border][right_border][outlier_number] = deepcopy( s[left_border][right_border - 1][outlier_number - 1] ) ss[left_border][right_border][outlier_number] = deepcopy( ss[left_border][right_border - 1][outlier_number - 1] ) outliers_indices[left_border][right_border][outlier_number] = deepcopy( outliers_indices[left_border][right_border - 1][outlier_number - 1] ) if len(outliers_indices[left_border][right_border][outlier_number]): for i in range(len(outliers_indices[left_border][right_border][outlier_number])): outliers_indices[left_border][right_border][outlier_number][i].append(right_border) else: outliers_indices[left_border][right_border][outlier_number].append([right_border]) elif f1 > now_min: f[left_border][right_border][outlier_number] = f2[where] s[left_border][right_border][outlier_number] = tmp_s ss[left_border][right_border][outlier_number] = tmp_ss outliers_indices[left_border][right_border][outlier_number] = now_outliers_indices else: f[left_border][right_border][outlier_number] = f1 tmp_s.extend(s[left_border][right_border - 1][outlier_number - 1]) tmp_ss.extend(ss[left_border][right_border - 1][outlier_number - 1]) s[left_border][right_border][outlier_number] = tmp_s ss[left_border][right_border][outlier_number] = tmp_ss tmp = deepcopy(outliers_indices[left_border][right_border - 1][outlier_number - 1]) if len(tmp): for i in range(len(tmp)): tmp[i].append(right_border) else: tmp = [[right_border]] outliers_indices[left_border][right_border][outlier_number].extend(now_outliers_indices) outliers_indices[left_border][right_border][outlier_number].extend(deepcopy(tmp)) return f, outliers_indices
[docs]def hist(series: np.ndarray, bins_number: int) -> np.ndarray: """ Compute outliers indices according to hist rule. `Reference <http://www.vldb.org/conf/1999/P9.pdf>`_. Parameters ---------- series: array to count F bins_number: number of bins Returns ------- indices: np.ndarray outliers indices """ approximation_error = np.zeros((len(series), bins_number + 1, bins_number)) anomalies: list = [[[[] for i in range(bins_number)] for j in range(bins_number + 1)] for s in range(len(series))] p, pp = np.empty_like(series), np.empty_like(series) p[0] = series[0] pp[0] = series[0] ** 2 for i in range(1, len(series)): p[i] = p[i - 1] + series[i] pp[i] = pp[i - 1] + series[i] ** 2 f, outliers_indices = compute_f(series, bins_number - 1, p, pp) approximation_error[:, 1:, 0] = v_optimal_hist(series, bins_number, p, pp) approximation_error[:, 1, :] = f[0] for right_border in range(len(series)): for outlier_number in range(1, bins_number): if len(outliers_indices[0][right_border][outlier_number]): anomalies[right_border][1][outlier_number] = deepcopy( outliers_indices[0][right_border][outlier_number][0] ) for right_border in range(1, len(series)): for tmp_bins_number in range(2, min(bins_number + 1, right_border + 2)): for outlier_number in range(1, min(bins_number, right_border + 2 - tmp_bins_number)): # см формулу выше tmp_approximation_error = approximation_error[:right_border, tmp_bins_number - 1, : outlier_number + 1] tmp_f = f[1 : right_border + 1, right_border, : outlier_number + 1][:, ::-1] approximation_error[right_border][tmp_bins_number][outlier_number] = np.min( tmp_approximation_error + tmp_f ) where = np.where( tmp_approximation_error + tmp_f == approximation_error[right_border][tmp_bins_number][outlier_number] ) if where[1][0] != outlier_number: anomalies[right_border][tmp_bins_number][outlier_number].extend( deepcopy(outliers_indices[1 + where[0][0]][right_border][outlier_number - where[1][0]][0]) ) anomalies[right_border][tmp_bins_number][outlier_number].extend( deepcopy(anomalies[where[0][0]][tmp_bins_number - 1][where[1][0]]) ) count = 0 now_min = approximation_error[-1][-1][0] for outlier_number in range(1, min(approximation_error.shape[1], approximation_error.shape[2])): if approximation_error[-1][approximation_error.shape[1] - 1 - outlier_number][outlier_number] <= now_min: count = outlier_number now_min = approximation_error[-1][approximation_error.shape[1] - 1 - outlier_number][outlier_number] return np.array(sorted(anomalies[-1][approximation_error.shape[1] - 1 - count][count]))
[docs]def get_anomalies_hist( ts: "TSDataset", in_column: str = "target", bins_number: int = 10 ) -> typing.Dict[str, List[pd.Timestamp]]: """ Get point outliers in time series using histogram model. Outliers are all points that, when removed, result in a histogram with a lower approximation error, even with the number of bins less than the number of outliers. Parameters ---------- ts: TSDataset with timeseries data in_column: name of the column in which the anomaly is searching bins_number: number of bins Returns ------- : dict of outliers in format {segment: [outliers_timestamps]} """ outliers_per_segment = {} segments = ts.segments for seg in segments: segment_df = ts.df[seg].reset_index() values = segment_df[in_column].values timestamp = segment_df["timestamp"].values anomalies = hist(values, bins_number) outliers_per_segment[seg] = [timestamp[i] for i in anomalies] return outliers_per_segment