kianxineki/basicevents

View on GitHub
basicevents.py

Summary

Maintainability
A
2 hrs
Test Coverage
from __future__ import print_function
import threading
import traceback
from time import sleep
from multiprocessing import Queue, Process, cpu_count
import signal


class Events(object):
    subs = {}
    queue = Queue()
    logger = print

    @staticmethod
    def _run_event(event, *args, **kwargs):
        try:
            for func in Events.subs[event]:
                try:
                    func(*args, **kwargs)
                except:
                    Events.logger(traceback.format_exc())
        except:
            pass

    @staticmethod
    def add_subscribe(event, func):
        if event not in Events.subs:
            Events.subs[event] = []
        Events.subs[event].append(func)

    @staticmethod
    def subscribe(event):
        def wrap_function(func):
            Events.add_subscribe(event, func)
            return func
        return wrap_function

    @staticmethod
    def send_queue(*args, **kwargs):
        Events.queue.put((args, kwargs))

    @staticmethod
    def send_thread(*args, **kwargs):
        threading.Thread(target=Events._run_event,
                         args=args, kwargs=kwargs).start()

    @staticmethod
    def send_blocking(*args, **kwargs):
        Events._run_event(*args, **kwargs)

    # default send
    send = send_queue


def __run_queue(stop_function=lambda: True):
    proccess_queue = True

    def signal_handler(signal, frame):
        def waiting_dead(stop_function):
            while True:
                if stop_function():
                    print("send stop")
                    for x in range(cpu_count()):
                        send("STOP")
                    break
                else:
                    sleep(2)
        print('basicevent stopping')
        Process(target=waiting_dead, args=(stop_function,)).start()

    signal.signal(signal.SIGINT, signal_handler)
    # os.setpgrp()  # kill non propagate
    while proccess_queue:
        args, kwargs = Events.queue.get()

        if args[0] == "STOP":
            proccess_queue = False
            Events.logger("basicevent stopped")
        else:
            Events._run_event(*args, **kwargs)


def run(stop_function=lambda: True):
    for x in range(cpu_count()):
        Process(target=__run_queue, args=(stop_function,)).start()


# deprecated
events = Events

# avoids having to import Events
add_subscribe = Events.add_subscribe
subscribe = Events.subscribe
send = Events.send
send_queue = Events.send_queue
send_thread = Events.send_thread
send_blocking = Events.send_blocking