codeprep/util/misc.py
# SPDX-FileCopyrightText: 2020 Hlib Babii <hlibbabii@gmail.com>
#
# SPDX-License-Identifier: Apache-2.0
import multiprocessing
from heapq import heappush, heappop, heapify
import itertools
from typing import Dict, Tuple, List, Optional, Generator, TypeVar, Iterable
def merge_dicts_(dict1, dict2) -> Tuple[Dict, List]:
"""
this method returns modified `dict1`! and new words are added to the dictionary
>>> dict1 = {"a": 3, "b": 4}
>>> dict2 = {"b": 5, "c": 6}
>>> merge_dicts_(dict1, dict2)
({'a': 3, 'b': 9, 'c': 6}, ['c'])
>>> dict1
{'a': 3, 'b': 9, 'c': 6}
"""
new_words = []
for k, v in dict2.items():
if k not in dict1:
dict1[k] = v
new_words.append(k)
else:
dict1[k] = dict1[k] + v
return dict1, new_words
class NonAtomicCounter(object):
"""
>>> counter = NonAtomicCounter(10)
>>> counter.inc()
11
>>> counter.dec()
10
>>> counter.compare_and_dec(10)
True
>>> counter.value
9
>>> counter.value = 20
>>> counter.get_and_dec()
20
>>> counter.value
19
"""
def __init__(self, v: int=0):
self.val = v
def inc(self) -> int:
self.val += 1
return self.val
def dec(self) -> int:
self.val -= 1
return self.val
def compare_and_dec(self, v: int) -> bool:
result = (self.val == v)
self.dec()
return result
def get_and_dec(self) -> int:
result = self.val
self.dec()
return result
@property
def value(self) -> int:
return self.val
@value.setter
def value(self, v: int) -> None:
self.val = v
class AtomicInteger(object):
def __init__(self, v=0):
self._lock = multiprocessing.Lock()
self._queue = multiprocessing.Queue()
for i in range(v):
self._queue.put(1)
def inc(self):
with self._lock:
self._queue.put(1)
return self._queue.qsize()
def dec(self):
with self._lock:
self._queue.get()
return self._queue.qsize()
def compare_and_dec(self, val):
with self._lock:
result = self._queue.qsize() == val
self._queue.get()
return result
def get_and_dec(self):
with self._lock:
result = self._queue.qsize()
self._queue.get()
return result
@property
def value(self):
with self._lock:
return self._queue.qsize()
@value.setter
def value(self, v):
with self._lock:
self._queue = multiprocessing.Queue()
for i in range(v):
self._queue.put(1)
class PriorityCounter(object):
REMOVED = '<removed-task>' # placeholder for a removed task
def __init__(self, d: Dict, automatic_count: bool=True):
self.counter = itertools.count() if automatic_count else None
self.pq = [[(-value, next(self.counter)) if self.counter else (-value[0], value[1]), key] for key, value in d.items()] # list of entries arranged in a heap
heapify(self.pq)
self.entry_finder = {entry[1]: entry for entry in self.pq} # mapping of tasks to entries
def add(self, pair, to_add: int, c: Optional[int]=None):
'Add a new task or update the priority of an existing task'
if (self.counter is None) == (c is None):
raise ValueError("Either counter should be set, or count argument should be passed!")
count = next(self.counter) if self.counter else c
to_add = -to_add
if pair in self.entry_finder:
entry = self.entry_finder[pair]
to_add = entry[0][0] + to_add
self.remove_task(pair)
if to_add != 0:
entry = [(to_add, count), pair]
self.entry_finder[pair] = entry
heappush(self.pq, entry)
def remove_task(self, task):
'Mark an existing task as REMOVED. Raise KeyError if not found.'
entry = self.entry_finder.pop(task)
entry[-1] = PriorityCounter.REMOVED
def pop_pair(self):
'Remove and return the lowest priority task. Raise KeyError if empty.'
while self.pq:
(priority, count), pair = heappop(self.pq)
if pair is not PriorityCounter.REMOVED:
del self.entry_finder[pair]
return pair, -priority
raise KeyError('pop from an empty priority queue')
import sys
from numbers import Number
from collections import Set, Mapping, deque
# From https://stackoverflow.com/a/30316760:
def getsize(obj):
zero_depth_bases = (str, bytes, Number, range, bytearray)
iteritems = 'items'
def _getsize(obj_0):
"""Recursively iterate to sum size of object & members."""
_seen_ids = set()
def inner(obj):
obj_id = id(obj)
if obj_id in _seen_ids:
return 0
_seen_ids.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, zero_depth_bases):
pass # bypass remaining control flow and return
elif isinstance(obj, (tuple, list, Set, deque)):
size += sum(inner(i) for i in obj)
elif isinstance(obj, Mapping) or hasattr(obj, iteritems):
size += sum(inner(k) + inner(v) for k, v in getattr(obj, iteritems)())
# Check for custom object instances - may subclass above too
if hasattr(obj, '__dict__'):
size += inner(vars(obj))
if hasattr(obj, '__slots__'): # can have __slots__ with __dict__
size += sum(inner(getattr(obj, s)) for s in obj.__slots__ if hasattr(obj, s))
return size
return inner(obj_0)
return _getsize(obj)
def is_python_3_6_and_higher():
python_version = sys.version_info
return python_version[0] >= 3 and python_version[1] >= 6
def create_chunk_generator(total: int, n_chunks: int) -> Generator[int, None, None]:
min_elms_in_chunk = total // n_chunks
for i in range(min_elms_in_chunk):
for j in range(n_chunks):
yield j
for i in range(total % n_chunks):
yield i
def groupify(all: List, n_chunks: int) -> List[List]:
"""
>>> groupify([0, 1, 2, 3, 4, 5, 6], 3)
[[0, 3, 6], [1, 4], [2, 5]]
>>> groupify([0, 1, 2, 3, 4, 5, 6], 300)
[[0], [1], [2], [3], [4], [5], [6]]
>>> groupify([], 300)
[]
>>> result = groupify(list(range(100 * 1000 + 17)), 100)
>>> len(result[0])
1001
>>> len(result[17])
1000
"""
groups = [[] for _ in range(n_chunks if len(all) >= n_chunks else len(all))]
chunk_gen = create_chunk_generator(len(all), n_chunks)
for elm, label in zip(all, chunk_gen):
groups[label].append(elm)
return groups
def to_non_literal_str(word:str) -> str:
return word.encode().decode("unicode-escape")
def to_literal_str(word: str) -> str:
return word.encode("unicode-escape").decode()
START_ERROR_COLOR = '\033[31m'
END_ERROR_COLOR = '\033[0m'
IntOrFloat = TypeVar('IntOrFloat', int, float)
def cum_sum(sequence: Iterable[IntOrFloat]) -> List[IntOrFloat]:
"""
>>> cum_sum([])
[]
>>> cum_sum([1, 4, 5])
[1, 5, 10]
"""
output = []
sum = 0
for element in sequence:
sum += element
output.append(sum)
return output