onejgordon/flow-dashboard

View on GitHub
services/flow_bigquery.py

Summary

Maintainability
C
1 day
Test Coverage
#!/usr/bin/python
# -*- coding: utf-8 -*-

# API calls to interact with Google Big Query

# TODO:
# - Build in push date overlap to handle late-reported data

from __future__ import absolute_import
from datetime import datetime, timedelta, time
from services.gservice import GoogleServiceFetcher
from constants import JOURNAL
from models import Habit, HabitDay, Task, Readable, MiniJournal
from apiclient.errors import HttpError
import logging
import tools


class BigQueryClient(GoogleServiceFetcher):

    def __init__(self, user, days_ago=8, days_ago_end=1):
        super(BigQueryClient, self).__init__(user,
                                             api='bigquery',
                                             version='v2',
                                             scopes=["https://www.googleapis.com/auth/bigquery"])
        self.user = user
        # self.credentials = GoogleCredentials.get_application_default()
        self.dataset_name = self.user.get_integration_prop('bigquery_dataset_name')
        self.table_name = self.user.get_integration_prop('bigquery_table_name')
        self.days_ago = days_ago
        self.days_ago_end = days_ago_end
        # self.dataset = self.client.dataset(self.dataset_name)
        self.table = None
        self.habits = None  # Dict habit_id -> Habit()
        self.journal_questions = None
        # here = os.path.dirname(os.path.abspath(__file__))
        # logging.debug('google: {} google.cloud.bigquery: {}'.format(
        #     os.path.relpath(google.__file__, here),
        #     os.path.relpath(bigquery.__file__, here)
        # ))

    def _maybe_get_habits(self):
        if self.habits is None:
            self.habits = tools.lookupDict(Habit.Active(self.user), "key_id")

    def _maybe_get_journal_questions(self):
        if self.journal_questions is None:
            qs = tools.getJson(self.user.settings, {}).get('journals', {}).get('questions', [])
            self.journal_questions = qs

    def _field(self, name, type, mode="REQUIRED", description=None):
        f = {
            "type": type,
            "name": name,
            "mode": mode
        }
        if description:
            f["description"] = description
        return f

    def _habit_col(self, h):
        return "habit_%s" % h.slug_name()

    def _journal_col(self, q):
        return "journal_%s" % q.get('name', '')

    def _bq_schema(self):
        # Genreate bigquery schema from user
        common_fields = [
            self._field("date", "DATE"),
            self._field("tasks_done", "INT64"),
            self._field("tasks_undone", "INT64"),
            self._field("habits_done", "INT64"),
            self._field("habits_cmt", "INT64"),
            self._field("habits_cmt_undone", "INT64", description="Habits committed but not completed"),
            self._field("items_read", "INT64"),
            self._field("fav_items_read", "INT64")
        ]
        self._maybe_get_habits()
        self._maybe_get_journal_questions()
        user_fields = []
        for h in self.habits.values():
            user_fields.append(self._field(self._habit_col(h),
                                           "BOOL",
                                           description="Habit done - %s" % h.name))
        for q in self.journal_questions:
            name = self._journal_col(q)
            label = q.get('label')
            data_type = "FLOAT64" if q.get('response_type') in JOURNAL.NUMERIC_RESPONSES else "STRING"
            user_fields.append(
                self._field(name, data_type,
                            mode="NULLABLE",
                            description="Journal response: %s" % label))
        schema = common_fields + user_fields
        # TODO: How do schema changes work?
        return schema

    # def get_table(self):
    #     self.table = self.dataset.table(self.table_name, self._bq_schema())
    #     if not self.table.exists(client=self.client):
    #         logging.debug("Creating table...")
    #         self.table.create(client=self.client)

    def fetch_daily_panel_data(self, since=None, until=None):
        self._maybe_get_habits()
        self._maybe_get_journal_questions()
        if not since:
            since = datetime.combine((datetime.now() - timedelta(days=self.days_ago)).date(), time(0, 0))
        if not until:
            until = datetime.combine((datetime.now() - timedelta(days=self.days_ago_end)).date(), time(0, 0))
        rows = []
        habitdays_by_day = tools.partition(
            HabitDay.Range(self.user, self.habits.values(), since, until_date=until),
            lambda hd: tools.iso_date(hd.date)
        )
        tasks_by_day = tools.partition(
            Task.DueInRange(self.user, since, until, limit=500),
            lambda t: tools.iso_date(t.dt_due)
        )
        readables_by_day = tools.partition(
            Readable.Fetch(self.user, read=True,
                           since=tools.iso_date(since),
                           until=tools.iso_date(until)),
            lambda r: tools.iso_date(r.dt_read)
        )
        journals, iso_dates = MiniJournal.Fetch(self.user, start=since, end=until)
        journals_by_day = tools.partition(journals, lambda jrnl: tools.iso_date(jrnl.date))
        cursor = since
        while cursor <= until:
            iso_date = tools.iso_date(cursor)
            tasks = tasks_by_day.get(iso_date, [])
            habitdays = habitdays_by_day.get(iso_date, [])
            readables = readables_by_day.get(iso_date, [])
            journals = journals_by_day.get(iso_date, [])
            journal = journals[0] if journals else None
            tasks_done = tasks_undone = habits_done = habits_cmt = habits_cmt_undone = items_read = 0
            row = {}
            for t in tasks:
                if t.is_done():
                    tasks_done += 1
                else:
                    tasks_undone += 1
            habits_checklist = self.habits.keys()  # list of habit IDs
            for hd in habitdays:
                hid = hd.habit.id()
                h = self.habits.get(hid)
                if h:
                    habits_checklist.remove(hid)
                    row[self._habit_col(h)] = 'true' if hd.done else 'false'
                if hd.done:
                    habits_done += 1
                if hd.committed:
                    habits_cmt += 1
                    if not hd.done:
                        habits_cmt_undone += 1
            if habits_checklist:
                # Missing habit-days, need to create columns anyway
                for hid in habits_checklist:
                    h = self.habits.get(hid)
                    if h:
                        row[self._habit_col(h)] = 'false'
            items_read = len(readables)
            fav_items_read = len([r for r in readables if r.favorite])
            row.update({
                "id": iso_date,
                "date": iso_date,
                "tasks_done": tasks_done,
                "tasks_undone": tasks_undone,
                "habits_done": habits_done,
                "habits_cmt": habits_cmt,
                "habits_cmt_undone": habits_cmt_undone,
                "items_read": items_read,
                "fav_items_read": fav_items_read
            })
            for q in self.journal_questions:
                name = q.get('name')
                value = None
                if journal:
                    value = journal.get_data_value(name)
                    numeric = q.get('response_type') in JOURNAL.NUMERIC_RESPONSES
                    if numeric:
                        value = tools.safe_number(value, default=0)
                    elif isinstance(value, basestring):
                        value = tools.removeNonAscii(value)
                    else:
                        value = str(value) if value else ""
                row[self._journal_col(q)] = value
            rows.append(row)
            cursor += timedelta(days=1)
        return rows

    def get_table(self):
        from settings.secrets import GOOGLE_PROJECT_NO
        try:
            response = self.service.tables().get(projectId=GOOGLE_PROJECT_NO,
                                                   datasetId=self.dataset_name,
                                                   tableId=self.table_name,
                                                ).execute()
            return True
        except HttpError, e:
            return False

    def create_table(self):
        # TODO
        from settings.secrets import GOOGLE_PROJECT_NO
        body = {
            "tableReference": {
                "projectId": GOOGLE_PROJECT_NO,
                "tableId": self.table_name,
                "datasetId": self.dataset_name
            },
            "schema": {
                "fields": self._bq_schema()
            }
        }
        response = self.service.tables().insert(projectId=GOOGLE_PROJECT_NO,
                                                  datasetId=self.dataset_name,
                                                  body=body).execute()
        logging.debug(response)

    def push_data(self, rows):
        from settings.secrets import GOOGLE_PROJECT_NO
        logging.debug("Inserting %d rows into table '%s'" % (len(rows), self.table_name))
        # errors = self.table.insert_data(rows, row_ids=row_ids,
        #                        ignore_unknown_values=True,
        #                        client=self.client)
        body = {
            "kind": "bigquery#tableDataInsertAllRequest",
            "skipInvalidRows": True,
            "ignoreUnknownValues": True,
            "rows": [{"insertId": r.pop("id"), "json": r} for r in rows]
        }
        response = self.service.tabledata().insertAll(projectId=GOOGLE_PROJECT_NO,
                                                      datasetId=self.dataset_name,
                                                      tableId=self.table_name,
                                                      body=body).execute()
        logging.debug(response)
        attempted_inserts = len(rows)
        errors = 0
        if response:
            if 'insertErrors' in response:
                errors = len(response.get('insertErrors', []))
                logging.warning(response.get('insertErrors'))
        return (attempted_inserts, errors)

    def run(self):
        self.build_service()
        have_table = self.get_table()
        if not have_table:
            self.create_table()
        rows = self.fetch_daily_panel_data()
        attempted_inserts, errors = self.push_data(rows)
        logging.debug("Finished pushing data: Attempted to insert %d, errors %d" % (attempted_inserts, errors))