tensorflow/models

View on GitHub
research/lstm_object_detection/lstm/rnn_decoder_test.py

Summary

Maintainability
C
1 day
Test Coverage
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Tests for lstm_object_detection.lstm.rnn_decoder."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import tensorflow.compat.v1 as tf

from tensorflow.contrib import layers as contrib_layers
from tensorflow.contrib import rnn as contrib_rnn
from lstm_object_detection.lstm import rnn_decoder


class MockRnnCell(contrib_rnn.RNNCell):

  def __init__(self, input_size, num_units):
    self._input_size = input_size
    self._num_units = num_units
    self._filter_size = [3, 3]

  def __call__(self, inputs, state_tuple):
    outputs = tf.concat([inputs, state_tuple[0]], axis=3)
    new_state_tuple = (tf.multiply(state_tuple[0], 2), state_tuple[1])
    return outputs, new_state_tuple

  def state_size(self):
    return self._num_units

  def output_size(self):
    return self._input_size + self._num_units

  def pre_bottleneck(self, inputs, state, input_index):
    with tf.variable_scope('bottleneck_%d' % input_index, reuse=tf.AUTO_REUSE):
      inputs = contrib_layers.separable_conv2d(
          tf.concat([inputs, state], 3),
          self._input_size,
          self._filter_size,
          depth_multiplier=1,
          activation_fn=tf.nn.relu6,
          normalizer_fn=None)
    return inputs


class RnnDecoderTest(tf.test.TestCase):

  def test_rnn_decoder_single_unroll(self):
    batch_size = 2
    num_unroll = 1
    num_units = 64
    width = 8
    height = 10
    input_channels = 128

    initial_state = tf.random_normal((batch_size, width, height, num_units))
    inputs = tf.random_normal([batch_size, width, height, input_channels])

    rnn_cell = MockRnnCell(input_channels, num_units)
    outputs, states = rnn_decoder.rnn_decoder(
        decoder_inputs=[inputs] * num_unroll,
        initial_state=(initial_state, initial_state),
        cell=rnn_cell)

    self.assertEqual(len(outputs), num_unroll)
    self.assertEqual(len(states), num_unroll)
    with tf.Session() as sess:
      sess.run(tf.global_variables_initializer())
      results = sess.run((outputs, states, inputs, initial_state))
      outputs_results = results[0]
      states_results = results[1]
      inputs_results = results[2]
      initial_states_results = results[3]
      self.assertEqual(outputs_results[0].shape,
                       (batch_size, width, height, input_channels + num_units))
      self.assertAllEqual(
          outputs_results[0],
          np.concatenate((inputs_results, initial_states_results), axis=3))
      self.assertEqual(states_results[0][0].shape,
                       (batch_size, width, height, num_units))
      self.assertEqual(states_results[0][1].shape,
                       (batch_size, width, height, num_units))
      self.assertAllEqual(states_results[0][0],
                          np.multiply(initial_states_results, 2.0))
      self.assertAllEqual(states_results[0][1], initial_states_results)

  def test_rnn_decoder_multiple_unroll(self):
    batch_size = 2
    num_unroll = 3
    num_units = 64
    width = 8
    height = 10
    input_channels = 128

    initial_state = tf.random_normal((batch_size, width, height, num_units))
    inputs = tf.random_normal([batch_size, width, height, input_channels])

    rnn_cell = MockRnnCell(input_channels, num_units)
    outputs, states = rnn_decoder.rnn_decoder(
        decoder_inputs=[inputs] * num_unroll,
        initial_state=(initial_state, initial_state),
        cell=rnn_cell)

    self.assertEqual(len(outputs), num_unroll)
    self.assertEqual(len(states), num_unroll)
    with tf.Session() as sess:
      sess.run(tf.global_variables_initializer())
      results = sess.run((outputs, states, inputs, initial_state))
      outputs_results = results[0]
      states_results = results[1]
      inputs_results = results[2]
      initial_states_results = results[3]
      for i in range(num_unroll):
        previous_state = ([initial_states_results, initial_states_results]
                          if i == 0 else states_results[i - 1])
        self.assertEqual(
            outputs_results[i].shape,
            (batch_size, width, height, input_channels + num_units))
        self.assertAllEqual(
            outputs_results[i],
            np.concatenate((inputs_results, previous_state[0]), axis=3))
        self.assertEqual(states_results[i][0].shape,
                         (batch_size, width, height, num_units))
        self.assertEqual(states_results[i][1].shape,
                         (batch_size, width, height, num_units))
        self.assertAllEqual(states_results[i][0],
                            np.multiply(previous_state[0], 2.0))
        self.assertAllEqual(states_results[i][1], previous_state[1])


class MultiInputRnnDecoderTest(tf.test.TestCase):

  def test_rnn_decoder_single_unroll(self):
    batch_size = 2
    num_unroll = 1
    num_units = 12
    width = 8
    height = 10
    input_channels_large = 24
    input_channels_small = 12
    bottleneck_channels = 20

    initial_state_c = tf.random_normal((batch_size, width, height, num_units))
    initial_state_h = tf.random_normal((batch_size, width, height, num_units))
    initial_state = (initial_state_c, initial_state_h)
    inputs_large = tf.random_normal(
        [batch_size, width, height, input_channels_large])
    inputs_small = tf.random_normal(
        [batch_size, width, height, input_channels_small])

    rnn_cell = MockRnnCell(bottleneck_channels, num_units)
    outputs, states = rnn_decoder.multi_input_rnn_decoder(
        decoder_inputs=[[inputs_large] * num_unroll,
                        [inputs_small] * num_unroll],
        initial_state=initial_state,
        cell=rnn_cell,
        sequence_step=tf.zeros([batch_size]),
        pre_bottleneck=True)

    self.assertEqual(len(outputs), num_unroll)
    self.assertEqual(len(states), num_unroll)
    with tf.Session() as sess:
      sess.run(tf.global_variables_initializer())
      results = sess.run(
          (outputs, states, inputs_large, inputs_small, initial_state))
      outputs_results = results[0]
      states_results = results[1]
      initial_states_results = results[4]
      self.assertEqual(
          outputs_results[0].shape,
          (batch_size, width, height, bottleneck_channels + num_units))
      self.assertEqual(states_results[0][0].shape,
                       (batch_size, width, height, num_units))
      self.assertEqual(states_results[0][1].shape,
                       (batch_size, width, height, num_units))
      # The first step should always update state.
      self.assertAllEqual(states_results[0][0],
                          np.multiply(initial_states_results[0], 2))
      self.assertAllEqual(states_results[0][1], initial_states_results[1])

  def test_rnn_decoder_multiple_unroll(self):
    batch_size = 2
    num_unroll = 3
    num_units = 12
    width = 8
    height = 10
    input_channels_large = 24
    input_channels_small = 12
    bottleneck_channels = 20

    initial_state_c = tf.random_normal((batch_size, width, height, num_units))
    initial_state_h = tf.random_normal((batch_size, width, height, num_units))
    initial_state = (initial_state_c, initial_state_h)
    inputs_large = tf.random_normal(
        [batch_size, width, height, input_channels_large])
    inputs_small = tf.random_normal(
        [batch_size, width, height, input_channels_small])

    rnn_cell = MockRnnCell(bottleneck_channels, num_units)
    outputs, states = rnn_decoder.multi_input_rnn_decoder(
        decoder_inputs=[[inputs_large] * num_unroll,
                        [inputs_small] * num_unroll],
        initial_state=initial_state,
        cell=rnn_cell,
        sequence_step=tf.zeros([batch_size]),
        pre_bottleneck=True)

    self.assertEqual(len(outputs), num_unroll)
    self.assertEqual(len(states), num_unroll)
    with tf.Session() as sess:
      sess.run(tf.global_variables_initializer())
      results = sess.run(
          (outputs, states, inputs_large, inputs_small, initial_state))
      outputs_results = results[0]
      states_results = results[1]
      initial_states_results = results[4]

      # The first step should always update state.
      self.assertAllEqual(states_results[0][0],
                          np.multiply(initial_states_results[0], 2))
      self.assertAllEqual(states_results[0][1], initial_states_results[1])
      for i in range(num_unroll):
        self.assertEqual(
            outputs_results[i].shape,
            (batch_size, width, height, bottleneck_channels + num_units))
        self.assertEqual(states_results[i][0].shape,
                         (batch_size, width, height, num_units))
        self.assertEqual(states_results[i][1].shape,
                         (batch_size, width, height, num_units))

  def test_rnn_decoder_multiple_unroll_with_skip(self):
    batch_size = 2
    num_unroll = 5
    num_units = 12
    width = 8
    height = 10
    input_channels_large = 24
    input_channels_small = 12
    bottleneck_channels = 20
    skip = 2

    initial_state_c = tf.random_normal((batch_size, width, height, num_units))
    initial_state_h = tf.random_normal((batch_size, width, height, num_units))
    initial_state = (initial_state_c, initial_state_h)
    inputs_large = tf.random_normal(
        [batch_size, width, height, input_channels_large])
    inputs_small = tf.random_normal(
        [batch_size, width, height, input_channels_small])

    rnn_cell = MockRnnCell(bottleneck_channels, num_units)
    outputs, states = rnn_decoder.multi_input_rnn_decoder(
        decoder_inputs=[[inputs_large] * num_unroll,
                        [inputs_small] * num_unroll],
        initial_state=initial_state,
        cell=rnn_cell,
        sequence_step=tf.zeros([batch_size]),
        pre_bottleneck=True,
        selection_strategy='SKIP%d' % skip)

    self.assertEqual(len(outputs), num_unroll)
    self.assertEqual(len(states), num_unroll)
    with tf.Session() as sess:
      sess.run(tf.global_variables_initializer())
      results = sess.run(
          (outputs, states, inputs_large, inputs_small, initial_state))
      outputs_results = results[0]
      states_results = results[1]
      initial_states_results = results[4]

      for i in range(num_unroll):
        self.assertEqual(
            outputs_results[i].shape,
            (batch_size, width, height, bottleneck_channels + num_units))
        self.assertEqual(states_results[i][0].shape,
                         (batch_size, width, height, num_units))
        self.assertEqual(states_results[i][1].shape,
                         (batch_size, width, height, num_units))

        previous_state = (
            initial_states_results if i == 0 else states_results[i - 1])
        # State only updates during key frames
        if i % (skip + 1) == 0:
          self.assertAllEqual(states_results[i][0],
                              np.multiply(previous_state[0], 2))
          self.assertAllEqual(states_results[i][1], previous_state[1])
        else:
          self.assertAllEqual(states_results[i][0], previous_state[0])
          self.assertAllEqual(states_results[i][1], previous_state[1])


if __name__ == '__main__':
  tf.test.main()