Shevraar/jovabot

View on GitHub
modules/learn/__init__.py

Summary

Maintainability
A
0 mins
Test Coverage
# coding=utf-8
import json
import logging
import random
import re
import os
from fuzzywuzzy import process

try:
    import pymongo

    HAVE_MONGO_DB = True
except:
    HAVE_MONGO_DB = False

try:
    import redis

    HAVE_REDIS = True
except:
    HAVE_REDIS = False

try:
    from uwsgi import sharedarea_read, sharedarea_write, \
                      sharedarea_write32, sharedarea_read32

    HAVE_SHARED_AREA = True
except:
    HAVE_SHARED_AREA = False

use_dict_db = os.environ.get('USE_DICT_DB')
if use_dict_db and use_dict_db == '1':
    HAVE_DICT_DB = True
else:
    HAVE_DICT_DB = False


# noinspection PyMethodMayBeStatic
class JovaLearnNone(object):
    def __init__(self):
        pass

    def jova_learn(self, tit, tat):
        print('tit [{}] for tat[{}]'.format(tit, tat))
        return False

    def jova_keys(self):
        return None

    def jova_answer_for_key(self, key):
        print('answer for [{}]'.format(key))
        return None

    def clear(self):
        pass


class JovaLearnSharedMemory(object):
    def __init__(self):
        self.data = {}
        self.id = random.randint(10, 20)

    def jova_learn(self, tit, tat):
        if tit in self.data:
            self.data[tit].append(tat)
        else:
            self.data[tit] = [tat]
        return self.write_shared_area()

    def jova_keys(self):
        self.read_shared_area()
        return self.data

    def jova_answer_for_key(self, key):
        self.read_shared_area()
        if key in self.data:
            return random.choice(self.data[key])
        return None

    def write_shared_area(self):
        payload = json.dumps(self.data)

        # write payload len
        sharedarea_write32(self.id, 0, len(payload))

        # write payload
        sharedarea_write(self.id, 4, payload)

    def read_shared_area(self):
        # read payload len
        len_ = sharedarea_read32(self.id, 0)

        # read payload
        if len_ > 0:
            json_ = sharedarea_read(self.id, 4, len_)
            self.data = json.loads(json_)
        else:
            self.data = {}

    def clear(self):
        # write zero len
        sharedarea_write32(self.id, 0, 0)

    def get_all(self, key):
        self.read_shared_area()
        if key in self.data:
            return self.data[key]
        return []


class JovaLearnMongoDb(object):
    def __init__(self):
        self.client = pymongo.MongoClient()
        self.db = self.client.jovabot
        self.tit_for_tat = self.db.tit_for_tat

    def jova_learn(self, tit, tat):
        tit_tat = {"key": tit, "text": tat}
        self.tit_for_tat.insert_one(tit_tat)

    def jova_keys(self):
        return self.tit_for_tat.distinct("key")

    def jova_answer_for_key(self, key):
        all_ = self.get_all(key)
        if len(all_):
            return random.choice(self.get_all(key))
        return None

    def clear(self):
        self.tit_for_tat.drop()

    def get_all(self, key):
        return [x['text'] for x in self.tit_for_tat.find({"key": key})]


class JovaLearnRedis(object):
    def __init__(self):
        self.client = redis.Redis('localhost')

    def jova_learn(self, tit, tat):
        self.client.sadd(tit, tat)

    def jova_keys(self):
        pass

    def jova_answer_for_key(self, key):
        return self.client.srandmember(key)

    def clear(self):
        self.client.flushdb()

    def get_all(self, key):
        return self.client.smembers(key)


class JovaLearnDict(object):
    def __init__(self):
        self.learned = {}

    def jova_learn(self, tit, tat):
        if not self.learned.get(tit):
            self.learned[tit] = []
        self.learned[tit].append(tat)

    def jova_keys(self):
        return self.learned.keys()

    def jova_answer_for_key(self, key):
        return random.choice(self.learned.get(key))

    def clear(self):
        self.learned.clear()

    def get_all(self, key):
        return self.learned[key]


impl = None


def init():
    global impl

    if HAVE_MONGO_DB:
        impl = JovaLearnMongoDb()
        logging.debug('learn module uses mongo-db backend')
    elif HAVE_REDIS:
        impl = JovaLearnRedis()
        logging.debug('learn module uses redis backend')
    elif HAVE_SHARED_AREA:
        impl = JovaLearnSharedMemory()
        logging.debug('learn module uses uwsgi shared area backend')
    elif HAVE_DICT_DB:
        impl = JovaLearnDict()
        logging.debug('learn module uses dictionary backend')
    else:
        impl = JovaLearnNone()
        logging.debug('learn module not available')


def get_answer(message):
    if message.startswith('/'):
        return None
    logging.debug('{} :: message [{}]'.format(__file__, message))
    if 'se ti dico' in message and '/':
        return jova_learn(message)
    else:
        return jova_answer_learned(message), 'jovaize'


def jova_answer_learned(message):
    rx = r'jova,?\s(.+)$'
    m = re.match(rx, message)
    logging.debug('message [{}] :: match [{}]'.format(message, m))
    if not m:
        return None
    try:
        k = m.groups(1)[0]
        logging.debug('key => [{}]'.format(k))
        return jova_fuzzy_answer(k)
    except:
        logging.exception('jova_answer_learned error')

    return None


def jova_learn(message):
    rx = r'jova,?\sse ti (?:dico|dicono)\s([\w\s\?\!\']+)\stu rispondi\s([\w\s\?\!\']+)'
    m = re.match(rx, message)

    if not m:
        return None

    tokens = m.groups(1)
    if len(tokens) == 2 and len(tokens[0]) > 3:
        try:
            logging.info('learning to answer [{0}] to the trigger [{1}]'
                         .format(tokens[1], tokens[0]))
            impl.jova_learn(tokens[0], tokens[1])
            # file_id = BQADBAADkgADwThpBr2dKDwqptsXAg - SAITAMA_OK
            return 'BQADBAADkgADwThpBr2dKDwqptsXAg', 'sticker'
        except:
            logging.exception('jova_learn error')
    return None


def jova_fuzzy_answer(key):
    keys = impl.jova_keys()
    random.shuffle(keys)
    choosen_key = process.extractOne(key, keys)
    logging.debug('choosen_key => [{}]'.format(choosen_key))
    if choosen_key:
        if choosen_key[1] > 80:
            return impl.jova_answer_for_key(choosen_key[0])
        else:
            logging.warning('no close match for the provided key[{}]'.format(key))
    return None


def clear():
    impl.clear()


def get_all(key):
    return impl.get_all(key)