masschallenge/impact-api

View on GitHub
web/impact/impact/v1/views/office_hours_calendar_view.py

Summary

Maintainability
B
4 hrs
Test Coverage
from datetime import (
    datetime,
    timedelta,
)

import calendar

from pytz import utc

from rest_framework.response import Response

from django.contrib.auth import get_user_model
from django.db.models import (
    BooleanField,
    Case,
    Count,
    F,
    Q,
    When,
    Value,
)
from . import ImpactView
from ...utils import compose_filter
from ...permissions.v1_api_permissions import (
    IsAuthenticated,
)
from accelerator.models import (
    Clearance,
    MentorProgramOfficeHour,
    ProgramFamily,
    ProgramRoleGrant,
    UserRole,
    Location,
)
from accelerator_abstract.models.base_user_utils import is_employee
User = get_user_model()


ISO_8601_DATE_FORMAT = "%Y-%m-%d"
ONE_DAY = timedelta(1)
ONE_WEEK = timedelta(8)



STAFF = "staff"
MENTOR = "mentor"
FINALIST = "finalist"
NOT_ALLOWED = "not_allowed"
LOCATION_FIELDS = (
    'id',
    'street_address',
    'timezone',
    'country',
    'state',
    'name',
    'city',
)

OFFICE_HOUR_HOLDER_ROLES = [UserRole.MENTOR, UserRole.AIR]
OFFICE_HOUR_RESERVER_ROLES = [UserRole.FINALIST, UserRole.ALUM]
OFFICE_HOURS_HOLDER = Q(
    program_role__user_role__name__in=OFFICE_HOUR_HOLDER_ROLES)
OFFICE_HOURS_RESERVER = Q(
    program_role__user_role__name__in=OFFICE_HOUR_RESERVER_ROLES)
ACTIVE_PROGRAM = Q(program_role__program__program_status='active')


class OfficeHoursCalendarView(ImpactView):
    permission_classes = [IsAuthenticated]
    view_name = "office_hours_calendar_view"
    SUCCESS_HEADER = "Office hours fetched successfully"
    FAIL_HEADER = "Office hours could not be fetched"
    BAD_FOCAL_DATE = "We were unable to parse the date specifier"
    NO_SUCH_USER = "We were not able to locate that user"
    NOT_OFFICE_HOURS_VIEWER = ("You are not able to view office hours at this "
                               "time. Please see MassChallenge staff.")

    def get(self, request):
        self.response_elements = {}
        (self._get_target_user(request) and
         self._check_request_user_type() and
         self._get_date_range(request) and
         self._set_user_query() and
         self._get_office_hours_data(request))
        return Response(self.response_elements)

    def _get_target_user(self, request):
        user_id = request.query_params.get("user_id", None)
        if user_id is None:
            self.target_user = request.user
        else:
            try:
                self.target_user = User.objects.get(pk=user_id)
            except User.DoesNotExist:
                self.fail(self.NO_SUCH_USER)
                return False
        return True

    def _check_request_user_type(self):
        """
        determine whether we are viewing as "staff", "mentor", or "finalist"
        if we pursue AC-7778, this will be modified to read from request data
        """
        if is_employee(self.target_user):
            self.request_user_type = STAFF
        elif _is_mentor(self.target_user):
            self.request_user_type = MENTOR
        elif _is_finalist(self.target_user):
            self.request_user_type = FINALIST
        else:
            self.request_user_type = NOT_ALLOWED
        return True

    def _get_date_range(self, request):
        focal_date = request.query_params.get("focal_date", None)
        calendar_span = request.query_params.get("calendar_span", "week")

        try:
            self.start_date, self.end_date = _date_range(calendar_span, focal_date)

        except ValueError:
            self.fail(self.BAD_FOCAL_DATE)
            return False
        return True

    def _office_hours_queryset(self):
        if self.request_user_type == STAFF:
            return self._staff_office_hours_queryset()
        elif self.request_user_type == MENTOR:

            return self._mentor_office_hours_queryset()
        elif self.request_user_type == FINALIST:
            return self._finalist_office_hours_queryset()
        else:
            self.response_elements['detail'] = self.NOT_OFFICE_HOURS_VIEWER
            return self._null_office_hours_queryset()

    def _staff_office_hours_queryset(self):
        staff_programs = Clearance.objects.clearances_for_user(
            self.target_user).values_list(
                "program_family", flat=True)
        in_visible_program_family = Q(
            program_role__program__program_family__in=staff_programs)
        program_mentors = ProgramRoleGrant.objects.filter(
            ACTIVE_PROGRAM &
            OFFICE_HOURS_HOLDER &
            in_visible_program_family).values_list(
                "person__id", flat=True)
        relevant_staff = _relevant_staff(staff_programs)
        mentors = list(program_mentors) + [self.target_user.id]
        active_mentors = Q(mentor__in=mentors)
        return MentorProgramOfficeHour.objects.filter(
            active_mentors | relevant_staff,
            start_date_time__range=[self.start_date, self.end_date]).order_by(
                'start_date_time').annotate(
                    finalist_count=Count("finalist")).annotate(
                        own_office_hour=Case(
                            When(mentor_id=self.target_user.id,
                                 then=Value(True)),
                            default=Value(False),
                            output_field=BooleanField()))

    def _mentor_office_hours_queryset(self):
        return MentorProgramOfficeHour.objects.filter(
            mentor=self.target_user,
            start_date_time__range=[self.start_date, self.end_date]).order_by(
            'start_date_time').annotate(
            finalist_count=Count("finalist")).annotate(
            own_office_hour=Case(
                default=Value(False),
                output_field=BooleanField()))

    def _finalist_office_hours_queryset(self):
        reserved_by_user = Q(finalist=self.target_user)
        unreserved = Q(finalist__isnull=True)
        user_programs = self.target_user.programrolegrant_set.filter(
            ACTIVE_PROGRAM & OFFICE_HOURS_RESERVER).values_list(
                "program_role__program", flat=True)
        relevant_staff = _relevant_staff(user_programs)
        relevant_mentors = Q(**compose_filter(("mentor",
                                               "programrolegrant",
                                               "program_role",
                                               "program",
                                               "in"),
                                              user_programs))
        return MentorProgramOfficeHour.objects.filter(
            (reserved_by_user |
             unreserved & (relevant_mentors | relevant_staff)),
            start_date_time__range=[self.start_date, self.end_date]).annotate(
                own_office_hour=Case(
                    default=Value(False),
                    output_field=BooleanField()))

    def _null_office_hours_queryset(self):
        return MentorProgramOfficeHour.objects.none().annotate(
            own_office_hour=Case(
                default=Value(False),
                output_field=BooleanField()))

    def _set_user_query(self):
        if self.request_user_type == STAFF:
            self.user_query = Clearance.objects.clearances_for_user(
                self.target_user).filter(
                program_family__programs__program_status='active')
            self.location_path = "__".join(["program_family",
                                            "programfamilylocation",
                                            "location"])
            self.program_family = "__".join(["program_family", "name"])
        else:
            self.user_query = self.target_user.programrolegrant_set.filter(
                OFFICE_HOURS_HOLDER & ACTIVE_PROGRAM)
            self.location_path = "__".join(["program_role",
                                            "program",
                                            "program_family",
                                            "programfamilylocation",
                                            "location"])
            self.program_family = "__".join(["program_role",
                                             "program",
                                             "program_family",
                                             "name"])
        return True

    def _get_office_hours_data(self, request):
        primary_industry_key = "mentor__expertprofile__primary_industry__name"
        office_hours = self._office_hours_queryset().filter(
            program__isnull=True).order_by(
            'start_date_time').annotate(
            finalist_count=Count("finalist")).annotate(
            reserved=Case(
                When(finalist_count__gt=0, then=Value(True)),
                default=Value(False),
                output_field=BooleanField()))

        self.response_elements['calendar_data'] = office_hours.values(
            "id",
            "mentor_id",
            "finalist_id",
            "start_date_time",
            "end_date_time",
            "description",
            "topics",
            "startup_id",
            "reserved",
            "meeting_info",
            "own_office_hour",
            location_name=F("location__name"),
            location_timezone=F("location__timezone"),
            finalist_first_name=F("finalist__first_name"),
            finalist_last_name=F("finalist__last_name"),
            mentor_title=F("mentor__expertprofile__title"),
            mentor_company=F("mentor__expertprofile__company"),
            mentor_first_name=F("mentor__first_name"),
            mentor_last_name=F("mentor__last_name"),
            mentor_primary_industry=F(primary_industry_key),
            startup_name=F("startup__organization__name"),
            finalist_email=F("finalist__email"),
            mentor_email=F("mentor__email"),
            startup_primary_industry=F("startup__primary_industry__name"),
            startup_short_pitch=F("startup__short_pitch")
        )
        self.response_elements['timezones'] = office_hours.filter(
            location__isnull=False).order_by(
                "location__timezone").values_list("location__timezone",
                                                  flat=True).distinct()
        self.response_elements['location_choices'] = self.location_choices()
        program_families = self.mentor_program_families()
        self.response_elements['mentor_program_families'] = program_families
        self.response_elements['user_startups'] = self._user_startups()
        self.response_elements['user_type'] = self.request_user_type
        self.succeed()

    def _user_startups(self):
        return self.target_user.startupteammember_set.order_by(
            '-id').values(
                "startup_id",
                name=F("startup__organization__name")).distinct()

    def mentor_program_families(self):
        return self.user_query.values_list(
            self.program_family, flat=True).distinct()

    def location_choices(self):
        locations = self.user_query.filter(
            Q(**{self.location_path + '__isnull': False})
        ).values(**_location_lookups(self.location_path)).distinct()
        locations_list = list(locations)
        has_remote_location = _check_remote_location(locations_list)

        if not has_remote_location:
            remote_location_fields = dict([("location_" + field, F(field))
                                           for field in LOCATION_FIELDS])
            remote_loc = Location.objects.filter(
                name="Remote").values(**remote_location_fields).first()
            locations_list.append(remote_loc) if remote_loc else locations_list
        return locations_list

    def fail(self, detail):
        self.response_elements['success'] = False
        self.response_elements['header'] = self.FAIL_HEADER
        self.response_elements['detail'] = detail
        self.response_elements['calendar_data'] = None

    def succeed(self):
        self.response_elements['success'] = True
        self.response_elements['header'] = self.SUCCESS_HEADER


def _check_remote_location(locations_list):
    return any([loc['location_name'] == 'Remote' for loc in locations_list])


def _location_lookups(location_path):
    return dict([_location_dict_pair(field, location_path)
                 for field in LOCATION_FIELDS])


def _location_dict_pair(field, location_path):
    return (_format_key(field), _make_f_expression(field, location_path))


def _format_key(field):
    return "location_" + field


def _make_f_expression(field, location_path):
    return F("{}__{}".format(location_path, field))


def _date_range(calendar_span, focal_date=None):
    # returns (start_date, end_date)
    # When calendar_span == week
    # start_date is the latest monday that is less than or equal to today,
    # end_date is start_date + seven days
    # When calendar_span == month 
    # start_date is the first day of the new month
    # end_date is start_date + length of the current month
    # both values are then padded by 24 hours to allow for TZ differences
    # throws ValueError if focal_date is not in ISO-8601 format

    if focal_date:
        initial_date = datetime.strptime(focal_date, ISO_8601_DATE_FORMAT)
    else:
        initial_date = datetime.now()

    initial_date = utc.localize(initial_date)
    if calendar_span == "month":
        reducer = timedelta(int(initial_date.strftime("%d")))
        span = timedelta(31)
    else:
        # This calculation depends on the fact that monday == 0 in python
        reducer = timedelta(initial_date.weekday())
        span = ONE_WEEK

    start_date = initial_date - reducer
    end_date = start_date + span + ONE_DAY
    adjusted_start_date = start_date - ONE_DAY
    return adjusted_start_date, end_date


def _is_mentor(user):
    return user.programrolegrant_set.filter(
        ACTIVE_PROGRAM & OFFICE_HOURS_HOLDER).exists()


def _is_finalist(user):
    return user.programrolegrant_set.filter(
        ACTIVE_PROGRAM & OFFICE_HOURS_RESERVER).exists()


def _relevant_staff(user_programs):
    program_families = ProgramFamily.objects.filter(programs__in=user_programs)
    staff_ids = Clearance.objects.filter(
        program_family_id__in=program_families).values_list(
            'user_id', flat=True)
    return Q(**compose_filter(('mentor',
                               'id',
                               'in'),
                              staff_ids))