oeis/sequence.py
# -*- coding: utf-8 -*- #
#
# oeis/sequence.py
#
#
# MIT License
#
# Copyright (c) 2019 Brandon Gomes
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
"""
OEIS Sequences.
"""
# -------------- Standard Library -------------- #
import re
import inspect
from collections.abc import MutableMapping
from copy import deepcopy
from datetime import datetime
from itertools import chain, groupby, islice
from functools import partial
from typing import Union
# -------------- External Library -------------- #
from wrapt import ObjectProxy
# ---------------- oeis Library ---------------- #
from .base import name as oeis_name
from .base import number as oeis_number
from .base import find_references, MissingID
from .client import entry as oeis_entry
from .client import bfile as oeis_bfile
from .util import is_int, value_or, empty_generator, Box, BoxList
__all__ = ("Sequence", "SequenceFactory", "Registry")
def _slice_details(f, *args, **kwargs):
"""
Get Slice Details of a Function.
:param f:
:param args:
:param kwargs:
:return:
"""
if inspect.isgeneratorfunction(f):
gen = f(*args, **kwargs)
return type(gen), 3, partial(islice, gen)
if inspect.isgenerator(f):
return type(f), 3, partial(islice, f)
if inspect.ismethod(f) or inspect.isfunction(f):
signature = inspect.signature(f)
argument_count = len(signature.parameters)
if argument_count in (1, 3):
for parameter in signature.parameters.values():
if parameter.kind != inspect.Parameter.POSITIONAL_OR_KEYWORD:
return type(f), 0, None
return type(f), argument_count, f
return type(f), 0, None
class Sequence(ObjectProxy):
"""
OEIS Sequence Function Wrapper.
"""
def __init__(self, number, generator=None, *, meta=None):
"""
Initialize Proxy.
:param number:
:param generator:
:param meta:
"""
super().__init__(value_or(generator, empty_generator))
self._self_number = number
self._self_meta = value_or(meta, Box())
@classmethod
def from_dict(cls, meta):
"""
Create Sequence from Metadata Dictionary.
:param meta:
:return:
"""
return cls(oeis_number(meta["number"]), meta=meta)
@classmethod
def from_sequence(cls, sequence, new_generator=None, *, new_meta=None, copy=False):
"""
Generate Sequence from Existing Sequence.
:param sequence:
:param new_generator:
:param new_meta:
:param copy:
:return:
"""
if not (new_generator or new_meta):
return deepcopy(sequence) if copy else sequence
generator = value_or(new_generator, sequence.__wrapped__)
meta = value_or(new_meta, sequence.meta)
return cls(sequence.number, generator, meta=meta)
def __call__(self, *args, **kwargs):
"""
Call Internal Generator.
:param args:
:param kwargs:
:return:
"""
return self.__wrapped__(*args, **kwargs)
def get(
self,
index,
*args,
cache_result=False,
ignore_offset=False,
ignore_sample=False,
**kwargs
):
"""
Get Index into Sequence.
:param index:
:param args:
:param cache_result:
:param ignore_offset:
:param ignore_sample:
:param kwargs:
:return:
"""
if cache_result:
return NotImplemented
if is_int(index):
index = slice(index)
if isinstance(index, slice):
start, stop, step = index.start, index.stop, index.step
if not ignore_offset:
start = start - self.offset
if stop:
stop = stop - self.offset
else:
raise TypeError("expected integer or slice")
gen_type, argument_count, gen = _slice_details(self.generator, *args, **kwargs)
if not ignore_sample:
sample_length = len(self.sample)
after_sample = stop - sample_length
if start < sample_length:
yield from self.sample[start : min(stop, sample_length) : step]
if after_sample < 0:
return
start = sample_length
stop = stop
if argument_count == 3:
yield from gen(start, stop, step)
if argument_count == 1:
if stop is None and step is None:
yield gen(start)
yield from map(gen, range(start, stop, step))
raise TypeError("invalid generator")
def __getitem__(self, index, *args):
"""
Get Index into Sequence.
:param index:
:param args:
:return:
"""
return self.get(index, *args)
@property
def number(self):
"""Get Sequence Number."""
return self._self_number
@property
def short_name(self):
"""Get Sequence Short Name."""
return "A{number}".format(number=self.number)
@property
def name(self):
"""Get Sequence Full Name."""
return "A{number:06d}".format(number=self.number)
@property
def website(self):
"""Get Website for Sequence."""
return "https://oeis.org/{}".format(self.name)
@property
def __oeis_name__(self):
"""Standard OEIS Name."""
return self.name
@property
def __oeis_number__(self):
"""Standard OEIS Number."""
return self.number
@property
def meta(self):
"""Get Full Metadata Box."""
return self._self_meta
@property
def offset(self):
"""Get Sequence Offset."""
return int(self.meta.offset.split(",")[0])
@property
def description(self):
"""Get Sequence Description."""
return self.meta.name
@property
def sample(self):
"""Get Cached Sample of the Sequence."""
if not hasattr(self, "_self_sample"):
self._self_sample = list(map(int, self.meta.data.split(",")))
self._self_with_bfile = False
return self._self_sample
def sample_append(self, value):
"""Append to Sequence Sample."""
self.sample.append(value)
def sample_extend(self, values):
"""Extend Sample Sequence."""
self.sample.extend(values)
def sample_reset(self):
"""Reset Sequence Sample to Metadata Default."""
if hasattr(self, "_self_sample"):
del self._self_sample
self._self_with_bfile = False
@property
def with_bfile(self):
"""Check if B-File is Loaded into Sample."""
if not hasattr(self, "_self_with_bfile"):
self._self_with_bfile = False
return self._self_with_bfile
@property
def formulas(self):
"""Get Formulas for Sequence."""
return self.meta.formula
@classmethod
def _parse_programs(cls, others):
"""Parse Sample Program List."""
if not hasattr(cls, "_program_regex"):
cls._program_regex = re.compile(
r"^\(([^()]*)+?\).*$", re.MULTILINE | re.UNICODE
)
last_key = None
def key(line):
nonlocal last_key
try:
last_key = cls._program_regex.match(line).groups()[0]
except AttributeError:
return last_key
return last_key
clean = lambda k: lambda s: s.replace("({})".format(k), "").strip()
return {k.lower(): tuple(map(clean(k), g)) for k, g in groupby(others, key=key)}
@property
def programs(self):
"""Get OEIS Sample Programs."""
return Box(
maple=self.meta.maple,
mathematica=self.meta.mathematica,
**self._parse_programs(self.meta.program)
)
@property
def keywords(self):
"""Get OEIS Keywords."""
return self.meta.keyword.split(",")
@property
def recycled(self):
"""Check if Sequence is Recycled."""
return "recycled" in self.keywords
@property
def modified(self):
"""Get Last Modified Time."""
return datetime.fromisoformat(self.meta.time)
@property
def created(self):
"""Get Created Time."""
return datetime.fromisoformat(self.meta.created)
@classmethod
def _find_chain_references(cls, text):
"""Find references to OEIS Sequences."""
return tuple(chain.from_iterable(find_references(text)))
@classmethod
def _find_xref_keys(cls, xref):
"""Find Cross Reference Keys."""
return tuple((cls._find_chain_references(entry), entry) for entry in xref)
@property
def cross_references(self):
"""Get Cross References for Sequence."""
if not hasattr(self, "_self_xref"):
xref = self.meta.xref
if xref:
self._self_xref = self._find_xref_keys(xref)
else:
self._self_xref = tuple()
return self._self_xref
@classmethod
def _parse_comments(cls, comments):
"""Parse Comments for References."""
for comment in comments:
text = comment.strip()
yield dict(text=text, references=cls._find_chain_references(text))
@property
def comments(self):
"""Get Comments for Sequence."""
if not hasattr(self, "_self_comments"):
comments = self.meta.comment
if comments:
self._self_comments = BoxList(tuple(self._parse_comments(comments)))
else:
self._self_comments = BoxList()
return self._self_comments
@property
def finite(self):
"""Get Finiteness of Sequence."""
return NotImplemented
class SequenceFactory:
"""
OEIS Sequence Factory.
"""
__slots__ = ("session", "cache", "always_cache")
def __init__(self, *, factory=dict, session=None, always_cache=False):
"""
Initialize Sequence Factory.
:param factory:
:param session:
:param always_cache:
"""
self.cache = factory()
self.session = session
self.always_cache = always_cache
@classmethod
def from_cache(cls, cache, *, session=None, always_cache=False):
"""
Make Sequence Factory from Pre-loaded Cache.
:param cache:
:param session:
:param always_cache:
:return:
"""
return cls(factory=lambda: cache, session=session, always_cache=always_cache)
def __reduce__(self):
"""
Reduce Sequence Factory for Pickling.
:return:
"""
return self.__class__, (self.cache, self.session, self.always_cache)
def __eq__(self, other) -> bool:
"""
Equality of SequenceFactory.
:param other:
:return:
"""
if isinstance(other, type(self)):
return self.__reduce__() == other.__reduce__()
return NotImplemented
def clear(self):
"""
Clear Cache.
:return:
"""
self.cache.clear()
def __contains__(self, item) -> bool:
"""
Check if item is Stored in the Factory Cache.
:param item:
:return:
"""
return item in self.cache
def load_meta(self, key, *, check_name=False):
"""Load Metadata Dictionary from Loader."""
return oeis_entry(key, self.session, check_name=check_name)
def extend_from_bfile(self, key, sequence, *, check_name=False):
"""
Extend Sample Data for Sequence from B-File if possible.
:param key:
:param sequence:
:param check_name:
:return:
"""
data = oeis_bfile(key, self.session, check_name=check_name)
if data:
sequence.sample_extend(data[len(sequence.sample) :])
sequence._self_with_bfile = True
return sequence
def load(self, key, *, cache_result=True, with_bfile=False):
"""
Load Sequence with Default Caching.
:param key:
:param cache_result:
:param with_bfile:
:return:
"""
key = oeis_name(key)
try:
previous = self.cache[key]
if with_bfile and not previous.with_bfile:
return self.extend_from_bfile(key, previous, check_name=False)
return previous
except KeyError:
pass
meta = self.load_meta(key, check_name=False)
if not meta:
raise MissingID.from_key(key)
if with_bfile:
entry = self.extend_from_bfile(
key, Sequence.from_dict(meta), check_name=False
)
else:
entry = Sequence.from_dict(meta)
if cache_result or self.always_cache:
self.cache[key] = entry
return self.cache[key]
return entry
def safe_load(self, *args, **kwargs) -> Union[Sequence, None]:
""""""
try:
return self.load(*args, **kwargs)
except MissingID:
return None
def __call__(self, key, *args, cache_result=False, **kwargs):
"""
Load Sequence without Caching by Default.
:param key:
:param args:
:param cache_result:
:param kwargs:
:return:
"""
return self.load(
key, *args, cache_result=cache_result or self.always_cache, **kwargs
)
class Registry(MutableMapping):
"""
Sequence Registry.
"""
__slots__ = ("_factory",)
@classmethod
def from_factory(cls, factory):
"""
Make Registry from Pre-existing Factory.
:param factory:
:return:
"""
obj = object.__new__(cls)
obj._factory = factory
return obj
def __new__(cls, *, cache_factory=dict, session=None):
"""
Make new Registry.
:param cache_factory:
:param session:
:return:
"""
return cls.from_factory(SequenceFactory(factory=cache_factory, session=session))
def __repr__(self):
"""Get Registry Representation."""
return "{cls}({cache})".format(cls=type(self).__name__, cache=tuple(self.cache))
@property
def cache(self):
"""Get Factory Cache."""
return self._factory.cache
def __getattr__(self, name):
"""Get Element from Cache as Attribute."""
try:
return self[name]
except KeyError:
raise AttributeError("Missing Key: {}.".format(name))
def __contains__(self, key):
"""Check Containment of OEIS Key."""
return oeis_name(key) in self.cache
def __getitem__(self, key):
"""Get Element from Internal Cache."""
return self.cache[oeis_name(key)]
def __setitem__(self, key, value):
"""Set Element of Internal Cache."""
self.cache[key] = value
def __delitem__(self, key):
"""Delete Element from Internal Cache."""
del self.cache[oeis_name(key)]
def __iter__(self):
"""Return Iterator to Internal Cache."""
return iter(self.cache)
def __len__(self):
"""Get Length of Internal Cache."""
return len(self.cache)
def clear(self):
"""Clear Factory."""
return self._factory.clear()
def register(self, key, generator=None, *, meta=None):
"""
Register Sequence through Factory.
:param key:
:param generator:
:param meta:
:return:
"""
try:
cached_sequence = self[key]
if meta and cached_sequence.meta == meta:
return Sequence.from_sequence(cached_sequence, generator)
generator = value_or(generator, cached_sequence.__wrapped__)
except KeyError:
pass
key = oeis_name(key)
if meta is True:
meta = self._factory.load_meta(key, check_name=False)
number = oeis_number(key)
if meta and number != meta.number:
raise ValueError(
"OEIS indices don't match: {} should be {}".format(number, meta.number)
)
self[key] = Sequence(number, generator=generator, meta=meta)
return self[key]