open-synergy/opnsynid-hr

View on GitHub
hr_attendance_computation/models/hr_attendance.py

Summary

Maintainability
F
4 days
Test Coverage
# -*- coding: utf-8 -*-
# Copyright 2011 Domsense srl (<http://www.domsense.com>)
# Copyright 2011-15 Agile Business Group sagl (<http://www.agilebg.com>)
# Copyright 2017 OpenSynergy Indonesia (<https://opensynergy-indonesia.com>)
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from __future__ import division

import math
import time
from datetime import datetime, timedelta

# from openerp.tools import float_compare
import pytz
from openerp import api, fields, models
from openerp.exceptions import Warning as UserError
from openerp.tools import DEFAULT_SERVER_DATETIME_FORMAT
from openerp.tools.translate import _


class HrAttendance(models.Model):
    # ref: https://bugs.launchpad.net/openobject-client/+bug/887612
    # test: 0.9853 - 0.0085
    _inherit = "hr.attendance"

    def float_time_convert(self, float_val):
        hours = math.floor(abs(float_val))
        mins = abs(float_val) - hours
        mins = round(mins * 60)
        # Original Code
        # Comment by Reason:
        #     1. Mins can't be greater than 60
        # ====================================
        # if mins >= 60.0:
        #    hours = hours + 1
        #    mins = 0.0
        float_time = "%02d:%02d" % (hours, mins)
        return float_time

    def float_to_datetime(self, float_val):
        str_float = self.float_time_convert(float_val)
        hours = int(str_float.split(":")[0])
        minutes = int(str_float.split(":")[1])
        days = 1
        if hours / 24 > 0:
            days += hours / 24
            hours = hours % 24
        return datetime(1900, 1, int(days), hours, minutes)

    # Original Code
    # Comment by Reason:
    #    1. Not used
    # ==================================================
    # def float_to_timedelta(self, float_val):
    #    str_time = self.float_time_convert(float_val)
    #    int_hour = int(str_time.split(":")[0])
    #    int_minute = int(str_time.split(":")[1])
    #    return timedelta(
    #        0,
    #        (int_hour * 3600.0) + (int_minute * 6.0)),

    def total_seconds(self, td):
        return (
            td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6
        ) / 10**6

    def time_difference(self, float_start_time, float_end_time, help_message=False):
        # Original Code
        # Condition:
        #    1. End Time = Duration within working schedule
        #    2. Start Time = Duration
        # Comment by Reason:
        #    1. Start Time can't be greater than end time
        # ================================================================
        # if float_compare(
        #    float_end_time, float_start_time, precision_rounding=0.0000001
        # ) == -1:
        #    that means a difference smaller than 0.36 milliseconds
        #    message = _('End time %s < start time %s %s') % (
        #        unicode(float_end_time),
        #        unicode(float_start_time),
        #        help_message and '(' + help_message + ')' or ''
        #    )
        #    raise UserError(message)

        delta = self.float_to_datetime(float_end_time) - self.float_to_datetime(
            float_start_time
        )
        return self.total_seconds(delta) / 3600.0

    def time_sum(self, float_first_time, float_second_time):
        str_first_time = self.float_time_convert(float_first_time)
        first_timedelta = timedelta(
            0,
            int(str_first_time.split(":")[0]) * 3600.0
            + int(str_first_time.split(":")[1]) * 60.0,
        )
        str_second_time = self.float_time_convert(float_second_time)
        second_timedelta = timedelta(
            0,
            int(str_second_time.split(":")[0]) * 3600.0
            + int(str_second_time.split(":")[1]) * 60.0,
        )
        return self.total_seconds(first_timedelta + second_timedelta) / 60.0 / 60.0

    def split_interval_time_by_precision(
        self, start_datetime, duration, precision=0.25
    ):
        # start_datetime: datetime, duration: hours, precision: hours
        # returns [(datetime, hours)]
        res = []
        while duration > precision:
            res.append((start_datetime, precision))
            start_datetime += timedelta(hours=precision)
            duration -= precision
        if duration > precision / 2.0:
            res.append((start_datetime, precision))
        return res

    def datetime_to_hour(self, datetime_):
        hour = datetime_.hour + datetime_.minute / 60.0 + datetime_.second / 3600.0
        return hour

    def mid_time_interval(self, datetime_start, delta):
        return datetime_start + timedelta(hours=delta / 2.0)

    @api.model
    def matched_schedule(self, datetime_, weekday_char, calendar_id, context=None):

        calendar_attendance_pool = self.env["resource.calendar.attendance"]
        datetime_hour = self.datetime_to_hour(datetime_)
        matched_schedules = calendar_attendance_pool.search(
            [
                "&",
                "|",
                ("date_from", "=", False),
                ("date_from", "<=", datetime_.date()),
                "|",
                ("dayofweek", "=", False),
                ("dayofweek", "=", weekday_char),
                ("calendar_id", "=", calendar_id),
                ("hour_to", ">=", datetime_hour),
                ("hour_from", "<=", datetime_hour),
            ],
        )
        return matched_schedules

    # Original Code
    # Comment by Reason:
    #    1. Not used
    # ====================================
    # @api.model
    # def get_reference_calendar(
    #        self, employee_id, date=None):
    #
    #    if date is None:
    #        date = fields.date.context_today()
    #
    #    contract_pool = self.env['hr.contract']
    #    employee_pool = self.env['hr.employee']
    #
    #    active_contracts = contract_pool.search([
    #        '&',
    #        ('employee_id', '=', employee_id),
    #        '|',
    #        '&',
    #        ('date_start', '<=', date),
    #        '|',
    #        ('date_end', '>=', date),
    #        ('date_end', '=', False),
    #        '&',
    #        '&',
    #        ('trial_date_start', '!=', False),
    #        ('trial_date_start', '<=', date),
    #        '&',
    #        ('trial_date_end', '!=', False),
    #        ('trial_date_end', '>=', date),
    #    ])
    #
    #    if len(active_contracts) > 1:
    #        employee = employee_pool.browse(employee_id)
    #        msg = _('Too many active contracts for employee %s at date %s')
    #        raise UserError(msg % (employee.name, date))
    #    elif active_contracts:
    #        contract = active_contracts[0]
    #        return contract.working_hours
    #    else:
    #        return None

    def _ceil_rounding(self, rounding, datetime_):
        minutes = datetime_.minute / 60.0 + datetime_.second / 3600.0
        return math.ceil(minutes * rounding) / rounding

    def _floor_rounding(self, rounding, datetime_):
        minutes = datetime_.minute / 60.0 + datetime_.second / 3600.0
        return math.floor(minutes * rounding) / rounding

    # TODO: this is for functional field
    @api.depends(
        "triggering_attendance_id",
        "triggering_attendance_id.name",
        "triggering_attendance_id.action",
        "triggering_attendance_id.employee_id",
        "employee_id.contract_ids",
        "employee_id.contract_ids.date_start",
        "employee_id.contract_ids.date_start",
        "employee_id.contract_ids.date_end",
        "employee_id.contract_ids.trial_date_start",
        "employee_id.contract_ids.trial_date_end",
        "employee_id.contract_ids.working_hours",
        "employee_id.contract_ids.working_hours.attendance_ids",
        "employee_id.contract_ids.working_hours.attendance_ids.dayofweek",
        "employee_id.contract_ids.working_hours.attendance_ids.date_from",
        "employee_id.contract_ids.working_hours.attendance_ids.hour_from",
        "employee_id.contract_ids.working_hours.attendance_ids.hour_to",
        "employee_id.contract_ids.working_hours.attendance_ids.calendar_id",
    )
    @api.multi
    def _compute_attendance_duration(self):  # noqa C901
        precision = (
            self.env["res.users"]
            .browse(self.env.user.id)
            .company_id.working_time_precision
        )

        # 2012.10.16 LF FIX : Get timezone from context
        active_tz = pytz.timezone(self.env.context.get("tz") or "UTC")
        str_now = datetime.strftime(datetime.now(), DEFAULT_SERVER_DATETIME_FORMAT)
        for attendance in self:
            duration = 0.0
            # 2012.10.16 LF FIX : Attendance in context timezone
            attendance_start = (
                datetime.strptime(attendance.name, DEFAULT_SERVER_DATETIME_FORMAT)
                .replace(tzinfo=pytz.utc)
                .astimezone(active_tz)
            )
            next_attendance_date = str_now
            next_attendance = False
            # should we compute for sign out too?
            if attendance.action == "sign_in":
                next_attendances = self.search(
                    [
                        ("employee_id", "=", attendance.employee_id.id),
                        ("name", ">", attendance.name),
                    ],
                    order="name",
                )
                if next_attendances:
                    next_attendance = next_attendances[0]
                    # Original Code
                    # Comment by Reason:
                    #    1. hr.attendance already has constraints againts it
                    # ======================================================
                    # if next_attendance.action == 'sign_in':
                    #    2012.10.16 LF FIX : Attendance in context timezone
                    #    raise UserError(
                    #        _('Incongruent data: sign-in %s is followed by '
                    #          'another sign-in') % attendance_start)
                    next_attendance_date = next_attendance.name
                # 2012.10.16 LF FIX : Attendance in context timezone
                attendance_stop = (
                    datetime.strptime(
                        next_attendance_date, DEFAULT_SERVER_DATETIME_FORMAT
                    )
                    .replace(tzinfo=pytz.utc)
                    .astimezone(active_tz)
                )
                duration_delta = attendance_stop - attendance_start
                duration = self.total_seconds(duration_delta) / 3600.0
                duration = round(duration / precision) * precision
            attendance.duration = duration
            attendance.end_datetime = next_attendance_date
            # If calendar is not specified: working days = 24/7
            attendance.inside_calendar_duration = duration
            attendance.outside_calendar_duration = 0.0
            reference_calendar = (
                attendance.employee_id.contract_id
                and attendance.employee_id.contract_id.working_hours
                or False
            )
            # reference_calendar = self.get_reference_calendar(
            #     attendance.employee_id.id,
            #     date=str_now[:10])
            if reference_calendar and next_attendance:
                # raise UserError("weks")
                if reference_calendar:
                    # TODO applicare prima arrotondamento o tolleranza?
                    if reference_calendar.attendance_rounding:
                        float_attendance_rounding = float(
                            reference_calendar.attendance_rounding
                        )
                        rounded_start_hour = self._ceil_rounding(
                            float_attendance_rounding, attendance_start
                        )
                        rounded_stop_hour = self._floor_rounding(
                            float_attendance_rounding, attendance_stop
                        )
                        # if shift is approximately one hour
                        if abs(1 - rounded_start_hour) < 0.01:
                            attendance_start = datetime(
                                attendance_start.year,
                                attendance_start.month,
                                attendance_start.day,
                                attendance_start.hour + 1,
                            )
                        else:
                            attendance_start = datetime(
                                attendance_start.year,
                                attendance_start.month,
                                attendance_start.day,
                                attendance_start.hour,
                                int(round(rounded_start_hour * 60.0)),
                            )
                        attendance_stop = datetime(
                            attendance_stop.year,
                            attendance_stop.month,
                            attendance_stop.day,
                            attendance_stop.hour,
                            int(round(rounded_stop_hour * 60.0)),
                        )
                        # again
                        duration_delta = attendance_stop - attendance_start
                        duration = self.total_seconds(duration_delta) / 3600.0
                        duration = round(duration / precision) * precision
                        attendance.duration = duration
                    attendance.inside_calendar_duration = 0.0
                    attendance.outside_calendar_duration = 0.0
                    calendar_id = reference_calendar.id
                    intervals_within = 0
                    # split attendance in intervals = precision
                    # 2012.10.16 LF FIX : no recursion in split attendance
                    splitted_attendances = self.split_interval_time_by_precision(
                        attendance_start, duration, precision
                    )
                    counter = 0
                    for atomic_attendance in splitted_attendances:
                        counter += 1
                        centered_attendance = self.mid_time_interval(
                            atomic_attendance[0],
                            delta=atomic_attendance[1],
                        )
                        # check if centered_attendance is within a working
                        # schedule
                        # 2012.10.16 LF FIX : weekday must be single character
                        # not int
                        weekday_char = unicode(  # noqa: F821
                            unichr(centered_attendance.weekday() + 48)  # noqa: F821
                        )
                        matched_schedules = self.matched_schedule(
                            centered_attendance,
                            weekday_char,
                            calendar_id,
                        )
                        if len(matched_schedules) > 1:
                            raise UserError(
                                _("Wrongly configured working schedule with " "id %s")
                                % unicode(calendar_id)  # noqa: F821
                            )
                        if matched_schedules:
                            intervals_within += 1
                            # sign in tolerance
                            if intervals_within == 1:
                                att = matched_schedules[0]
                                att_start = self.datetime_to_hour(attendance_start)
                                if (
                                    att.hour_from
                                    and (
                                        att_start
                                        >= att_start - att.hour_from - att.tolerance_to
                                    )
                                    < 0.01
                                ):
                                    # handling float roundings (<=)
                                    additional_intervals = round(
                                        (att_start - att.hour_from) / precision
                                    )
                                    intervals_within += additional_intervals
                                    attendance.duration = self.time_sum(
                                        attendance.duration,
                                        additional_intervals * precision,
                                    )
                            # sign out tolerance
                            if len(splitted_attendances) == counter:
                                att = matched_schedules[0]
                                att_stop = self.datetime_to_hour(attendance_stop)
                                if att_stop <= att.hour_to and (
                                    att_stop - att.hour_to + att.tolerance_from
                                ) > (-0.01):
                                    # handling float roundings (>=)
                                    additional_intervals = round(
                                        (att.hour_to - att_stop) / precision
                                    )
                                    intervals_within += additional_intervals
                                    attendance.duration = self.time_sum(
                                        attendance.duration,
                                        additional_intervals * precision,
                                    )
                    attendance.inside_calendar_duration = intervals_within * precision
                    # make difference using time in order to avoid
                    # rounding errors
                    # inside_calendar_duration can't be > duration
                    attendance.outside_calendar_duration = self.time_difference(
                        attendance.inside_calendar_duration,
                        attendance.duration,
                        help_message="Attendance ID %s" % attendance.id,
                    )
                    if reference_calendar.overtime_rounding:
                        if attendance.outside_calendar_duration:
                            overtime = attendance.outside_calendar_duration
                            cal = reference_calendar
                            if cal.overtime_rounding_tolerance:
                                overtime = self.time_sum(
                                    overtime, cal.overtime_rounding_tolerance
                                )
                            float_overtime_rounding = float(
                                reference_calendar.overtime_rounding
                            )
                            attendance.outside_calendar_duration = (
                                math.floor(overtime * float_overtime_rounding)
                                / float_overtime_rounding
                            )

    @api.depends("name", "action", "employee_id")
    @api.multi
    def _compute_triggering_attendance_id(self):
        for attendance in self:
            attendance.triggering_attendance_id = False
            if attendance.action == "sign_in":
                attendance.triggering_attendance_id = attendance.id
            elif attendance.action == "sign_out":
                previous_attendances = self.search(
                    [
                        ("employee_id", "=", attendance.employee_id.id),
                        ("name", "<", attendance.name),
                        ("action", "=", "sign_in"),
                    ],
                    order="name",
                )
                if previous_attendances:
                    attendance.triggering_attendance_id = previous_attendances[-1].id

    @api.depends("name")
    @api.multi
    def _compute_day(self):
        for attendance in self:
            attendance.day = time.strftime(
                "%Y-%m-%d", time.strptime(attendance.name, "%Y-%m-%d %H:%M:%S")
            )

    triggering_attendance_id = fields.Many2one(
        string="Triggering Attendance",
        comodel_name="hr.attendance",
        compute="_compute_triggering_attendance_id",
        store=True,
    )
    duration = fields.Float(
        compute="_compute_attendance_duration",
        multi="duration",
        string="Attendance duration",
        store=True,
    )
    end_datetime = fields.Datetime(
        compute="_compute_attendance_duration",
        multi="duration",
        string="End date time",
        store=True,
    )
    outside_calendar_duration = fields.Float(
        compute="_compute_attendance_duration",
        multi="duration",
        string="Overtime",
        store=True,
    )
    inside_calendar_duration = fields.Float(
        compute="_compute_attendance_duration",
        multi="duration",
        string="Duration within working schedule",
        store=True,
    )
    day = fields.Date(
        compute="_compute_day",
        string="Day",
        store=True,
        select=1,
    )

    @api.multi
    def button_dummy(self):
        for att in self:
            #  By writing the 'action' field without changing it,
            #  I'm forcing the '_compute_attendance_duration' to be executed
            att.write({"action": att.action})