reports.py
import traceback
import tools
from google.appengine.ext import ndb
from google.appengine.api import logservice, memcache
from models import Project, HabitDay, Task, Goal, MiniJournal, Event, TrackingDay
from constants import REPORT, GCS_REPORT_BUCKET, GOAL
import cloudstorage as gcs
from datetime import datetime
import gc
import csv
from common.decorators import deferred_task_decorator
import logging
TEST_TOO_LONG_ON_EVERY_BATCH = False
MC_EXPORT_STATUS = "MC_EXPORT_STATUS_%s"
MAX_REQUEST_SECONDS = 40*3
DATE_FMT = "%Y-%m-%d %H:%M:%S %Z"
class TooLongError(Exception):
def __init__(self):
pass
class GCSReportWorker(object):
KIND = None
def __init__(self, rkey, start_att="__key__", start_att_desc=False, title="Report"):
self.report = rkey.get()
if not self.report:
logging.error("Error retrieving report [ %s ] from db" % rkey)
return
self.start_att = start_att
self.start_att_desc = start_att_desc
self.FILTERS = []
self.report.status = REPORT.GENERATING
self.specs = self.report.get_specs()
self.start_ts = self.specs.get('start', 0)
self.end_ts = self.specs.get('end', 0)
self.report.generate_title(title, ts_start=self.start_ts, ts_end=self.end_ts)
self.report.put()
self.add_date_filters(start=self.start_ts, end=self.end_ts)
self.user = self.report.key.parent().get()
self.ancestor = self.user
self.counters = {
'run': 0,
'skipped': 0
}
self.worker_start = tools.unixtime()
self.cursor = None
self.worker_cancelled = False
self.prefetch_props = []
self.date_columns = []
self.headers = []
self.projection = None
self.cursor = None
self.query = None
self.batch_size = 1000
self.report_prog_mckey = MC_EXPORT_STATUS % self.report.key
self.setProgress({'val': 0, "status": REPORT.GENERATING})
self.gcs_file = gcs.open(self.get_gcs_filename(), 'w')
# From: https://code.google.com/p/googleappengine/issues/detail?id=8809
logservice.AUTOFLUSH_ENABLED = True
logservice.AUTOFLUSH_EVERY_BYTES = None
logservice.AUTOFLUSH_EVERY_SECONDS = 1
logservice.AUTOFLUSH_EVERY_BYTES = 1024
logservice.AUTOFLUSH_EVERY_LINES = 1
def add_date_filters(self, start=None, end=None):
if start:
self.FILTERS.append("%s >= DATETIME('%s 00:00:00')" % (self.start_att, tools.iso_date(tools.dt_from_ts(start))))
if end:
self.FILTERS.append("%s < DATETIME('%s 23:59:59')" % (self.start_att, tools.iso_date(tools.dt_from_ts(end))))
def get_gcs_filename(self):
r = self.report
filename = GCS_REPORT_BUCKET + "/uid:%d/%s.%s" % (self.user.key.id(), r.key.id(), r.extension)
r.gcs_files.append(filename)
return r.gcs_files[-1]
@deferred_task_decorator
def run(self, start_cursor=None):
self.worker_start = tools.unixtime()
self.cursor = start_cursor
if not start_cursor:
self.writeHeaders()
try:
# This is heavy
self.writeData()
except TooLongError:
logging.debug("TooLongError: Going to the next batch")
if self.report:
self.finish(reportDone=False)
tools.safe_add_task(self.run, start_cursor=self._get_cursor(), _queue="report-queue")
except Exception, e: # including DeadlineExceededError
traceback.print_exc()
logging.error("Error: %s" % e)
self.setProgress({'error': "Error occurred: %s" % e, 'status': REPORT.ERROR})
return
else:
tools.safe_add_task(self.finish)
def writeHeaders(self):
if self.report.ftype == REPORT.CSV:
csv.writer(self.gcs_file).writerow(tools.normalize_list_to_ascii(self.headers))
def writeData(self):
total_i = self.counters['run']
while True:
self.query = self._get_gql_query()
if self.query:
entities, self.cursor, more = self.KIND.gql(self.query).fetch_page(self.batch_size, start_cursor=self.cursor)
if not entities:
logging.debug("No rows returned by query -- done")
return
else:
logging.debug("Got %d rows" % len(entities))
for entity in entities:
if entity:
ed = self.entityData(entity)
else:
continue
if self.report.ftype == REPORT.CSV:
csv.writer(self.gcs_file).writerow(tools.normalize_list_to_ascii(ed))
self.gcs_file.flush()
total_i += 1
self.counters['run'] += 1
if total_i % 100 == 0:
cancelled = self.updateProgressAndCheckIfCancelled()
if cancelled:
self.report.CleanDelete()
logging.debug("Worker cancelled by user, report deleted.")
return
logging.debug("Batch of %d done" % len(entities))
elapsed_ms = tools.unixtime() - self.worker_start
elapsed = elapsed_ms / 1000
if elapsed >= MAX_REQUEST_SECONDS or (tools.on_dev_server() and TEST_TOO_LONG_ON_EVERY_BATCH):
logging.debug("Elapsed %ss" % elapsed)
raise TooLongError()
def updateProgressAndCheckIfCancelled(self):
progress = self.getProgress()
return progress and progress.get('status') == REPORT.CANCELLED
def getProgress(self):
return memcache.get(self.report_prog_mckey)
def setProgress(self, updatedProgress):
progress = self.getProgress()
if progress:
progress.update(updatedProgress)
else:
progress = updatedProgress
memcache.set(self.report_prog_mckey, progress)
def entityData(self, entity):
"""
Override with format specific to report type
"""
self.setProgress({'val': 0})
return []
@deferred_task_decorator
def finish(self, reportDone=True):
"""Called when the worker has finished, to allow for any final work to be done."""
progress = None
if reportDone:
self.gcs_file.close()
self.report.status = REPORT.DONE
self.report.dt_generated = datetime.now()
self.report.put()
duration = self.report.get_duration()
logging.debug("GCSReportWorker finished. Counters: %s. Report ran for %d seconds." % (self.counters, duration))
progress = {
"status": REPORT.DONE,
"resource": self.report.get_gcs_file(),
"generated": tools.unixtime(dt=self.report.dt_generated),
"report": self.report.json(),
"duration": duration
}
else:
logging.debug("Batch finished. Counters: %s" % (self.counters))
p = {
'val': self.counters['run'],
"filename": self.report.title
}
if progress:
p.update(progress)
self.setProgress(p)
gc.collect() # Garbage collector
def _get_cursor(self):
return self.query.cursor() if self.query else None
def _get_gql_query(self):
"""Returns a query over the specified kind, with any appropriate filters applied."""
if self.FILTERS or self.ancestor:
query_string = "WHERE "
if self.ancestor:
query_string += "ANCESTOR IS KEY('%s')" % (self.ancestor.key.urlsafe())
if self.FILTERS:
query_string += ' AND ' + ' AND '.join(self.FILTERS)
query_string += " ORDER BY %s" % self.start_att
if self.start_att_desc:
query_string += " DESC"
return query_string
else:
logging.debug("No FILTERS or ancestor, not querying")
class HabitReportWorker(GCSReportWorker):
KIND = HabitDay
def __init__(self, rkey):
super(HabitReportWorker, self).__init__(rkey, start_att="dt_created", title="Habit Report")
self.prefetch_props = ['habit']
self.headers = ["Created", "Updated", "Date", "Habit", "Done", "Committed"]
def entityData(self, hd):
habit = hd.habit.get()
row = [
tools.sdatetime(hd.dt_created, fmt=DATE_FMT),
tools.sdatetime(hd.dt_updated, fmt=DATE_FMT),
tools.iso_date(hd.date),
habit.name if habit else "",
"1" if hd.done else "0",
"1" if hd.committed else "0"
]
return row
class TaskReportWorker(GCSReportWorker):
KIND = Task
def __init__(self, rkey):
super(TaskReportWorker, self).__init__(rkey, start_att="dt_created", title="Task Report")
self.prefetch_props = ['habit']
self.headers = [
"Date Created", "Date Due", "Date Done", "Title", "Done", "Archived", "Seconds Logged",
"Complete Sessions Logged"]
def entityData(self, task):
timer_ms = task.timer_total_ms or 0
sess = task.timer_complete_sess or 0
row = [
tools.sdatetime(task.dt_created, fmt=DATE_FMT),
tools.sdatetime(task.dt_due, fmt=DATE_FMT),
tools.sdatetime(task.dt_done, fmt=DATE_FMT),
task.title,
"1" if task.is_done() else "0",
"1" if task.archived else "0",
str(timer_ms / 1000),
str(sess)
]
return row
class ProjectReportWorker(GCSReportWorker):
KIND = Project
def __init__(self, rkey):
super(ProjectReportWorker, self).__init__(rkey, start_att="dt_created", start_att_desc=True, title="Project Report")
self.headers = [
"Date Created", "Date Due", "Date Completed", "Date Archived", "Title", "Subhead",
"Links", "Starred", "Archived", "Progress"]
for i in range(10):
self.headers.append("Progress %d%%" % ((i+1) * 10))
def entityData(self, prj):
row = [
tools.sdatetime(prj.dt_created, fmt=DATE_FMT),
tools.sdatetime(prj.dt_due, fmt=DATE_FMT),
tools.sdatetime(prj.dt_completed, fmt=DATE_FMT),
tools.sdatetime(prj.dt_archived, fmt=DATE_FMT),
prj.title,
prj.subhead,
', '.join(prj.urls),
"1" if prj.starred else "0",
"1" if prj.archived else "0",
"%d%%" % (prj.progress * 10)
]
for i in range(10):
val = ""
if prj.progress_ts and len(prj.progress_ts) > i:
ms = prj.progress_ts[i]
val = tools.sdatetime(tools.dt_from_ts(ms), fmt=DATE_FMT)
row.append(val)
return row
class GoalReportWorker(GCSReportWorker):
KIND = Goal
def __init__(self, rkey):
super(GoalReportWorker, self).__init__(rkey, start_att="dt_created", title="Goal Report")
self.prefetch_props = ['habit']
self.n_slots = int(self.user.get_setting_prop(['goals', 'preferences', 'slots'], default=GOAL.DEFAULT_GOAL_SLOTS))
self.headers = ["Goal Period", "Date Created"]
for i in range(1, self.n_slots+1):
self.headers.append("Text %s" % i)
self.headers.extend(["Goal Assessments", "Overall Assessment"])
def entityData(self, goal):
n_texts = len(goal.text) if goal.text else 0
row = [goal.key.id(), tools.sdatetime(goal.dt_created, fmt=DATE_FMT)]
slots = [(goal.text[i] if n_texts > i else "") for i in range(self.n_slots)]
row += slots
row += [','.join([str(a) for a in goal.assessments]) if goal.assessments else ""]
row += [str(goal.assessment) if goal.assessment else ""]
return row
class JournalReportWorker(GCSReportWorker):
KIND = MiniJournal
def __init__(self, rkey):
super(JournalReportWorker, self).__init__(rkey, start_att="dt_created", title="Journal Report")
self.prefetch_props = ['habit']
self.headers = ["Date", "Tags", "Location", "Data"]
def entityData(self, jrnl):
row = [
tools.iso_date(jrnl.date),
', '.join([key.id() for key in jrnl.tags]),
str(jrnl.location) if jrnl.location else "",
jrnl.data if jrnl.data else ""
]
return row
class EventReportWorker(GCSReportWorker):
KIND = Event
def __init__(self, rkey):
super(EventReportWorker, self).__init__(rkey, start_att="date_start", title="Event Report")
self.headers = ["Date Start", "Date End", "Title", "Details", "Color"]
def entityData(self, event):
row = [
tools.iso_date(event.date_start),
tools.iso_date(event.date_end),
event.title,
event.details,
event.color
]
return row
class TrackingReportWorker(GCSReportWorker):
KIND = TrackingDay
def __init__(self, rkey):
super(TrackingReportWorker, self).__init__(rkey, start_att_desc=True, start_att="date", title="Tracking Report")
self.headers = ["Date", "Data"]
def entityData(self, td):
data = tools.getJson(td.data)
data_text = []
for key, val in data.items():
data_text.append("%s:%s" % (key, val))
row = [
tools.iso_date(td.date),
', '.join(data_text)
]
return row