izikeros/trend_classifier

View on GitHub
docs/segmentation.html

Summary

Maintainability
Test Coverage
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="generator" content="pdoc 0.10.0" />
<title>trend_classifier.segmentation API documentation</title>
<meta name="description" content="" />
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin>
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin>
<link rel="stylesheet preload" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/styles/github.min.css" crossorigin>
<style>:root{--highlight-color:#fe9}.flex{display:flex !important}body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}h1:target,h2:target,h3:target,h4:target,h5:target,h6:target{background:var(--highlight-color);padding:.2em 0}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}dt:target .name{background:var(--highlight-color)}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}td{padding:0 .5em}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style>
<style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style>
<style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent !important;color:#000 !important;box-shadow:none !important;text-shadow:none !important}a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% !important}@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style>
<script defer src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/highlight.min.js" integrity="sha256-Uv3H6lx7dJmRfRvH8TH6kJD1TSK1aFcwgx+mdg3epi8=" crossorigin></script>
<script>window.addEventListener('DOMContentLoaded', () => hljs.initHighlighting())</script>
</head>
<body>
<main>
<article id="content">
<header>
<h1 class="title">Module <code>trend_classifier.segmentation</code></h1>
</header>
<section id="section-intro">
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">import warnings

import numpy as np
from trend_classifier.configuration import Config
from trend_classifier.models import Metrics
from trend_classifier.segment import Segment
from trend_classifier.segment import SegmentList
from trend_classifier.types import FigSize
from trend_classifier.visuals import _plot_detrended_signal
from trend_classifier.visuals import _plot_segment
from trend_classifier.visuals import _plot_segment_with_trendlines_no_context
from trend_classifier.visuals import _plot_segments


def _error(a: float, b: float, metrics: Metrics = Metrics.ABSOLUTE_ERROR) -&gt; float:
    &#34;&#34;&#34;Calculate how much two parameters differ.

    Used e.g. to calculate how much the slopes of linear trends in two windows differ.

    Args:
        a: First parameter.
        b: Second parameter.
        metrics: Metrics to use for the calculation.

    Returns:
        Measure of difference between the two parameters.

    See Also:
        class `Metrics`

    &#34;&#34;&#34;
    if metrics == Metrics.RELATIVE_ABSOLUTE_ERROR:
        return abs(a - b) / abs(a)
    if metrics == Metrics.ABSOLUTE_ERROR:
        return abs(a - b)


class Segmenter:
    &#34;&#34;&#34;Class for segmenting a time series into segments with similar trend.&#34;&#34;&#34;

    def __init__(
        self,
        x: list[int] | None = None,
        y: list[int] | None = None,
        df=None,
        column: str | None = &#34;Adj Close&#34;,
        config: Config | None = None,
        n: int | None = None,
    ):
        &#34;&#34;&#34;Initialize the segmenter.

        Args:
            x: List of x values.
            y: List of y values.
            df: Pandas DataFrame with time series.
            column: Name of the column with the time series.
            config: Configuration of the segmenter.
            n: Number of samples in a window.
        &#34;&#34;&#34;
        self._handle_configuration(config, n)
        self._handle_input_data(column=column, df=df, x=x, y=y)
        self.y_de_trended: list | None = None
        self.segments: SegmentList[Segment] | None = None
        self.slope: float | None = None
        self.offset: float | None = None
        self.slopes_std: float | None = None
        self.offsets_std: float | None = None

    def _handle_configuration(self, config, n):
        # Handle configuration
        if config is None:
            # use default configuration if no configuration is provided
            self.config = Config()
            if n is not None:
                # override default N in configuration if N is provided
                self.config.N = n
        if config is not None:
            if n is not None:
                # raise error
                raise ValueError(&#34;Provide either config or N, not both.&#34;)
            self.config = config

    def _handle_input_data(self, column, df, x, y):
        # --- Handle input data
        # error - most likely pandas dataframe as argument instead of kwarg
        if x is not None and not isinstance(x, list):
            # TODO: KS: 2022-09-07: accept also numpy array, ndarray,np.matrix, pd.Series
            raise TypeError(
                &#34;x must be a list, got {}. For pandas dataframe use &#39;df&#39; keyword argument&#34;.format(
                    type(x)
                )
            )
        # error - no input data provided
        if x is None and y is None and df is None:
            raise ValueError(&#34;Provide timeseries data: either x and y or df.&#34;)
        # error - ambiguous input data provided - both x,y and df provided
        if x is not None and y is not None and df is not None:
            raise ValueError(
                &#34;Provide timeseries data: either (x and y) or (df), not all.&#34;
            )
        # input data provided as x and y
        if x is not None and y is not None:
            self.x = x
            self.y = y
        # make warning if column provided but not dataframe
        if df is None and column is not None:
            warnings.warn(&#34;No dataframe provided, column argument will be ignored.&#34;)
        # input data provided as dataframe
        if df is not None:
            self.x = list(range(0, len(df.index.tolist()), 1))  # noqa: FKA01
            self.y = df[column].tolist()

    def calculate_segments(self) -&gt; list[Segment]:
        &#34;&#34;&#34;Calculate segments with similar trend for the given timeserie.

        Calculates:
         - boundaries of segments
        - slopes and offsets of windows

        &#34;&#34;&#34;
        # check if initialized x and y
        if self.x is None or self.y is None:
            raise ValueError(&#34;Segmenter x and y must be initialized!&#34;)
        # Read data from config to short variables
        N = self.config.N
        overlap_ratio = self.config.overlap_ratio
        alpha = self.config.alpha
        beta = self.config.beta
        metrics_alpha = self.config.metrics_alpha
        metrics_beta = self.config.metrics_beta

        prev_fit = None

        segments = SegmentList()

        new_segment = {&#34;s_start&#34;: 0, &#34;slopes&#34;: [], &#34;offsets&#34;: [], &#34;starts&#34;: []}

        off = self._set_offset(N, overlap_ratio)

        for start in range(0, len(self.x) - N, off):  # noqa: FKA01
            end = start + N
            fit = np.polyfit(x=self.x[start:end], y=self.y[start:end], deg=1)
            new_segment[&#34;slopes&#34;].append(fit[0])
            new_segment[&#34;offsets&#34;].append(fit[1])
            new_segment[&#34;starts&#34;].append(start)

            if prev_fit is not None:
                # asses if the slope is similar to the previous one
                prev_slope = float(prev_fit[0])
                this_slope = float(fit[0])
                r0 = _error(prev_slope, this_slope, metrics=metrics_alpha)

                # asses if the offset is similar to the previous one
                prev_offset = float(prev_fit[1])
                this_offset = float(fit[1])
                r1 = _error(prev_offset, this_offset, metrics=metrics_beta)

                is_slope_different = r0 &gt;= alpha if alpha is not None else False
                is_offset_different = r1 &gt;= beta if beta is not None else False
                new_segment[&#34;is_slope_different&#34;] = is_slope_different
                new_segment[&#34;is_offset_different&#34;] = is_offset_different

                new_segment = self._finish_segment_if_needed(
                    offset=off, new_segment=new_segment, segments=segments, start=start
                )
            prev_fit = fit

        # add last segment
        last_segment = Segment(
            start=int(new_segment[&#34;s_start&#34;]),
            stop=int(len(self.x)),
            slopes=new_segment[&#34;slopes&#34;],
            offsets=new_segment[&#34;offsets&#34;],
            starts=new_segment[&#34;starts&#34;],
        )

        segments.append(last_segment)
        self.segments = segments

        # remove outstanding windows
        last_segment.remove_outstanding_windows(self.config.N)

        # add extra information to the segments
        self._describe_segments()

        return segments

    def _finish_segment_if_needed(self, offset, new_segment, segments, start):
        need_to_finish_segment = (
            new_segment[&#34;is_slope_different&#34;] or new_segment[&#34;is_offset_different&#34;]
        )
        if need_to_finish_segment:
            s_stop = _determine_trend_end_point(offset, start)
            reason = self.describe_reason_for_new_segment(
                new_segment[&#34;is_offset_different&#34;], new_segment[&#34;is_slope_different&#34;]
            )

            segment = Segment(
                start=int(new_segment[&#34;s_start&#34;]),
                stop=int(s_stop),
                slopes=new_segment[&#34;slopes&#34;],
                offsets=new_segment[&#34;offsets&#34;],
                starts=new_segment[&#34;starts&#34;],
                reason_for_new_segment=reason,
            )

            # remove outstanding windows
            segment.remove_outstanding_windows(self.config.N)

            segments.append(segment)
            new_segment[&#34;s_start&#34;] = s_stop + 1
            new_segment[&#34;slopes&#34;] = []
            new_segment[&#34;offsets&#34;] = []
            new_segment[&#34;starts&#34;] = []
        return new_segment

    @staticmethod
    def _set_offset(n, overlap_ratio):
        offset = int(n * overlap_ratio)
        if offset == 0:
            print(&#34;Overlap ratio is too small, setting it to 1&#34;)
            print(&#34;N = &#34;, n)
            print(&#34;overlap_ratio = &#34;, overlap_ratio)
            offset = 1
        return offset

    @staticmethod
    def describe_reason_for_new_segment(
        is_offset_different: bool, is_slope_different: bool
    ) -&gt; str:
        reason = &#34;slope&#34; if is_slope_different else &#34;offset&#34;
        if is_slope_different and is_offset_different:
            reason = &#34;slope and offset&#34;
        return reason

    def _describe_segments(self) -&gt; None:
        &#34;&#34;&#34;Add extra information about the segments.&#34;&#34;&#34;
        y_norm = []
        for idx, segment in enumerate(self.segments):
            start = segment.start
            stop = segment.stop

            # x and y for the segment
            xx = self.x[start : stop + 1]
            yy = self.y[start : stop + 1]

            # trend calculation
            fit = np.polyfit(x=xx, y=yy, deg=1)
            fit_fn = np.poly1d(fit)

            # calculate point for the trend line
            yt = np.array(fit_fn(xx))

            # FIXME: KS: 2022-09-02: Normalize (as below?)
            # normalize each point in yy by value of corresponding point in yt,
            # store results in ydtn
            # ydtn = yy / yt

            ydt = np.array(yy) - yt

            # calculate standard deviation of the values with removed trend
            s = np.std(ydt)

            # calculate span of the values in the segment normalized by
            # the mean value of the segment
            span = 1000 * (np.max(ydt) - np.min(ydt)) // np.mean(yy)

            # store de-trended values
            y_norm.extend(ydt)

            # store volatility measures for the segment
            self.segments[idx].std = s
            self.segments[idx].span = span
            self.y_de_trended = y_norm
            self.segments[idx].slope = fit[0]
            self.segments[idx].offset = fit[1]
            self.segments[idx].slopes_std = np.std(self.segments[idx].slopes)
            self.segments[idx].offsets_std = np.std(self.segments[idx].offsets)

    def plot_segment(
        self,
        idx: list[int] | int,
        col: str = &#34;red&#34;,
        fig_size: FigSize = (10, 5),
    ) -&gt; None:
        &#34;&#34;&#34;Plot segment with given index or multiple segments with given indices.

        Args:
            idx: index of the segment or list of indices of segments
            col: color of the segment
            fig_size: size of the figure
        &#34;&#34;&#34;
        _plot_segment(obj=self, idx=idx, col=col, fig_size=fig_size)

    def plot_segment_with_trendlines_no_context(
        self,
        idx: int,
        fig_size: FigSize = (10, 5),
    ) -&gt; None:
        &#34;&#34;&#34;Plot segment with given index.

        Args:
            idx: index of the segment or list of indices of segments
            fig_size: size of the figure
        &#34;&#34;&#34;
        _plot_segment_with_trendlines_no_context(obj=self, idx=idx, fig_size=fig_size)

    def plot_segments(self, fig_size: FigSize = (8, 4)) -&gt; None:
        &#34;&#34;&#34;Plot all segments and linear trend lines.

        Args:
            fig_size: size of the figure e.g. (8, 4)
        &#34;&#34;&#34;
        _plot_segments(self, fig_size)

    def plot_detrended_signal(self, fig_size: FigSize = (10, 5)) -&gt; None:
        &#34;&#34;&#34;Plot de-trended signal.

        Args:
            fig_size: size of the figure
        &#34;&#34;&#34;
        _plot_detrended_signal(
            x=self.x, y_de_trended=self.y_de_trended, fig_size=fig_size
        )

    def calc_area_outside_trend(self) -&gt; float:
        &#34;&#34;&#34;Calculate area outside trend.

        Sum of absolute values of the points below/above the trend line.
        Normalized by the mean value of the signal.
        Normalized by the length of the signal.

        Returns:
            area outside trend

        &#34;&#34;&#34;
        return np.sum(np.abs(self.y_de_trended)) / np.mean(self.y) / len(self.y)


def _determine_trend_end_point(off: int, start: int) -&gt; int:
    &#34;&#34;&#34;Determine end point of the trend.

    Args:
        off: offset of the window
        start: start point of the trend
    &#34;&#34;&#34;
    # TODO: KS: 2022-09-06: proper calculation of the end of the segment
    s_stop = start + off / 2
    return int(s_stop)


# TODO: KS: 2022-09-06: Automatically determine parameters based on history:
#  see the difference e.g. between AAPL and BTC

# TODO: KS: 2022-09-06: Check how it works with different timeframes
#  (e.g. 1h, 4h, 1d)

# TODO: KS: 2022-09-06: improve quality on basic data V-shape or lambda-shape

# TODO: KS: 2022-09-07: add docstrings</code></pre>
</details>
</section>
<section>
</section>
<section>
</section>
<section>
</section>
<section>
<h2 class="section-title" id="header-classes">Classes</h2>
<dl>
<dt id="trend_classifier.segmentation.Segmenter"><code class="flex name class">
<span>class <span class="ident">Segmenter</span></span>
<span>(</span><span>x: list[int] | None = None, y: list[int] | None = None, df=None, column: str | None = 'Adj Close', config: <a title="trend_classifier.configuration.Config" href="configuration.html#trend_classifier.configuration.Config">Config</a> | None = None, n: int | None = None)</span>
</code></dt>
<dd>
<div class="desc"><p>Class for segmenting a time series into segments with similar trend.</p>
<p>Initialize the segmenter.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>x</code></strong></dt>
<dd>List of x values.</dd>
<dt><strong><code>y</code></strong></dt>
<dd>List of y values.</dd>
<dt><strong><code>df</code></strong></dt>
<dd>Pandas DataFrame with time series.</dd>
<dt><strong><code>column</code></strong></dt>
<dd>Name of the column with the time series.</dd>
<dt><strong><code>config</code></strong></dt>
<dd>Configuration of the segmenter.</dd>
<dt><strong><code>n</code></strong></dt>
<dd>Number of samples in a window.</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class Segmenter:
    &#34;&#34;&#34;Class for segmenting a time series into segments with similar trend.&#34;&#34;&#34;

    def __init__(
        self,
        x: list[int] | None = None,
        y: list[int] | None = None,
        df=None,
        column: str | None = &#34;Adj Close&#34;,
        config: Config | None = None,
        n: int | None = None,
    ):
        &#34;&#34;&#34;Initialize the segmenter.

        Args:
            x: List of x values.
            y: List of y values.
            df: Pandas DataFrame with time series.
            column: Name of the column with the time series.
            config: Configuration of the segmenter.
            n: Number of samples in a window.
        &#34;&#34;&#34;
        self._handle_configuration(config, n)
        self._handle_input_data(column=column, df=df, x=x, y=y)
        self.y_de_trended: list | None = None
        self.segments: SegmentList[Segment] | None = None
        self.slope: float | None = None
        self.offset: float | None = None
        self.slopes_std: float | None = None
        self.offsets_std: float | None = None

    def _handle_configuration(self, config, n):
        # Handle configuration
        if config is None:
            # use default configuration if no configuration is provided
            self.config = Config()
            if n is not None:
                # override default N in configuration if N is provided
                self.config.N = n
        if config is not None:
            if n is not None:
                # raise error
                raise ValueError(&#34;Provide either config or N, not both.&#34;)
            self.config = config

    def _handle_input_data(self, column, df, x, y):
        # --- Handle input data
        # error - most likely pandas dataframe as argument instead of kwarg
        if x is not None and not isinstance(x, list):
            # TODO: KS: 2022-09-07: accept also numpy array, ndarray,np.matrix, pd.Series
            raise TypeError(
                &#34;x must be a list, got {}. For pandas dataframe use &#39;df&#39; keyword argument&#34;.format(
                    type(x)
                )
            )
        # error - no input data provided
        if x is None and y is None and df is None:
            raise ValueError(&#34;Provide timeseries data: either x and y or df.&#34;)
        # error - ambiguous input data provided - both x,y and df provided
        if x is not None and y is not None and df is not None:
            raise ValueError(
                &#34;Provide timeseries data: either (x and y) or (df), not all.&#34;
            )
        # input data provided as x and y
        if x is not None and y is not None:
            self.x = x
            self.y = y
        # make warning if column provided but not dataframe
        if df is None and column is not None:
            warnings.warn(&#34;No dataframe provided, column argument will be ignored.&#34;)
        # input data provided as dataframe
        if df is not None:
            self.x = list(range(0, len(df.index.tolist()), 1))  # noqa: FKA01
            self.y = df[column].tolist()

    def calculate_segments(self) -&gt; list[Segment]:
        &#34;&#34;&#34;Calculate segments with similar trend for the given timeserie.

        Calculates:
         - boundaries of segments
        - slopes and offsets of windows

        &#34;&#34;&#34;
        # check if initialized x and y
        if self.x is None or self.y is None:
            raise ValueError(&#34;Segmenter x and y must be initialized!&#34;)
        # Read data from config to short variables
        N = self.config.N
        overlap_ratio = self.config.overlap_ratio
        alpha = self.config.alpha
        beta = self.config.beta
        metrics_alpha = self.config.metrics_alpha
        metrics_beta = self.config.metrics_beta

        prev_fit = None

        segments = SegmentList()

        new_segment = {&#34;s_start&#34;: 0, &#34;slopes&#34;: [], &#34;offsets&#34;: [], &#34;starts&#34;: []}

        off = self._set_offset(N, overlap_ratio)

        for start in range(0, len(self.x) - N, off):  # noqa: FKA01
            end = start + N
            fit = np.polyfit(x=self.x[start:end], y=self.y[start:end], deg=1)
            new_segment[&#34;slopes&#34;].append(fit[0])
            new_segment[&#34;offsets&#34;].append(fit[1])
            new_segment[&#34;starts&#34;].append(start)

            if prev_fit is not None:
                # asses if the slope is similar to the previous one
                prev_slope = float(prev_fit[0])
                this_slope = float(fit[0])
                r0 = _error(prev_slope, this_slope, metrics=metrics_alpha)

                # asses if the offset is similar to the previous one
                prev_offset = float(prev_fit[1])
                this_offset = float(fit[1])
                r1 = _error(prev_offset, this_offset, metrics=metrics_beta)

                is_slope_different = r0 &gt;= alpha if alpha is not None else False
                is_offset_different = r1 &gt;= beta if beta is not None else False
                new_segment[&#34;is_slope_different&#34;] = is_slope_different
                new_segment[&#34;is_offset_different&#34;] = is_offset_different

                new_segment = self._finish_segment_if_needed(
                    offset=off, new_segment=new_segment, segments=segments, start=start
                )
            prev_fit = fit

        # add last segment
        last_segment = Segment(
            start=int(new_segment[&#34;s_start&#34;]),
            stop=int(len(self.x)),
            slopes=new_segment[&#34;slopes&#34;],
            offsets=new_segment[&#34;offsets&#34;],
            starts=new_segment[&#34;starts&#34;],
        )

        segments.append(last_segment)
        self.segments = segments

        # remove outstanding windows
        last_segment.remove_outstanding_windows(self.config.N)

        # add extra information to the segments
        self._describe_segments()

        return segments

    def _finish_segment_if_needed(self, offset, new_segment, segments, start):
        need_to_finish_segment = (
            new_segment[&#34;is_slope_different&#34;] or new_segment[&#34;is_offset_different&#34;]
        )
        if need_to_finish_segment:
            s_stop = _determine_trend_end_point(offset, start)
            reason = self.describe_reason_for_new_segment(
                new_segment[&#34;is_offset_different&#34;], new_segment[&#34;is_slope_different&#34;]
            )

            segment = Segment(
                start=int(new_segment[&#34;s_start&#34;]),
                stop=int(s_stop),
                slopes=new_segment[&#34;slopes&#34;],
                offsets=new_segment[&#34;offsets&#34;],
                starts=new_segment[&#34;starts&#34;],
                reason_for_new_segment=reason,
            )

            # remove outstanding windows
            segment.remove_outstanding_windows(self.config.N)

            segments.append(segment)
            new_segment[&#34;s_start&#34;] = s_stop + 1
            new_segment[&#34;slopes&#34;] = []
            new_segment[&#34;offsets&#34;] = []
            new_segment[&#34;starts&#34;] = []
        return new_segment

    @staticmethod
    def _set_offset(n, overlap_ratio):
        offset = int(n * overlap_ratio)
        if offset == 0:
            print(&#34;Overlap ratio is too small, setting it to 1&#34;)
            print(&#34;N = &#34;, n)
            print(&#34;overlap_ratio = &#34;, overlap_ratio)
            offset = 1
        return offset

    @staticmethod
    def describe_reason_for_new_segment(
        is_offset_different: bool, is_slope_different: bool
    ) -&gt; str:
        reason = &#34;slope&#34; if is_slope_different else &#34;offset&#34;
        if is_slope_different and is_offset_different:
            reason = &#34;slope and offset&#34;
        return reason

    def _describe_segments(self) -&gt; None:
        &#34;&#34;&#34;Add extra information about the segments.&#34;&#34;&#34;
        y_norm = []
        for idx, segment in enumerate(self.segments):
            start = segment.start
            stop = segment.stop

            # x and y for the segment
            xx = self.x[start : stop + 1]
            yy = self.y[start : stop + 1]

            # trend calculation
            fit = np.polyfit(x=xx, y=yy, deg=1)
            fit_fn = np.poly1d(fit)

            # calculate point for the trend line
            yt = np.array(fit_fn(xx))

            # FIXME: KS: 2022-09-02: Normalize (as below?)
            # normalize each point in yy by value of corresponding point in yt,
            # store results in ydtn
            # ydtn = yy / yt

            ydt = np.array(yy) - yt

            # calculate standard deviation of the values with removed trend
            s = np.std(ydt)

            # calculate span of the values in the segment normalized by
            # the mean value of the segment
            span = 1000 * (np.max(ydt) - np.min(ydt)) // np.mean(yy)

            # store de-trended values
            y_norm.extend(ydt)

            # store volatility measures for the segment
            self.segments[idx].std = s
            self.segments[idx].span = span
            self.y_de_trended = y_norm
            self.segments[idx].slope = fit[0]
            self.segments[idx].offset = fit[1]
            self.segments[idx].slopes_std = np.std(self.segments[idx].slopes)
            self.segments[idx].offsets_std = np.std(self.segments[idx].offsets)

    def plot_segment(
        self,
        idx: list[int] | int,
        col: str = &#34;red&#34;,
        fig_size: FigSize = (10, 5),
    ) -&gt; None:
        &#34;&#34;&#34;Plot segment with given index or multiple segments with given indices.

        Args:
            idx: index of the segment or list of indices of segments
            col: color of the segment
            fig_size: size of the figure
        &#34;&#34;&#34;
        _plot_segment(obj=self, idx=idx, col=col, fig_size=fig_size)

    def plot_segment_with_trendlines_no_context(
        self,
        idx: int,
        fig_size: FigSize = (10, 5),
    ) -&gt; None:
        &#34;&#34;&#34;Plot segment with given index.

        Args:
            idx: index of the segment or list of indices of segments
            fig_size: size of the figure
        &#34;&#34;&#34;
        _plot_segment_with_trendlines_no_context(obj=self, idx=idx, fig_size=fig_size)

    def plot_segments(self, fig_size: FigSize = (8, 4)) -&gt; None:
        &#34;&#34;&#34;Plot all segments and linear trend lines.

        Args:
            fig_size: size of the figure e.g. (8, 4)
        &#34;&#34;&#34;
        _plot_segments(self, fig_size)

    def plot_detrended_signal(self, fig_size: FigSize = (10, 5)) -&gt; None:
        &#34;&#34;&#34;Plot de-trended signal.

        Args:
            fig_size: size of the figure
        &#34;&#34;&#34;
        _plot_detrended_signal(
            x=self.x, y_de_trended=self.y_de_trended, fig_size=fig_size
        )

    def calc_area_outside_trend(self) -&gt; float:
        &#34;&#34;&#34;Calculate area outside trend.

        Sum of absolute values of the points below/above the trend line.
        Normalized by the mean value of the signal.
        Normalized by the length of the signal.

        Returns:
            area outside trend

        &#34;&#34;&#34;
        return np.sum(np.abs(self.y_de_trended)) / np.mean(self.y) / len(self.y)</code></pre>
</details>
<h3>Static methods</h3>
<dl>
<dt id="trend_classifier.segmentation.Segmenter.describe_reason_for_new_segment"><code class="name flex">
<span>def <span class="ident">describe_reason_for_new_segment</span></span>(<span>is_offset_different: bool, is_slope_different: bool) ‑> str</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">@staticmethod
def describe_reason_for_new_segment(
    is_offset_different: bool, is_slope_different: bool
) -&gt; str:
    reason = &#34;slope&#34; if is_slope_different else &#34;offset&#34;
    if is_slope_different and is_offset_different:
        reason = &#34;slope and offset&#34;
    return reason</code></pre>
</details>
</dd>
</dl>
<h3>Methods</h3>
<dl>
<dt id="trend_classifier.segmentation.Segmenter.calc_area_outside_trend"><code class="name flex">
<span>def <span class="ident">calc_area_outside_trend</span></span>(<span>self) ‑> float</span>
</code></dt>
<dd>
<div class="desc"><p>Calculate area outside trend.</p>
<p>Sum of absolute values of the points below/above the trend line.
Normalized by the mean value of the signal.
Normalized by the length of the signal.</p>
<h2 id="returns">Returns</h2>
<p>area outside trend</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def calc_area_outside_trend(self) -&gt; float:
    &#34;&#34;&#34;Calculate area outside trend.

    Sum of absolute values of the points below/above the trend line.
    Normalized by the mean value of the signal.
    Normalized by the length of the signal.

    Returns:
        area outside trend

    &#34;&#34;&#34;
    return np.sum(np.abs(self.y_de_trended)) / np.mean(self.y) / len(self.y)</code></pre>
</details>
</dd>
<dt id="trend_classifier.segmentation.Segmenter.calculate_segments"><code class="name flex">
<span>def <span class="ident">calculate_segments</span></span>(<span>self) ‑> list[<a title="trend_classifier.segment.Segment" href="segment.html#trend_classifier.segment.Segment">Segment</a>]</span>
</code></dt>
<dd>
<div class="desc"><p>Calculate segments with similar trend for the given timeserie.</p>
<p>Calculates:
- boundaries of segments
- slopes and offsets of windows</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def calculate_segments(self) -&gt; list[Segment]:
    &#34;&#34;&#34;Calculate segments with similar trend for the given timeserie.

    Calculates:
     - boundaries of segments
    - slopes and offsets of windows

    &#34;&#34;&#34;
    # check if initialized x and y
    if self.x is None or self.y is None:
        raise ValueError(&#34;Segmenter x and y must be initialized!&#34;)
    # Read data from config to short variables
    N = self.config.N
    overlap_ratio = self.config.overlap_ratio
    alpha = self.config.alpha
    beta = self.config.beta
    metrics_alpha = self.config.metrics_alpha
    metrics_beta = self.config.metrics_beta

    prev_fit = None

    segments = SegmentList()

    new_segment = {&#34;s_start&#34;: 0, &#34;slopes&#34;: [], &#34;offsets&#34;: [], &#34;starts&#34;: []}

    off = self._set_offset(N, overlap_ratio)

    for start in range(0, len(self.x) - N, off):  # noqa: FKA01
        end = start + N
        fit = np.polyfit(x=self.x[start:end], y=self.y[start:end], deg=1)
        new_segment[&#34;slopes&#34;].append(fit[0])
        new_segment[&#34;offsets&#34;].append(fit[1])
        new_segment[&#34;starts&#34;].append(start)

        if prev_fit is not None:
            # asses if the slope is similar to the previous one
            prev_slope = float(prev_fit[0])
            this_slope = float(fit[0])
            r0 = _error(prev_slope, this_slope, metrics=metrics_alpha)

            # asses if the offset is similar to the previous one
            prev_offset = float(prev_fit[1])
            this_offset = float(fit[1])
            r1 = _error(prev_offset, this_offset, metrics=metrics_beta)

            is_slope_different = r0 &gt;= alpha if alpha is not None else False
            is_offset_different = r1 &gt;= beta if beta is not None else False
            new_segment[&#34;is_slope_different&#34;] = is_slope_different
            new_segment[&#34;is_offset_different&#34;] = is_offset_different

            new_segment = self._finish_segment_if_needed(
                offset=off, new_segment=new_segment, segments=segments, start=start
            )
        prev_fit = fit

    # add last segment
    last_segment = Segment(
        start=int(new_segment[&#34;s_start&#34;]),
        stop=int(len(self.x)),
        slopes=new_segment[&#34;slopes&#34;],
        offsets=new_segment[&#34;offsets&#34;],
        starts=new_segment[&#34;starts&#34;],
    )

    segments.append(last_segment)
    self.segments = segments

    # remove outstanding windows
    last_segment.remove_outstanding_windows(self.config.N)

    # add extra information to the segments
    self._describe_segments()

    return segments</code></pre>
</details>
</dd>
<dt id="trend_classifier.segmentation.Segmenter.plot_detrended_signal"><code class="name flex">
<span>def <span class="ident">plot_detrended_signal</span></span>(<span>self, fig_size: tuple[typing.Union[float, int], typing.Union[float, int]] = (10, 5)) ‑> None</span>
</code></dt>
<dd>
<div class="desc"><p>Plot de-trended signal.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>fig_size</code></strong></dt>
<dd>size of the figure</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def plot_detrended_signal(self, fig_size: FigSize = (10, 5)) -&gt; None:
    &#34;&#34;&#34;Plot de-trended signal.

    Args:
        fig_size: size of the figure
    &#34;&#34;&#34;
    _plot_detrended_signal(
        x=self.x, y_de_trended=self.y_de_trended, fig_size=fig_size
    )</code></pre>
</details>
</dd>
<dt id="trend_classifier.segmentation.Segmenter.plot_segment"><code class="name flex">
<span>def <span class="ident">plot_segment</span></span>(<span>self, idx: list[int] | int, col: str = 'red', fig_size: tuple[typing.Union[float, int], typing.Union[float, int]] = (10, 5)) ‑> None</span>
</code></dt>
<dd>
<div class="desc"><p>Plot segment with given index or multiple segments with given indices.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>idx</code></strong></dt>
<dd>index of the segment or list of indices of segments</dd>
<dt><strong><code>col</code></strong></dt>
<dd>color of the segment</dd>
<dt><strong><code>fig_size</code></strong></dt>
<dd>size of the figure</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def plot_segment(
    self,
    idx: list[int] | int,
    col: str = &#34;red&#34;,
    fig_size: FigSize = (10, 5),
) -&gt; None:
    &#34;&#34;&#34;Plot segment with given index or multiple segments with given indices.

    Args:
        idx: index of the segment or list of indices of segments
        col: color of the segment
        fig_size: size of the figure
    &#34;&#34;&#34;
    _plot_segment(obj=self, idx=idx, col=col, fig_size=fig_size)</code></pre>
</details>
</dd>
<dt id="trend_classifier.segmentation.Segmenter.plot_segment_with_trendlines_no_context"><code class="name flex">
<span>def <span class="ident">plot_segment_with_trendlines_no_context</span></span>(<span>self, idx: int, fig_size: tuple[typing.Union[float, int], typing.Union[float, int]] = (10, 5)) ‑> None</span>
</code></dt>
<dd>
<div class="desc"><p>Plot segment with given index.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>idx</code></strong></dt>
<dd>index of the segment or list of indices of segments</dd>
<dt><strong><code>fig_size</code></strong></dt>
<dd>size of the figure</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def plot_segment_with_trendlines_no_context(
    self,
    idx: int,
    fig_size: FigSize = (10, 5),
) -&gt; None:
    &#34;&#34;&#34;Plot segment with given index.

    Args:
        idx: index of the segment or list of indices of segments
        fig_size: size of the figure
    &#34;&#34;&#34;
    _plot_segment_with_trendlines_no_context(obj=self, idx=idx, fig_size=fig_size)</code></pre>
</details>
</dd>
<dt id="trend_classifier.segmentation.Segmenter.plot_segments"><code class="name flex">
<span>def <span class="ident">plot_segments</span></span>(<span>self, fig_size: tuple[typing.Union[float, int], typing.Union[float, int]] = (8, 4)) ‑> None</span>
</code></dt>
<dd>
<div class="desc"><p>Plot all segments and linear trend lines.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>fig_size</code></strong></dt>
<dd>size of the figure e.g. (8, 4)</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def plot_segments(self, fig_size: FigSize = (8, 4)) -&gt; None:
    &#34;&#34;&#34;Plot all segments and linear trend lines.

    Args:
        fig_size: size of the figure e.g. (8, 4)
    &#34;&#34;&#34;
    _plot_segments(self, fig_size)</code></pre>
</details>
</dd>
</dl>
</dd>
</dl>
</section>
</article>
<nav id="sidebar">
<h1>Index</h1>
<div class="toc">
<ul></ul>
</div>
<ul id="index">
<li><h3>Super-module</h3>
<ul>
<li><code><a title="trend_classifier" href="index.html">trend_classifier</a></code></li>
</ul>
</li>
<li><h3><a href="#header-classes">Classes</a></h3>
<ul>
<li>
<h4><code><a title="trend_classifier.segmentation.Segmenter" href="#trend_classifier.segmentation.Segmenter">Segmenter</a></code></h4>
<ul class="">
<li><code><a title="trend_classifier.segmentation.Segmenter.calc_area_outside_trend" href="#trend_classifier.segmentation.Segmenter.calc_area_outside_trend">calc_area_outside_trend</a></code></li>
<li><code><a title="trend_classifier.segmentation.Segmenter.calculate_segments" href="#trend_classifier.segmentation.Segmenter.calculate_segments">calculate_segments</a></code></li>
<li><code><a title="trend_classifier.segmentation.Segmenter.describe_reason_for_new_segment" href="#trend_classifier.segmentation.Segmenter.describe_reason_for_new_segment">describe_reason_for_new_segment</a></code></li>
<li><code><a title="trend_classifier.segmentation.Segmenter.plot_detrended_signal" href="#trend_classifier.segmentation.Segmenter.plot_detrended_signal">plot_detrended_signal</a></code></li>
<li><code><a title="trend_classifier.segmentation.Segmenter.plot_segment" href="#trend_classifier.segmentation.Segmenter.plot_segment">plot_segment</a></code></li>
<li><code><a title="trend_classifier.segmentation.Segmenter.plot_segment_with_trendlines_no_context" href="#trend_classifier.segmentation.Segmenter.plot_segment_with_trendlines_no_context">plot_segment_with_trendlines_no_context</a></code></li>
<li><code><a title="trend_classifier.segmentation.Segmenter.plot_segments" href="#trend_classifier.segmentation.Segmenter.plot_segments">plot_segments</a></code></li>
</ul>
</li>
</ul>
</li>
</ul>
</nav>
</main>
<footer id="footer">
<p>Generated by <a href="https://pdoc3.github.io/pdoc" title="pdoc: Python API documentation generator"><cite>pdoc</cite> 0.10.0</a>.</p>
</footer>
</body>
</html>