driver27/streak.py
from operator import le, ge, eq, lt, gt
import re
from django.db.models import F
def build_dict(seq, key):
return dict((d[key], dict(d, index=index)) for (index, d) in enumerate(seq))
class Streak(object):
results = None
max_streak = False
def __init__(self, results, unique_by_race=False, max_streak=False):
self.results = results
self.max_streak = max_streak
self.unique_by_race = unique_by_race
@staticmethod
def builtin_function_dict():
return {'lte': le,
'gte': ge,
'eq': eq,
'exact': eq,
'lt': lt,
'gt': gt
}
@staticmethod
def get_builtin_function(builtin_key):
return Streak.builtin_function_dict().get(builtin_key, None)
@staticmethod
def validate_syntax_filter(current_filter):
if not re.match(r'^(\w)+$', current_filter):
raise Exception('Invalid streak filter syntax')
@staticmethod
def split_filter(curr_filter):
split_char = '__'
return curr_filter.split(split_char)
@staticmethod
def get_comparison_position(filter_list):
return len(filter_list) - 1
@classmethod
def get_comparison_item(cls, filter_list):
comparison_position = cls.get_comparison_position(filter_list)
return filter_list[comparison_position]
@classmethod
def exists_comparison(cls, filter_list):
comparison_position = cls.get_comparison_position(filter_list)
return filter_list[comparison_position] in Streak.builtin_function_dict()
@classmethod
def convert_filter(cls, filter_key, filter_value):
if filter_key == 'race.fastest_car' and isinstance(filter_value, F) \
and filter_value.name == 'seat':
filter_key = 'fastest_lap'
filter_value = True
return filter_key, filter_value
@classmethod
def exec_comparison(cls, result, filter_list, filter_value):
comparison_key = 'eq'
if cls.exists_comparison(filter_list):
comparison_key = cls.get_comparison_item(filter_list)
filter_list.pop(cls.get_comparison_position(filter_list))
filter_key = '.'.join(filter_list)
filter_key, filter_value = cls.convert_filter(filter_key, filter_value)
result_attr = getattr(result, filter_key)
if isinstance(filter_value, F):
filter_value = filter_value.name
filter_value = getattr(result, filter_value)
builtin_function = cls.get_builtin_function(comparison_key)
return builtin_function(result_attr, filter_value) if builtin_function else False
@classmethod
def checked_filters(cls, result, filters):
passed_filter = True # until is a false condition, filter is passed
for curr_filter in filters:
filter_value = filters[curr_filter]
filter_list = cls.split_filter(curr_filter)
comparison_result = cls.exec_comparison(result, filter_list, filter_value)
if not comparison_result:
passed_filter = False
break
return passed_filter
def get_results_by_race(self):
results_by_race = []
race_order = 0
for result in self.results:
race_str = 'race_{id}'.format(id=result.race_id)
race_indexes = build_dict(results_by_race, key="race_id")
if race_str not in race_indexes:
results_by_race.append(
{
'race_id': race_str,
'order': race_order,
'results': []
}
)
race_order += 1
results_by_race[-1]['results'].append(result)
else:
race_index = race_indexes[race_str]['index']
results_by_race[race_index]['results'].append(result)
return sorted(results_by_race, key=lambda x: x['order'])
def run_unique_by_race(self, filters):
results_by_race = self.get_results_by_race()
count = 0
max_count = 0
for race_results in results_by_race:
is_ok = any([self.checked_filters(result, filters) for result in race_results['results']])
if not is_ok:
if self.max_streak:
count = 0
continue
else:
break
count += 1
if self.max_streak and count > max_count:
max_count = count
return max_count if self.max_streak else count
def run(self, filters):
if self.unique_by_race:
return self.run_unique_by_race(filters)
count = 0
max_count = 0
for result in self.results:
is_ok = self.checked_filters(result, filters)
if not is_ok:
if self.max_streak:
count = 0
continue
else:
break
count += 1
if self.max_streak and count > max_count:
max_count = count
return max_count if self.max_streak else count