autobean_format/internal/sorting.py
File `sorting.py` has 276 lines of code (exceeds 250 allowed). Consider refactoring."""Sorts entities in a less disruptive way. ## Problems Sorting beancount entities may sound as simple as `sorted(entities)` but doing it properly isn't trivial. * Establishing total order may not be possible. * There may be undated entities (e.g. `include`) * Ideally we also sort by time if specified in meta. But they may not be consistently specified.* Swapping entries inside and outside a pushtag / pushmeta block is not allowed.* Ideally we would like to reorder as few entities as possible.* Ideally we would like to keep the original block structure. ## Solution In order to not disrupt existing formatting, we make the assumption that the majority of the file is already sortedand only a few entities need to be reordered. This assumption may not be true for all files, but when it isn't, wereally don't know how to best preserve the existing formatting. The sorting process is: 1. Partition the file into blocks based on surrounding blank lines and entities types (pushpop / undated / dated). 1. For each dated block, sort entities with prudent sort.1. Partition the blocks into compartments of blocks, split by pushpop blocks.1. For each compartment: 1. Sort blocks with prudent sort. The prudent sort process is: 1. Identify the longest non-decreasing subsequence of entities and split out the rest.1. Sort the rest with simple sort by dicating a total order.1. Merge the sorted rest into the non-decreasing subsequence. * When merging blocks, this may involve split some blocks when necessary. ## Alternatives Instead of sorting each compartment separately, we could allow swapping entities before `pushtag #foo` and after`poptag #foo`, but we don't do it as it may be over-complexing due to possibly interleaving pushes. Instead of sorting out-of-order entities with simple sort, we could recurse with the same method, but we don't do itas it may be over-complexing and degraded performance.""" import abcimport bisectimport decimalimport functoolsimport heapqimport itertoolsfrom typing import TYPE_CHECKING, Any, Generic, Iterable, Iterator, Optional, Sequence, TypeAlias, TypeVar, cast, get_argsfrom typing_extensions import Self from autobean_refactor import modelsfrom . import chronoif TYPE_CHECKING: from _typeshed import SupportsDunderLT, SupportsDunderGT _T = TypeVar('_T')_O = TypeVar('_O', bound='_Ordered')_Entry: TypeAlias = models.Balance | models.Open | models.Close | models.Commodity | models.Pad | models.Event | models.Query | models.Price | models.Note | models.Document | models.Custom | models.Transaction_CompartmentSplitter: TypeAlias = models.Pushtag | models.Poptag | models.Pushmeta | models.Popmeta | models.BlockComment_TopLevelEntitiy = models.Directive | models.BlockComment class _Ordered(Generic[_T], abc.ABC): def __init__(self, value: _T) -> None: self.value = value # Note: a.more_successor_permissive_than(b) may not imply !(b.can_go_before(a)) @abc.abstractmethod def more_successor_permissive_than(self, other: Self) -> bool: """Tests if successors permitted by this entity forms a proper superset of those permitted by the other entity.""" # Note: may not be transitive @abc.abstractmethod def can_go_before(self, other: Self) -> bool: """Tests if this entity can go before the other entity.""" @classmethod @abc.abstractmethod def min(cls, a: Self, b: Self) -> Self: """An associative function to summarize the lower bound for can_go_before. Formally: forall x: x.can_go_before(a) && x.can_go_before(b) <=> x.can_go_before(min(a, b)) """ @classmethod @abc.abstractmethod def max(cls, a: Self, b: Self) -> Self: """An associative function to summarize the upper bound for can_go_before. Formally: forall x: a.can_go_before(x) && b.can_go_before(x) <=> max(a, b).can_go_before(x) """ @classmethod @abc.abstractmethod def merge(cls, sorted: list[Self], unsorted: list[Self]) -> Iterable[Self]: """Merges a sorted list with an unsorted list into a sorted list. The original order of `sorted` must be preserved. """ @classmethod def sort(cls, values: list[Self]) -> list[Self]: if _is_ordered(values): return values sorted, unsorted = _split_sorted_unsorted(values) return list(cls.merge(sorted, unsorted)) def _is_ordered(entities: Sequence[_Ordered[_T]]) -> bool: if not entities: return True it = iter(entities) running_max = next(it) for entity in it: if not running_max.can_go_before(entity): return False running_max = running_max.max(running_max, entity) return True Function `_split_sorted_unsorted` has a Cognitive Complexity of 19 (exceeds 5 allowed). Consider refactoring.def _split_sorted_unsorted( entities: Sequence[_O],) -> tuple[list[_O], list[_O]]: # (running max index, entity index) -> (prev running max index, prev entity index) p = dict[tuple[int, int], tuple[int, int]]() m = list[tuple[int, int]]() # length -> (running max index, last entity index) for i, entity in enumerate(entities): # first element we cannot go after j = bisect.bisect_left(m, True, key=lambda item: not entities[item[0]].can_go_before(entity)) while j >= 0 and (j == len(m) or entity.more_successor_permissive_than(entities[m[j][0]])): running_max = i if j: if entity.max(entity, entities[m[j - 1][0]]) is not entity: running_max = m[j - 1][0] p[(running_max, i)] = m[j - 1] m[j:j+1] = [(running_max, i)] j -= 1 last = m[-1] if m else None sorted_i, sorted, unsorted = [], [], [] while last: sorted_i.append(last[1]) last = p.get(last) sorted_i.reverse() sorted = [entities[i] for i in sorted_i] j = 0 for i in range(len(entities)): if j < len(sorted_i) and i == sorted_i[j]: j += 1 else: unsorted.append(entities[i]) return sorted, unsorted def _get_entry_time(entry: _Entry) -> int | None: time = entry.meta.get('time') if isinstance(time, str): return chrono.try_normalize_timestring(entry.date, time) elif isinstance(time, decimal.Decimal): return chrono.try_normalize_timestamp(time) else: return None def _build_reversed_running_min(entities: Sequence[_O]) -> list[_O]: reversed_running_min = list(entities) for i in reversed(range(len(reversed_running_min) - 1)): reversed_running_min[i] = reversed_running_min[i].min( reversed_running_min[i], reversed_running_min[i + 1]) return reversed_running_min Function `_merge_entries` has a Cognitive Complexity of 11 (exceeds 5 allowed). Consider refactoring.def _merge_entries(sorted: Sequence['_OrderedEntry'], unsorted: Sequence['_OrderedEntry']) -> Iterator[Sequence['_OrderedEntry']]: cursor_sorted, cursor_unsorted = 0, 0 reversed_running_min_unsorted = _build_reversed_running_min(unsorted) while cursor_sorted < len(sorted) and cursor_unsorted < len(unsorted): prev_cursor_sorted = cursor_sorted while cursor_sorted < len(sorted) and sorted[cursor_sorted].can_go_before(reversed_running_min_unsorted[cursor_unsorted]): cursor_sorted += 1 if cursor_sorted > prev_cursor_sorted: yield sorted[prev_cursor_sorted:cursor_sorted] if cursor_sorted == len(sorted): break prev_cursor_unsorted = cursor_unsorted while cursor_unsorted < len(unsorted) and not sorted[cursor_sorted].can_go_before(reversed_running_min_unsorted[cursor_unsorted]): cursor_unsorted += 1 yield unsorted[prev_cursor_unsorted:cursor_unsorted] if cursor_sorted < len(sorted): yield sorted[cursor_sorted:] if cursor_unsorted < len(unsorted): yield unsorted[cursor_unsorted:] class _OrderedEntry(_Ordered[_Entry]): def __init__(self, entry: _Entry, initial_index: int) -> None: super().__init__(entry) self.date = entry.date self.time = _get_entry_time(entry) self.initial_index = initial_index def more_successor_permissive_than(self, other: Self) -> bool: return self.date < other.date or (self.date == other.date and ( self.time is None and other.time is not None or self.time is not None and other.time is not None and self.time < other.time)) def can_go_before(self, other: Self) -> bool: return self.date < other.date or (self.date == other.date and ( self.time is None or other.time is None or self.time <= other.time)) @classmethod def min(cls, a: Self, b: Self) -> Self: if a.date < b.date or (a.date == b.date and ( b.time is None or (a.time is not None and a.time < b.time))): return a return b @classmethod def max(cls, a: Self, b: Self) -> Self: if a.date < b.date or (a.date == b.date and ( a.time is None or (b.time is not None and a.time < b.time))): return b return a def simple_sort_key(self) -> Any: return (self.date, self.time or 0) @classmethod def merge(cls, sorted: list['_OrderedEntry'], unsorted: list['_OrderedEntry']) -> Iterator['_OrderedEntry']: unsorted.sort(key=lambda x: x.simple_sort_key()) return itertools.chain.from_iterable(_merge_entries(sorted, unsorted)) _Block = Sequence[_OrderedEntry] | Sequence[_TopLevelEntitiy] class _OrderedBlock(_Ordered[_Block]): def __init__(self, block: _Block) -> None: super().__init__(block) self.entries = cast(Sequence[_OrderedEntry], block) if isinstance(block[0], _OrderedEntry) else None @functools.cached_property def _max(self) -> Optional['_OrderedEntry']: if not self.entries: return None return functools.reduce(_OrderedEntry.max, self.entries) @functools.cached_property def _min(self) -> Optional['_OrderedEntry']: if not self.entries: return None return functools.reduce(_OrderedEntry.min, self.entries) @classmethod def min(cls, a: Self, b: Self) -> Self: if (not b._min or (a._min and _OrderedEntry.min(a._min, b._min) is a._min)): return a return b @classmethod def max(cls, a: Self, b: Self) -> Self: if (not a._max or (b._max and _OrderedEntry.max(a._max, b._max) is b._max)): return b return a def more_successor_permissive_than(self, other: Self) -> bool: return bool( not self._max and other._max or # undated is more permissive than dated self._max and other._max and self._max.more_successor_permissive_than(other._max) ) def can_go_before(self, other: Self) -> bool: return bool(not self._max or not other._min or self._max.can_go_before(other._min)) def simple_sort_key(self) -> Any: assert self._min and self._max # undated blocks must be part of sorted first_entry_index = -1 if self.entries: first_entry_index = self.entries[0].initial_index return (self._min.simple_sort_key(), self._max.simple_sort_key(), first_entry_index) @classmethodFunction `merge` has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring. def merge(cls, sorted: list['_OrderedBlock'], unsorted: list['_OrderedBlock']) -> Iterator['_OrderedBlock']: keyed_unsorted = [(block.simple_sort_key(), block) for block in unsorted] heapq.heapify(keyed_unsorted) cursor_sorted = 0 while cursor_sorted < len(sorted) and keyed_unsorted: if sorted[cursor_sorted].can_go_before(keyed_unsorted[0][-1]): yield sorted[cursor_sorted] cursor_sorted += 1 elif keyed_unsorted[0][-1].can_go_before(sorted[cursor_sorted]): yield heapq.heappop(keyed_unsorted)[-1] else: sorted_head_entries = sorted[cursor_sorted].entries cursor_sorted += 1 unsorted_block = heapq.heappop(keyed_unsorted) unsorted_head_entries = unsorted_block[-1].entries assert sorted_head_entries is not None assert unsorted_head_entries is not None split_blocks = _merge_entries(sorted_head_entries, unsorted_head_entries) for block in split_blocks: ordered_block = _OrderedBlock(block) heapq.heappush(keyed_unsorted, (ordered_block.simple_sort_key(), ordered_block)) if cursor_sorted < len(sorted): yield from sorted[cursor_sorted:] while keyed_unsorted: yield heapq.heappop(keyed_unsorted)[-1] def _sort_compartment(blocks: list[_OrderedBlock]) -> list[Sequence[_TopLevelEntitiy]]: blocks = _OrderedBlock.sort(blocks) sorted_blocks = list[Sequence[_TopLevelEntitiy]]() for ordered_block in blocks: if ordered_block.entries is not None: sorted_blocks.append([entry.value for entry in ordered_block.entries]) else: sorted_blocks.append(cast(Sequence[_TopLevelEntitiy], ordered_block.value)) return sorted_blocks Function `sort_blocks` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.def sort_blocks(blocks: Iterable[Sequence[_TopLevelEntitiy]]) -> list[Sequence[_TopLevelEntitiy]]: results = [] compartment = list[_OrderedBlock]() first_entry_index = 0 for block in blocks: if isinstance(block[0], get_args(_CompartmentSplitter)): results.extend(_sort_compartment(compartment)) compartment.clear() results.append(block) elif isinstance(block[0], get_args(_Entry)): # sort within block sorted_entries = _OrderedEntry.sort([ _OrderedEntry(entry, first_entry_index + offset) for offset, entry in enumerate(cast(list[_Entry], block)) ]) compartment.append(_OrderedBlock(sorted_entries)) else: compartment.append(_OrderedBlock(block)) first_entry_index += len(block) if compartment: results.extend(_sort_compartment(compartment)) return results