IBM/pytorchpipe

View on GitHub
ptp/components/models/general_usage/recurrent_neural_network.py

Summary

Maintainability
F
1 wk
Test Coverage
# Copyright (C) tkornuta, IBM Corporation 2019
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__author__ = "Tomasz Kornuta"

import torch

from ptp.configuration.configuration_error import ConfigurationError
from ptp.components.models.model import Model
from ptp.data_types.data_definition import DataDefinition


class RecurrentNeuralNetwork(Model): 
    """
    Simple Classifier consisting of fully connected layer with log softmax non-linearity.
    """
    def __init__(self, name, config):
        """
        Initializes the model.

        :param config: Dictionary of parameters (read from configuration ``.yaml`` file).
        :type config: ``ptp.configuration.ConfigInterface``
        """
        # Call constructors of parent classes.
        Model.__init__(self, name, RecurrentNeuralNetwork, config)

        # Get input mode from the configuration.
        self.input_mode = self.config["input_mode"]
        if self.input_mode not in ['Dense','Autoregression_First', 'Autoregression_None']:
            raise ConfigurationError("Invalid 'input_mode' (current '{}', available {})".format(self.input_mode, ['Dense','Autoregression_First', 'Autoregression_None']))

        # Get prediction mode from configuration.
        self.prediction_mode = self.config["prediction_mode"]
        if self.prediction_mode not in ['Dense','Last', 'None']:
            raise ConfigurationError("Invalid 'prediction_mode' (current '{}', available {})".format(self.prediction_mode, ['Dense','Last', 'None']))

        # Get source of initial hidden state from configuration.
        self.initial_state = self.config["initial_state"]
        if self.initial_state not in ["Zero", "Trainable", "Input"]:
            raise ConfigurationError("Invalid 'initial_state' of the hidden state (current '{}', available {})".format(self.initial_state, ["Zero", "Trainable", "Input"]))

        # Make sure that the input-output combination is valid.
        if self.prediction_mode == 'None' and 'Autoregression' in self.input_mode:
            raise ConfigurationError("Invalid combination of 'input_mode' and prediction_mode' (current '{}' and '{}')".format(self.input_mode, self.prediction_mode))
        # TODO: Any others?

        # If we are returning any predictions, set up the right stream and variables.
        if self.prediction_mode != "None":
            self.key_predictions = self.stream_keys["predictions"]
            # Retrieve output (prediction) size from global params.
            self.prediction_size = self.globals["prediction_size"]
            # Check whether it is ok.
            if type(self.prediction_size) == list:
                if len(self.prediction_size) == 1:
                    self.prediction_size = self.prediction_size[0]
                else:
                    raise ConfigurationError("RNN prediction size '{}' must be a single dimension (current '{}')".format(self.key_prediction_size, self.prediction_size))

        # If we are accepting any inputs, set up the right stream and variables.
        if "None" not in self.input_mode:
            # Retrieve stream key.
            self.key_inputs = self.stream_keys["inputs"]
            # Retrieve input size from global variables.
            self.key_input_size = self.global_keys["input_size"]
            self.input_size = self.globals["input_size"]
            if type(self.input_size) == list:
                if len(self.input_size) == 1:
                    self.input_size = self.input_size[0]
                else:
                    raise ConfigurationError("RNN input size '{}' must be a single dimension (current {})".format(self.key_input_size, self.input_size))
        else: 
            # If there are no inputs, do we really need input_size.
            # Because it is autoregression mode and we can use prediction size instead.
            self.input_size = self.prediction_size

        # Setup options for autoregression.
        if "Autoregression" in self.input_mode:
            assert self.input_size == self.prediction_size, "In autoregression mode, needs input_size == prediction_size."
            # Get max length from configuration.
            self.max_autoregression_length = self.config["max_autoregression_length"]

        # Get number of layers from config.
        self.num_layers = self.config["num_layers"]

        # Retrieve hidden size from configuration.
        self.hidden_size = self.config["hidden_size"]
        if type(self.hidden_size) == list:
            if len(self.hidden_size) == 1:
                self.hidden_size = self.hidden_size[0]
            else:
                raise ConfigurationError("RNN hidden_size must be a single dimension (current {})".format(self.hidden_size))
        
        # Get dropout rate value from config.
        dropout_rate = self.config["dropout_rate"]

        # Create RNN depending on the configuration
        self.cell_type = self.config["cell_type"]
        if self.cell_type in ['LSTM', 'GRU']:
            # Create rnn cell.
            self.rnn_cell = getattr(torch.nn, self.cell_type)(self.input_size, self.hidden_size, self.num_layers, dropout=dropout_rate, batch_first=True)
        else:
            try:
                # Retrieve the non-linearity.
                nonlinearity = {'RNN_TANH': 'tanh', 'RNN_RELU': 'relu'}[self.cell_type]
                # Create rnn cell.
                self.rnn_cell = torch.nn.RNN(self.input_size, self.hidden_size, self.num_layers, nonlinearity=nonlinearity, dropout=dropout_rate, batch_first=True)
            except KeyError:
                raise ConfigurationError( "Invalid RNN type, available options for 'cell_type' are ['LSTM', 'GRU', 'RNN_TANH', 'RNN_RELU'] (currently '{}')".format(self.cell_type))
        
        # Parameters - for a single sample 2 x [NUM_LAYERS x BATCH_SIZE x HIDDEN_SIZE]
        h0 = torch.zeros(self.num_layers, 1, self.hidden_size)
        c0 = torch.zeros(self.num_layers, 1, self.hidden_size)

        # Check if initial state (h0/c0) is zero, trainable, or coming from input stream.
        self.init_hidden = None
        if self.initial_state == "Trainable":
            self.logger.info("Using trainable initial (h0/c0) state")
            # Initialize a single vector used as hidden state.
            # Initialize it using xavier initialization.
            torch.nn.init.xavier_uniform(h0)
            # It will be trainable, i.e. the system will learn what should be the right initialization state.
            self.init_hidden = torch.nn.Parameter(h0, requires_grad=True)
            # Initilize memory cell in a similar way.
            if self.cell_type == 'LSTM':
                torch.nn.init.xavier_uniform(c0)
                self.init_memory = torch.nn.Parameter(c0, requires_grad=True)
        elif self.initial_state == "Zero":
            self.logger.info("Using zero initial (h0/c0) state")
            # We will still embedd it into parameter to enable storing/loading of both types of models by each other.
            self.init_hidden = torch.nn.Parameter(h0, requires_grad=False)
            if self.cell_type == 'LSTM':
                self.init_memory = torch.nn.Parameter(c0, requires_grad=False)
        else: # "Input" means that it will be taken from the "input_state" stream.
            # Get adequate key mappings.
            self.key_input_state = self.stream_keys["input_state"]
            self.logger.info("Will read initial (h0/c0) state from stream '{}".format(self.key_input_state))


        # Setup for outputs.
        # Last state.
        self.output_last_state = self.config["output_last_state"]
        if self.output_last_state:
            self.key_output_state = self.stream_keys["output_state"]
        
        self.logger.info("Initializing RNN with input size = {}, hidden size = {} and prediction size = {}".format(self.input_size, self.hidden_size, self.prediction_size))

        # Setup for the output layer (and associated non-linearities).
        self.use_output_layer = self.config["use_output_layer"]
        if(self.use_output_layer):
            # Create dropout layer.
            self.dropout = torch.nn.Dropout(dropout_rate)
            # Create the layer.
            self.activation2output = torch.nn.Linear(self.hidden_size, self.prediction_size)
        
        # Setup for the final non-linearity.
        self.use_logsoftmax = self.config["use_logsoftmax"]
        if self.use_logsoftmax:
            if self.prediction_mode == "Dense":
                # Used then returning dense prediction, i.e. every output of unfolded model.
                self.log_softmax = torch.nn.LogSoftmax(dim=2)
            else:
                # Used when returning only the last output.
                self.log_softmax = torch.nn.LogSoftmax(dim=1)


    def initialize_hiddens_state(self, batch_size):
        """
        Function initializes hidden states, depending on the cell type.
        """
        if self.cell_type == 'LSTM':
            # Return tuple (hidden_state, memory_cell).
            return (self.init_hidden.expand(self.num_layers, batch_size, self.hidden_size).contiguous(),
                self.init_memory.expand(self.num_layers, batch_size, self.hidden_size).contiguous() )
        else:
            # Return hidden_state.
            return self.init_hidden.expand(self.num_layers, batch_size, self.hidden_size).contiguous()


    def activation_to_output_pass(self, activations):
        """
        Function propagates hidden state "activations" through output layer (that pass can be optionally turned off).
        """
        if(self.use_output_layer):
            # Use dropout when using output layer.
            output = self.dropout(activations)

            #output = activations.squeeze(1)
            shape = activations.shape

            # Reshape to 2D tensor [BATCH_SIZE * SEQ_LEN x HIDDEN_SIZE]
            output = output.contiguous().view(-1, shape[2])

            # Propagate data through the output layer [BATCH_SIZE * SEQ_LEN x PREDICTION_SIZE]
            output = self.activation2output(output)
            #output = output.unsqueeze(1)

            # Reshape back to 3D tensor [BATCH_SIZE x SEQ_LEN x PREDICTION_SIZE]
            output = output.view(shape[0], shape[1], output.size(1))

        return output


    def input_data_definitions(self):
        """ 
        Function returns a dictionary with definitions of input data that are required by the component.

        :return: dictionary containing input data definitions (each of type :py:class:`ptp.utils.DataDefinition`).
        """
        d = {}
        # Input depending on the input_mode
        if self.input_mode == "Dense":
            d[self.key_inputs] = DataDefinition([-1, -1, self.input_size], [torch.Tensor], "Batch of inputs, each being a sequence of items [BATCH_SIZE x SEQ_LEN x INPUT_SIZE]")
        elif self.input_mode == "Autoregression_First":
            d[self.key_inputs] = DataDefinition([-1, self.input_size], [torch.Tensor], "Batch of inputs, each being a single item [BATCH_SIZE x SEQ_LEN x INPUT_SIZE]")
        #else: Autoregression_None: no inputs.

        # Input hidden state
        if self.initial_state == "Input":
            if self.cell_type == "LSTM":
                d[self.key_input_state] = DataDefinition([-1, 2, self.num_layers, self.hidden_size], [torch.Tensor], "Batch of LSTM last hidden states (h0/c0) passed from another LSTM that will be used as initial [BATCH_SIZE x 2 x NUM_LAYERS x SEQ_LEN x HIDDEN_SIZE]")
            else:
                d[self.key_input_state] = DataDefinition([-1, self.num_layers, self.hidden_size], [torch.Tensor], "Batch of RNN last hidden states passed from another RNN that will be used as initial [BATCH_SIZE x NUM_LAYERS x SEQ_LEN x HIDDEN_SIZE]")

        return d

    def output_data_definitions(self):
        """ 
        Function returns a dictionary with definitions of output data produced the component.

        :return: dictionary containing output data definitions (each of type :py:class:`ptp.utils.DataDefinition`).
        """
        d = {}
    
        # Output: predictions stream.
        if self.prediction_mode == "Dense":
            d[self.key_predictions] = DataDefinition([-1, -1, self.prediction_size], [torch.Tensor], "Batch of predictions, each represented as sequence of probability distributions over classes [BATCH_SIZE x SEQ_LEN x PREDICTION_SIZE]")
        elif self.prediction_mode == "Last": # "Last"
            # Only last prediction.
            d[self.key_predictions] = DataDefinition([-1, self.prediction_size], [torch.Tensor], "Batch of predictions, each represented as a single probability distribution over classes [BATCH_SIZE x PREDICTION_SIZE]")
        # Else: no predictions.

        # Output: hidden state stream.
        if self.output_last_state:
            if self.cell_type == "LSTM":
                d[self.key_output_state] = DataDefinition([-1, 2, self.num_layers, self.hidden_size], [torch.Tensor], "Batch of LSTM final hidden states (h0/c0) [BATCH_SIZE x 2 x NUM_LAYERS x SEQ_LEN x HIDDEN_SIZE]")
            else:
                d[self.key_output_state] = DataDefinition([-1, self.num_layers, self.hidden_size], [torch.Tensor], "Batch of RNN final hidden states [BATCH_SIZE x NUM_LAYERS x SEQ_LEN x HIDDEN_SIZE]")

        return d

    def forward(self, data_streams):
        """
        Forward pass of the model.

        :param data_streams: DataStreams({'inputs', 'predictions ...}), where:

            - inputs: expected inputs [BATCH_SIZE x SEQ_LEN x INPUT_SIZE],
            - predictions: returned output with predictions (log_probs) [BATCH_SIZE x SEQ_LEN x PREDICTION_SIZE]
        """
        
        inputs = None
        batch_size = None

        # Get inputs
        if "None" in self.input_mode:
            # Must be in autoregressive mode - retrieve batch_size from initial hidden state from encoder.
            batch_size = data_streams[self.key_input_state][0].shape[1]
            # Set zero inputs [BATCH_SIZE x SEQ_LEN x INPUT_SIZE].
            inputs = torch.zeros(batch_size, self.hidden_size, requires_grad=False).type(self.app_state.FloatTensor)

        else:
            # Get inputs [BATCH_SIZE x SEQ_LEN x INPUT_SIZE]
            inputs = data_streams[self.key_inputs]
            if inputs.dim() == 2:
                inputs = inputs.unsqueeze(1)
            batch_size = inputs.shape[0]
        #print("{}: input shape: {}, device: {}\n".format(self.name, inputs.shape, inputs.device))

        # Get initial state, depending on the settings.
        if self.initial_state == "Input":
            # Initialize hidden state from inputs - as last hidden state from external component.
            hidden = data_streams[self.key_input_state]
            # Flip batch and num_layer dims so batch will be third/second!
            if self.cell_type  == 'LSTM':
                # For LSTM: [BATCH_SIZE x NUM_LAYERS x 2 x HIDDEN_SIZE]  -> [2 x NUM_LAYERS x BATCH_SIZE x HIDDEN_SIZE]
                hidden = hidden.transpose(0,2)
            else: 
                # For others: [BATCH_SIZE x NUM_LAYERS x HIDDEN_SIZE] -> [NUM_LAYERS x BATCH_SIZE x HIDDEN_SIZE]
                hidden = hidden.transpose(0,1)
        else:
            hidden = self.initialize_hiddens_state(batch_size)
        #print("{}: hidden shape: {}, device: {}\n".format(self.name, hidden.shape, hidden.device))

        activations = []

        # Check out operation mode.
        if "Autoregression" in self.input_mode: 
            # Autoregressive mode - feed back outputs in the input
            activations_partial, hidden = self.rnn_cell(inputs, hidden)
            activations_partial = self.activation_to_output_pass(activations_partial)
            activations += [activations_partial]

            # Feed back the outputs iteratively
            for i in range(self.max_autoregression_length - 1):
                activations_partial, hidden = self.rnn_cell(activations_partial, hidden)
                activations_partial = self.activation_to_output_pass(activations_partial)
                # Add the single step output into list
                if self.prediction_mode == "Dense":
                    activations += [activations_partial]
            # Reassemble all the outputs from list into an output sequence
            if self.prediction_mode == "Dense":
                outputs = torch.cat(activations, 1)
                # Log softmax - along PREDICTION dim.
                if self.use_logsoftmax:
                    outputs = self.log_softmax(outputs)
                # Add predictions to datadict.
                data_streams.publish({self.key_predictions: outputs})
            elif self.prediction_mode == "Last":
                # Take only the last activations.
                outputs = activations_partial.squeeze(1)
                if self.use_logsoftmax:
                    outputs = self.log_softmax(outputs)
                # Add predictions to datadict.
                data_streams.publish({self.key_predictions: outputs})

        else:
            # Normal mode - feed the entire input sequence at once
            activations, hidden = self.rnn_cell(inputs, hidden)

            if self.prediction_mode == "Dense":
                # Pass every activation through the output layer.
                outputs = self.activation_to_output_pass(activations)
                
                # Log softmax - along PREDICTION dim.
                if self.use_logsoftmax:
                    outputs = self.log_softmax(outputs)

                # Add predictions to datadict.
                data_streams.publish({self.key_predictions: outputs})
            elif self.prediction_mode == "Last":
                outputs = self.activation_to_output_pass(activations.contiguous()[:, -1, :].unsqueeze(1))
                outputs = outputs.squeeze(1)

                # Log softmax - along PREDICTION dim.
                if self.use_logsoftmax:
                    outputs = self.log_softmax(outputs)
                    
                # Add predictions to datadict.
                data_streams.publish({self.key_predictions: outputs})
            elif self.prediction_mode == "None":
                # Nothing, since we don't want to keep the RNN's outputs
                pass

        if self.output_last_state:
            # Flip batch and num_layer dims so batch will be first!
            if self.cell_type  == 'LSTM':
                # For LSTM: [2 x NUM_LAYERS x BATCH_SIZE x HIDDEN_SIZE] -> [BATCH_SIZE x NUM_LAYERS x 2 x HIDDEN_SIZE] 
                hidden = hidden.transpose(0,2)
            else: 
                # For others: [NUM_LAYERS x BATCH_SIZE x HIDDEN_SIZE] -> [BATCH_SIZE x NUM_LAYERS x HIDDEN_SIZE] 
                hidden = hidden.transpose(0,1)
            # Export last hidden state.
            data_streams.publish({self.key_output_state: hidden})