ActivityWatch/aw-client

View on GitHub
examples/redact_sensitive.py

Summary

Maintainability
B
6 hrs
Test Coverage
"""
Interactive script to redact sensitive data.
Be careful to not delete stuff you want to keep!

Issues/improvements:
 - If an event matches the sensitive string, only the sensitive field will be redacted (so if the title matches but not the URL, the URL will remain unredacted)
 - One might not want to redact to the non-informative 'REDACTED', but instead to a string with secret meaning.
 - No preview of the events/strings to be redacted.
"""

import re
import sys
from typing import List, Set, Pattern, Union, cast
from copy import deepcopy

from aw_core import Event
from aw_client import ActivityWatchClient

aw: ActivityWatchClient

REDACTED = "REDACTED"
DRYRUN = True


def main():
    global DRYRUN
    if "--wet" in sys.argv:
        DRYRUN = False

    global aw
    aw = ActivityWatchClient(testing=True)

    buckets = aw.get_buckets()
    print("Buckets: ")
    print("\n".join([" - " + bid for bid in buckets.keys()]) + "\n")

    bid_to_redact = input(
        "In which bucket are the events you want to redact? (* for all): "
    )
    assert bid_to_redact == "*" or bid_to_redact in buckets, "Not a valid option"

    regex_or_string = input(
        "Do you want to search by regex or string? (regex/string): "
    )
    assert regex_or_string in ["regex", "string"], "Not a valid option"

    print("\nNOTE: Matching is not case sensitive!")
    pattern: Union[str, Pattern]
    if regex_or_string == "string":
        pattern = input("Enter a string indicating sensitive content: ").lower()
    else:
        pattern = re.compile(
            input("Enter a regex indicating sensitive content: ").lower()
        )

    print("")
    if DRYRUN:
        print(
            "NOTE: Performing a dry run, no events will be modified. Run with --wet to modify events."
        )
    else:
        print(
            "WARNING: Note that this performs an operation that cannot be undone. We strongly recommend that you backup/export your data before proceeding."
        )
    input("Press ENTER to continue, or Ctrl-C to abort")

    if bid_to_redact == "*":
        for bucket_id in buckets.keys():
            if bucket_id.startswith("aw-watcher-afk"):
                continue
            _redact_bucket(bucket_id, pattern)
    else:
        _redact_bucket(bid_to_redact, pattern)


def _redact_bucket(bucket_id: str, pattern: Union[str, Pattern]):
    print(f"\nChecking bucket: {bucket_id}")

    global aw
    events = aw.get_events(bucket_id, limit=-1)
    sensitive_ids = _find_sensitive(events, pattern)
    print(f"Found {len(sensitive_ids)} sensitive events")

    if not sensitive_ids:
        return

    yes_redact = input(
        f"Do you want to replace all the matching strings with '{REDACTED}'? (y/N): "
    )
    if yes_redact == "y":
        for e in events:
            if e.id in sensitive_ids:
                e_before = e
                e = _redact_event(e, pattern)
                print(f"\nData before: {e_before.data}")
                print(f"Data after:  {e.data}")

                if DRYRUN:
                    print("DRYRUN, would do: aw.insert_event(bucket_id, e)")
                else:
                    aw.delete_event(bucket_id, cast(int, e_before.id))
                    aw.insert_event(bucket_id, e)
                    print("Redacted event")


def _check_event(e: Event, pattern: Union[str, Pattern]) -> bool:
    for k, v in e.data.items():
        if isinstance(v, str):
            if isinstance(pattern, str):
                if pattern in v.lower():
                    return True
            else:
                if pattern.findall(v.lower()):
                    return True
    return False


def _redact_event(e: Event, pattern: Union[str, Pattern]) -> Event:
    e = deepcopy(e)
    for k, v in e.data.items():
        if isinstance(v, str):
            if isinstance(pattern, str):
                if pattern in v.lower():
                    e.data[k] = REDACTED
            else:
                if pattern.findall(v.lower()):
                    e.data[k] = REDACTED
    return e


def _find_sensitive(el: List[Event], pattern: Union[str, Pattern]) -> Set:
    sensitive_ids = set()
    for e in el:
        if _check_event(e, pattern):
            sensitive_ids.add(e.id)

    return sensitive_ids


if __name__ == "__main__":
    main()