matejak/estimagus

View on GitHub
estimage/persons.py

Summary

Maintainability
A
3 hrs
Test Coverage
A
90%
import dataclasses
import typing
import collections

import numpy as np
import scipy as sp

from . import data, PluginResolver


def get_all_collaborators(cards):
    ret = set()
    for t in cards:
        ret.update(set(t.collaborators))
        ret.add(t.assignee)
    ret.discard("")
    return ret


class WorkloadSummary(typing.NamedTuple):
    expected_effort_of_full_potential: float


@dataclasses.dataclass
class Workload:
    name: str = ""
    points: float = 0
    cards: typing.List[data.BaseCard] = dataclasses.field(default_factory=list)
    point_parts: typing.Dict[str, float] = dataclasses.field(default_factory=dict)
    proportions: typing.Dict[str, float] = dataclasses.field(default_factory=dict)


def get_people_associaged_with(card: data.BaseCard) -> typing.Set[str]:
    associated_people = get_all_collaborators((card,))
    return associated_people


@PluginResolver.class_is_extendable("Workloads")
@dataclasses.dataclass
class Workloads:
    points: float = 0
    cards: typing.List[data.BaseCard] = dataclasses.field(default_factory=list)
    persons_potential: typing.Dict[str, float] = dataclasses.field(
        default_factory=lambda: collections.defaultdict(lambda: 0))
    persons_indices: typing.Dict[str, int] = dataclasses.field(default_factory=dict)
    cards_indices: typing.Dict[str, int] = dataclasses.field(default_factory=dict)
    work_matrix = np.ndarray
    task_sizes = np.ndarray

    def __init__(self,
                 cards: typing.Iterable[data.BaseCard],
                 model: data.EstiModel,
                 * args, ** kwargs):
        super().__init__(* args, ** kwargs)

        self.model = model
        self.cards_by_name = collections.OrderedDict()
        self.cards = cards
        self.persons_potential = dict()
        self.persons_indices = dict()
        self.cards_indices = dict()
        for t in cards:
            self.cards_by_name[t.name] = t
        self._card_persons_map = dict()
        self._fill_in_collaborators()
        self.task_sizes = np.array([
            self.model.remaining_point_estimate_of(t.name).expected
            for t in self.cards_by_name.values()])
        self.work_matrix = np.zeros((len(self.persons_potential), len(cards)))
        self._create_indices()

    def _create_indices(self):
        for index, person_name in enumerate(self.persons_potential):
            self.persons_indices[person_name] = index
        for index, card in enumerate(self.cards):
            self.cards_indices[card.name] = index

    def _fill_in_collaborators(self):
        all_collaborators = set()
        for name, t in self.cards_by_name.items():
            self.points += self.model.remaining_point_estimate_of(name).expected

            associated_people = get_people_associaged_with(t)
            self._card_persons_map[name] = associated_people
            all_collaborators.update(associated_people)

        for c in all_collaborators:
            self.persons_potential[c] = 1.0

    def get_who_works_on(self, card_name: str) -> typing.Set[str]:
        return self._card_persons_map.get(card_name, set())

    def of_person(self, person_name: str) -> Workload:
        raise NotImplementedError()

    def summary(self) -> WorkloadSummary:
        effort_per_potential = self.task_sizes.sum() / sum(self.persons_potential.values())
        ret = WorkloadSummary(
            expected_effort_of_full_potential=effort_per_potential,
        )
        return ret


class SimpleWorkloads(Workloads):
    def solve_problem(self):
        for tidx, card in enumerate(self.cards):
            if self.task_sizes[tidx] == 0:
                continue
            collaborating_group = self.get_who_works_on(card.name)
            person_potentials = [self.persons_potential.get(name) for name in collaborating_group]
            card_potential = sum(person_potentials)
            self._solve_problem_for_card(tidx, card_potential, collaborating_group)

    def _solve_problem_for_card(self, tidx, card_potential, collaborating_group):
        for pidx, person_name in enumerate(self.persons_potential):
            if person_name not in collaborating_group:
                continue

            own_potential = self.persons_potential[person_name]

            proportion = own_potential / card_potential
            points_contribution = self.task_sizes[tidx]
            points_contribution *= proportion

            self.work_matrix[pidx, tidx] = points_contribution

    def of_person(self, person_name):
        ret = Workload(name=person_name)
        if person_name not in self.persons_indices:
            return ret
        person_index = self.persons_indices[person_name]
        ret.points = sum(self.work_matrix[person_index])
        task_totals = np.sum(self.work_matrix, axis=0)
        for task_index, task_name in enumerate(self.cards_by_name.keys()):
            projection = self.work_matrix[person_index, task_index]
            if projection == 0:
                continue
            ret.cards.append(self.cards_by_name[task_name])
            ret.point_parts[task_name] = projection
            ret.proportions[task_name] = projection / task_totals[task_index]
        return ret


class OptimizedWorkloads(Workloads):
    def __init__(self,
                 cards: typing.Iterable[data.BaseCard],
                 model: data.EstiModel,
                 * args, ** kwargs):
        super().__init__(cards, model, * args, ** kwargs)
        self.task_totals = np.zeros(len(cards))
        self.cards_indices = dict()
        self._create_indices()

    def cost_matrix(self):
        ret = np.ones((len(self.persons_potential), len(self.cards_by_name)))
        ret *= np.inf
        for collab_idx, collab_name in enumerate(self.persons_potential):
            for task_idx, task_name in enumerate(self.cards_by_name):
                task_collaborators = self._card_persons_map[task_name]
                if collab_name in task_collaborators:
                    ret[collab_idx, task_idx] = 1
        return ret

    def solve_problem(self):
        if len(self.task_sizes) == 0 or len(self.persons_potential) == 0:
            return
        costs = self.cost_matrix()
        indices = np.where(np.logical_and(np.min(costs, axis=0) == np.inf, self.task_sizes > 0))[0]
        if len(indices):
            task_names = [self.cards[i].name for i in indices]
            msg = f"Nobody wants to work on some tasks: {task_names}"
            raise ValueError(msg)
        self.work_matrix = solve(self.task_sizes, self.persons_potential.values(), costs)
        self.task_totals = np.sum(self.work_matrix, axis=0)

    def of_person(self, person_name):
        person_index = self.persons_indices[person_name]
        ret = Workload()
        ret.points = sum(self.work_matrix[person_index])
        ret.points = round(ret.points, 1)
        for task_index, task_name in enumerate(self.cards_by_name.keys()):
            projection = self.work_matrix[person_index, task_index]
            projection = round(projection, 1)
            if projection == 0:
                continue
            ret.cards.append(self.cards_by_name[task_name])
            ret.point_parts[task_name] = projection
            ret.proportions[task_name] = projection / self.task_totals[task_index]
        return ret


# For a naming reference, see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linprog.html
# bub:
# 0..num_persons * 2:
#   Either person works less than their potential, or they work more
#   odd(leading) rows represent + work - difference <= (positive) potential
#   even rows represent - work - difference <= - (positive) potential
# +0..num_persons: Nothing to see here
def gen_bub(task_sizes, persons_potential):
    work_per_potential_unit = sum(task_sizes) / sum(persons_potential)
    num_persons = len(persons_potential)
    ret = np.ones(3 * num_persons) * work_per_potential_unit
    coeff_when_person_working_less = -1
    for i, pot in enumerate(persons_potential):
        person_working_more_index = i * 2
        person_working_less_index = person_working_more_index + 1

        ret[person_working_more_index] *= pot
        ret[person_working_less_index] *= pot * coeff_when_person_working_less
    ret[2 * num_persons:] = 0
    return ret


# 0..num_tasks * num_persons: The same meaning as cost matrix that is flattened
# +0..num_persons: Absolute values of diff between work done by a person and their work potential
# +0..num_persons: Effort of particular person (work done over potential)
# +1: Maximum of the person's effort
def gen_c(task_sizes, persons_potential):
    num_tasks = len(task_sizes)
    num_persons = len(persons_potential)
    ret = np.zeros(num_tasks * num_persons + num_persons * 2 + 1)
    index_of_first_diff = num_persons * num_tasks
    for i in range(1, num_persons + 1):
        ret[i + index_of_first_diff] = 0.5 / num_persons
    ret[-1] = 1
    return ret


def _define_persons_overwork_or_underwork(Aub, starting_row_idx, num_persons, num_tasks):
    for perso_idx in range(num_persons):
        persons_work_start_index = perso_idx * num_tasks
        persons_work_end_index = persons_work_start_index + num_tasks
        persons_work_slice = slice(persons_work_start_index, persons_work_end_index)

        work_difference_index = num_persons * num_tasks + perso_idx
        coeff_when_person_working_more = 1
        coeff_when_person_working_less = -1
        person_working_more_index = starting_row_idx + perso_idx * 2
        person_working_less_index = starting_row_idx + person_working_more_index + 1

        Aub[person_working_more_index, persons_work_slice] = coeff_when_person_working_more
        Aub[person_working_more_index, work_difference_index] = -1
        Aub[person_working_less_index, persons_work_slice] = coeff_when_person_working_less
        Aub[person_working_less_index, work_difference_index] = -1


def _define_persons_greatest_effort(Aub, starting_row_idx, num_persons, num_tasks):
    greatest_effort_index = num_tasks * num_persons + 2 * num_persons
    for perso_idx in range(num_persons):
        current_effort_index = num_tasks * num_persons + num_persons + perso_idx
        Aub[starting_row_idx + perso_idx, current_effort_index] = 1
        Aub[starting_row_idx + perso_idx, greatest_effort_index] = -1


# Aub rows:
# 0..num_persons * 2:
#   Either person works less than their potential, or they work more
#   The sum of their work minus their potential, their work difference, is defined here
# +0..num_persons: Definition of max of person's efforts
def gen_Aub(task_sizes, persons_potential):
    num_tasks = len(task_sizes)
    num_persons = len(persons_potential)
    ret = np.zeros((
        num_persons * 3,
        num_tasks * num_persons + num_persons * 2 + 1))
    starting_idx = 0
    _define_persons_overwork_or_underwork(ret, starting_idx, num_persons, num_tasks)
    starting_idx += 2 * num_persons
    _define_persons_greatest_effort(ret, starting_idx, num_persons, num_tasks)
    return ret


def _record_lhs_contributions_make_whole_task(Aeq, starting_row_idx, num_tasks, num_persons):
    for task_idx in range(num_tasks):
        sl = slice(task_idx, num_tasks * num_persons, num_tasks)
        Aeq[starting_row_idx + task_idx, sl] = 1


def _record_lhs_differences_are_the_same(Aeq, starting_row_idx):
    pass


def _record_lhs_no_work_on_unwanted_items(Aeq, starting_row_idx, indices_of_zeros):
    for idx, zero_idx in enumerate(indices_of_zeros):
        Aeq[starting_row_idx + idx, zero_idx] = 1


def _record_lhs_effort_applied(Aeq, starting_row_idx, persons_potential, num_tasks):
    num_persons = len(persons_potential)
    effort_index_base = num_persons * num_tasks + num_persons
    for person_idx, potential in enumerate(persons_potential):
        relative_effort = 1.0 / potential
        task_slice = slice(person_idx * num_tasks, (person_idx + 1) * num_tasks)
        effort_index = effort_index_base + person_idx

        Aeq[starting_row_idx + person_idx, task_slice] = 1.0 * relative_effort
        Aeq[starting_row_idx + person_idx, effort_index] = -1


# Aeq rows:
# 0..num_tasks: Task composition
# +0..num_zeros: Work done by person on a task is zero (if the cost is infinite)
# +0..num_persons: Definition of work effort (work done divided by potential)
def gen_Aeq(task_sizes, persons_potential, labor_cost=None):
    num_tasks = len(task_sizes)
    num_persons = len(persons_potential)
    if labor_cost is None:
        labor_cost = np.ones((num_persons, num_tasks))
    indices_of_zeros = np.where(labor_cost.flatten() == np.inf)[0]
    ret = np.zeros((
        num_tasks + len(indices_of_zeros) + num_persons,
        num_tasks * num_persons + num_persons * 2 + 1))
    starting_index = 0
    _record_lhs_contributions_make_whole_task(ret, starting_index, num_tasks, num_persons)
    starting_index += num_tasks
    _record_lhs_no_work_on_unwanted_items(ret, starting_index, indices_of_zeros)
    starting_index += len(indices_of_zeros)
    _record_lhs_effort_applied(ret, starting_index, persons_potential, num_tasks)
    return ret


def _record_rhs_contributions_make_whole_task(beq, starting_idx, task_sizes):
    num_tasks = len(task_sizes)
    for task_idx in range(num_tasks):
        beq[starting_idx + task_idx] = task_sizes[task_idx]


def _record_rhs_differences_are_the_same(beq, starting_row_idx):
    pass


def _record_rhs_no_work_on_unwanted_items(beq, starting_row_idx):
    pass


def gen_beq(task_sizes, persons_potential, labor_cost=None):
    num_tasks = len(task_sizes)
    num_persons = len(persons_potential)
    if labor_cost is None:
        labor_cost = np.ones((num_persons, num_tasks))
    number_of_zeros = np.sum(labor_cost == np.inf)
    ret = np.zeros(num_tasks + number_of_zeros + num_persons)
    _record_rhs_contributions_make_whole_task(ret, 0, task_sizes)
    return ret


def solve(task_sizes, persons_potential, labor_cost=None):
    num_tasks = len(task_sizes)
    num_persons = len(persons_potential)
    if num_tasks == 0:
        return []
    if num_persons == 0:
        msg = "No persons to assign tasks to."
        raise ValueError(msg)
    interesting_solution_len = num_tasks * num_persons
    c = gen_c(task_sizes, persons_potential)
    Aub = gen_Aub(task_sizes, persons_potential)
    bub = gen_bub(task_sizes, persons_potential)
    Aeq = gen_Aeq(task_sizes, persons_potential, labor_cost)
    beq = gen_beq(task_sizes, persons_potential, labor_cost)
    solution = sp.optimize.linprog(c, Aub, bub, Aeq, beq)
    if not solution.success:
        msg = solution.message
        raise ValueError(msg)

    ret = solution.x[:interesting_solution_len].reshape(num_persons, num_tasks)
    return ret