awarebayes/RecNN

View on GitHub
recnn/data/utils.py

Summary

Maintainability
A
3 hrs
Test Coverage
import numpy as np
import torch
from .pandas_backend import pd


# helper function similar to pandas.Series.rolling
def rolling_window(a, window):
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)


def get_irsu(batch):
    items_t, ratings_t, sizes_t, users_t = (
        batch["items"],
        batch["ratings"],
        batch["sizes"],
        batch["users"],
    )
    return items_t, ratings_t, sizes_t, users_t


def batch_no_embeddings(batch, frame_size, *args, **kwargs):
    """
    Embed Batch: discrete state discrete action
    """
    items_t, ratings_t, sizes_t, users_t = get_irsu(batch)
    b_size = ratings_t.size(0)
    items = items_t[:, :-1]
    next_items = items_t[:, 1:]
    ratings = ratings_t[:, :-1]
    next_ratings = ratings_t[:, 1:]
    action = items_t[:, -1]
    reward = ratings_t[:, -1]
    done = torch.zeros(b_size)

    done[torch.cumsum(sizes_t - frame_size, dim=0) - 1] = 1
    batch = {
        "items": items,
        "next_items": next_items,
        ratings: "ratings",
        "next_ratings": next_ratings,
        "action": action,
        "reward": reward,
        "done": done,
        "meta": {"users": users_t, "sizes": sizes_t},
    }
    return batch


def batch_tensor_embeddings(batch, item_embeddings_tensor, frame_size, *args, **kwargs):
    """
    Embed Batch: continuous state continuous action
    """

    items_t, ratings_t, sizes_t, users_t = get_irsu(batch)
    items_emb = item_embeddings_tensor[items_t.long()]
    b_size = ratings_t.size(0)

    items = items_emb[:, :-1, :].view(b_size, -1)
    next_items = items_emb[:, 1:, :].view(b_size, -1)
    ratings = ratings_t[:, :-1]
    next_ratings = ratings_t[:, 1:]

    state = torch.cat([items, ratings], 1)
    next_state = torch.cat([next_items, next_ratings], 1)
    action = items_emb[:, -1, :]
    reward = ratings_t[:, -1]

    done = torch.zeros(b_size)
    done[torch.cumsum(sizes_t - frame_size, dim=0) - 1] = 1

    batch = {
        "state": state,
        "action": action,
        "reward": reward,
        "next_state": next_state,
        "done": done,
        "meta": {"users": users_t, "sizes": sizes_t},
    }
    return batch


def batch_contstate_discaction(
    batch, item_embeddings_tensor, frame_size, num_items, *args, **kwargs
):

    """
    Embed Batch: continuous state discrete action
    """

    items_t, ratings_t, sizes_t, users_t = get_irsu(batch)
    items_emb = item_embeddings_tensor[items_t.long()]
    b_size = ratings_t.size(0)

    items = items_emb[:, :-1, :].view(b_size, -1)
    next_items = items_emb[:, 1:, :].view(b_size, -1)
    ratings = ratings_t[:, :-1]
    next_ratings = ratings_t[:, 1:]

    state = torch.cat([items, ratings], 1)
    next_state = torch.cat([next_items, next_ratings], 1)
    action = items_t[:, -1]
    reward = ratings_t[:, -1]

    done = torch.zeros(b_size)
    done[torch.cumsum(sizes_t - frame_size, dim=0) - 1] = 1

    one_hot_action = torch.zeros(b_size, num_items)
    one_hot_action.scatter_(1, action.view(-1, 1), 1)

    batch = {
        "state": state,
        "action": one_hot_action,
        "reward": reward,
        "next_state": next_state,
        "done": done,
        "meta": {"users": users_t, "sizes": sizes_t},
    }
    return batch


# pads stuff to work with lstms
def padder(x):
    items_t = []
    ratings_t = []
    sizes_t = []
    users_t = []
    for i in range(len(x)):
        items_t.append(torch.tensor(x[i]["items"]))
        ratings_t.append(torch.tensor(x[i]["rates"]))
        sizes_t.append(x[i]["sizes"])
        users_t.append(x[i]["users"])
    items_t = torch.nn.utils.rnn.pad_sequence(items_t, batch_first=True).long()
    ratings_t = torch.nn.utils.rnn.pad_sequence(ratings_t, batch_first=True).float()
    sizes_t = torch.tensor(sizes_t).float()
    return {"items": items_t, "ratings": ratings_t, "sizes": sizes_t, "users": users_t}


def sort_users_itemwise(user_dict, users):
    return (
        pd.get()
        .Series(dict([(i, user_dict[i]["items"].shape[0]) for i in users]))
        .sort_values(ascending=False)
        .index
    )


def prepare_batch_dynamic_size(batch, item_embeddings_tensor, embed_batch=None):
    item_idx, ratings_t, sizes_t, users_t = get_irsu(batch)
    item_t = item_embeddings_tensor[item_idx]
    batch = {"items": item_t, "users": users_t, "ratings": ratings_t, "sizes": sizes_t}
    return batch


# Main function that is used as torch.DataLoader->collate_fn
# CollateFn docs:
# https://pytorch.org/docs/stable/data.html#working-with-collate-fn


def prepare_batch_static_size(
    batch, item_embeddings_tensor, frame_size=10, embed_batch=batch_tensor_embeddings
):
    item_t, ratings_t, sizes_t, users_t = [], [], [], []
    for i in range(len(batch)):
        item_t.append(batch[i]["items"])
        ratings_t.append(batch[i]["rates"])
        sizes_t.append(batch[i]["sizes"])
        users_t.append(batch[i]["users"])

    item_t = np.concatenate([rolling_window(i, frame_size + 1) for i in item_t], 0)
    ratings_t = np.concatenate(
        [rolling_window(i, frame_size + 1) for i in ratings_t], 0
    )

    item_t = torch.tensor(item_t)
    users_t = torch.tensor(users_t)
    ratings_t = torch.tensor(ratings_t).float()
    sizes_t = torch.tensor(sizes_t)

    batch = {"items": item_t, "users": users_t, "ratings": ratings_t, "sizes": sizes_t}

    return embed_batch(
        batch=batch,
        item_embeddings_tensor=item_embeddings_tensor,
        frame_size=frame_size,
    )


# Usually in data sets there item index is inconsistent (if you plot it doesn't look like a line)
# This function makes the index linear, allows for better compression of the data
# And also makes use of tensor[tensor] semantics

# items_embeddings_key_dict:arg - item embeddings by key
# include_zero:arg - whether to include items_embeddings_id_dict[0] = [0, 0, 0, ..., 0] (128)
# sometimes needed for rnn padding, by default True
# returns:
# items_embeddings_tensor - items_embeddings_dict compressed into tensor
# key_to_id - dict key -> index
# id_to_key - dict index -> key


def make_items_tensor(items_embeddings_key_dict):
    keys = list(sorted(items_embeddings_key_dict.keys()))
    key_to_id = dict(zip(keys, range(len(keys))))
    id_to_key = dict(zip(range(len(keys)), keys))

    items_embeddings_id_dict = {}
    for k in items_embeddings_key_dict.keys():
        items_embeddings_id_dict[key_to_id[k]] = items_embeddings_key_dict[k]
    items_embeddings_tensor = torch.stack(
        [items_embeddings_id_dict[i] for i in range(len(items_embeddings_id_dict))]
    )
    return items_embeddings_tensor, key_to_id, id_to_key


class ReplayBuffer:
    def __init__(self, buffer_size, layout):
        self.buffer = None
        self.idx = 0
        self.size = buffer_size
        self.layout = layout
        self.meta = {"step": []}
        self.flush()

    def flush(self):
        # state, action, reward, next_state
        del self.buffer
        self.buffer = [torch.zeros(i) for i in self.layout]
        self.idx = 0
        self.meta["step"] = []

    def append(self, batch):
        state, action, reward, next_state, step = (
            batch["state"],
            batch["action"],
            batch["reward"],
            batch["next_state"],
            batch["step"],
        )
        self.meta["step"].append(step)
        lower = self.idx
        upper = state.size(0) + lower
        self.buffer[0][lower:upper] = state
        self.buffer[1][lower:upper] = action
        self.buffer[2][lower:upper] = reward
        self.buffer[3][lower:upper] = next_state
        self.idx = upper

    def get(self):
        state, action, reward, next_state = self.buffer
        batch = {
            "state": state,
            "action": action,
            "reward": reward,
            "next_state": next_state,
            "meta": self.meta,
        }
        return batch

    def len(self):
        return self.idx


def get_base_batch(batch, device=torch.device("cuda"), done=True):
    b = [
        batch["state"],
        batch["action"],
        batch["reward"].unsqueeze(1),
        batch["next_state"],
    ]
    if done:
        b.append(batch["done"].unsqueeze(1))
    else:
        batch.append(torch.zeros_like(batch["reward"]))
    return [i.to(device) for i in b]