ActivityWatch/aw-client

View on GitHub
aw_client/cli.py

Summary

Maintainability
A
3 hrs
Test Coverage
#!/usr/bin/env python3
import json
import argparse
import logging
import textwrap
from typing import Optional, List
from datetime import timedelta, datetime, timezone

import click
from tabulate import tabulate

from aw_core import Event

import aw_client
from . import queries
from .classes import default_classes


now = datetime.now(timezone.utc)
td1day = timedelta(days=1)
td1yr = timedelta(days=365)

logger = logging.getLogger(__name__)


def _valid_date(s):
    # https://stackoverflow.com/questions/25470844/specify-format-for-input-arguments-argparse-python
    try:
        return datetime.strptime(s, "%Y-%m-%d")
    except ValueError:
        msg = f"Not a valid date: '{s}'."
        raise argparse.ArgumentTypeError(msg)


class _Context:
    client: aw_client.ActivityWatchClient


@click.group(
    help="CLI utility for aw-client to aid in interacting with the ActivityWatch server"
)
@click.option(
    "--host",
    default="127.0.0.1",
    help="Address of host",
)
@click.option(
    "--port",
    default=5600,
    help="Port to use",
)
@click.option(
    "-v",
    "--verbose",
    is_flag=True,
    help="Verbosity",
)
@click.option("--testing", is_flag=True, help="Set to use testing ports by default")
@click.pass_context
def main(ctx, testing: bool, verbose: bool, host: str, port: int):
    ctx.obj = _Context()
    ctx.obj.client = aw_client.ActivityWatchClient(
        host=host,
        port=port if port != 5600 else (5666 if testing else 5600),
        testing=testing,
    )
    logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)


@main.command(help="Send a heartbeat to bucket with ID `bucket_id` with JSON `data`")
@click.argument("bucket_id")
@click.argument("data")
@click.option("--pulsetime", default=60, help="pulsetime to use for merging heartbeats")
@click.pass_obj
def heartbeat(obj: _Context, bucket_id: str, data: str, pulsetime: int):
    now = datetime.now(timezone.utc)
    e = Event(duration=0, data=json.loads(data), timestamp=now)
    print(e)
    obj.client.heartbeat(bucket_id, e, pulsetime)


@main.command(help="List all buckets")
@click.pass_obj
def buckets(obj: _Context):
    buckets = obj.client.get_buckets()
    print("Buckets:")
    for bucket in buckets:
        print(f" - {bucket}")


@main.command(help="Query events from bucket with ID `bucket_id`")
@click.argument("bucket_id")
@click.pass_obj
def events(obj: _Context, bucket_id: str):
    events = obj.client.get_events(bucket_id)
    print("events:")
    for e in events:
        print(
            " - {} ({}) {}".format(
                e.timestamp.replace(tzinfo=None, microsecond=0),
                str(e.duration).split(".")[0],
                e.data,
            )
        )


@main.command(help="Run a query in file at `path` on the server")
@click.argument("path")
@click.option("--name")
@click.option("--cache", is_flag=True)
@click.option("--json", is_flag=True)
@click.option("--start", default=now - td1day, type=click.DateTime())
@click.option("--stop", default=now + td1yr, type=click.DateTime())
@click.pass_obj
def query(
    obj: _Context,
    path: str,
    cache: bool,
    _json: bool,
    start: datetime,
    stop: datetime,
    name: Optional[str] = None,
):
    with open(path) as f:
        query = f.read()
    result = obj.client.query(query, [(start, stop)], cache=cache, name=name)
    if _json:
        print(json.dumps(result))
    else:
        for period in result:
            print(f"Showing 10 out of {len(period)} events:")
            for event in period[:10]:
                event.pop("id")
                event.pop("timestamp")
                print(
                    " - Duration: {} \tData: {}".format(
                        str(timedelta(seconds=event["duration"])).split(".")[0],
                        event["data"],
                    )
                )
            print(
                "Total duration:\t",
                timedelta(seconds=sum(e["duration"] for e in period)),
            )


@main.command(help="Generate an activity report")
@click.argument("hostname")
@click.option("--cache", is_flag=True)
@click.option("--start", default=now - td1day, type=click.DateTime())
@click.option("--stop", default=now + td1yr, type=click.DateTime())
@click.pass_obj
def report(
    obj: _Context,
    hostname: str,
    cache: bool,
    start: datetime,
    stop: datetime,
    name: Optional[str] = None,
):
    logger.info(f"Querying between {start} and {stop}")
    bid_window = f"aw-watcher-window_{hostname}"
    bid_afk = f"aw-watcher-afk_{hostname}"

    if not start.tzinfo:
        start = start.astimezone()
    if not stop.tzinfo:
        stop = stop.astimezone()

    bid_browsers: List[str] = []

    # TODO: Allow loading from toml
    logger.info("Using default classes")
    classes = default_classes

    params = queries.DesktopQueryParams(
        bid_browsers=bid_browsers,
        classes=classes,
        filter_classes=[],
        filter_afk=True,
        include_audible=True,
        bid_window=bid_window,
        bid_afk=bid_afk,
    )
    query = queries.fullDesktopQuery(params)
    logger.debug("Query: \n" + queries.pretty_query(query))

    result = obj.client.query(query, [(start, stop)], cache=cache, name=name)

    # TODO: Print titles, apps, categories, with most time
    for period in result:
        print()
        # print(period["window"]["cat_events"])

        cat_events = _parse_events(period["window"]["cat_events"])
        print_top(
            cat_events,
            lambda e: " > ".join(e.data["$category"]),
            title="Top categories",
        )

        title_events = _parse_events(period["window"]["title_events"])
        print_top(title_events, lambda e: e.data["title"], title="Top titles")

        active_events = _parse_events(period["window"]["title_events"])
        print(
            "Total duration:\t",
            sum((e.duration for e in active_events), timedelta()),
        )


def _parse_events(events: List[dict]) -> List[Event]:
    return [Event(**event) for event in events]


def print_top(events: List[Event], key=lambda e: e.data, title="Events"):
    print(
        title
        + (f" (showing 10 out of {len(events)} events)" if len(events) > 10 else "")
    )
    print(
        tabulate(
            [
                (event.duration, key(event))
                for event in sorted(events, key=lambda e: e.duration, reverse=True)[:10]
            ],
            headers=["Duration", "Key"],
        )
    )
    print()


@main.command(help="Query 'canonical events' for a single host (filtered, classified)")
@click.argument("hostname")
@click.option("--cache", is_flag=True)
@click.option("--start", default=now - td1day, type=click.DateTime())
@click.option("--stop", default=now + td1yr, type=click.DateTime())
@click.pass_obj
def canonical(
    obj: _Context,
    hostname: str,
    cache: bool,
    start: datetime,
    stop: datetime,
    name: Optional[str] = None,
):
    logger.info(f"Querying between {start} and {stop}")
    bid_window = f"aw-watcher-window_{hostname}"
    bid_afk = f"aw-watcher-afk_{hostname}"

    if not start.tzinfo:
        start = start.astimezone()
    if not stop.tzinfo:
        stop = stop.astimezone()

    classes = default_classes

    query = queries.canonicalEvents(
        queries.DesktopQueryParams(
            bid_window=bid_window,
            bid_afk=bid_afk,
            classes=classes,
        )
    )
    query = f"""{query}\n RETURN = events;"""
    logger.debug("Query: \n" + queries.pretty_query(query))

    result = obj.client.query(query, [(start, stop)], cache=cache, name=name)

    # TODO: Print titles, apps, categories, with most time
    for period in result:
        print()
        events = _parse_events(period)
        print(f"Showing last 10 out of {len(events)} events:")

        print(
            tabulate(
                [
                    (
                        str(e.timestamp).split(".")[0],
                        str(e.duration).split(".")[0],
                        f'[{e.data["app"]}] {textwrap.shorten(e.data["title"], 60, placeholder="...")}',
                    )
                    for e in events[-10:]
                ],
                headers=["Timestamp", "Duration", "Data"],
            )
        )

        print()
        print(
            "Total duration:\t",
            timedelta(seconds=sum(e["duration"] for e in period)),
        )


if __name__ == "__main__":
    main()