natlas/cyclicprng

View on GitHub
cyclicprng.py

Summary

Maintainability
A
0 mins
Test Coverage
from threading import Lock
import random
import sympy
from datetime import datetime


mutex = Lock()


def modexp(b, e, m):
    bits = [(e >> bit) & 1 for bit in range(e.bit_length())]
    s = b
    v = 1
    for bit in bits:
        if bit == 1:
            v *= s
            v %= m
        s *= s
        s %= m
    return v


class CyclicPRNG:
    """
        For more information about the existence of this class and why it does what it does,
        see https://github.com/natlas/natlas/wiki/Host-Coverage-Scanning-Strategy
        Edge case behavior:
            size < 1 -> Raise ValueError
            size = 1 -> Always return 1
            size = 2 -> The generator is always 2 (equivalent to consistent=true)
    """

    size = 0
    modulus = 0
    modulus_factors = {}
    generator = 0
    start = 0
    end = 0
    current = 0
    cycle_start_time = None
    completed_cycle_count = 0
    consistent = False
    notify = []

    def __init__(
        self, cycle_size: int, consistent: bool = False, event_handler: callable = None
    ):
        """
            Initialize PRNG that restarts after cycle_size calls
        """
        self.size = cycle_size
        if self.size < 1:
            raise ValueError(
                "Random Number Generator must be given a positive non-zero integer"
            )
        if event_handler:
            self.register_event_handler(event_handler)

        self._init_cyclic_group()
        self._init_generator()
        self._init_permutation()
        self.cycle_start_time = datetime.utcnow()
        self.consistent = consistent
        self._emit_event("init")

    def register_event_handler(self, func: callable):
        """
            A simple event handler to be notified of events in the cyclic PRNG
            func must be a callable that takes a single parameter
        """
        if not callable(func):
            raise TypeError("Event handler must be callable")
        self.notify.append(func)

    def unregister_event_handler(self, func: callable):
        """
            Removes an event handler from the cyclic PRNG
        """
        if func in self.notify:
            self.notify.remove(func)

    def clear_event_handlers(self):
        """
            Clear all event handlers from the cyclic PRNG
        """
        self.notify.clear()

    def _emit_event(self, event):
        for func in self.notify:
            func(event)

    def _init_cyclic_group(self):
        def next_prime(num):
            num = num + 1 if (num % 2) == 0 else num + 2
            while not sympy.isprime(num):
                num = num + 2
            return num

        self.modulus = next_prime(self.size)
        self.modulus_factors = sympy.factorint(self.modulus - 1)

    def _init_generator(self):
        """
            Find a generator for the whole cyclic group.
        """
        found = False
        base = 0
        """
            If modulus is 3 then the only generator we can use is 2.
            Otherwise we infinite loop because always self.generator == base
        """
        if self.modulus == 3:
            self.generator = 2
            return
        while not found:
            base = random.randint(2, self.modulus - 1)
            found = self.generator != base and all(
                modexp(base, int((self.modulus - 1) / factor), self.modulus) != 1
                for factor in self.modulus_factors
            )
        self.generator = base

    def _cycle_until_in_range(self, element):
        """
            Cycle the element until it is self.size or less
        """
        while element > self.size:
            element = (element * self.generator) % self.modulus
        return element

    def _init_permutation(self):
        """
            Create a new permutation of the scope
        """
        exp = random.randint(2, self.modulus - 1)
        self.end = self._cycle_until_in_range(modexp(self.generator, exp, self.modulus))
        self.start = self._cycle_until_in_range(
            (self.end * self.generator) % self.modulus
        )
        self.current = self.start

    def _restart_cycle(self):
        """
            Restart PRNG Cycle
        """
        if self.consistent:
            self.current = self.start
        else:
            self._init_generator()
            self._init_permutation()
        self.cycle_start_time = datetime.utcnow()
        self.completed_cycle_count += 1
        self._emit_event("restart")

    def get_random(self):
        """
            Gets the next random number from the permutation
        """
        if self.size <= 1:
            return 1
        mutex.acquire()
        value = self.current
        self.current = self._cycle_until_in_range(
            (self.current * self.generator) % self.modulus
        )
        if value == self.end:
            self._restart_cycle()
        mutex.release()
        return value