oujago/NumpyDL

View on GitHub
applications/chatbot/model.py

Summary

Maintainability
F
1 wk
Test Coverage
# -*- coding: utf-8 -*-


import os
import json
import re
import npdl
import numpy as np
from npdl.initializations import _one

token2idx_path = "data/token2idx.json"
idx2token_path = "data/idx2token.json"
param_path = "data/params.npy"
max_sent_size = np.int32(50)
idx_start = np.int32(1)
idx_end = np.int32(2)
idx_unk = np.int32(3)  # unknown

token_start = "<start>"
token_end = "<end>"
token_unk = "<unk>"

if not os.path.exists(token2idx_path) or \
        not os.path.exists(idx2token_path) or \
        not os.path.exists(param_path):
    raise ValueError('Please download pre-trained models and put them in "data" directory.')


class Utils:
    token2idx = json.load(open(token2idx_path))
    idx2token = json.load(open(idx2token_path))

    @staticmethod
    def clearn_str(s):
        s = re.sub(r"\s+", " ", s)
        s = re.sub(r"\-", "", s)
        return s

    @staticmethod
    def load_params(hi):
        embed_words, \
        linear_weights, \
        en_lstm1_W, en_lstm1_U, en_lstm1_b, \
        en_lstm2_W, en_lstm2_U, en_lstm2_b, \
        de_lstm1_W, de_lstm1_U, de_lstm1_b, \
        de_lstm2_W, de_lstm2_U, de_lstm2_b = np.load(param_path)

        params = {'embed_words': embed_words,

                  'en_lstm1': {'U_f': en_lstm1_W[:, : hi * 1],
                               'U_i': en_lstm1_W[:, hi * 1: hi * 2],
                               'U_o': en_lstm1_W[:, hi * 2: hi * 3],
                               'U_g': en_lstm1_W[:, hi * 3:],

                               'W_f': en_lstm1_U[:, : hi * 1],
                               'W_i': en_lstm1_U[:, hi * 1: hi * 2],
                               'W_o': en_lstm1_U[:, hi * 2: hi * 3],
                               'W_g': en_lstm1_U[:, hi * 3:],

                               'b_f': en_lstm1_b[: hi * 1],
                               'b_i': en_lstm1_b[hi * 1: hi * 2],
                               'b_o': en_lstm1_b[hi * 2: hi * 3],
                               'b_g': en_lstm1_b[hi * 3:]},

                  'en_lstm2': {'U_f': en_lstm2_W[:, : hi * 1],
                               'U_i': en_lstm2_W[:, hi * 1: hi * 2],
                               'U_o': en_lstm2_W[:, hi * 2: hi * 3],
                               'U_g': en_lstm2_W[:, hi * 3:],

                               'W_f': en_lstm2_U[:, : hi * 1],
                               'W_i': en_lstm2_U[:, hi * 1: hi * 2],
                               'W_o': en_lstm2_U[:, hi * 2: hi * 3],
                               'W_g': en_lstm2_U[:, hi * 3:],

                               'b_f': en_lstm2_b[: hi * 1],
                               'b_i': en_lstm2_b[hi * 1: hi * 2],
                               'b_o': en_lstm2_b[hi * 2: hi * 3],
                               'b_g': en_lstm2_b[hi * 3:]},

                  'de_lstm1': {'U_f': de_lstm1_W[:, : hi * 1],
                               'U_i': de_lstm1_W[:, hi * 1: hi * 2],
                               'U_o': de_lstm1_W[:, hi * 2: hi * 3],
                               'U_g': de_lstm1_W[:, hi * 3:],

                               'W_f': de_lstm1_U[:, : hi * 1],
                               'W_i': de_lstm1_U[:, hi * 1: hi * 2],
                               'W_o': de_lstm1_U[:, hi * 2: hi * 3],
                               'W_g': de_lstm1_U[:, hi * 3:],

                               'b_f': de_lstm1_b[: hi * 1],
                               'b_i': de_lstm1_b[hi * 1: hi * 2],
                               'b_o': de_lstm1_b[hi * 2: hi * 3],
                               'b_g': de_lstm1_b[hi * 3:]},

                  'de_lstm2': {'U_f': de_lstm2_W[:, : hi * 1],
                               'U_i': de_lstm2_W[:, hi * 1: hi * 2],
                               'U_o': de_lstm2_W[:, hi * 2: hi * 3],
                               'U_g': de_lstm2_W[:, hi * 3:],

                               'W_f': de_lstm2_U[:, : hi * 1],
                               'W_i': de_lstm2_U[:, hi * 1: hi * 2],
                               'W_o': de_lstm2_U[:, hi * 2: hi * 3],
                               'W_g': de_lstm2_U[:, hi * 3:],

                               'b_f': de_lstm2_b[: hi * 1],
                               'b_i': de_lstm2_b[hi * 1: hi * 2],
                               'b_o': de_lstm2_b[hi * 2: hi * 3],
                               'b_g': de_lstm2_b[hi * 3:]},

                  'linear': linear_weights}

        return params

    @staticmethod
    def tokenize(s):
        """
        very raw tokenizer
        """
        s = Utils.clearn_str(s)
        return s.strip().split(" ")

    @staticmethod
    def tokens2idxs(tokens):
        rev = [str(Utils.token2idx.get(t, 3)) for t in tokens]  # default to <unk>
        return rev

    @staticmethod
    def idxs2tokens(idxs):
        rez = []
        for idx in idxs:
            if str(idx) in Utils.idx2token:
                rez.append(Utils.idx2token[str(idx)])
        return rez

    @staticmethod
    def cut_and_pad(ilist, max_size=max_sent_size):
        ilist = ilist[:max_size]
        rez = ilist + ["0"] * (max_size - len(ilist))
        return rez

    @staticmethod
    def cut_end(s):
        fid = s.find(token_end)
        if fid == -1:
            return s
        elif fid == 0:
            return ""
        else:
            return s[:fid - 1]

    @staticmethod
    def get_mask(data):
        mask = (np.not_equal(data, 0)).astype("int32")
        return mask


class Seq2Seq:
    def __init__(self, hidden_size=512, nb_seq=max_sent_size):
        print("Load parameters ...")
        params = Utils.load_params(hidden_size)
        print('Loading is done.')

        # embedding
        self.embedding = npdl.layers.Embedding(params['embed_words'], nb_seq=nb_seq)
        self.embedding.connect_to()
        self.embedding.embed_words = params['embed_words']

        # encoder LSTM 1
        self.encoder_lstm1 = npdl.layers.LSTM(n_out=hidden_size, n_in=hidden_size,
                                              return_sequence=True, nb_seq=nb_seq)
        self.encoder_lstm1.connect_to(self.embedding)
        self.encoder_lstm1.U_f = params['en_lstm1']['U_f']
        self.encoder_lstm1.U_i = params['en_lstm1']['U_i']
        self.encoder_lstm1.U_o = params['en_lstm1']['U_o']
        self.encoder_lstm1.U_g = params['en_lstm1']['U_g']
        self.encoder_lstm1.W_f = params['en_lstm1']['W_f']
        self.encoder_lstm1.W_i = params['en_lstm1']['W_i']
        self.encoder_lstm1.W_o = params['en_lstm1']['W_o']
        self.encoder_lstm1.W_g = params['en_lstm1']['W_g']
        self.encoder_lstm1.b_f = params['en_lstm1']['b_f']
        self.encoder_lstm1.b_i = params['en_lstm1']['b_i']
        self.encoder_lstm1.b_o = params['en_lstm1']['b_o']
        self.encoder_lstm1.b_g = params['en_lstm1']['b_g']

        # encoder LSTM 2
        self.encoder_lstm2 = npdl.layers.LSTM(n_out=hidden_size, return_sequence=False)
        self.encoder_lstm2.connect_to(self.encoder_lstm1)
        self.encoder_lstm2.U_f = params['en_lstm2']['U_f']
        self.encoder_lstm2.U_i = params['en_lstm2']['U_i']
        self.encoder_lstm2.U_o = params['en_lstm2']['U_o']
        self.encoder_lstm2.U_g = params['en_lstm2']['U_g']
        self.encoder_lstm2.W_f = params['en_lstm2']['W_f']
        self.encoder_lstm2.W_i = params['en_lstm2']['W_i']
        self.encoder_lstm2.W_o = params['en_lstm2']['W_o']
        self.encoder_lstm2.W_g = params['en_lstm2']['W_g']
        self.encoder_lstm2.b_f = params['en_lstm2']['b_f']
        self.encoder_lstm2.b_i = params['en_lstm2']['b_i']
        self.encoder_lstm2.b_o = params['en_lstm2']['b_o']
        self.encoder_lstm2.b_g = params['en_lstm2']['b_g']

        # decoder LSTM 1
        self.decoder_lstm1 = npdl.layers.LSTM(n_out=hidden_size, n_in=hidden_size,
                                              return_sequence=True)
        self.decoder_lstm1.connect_to(self.embedding)
        self.decoder_lstm1.U_f = params['de_lstm1']['U_f']
        self.decoder_lstm1.U_i = params['de_lstm1']['U_i']
        self.decoder_lstm1.U_o = params['de_lstm1']['U_o']
        self.decoder_lstm1.U_g = params['de_lstm1']['U_g']
        self.decoder_lstm1.W_f = params['de_lstm1']['W_f']
        self.decoder_lstm1.W_i = params['de_lstm1']['W_i']
        self.decoder_lstm1.W_o = params['de_lstm1']['W_o']
        self.decoder_lstm1.W_g = params['de_lstm1']['W_g']
        self.decoder_lstm1.b_f = params['de_lstm1']['b_f']
        self.decoder_lstm1.b_i = params['de_lstm1']['b_i']
        self.decoder_lstm1.b_o = params['de_lstm1']['b_o']
        self.decoder_lstm1.b_g = params['de_lstm1']['b_g']

        # decoder LSTM 2
        self.decoder_lstm2 = npdl.layers.LSTM(n_out=hidden_size, return_sequence=False)
        self.decoder_lstm2.connect_to(self.decoder_lstm1)
        self.decoder_lstm2.U_f = params['de_lstm2']['U_f']
        self.decoder_lstm2.U_i = params['de_lstm2']['U_i']
        self.decoder_lstm2.U_o = params['de_lstm2']['U_o']
        self.decoder_lstm2.U_g = params['de_lstm2']['U_g']
        self.decoder_lstm2.W_f = params['de_lstm2']['W_f']
        self.decoder_lstm2.W_i = params['de_lstm2']['W_i']
        self.decoder_lstm2.W_o = params['de_lstm2']['W_o']
        self.decoder_lstm2.W_g = params['de_lstm2']['W_g']
        self.decoder_lstm2.b_f = params['de_lstm2']['b_f']
        self.decoder_lstm2.b_i = params['de_lstm2']['b_i']
        self.decoder_lstm2.b_o = params['de_lstm2']['b_o']
        self.decoder_lstm2.b_g = params['de_lstm2']['b_g']

        # softmax layer
        self.linear = npdl.layers.Linear(n_out=self.embedding.embed_words.shape[0])
        self.linear.connect_to(self.decoder_lstm2)
        self.linear.W = params['linear']

    def forward(self, idxs, masks):
        ##############################
        # Encode
        ##############################

        # idxs data
        idxs = idxs[None, :] if np.ndim(idxs) == 1 else idxs
        masks = masks[None, :] if np.ndim(masks) == 1 else masks

        # embedding words
        embeds = self.embedding.forward(idxs)

        # encoder lstm 1
        en_lstm1_res = self.encoder_lstm1.forward(embeds, masks)

        # encoder lstm 2
        self.encoder_lstm2.forward(en_lstm1_res, masks)

        ##############################
        # Decode
        ##############################

        # functions
        g1 = self.decoder_lstm1.gate_activation.forward
        a1 = self.decoder_lstm1.activation.forward
        g2 = self.decoder_lstm2.gate_activation.forward
        a2 = self.decoder_lstm2.activation.forward

        # variables
        decodes = []
        c1_pre = self.encoder_lstm1.c0
        h1_pre = self.encoder_lstm1.h0
        c2_pre = self.encoder_lstm2.c0
        h2_pre = self.encoder_lstm2.h0
        idx = idx_start
        masks = _one((1, 1))

        # decoder LSTMs
        while True:
            # data
            idx = np.array([[idx]], dtype='int32')
            input = self.embedding.forward(idx)

            # decoder lstm 1
            de_res1 = self.decoder_lstm1.forward(input, masks, c1_pre, h1_pre)
            c1_pre, h1_pre = self.decoder_lstm1.c0, self.decoder_lstm1.h0

            # decoder lstm 2
            de_res2 = self.decoder_lstm2.forward(de_res1, masks, c2_pre, h2_pre)
            c2_pre, h2_pre = self.decoder_lstm2.c0, self.decoder_lstm2.h0

            # linear layer
            out = self.linear.forward(de_res2)
            idx = np.argmax(out[0])

            # break
            if idx == 2:
                break
            else:
                decodes.append(idx)
            if len(decodes) >= max_sent_size:
                break

        return decodes

    def utter(self, sentence):
        # parse text to idxs
        idxs = np.asarray(Utils.cut_and_pad(Utils.tokens2idxs(Utils.tokenize(sentence)))[::-1], dtype='int32')
        masks = Utils.get_mask(idxs)

        idxs = self.forward(idxs, masks)

        # parse idxs to text
        tokens = Utils.idxs2tokens(idxs)
        sentence = Utils.cut_end(' '.join(tokens))
        return sentence