tensorflow/models

View on GitHub
official/modeling/fast_training/experimental/tf2_utils_2x_wide.py

Summary

Maintainability
A
55 mins
Test Coverage
# Copyright 2024 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Stacking model horizontally."""

from absl import logging
import numpy as np
import tensorflow as tf, tf_keras


def expand_vector(v: np.ndarray) -> np.ndarray:
  """Expands a vector with batch dimensions.

  Equivalent to expand_1_axis(v, epsilon=0.0, axis=-1)

  Args:
    v: A vector with shape [..., a].

  Returns:
    A vector with shape [..., 2 * a].
  """
  return np.repeat(v, 2, axis=-1)


def expand_1_axis(w: np.ndarray,
                  epsilon: float,
                  axis: int) -> np.ndarray:
  """Expands either the first or last dimension of w.

  If `axis = 0`, the following expression will be satisfied:
  matmul(x, w) ==
      matmul(expand_vector(x), expand_1_axis(w, axis=0))

  If `axis = -1` and `epsilon = 0.0`, the following constraint will be
  satisfied:
  expand_vector(matmul(x, w)) ==
      2 * matmul(x, expand_1_axis(w, epsilon=0.0, axis=-1))

  Args:
    w: Numpy array of shape [a_0, a_1, ..., a_i-1, a_i].
    epsilon: Symmetric Noise added to expanded tensor.
    axis: Must be either 0 or -1.

  Returns:
    Expanded numpy array.
  """

  if axis not in (0, -1):
    raise ValueError(
        "Only support expanding the first or the last dimension. "
        "Got: {}".format(axis)
    )

  rank = len(w.shape)

  d_w = np.random.normal(np.zeros_like(w), np.fabs(w) * epsilon, w.shape)
  d_w = np.repeat(d_w, 2, axis=axis)

  sign_flip = np.array([1, -1])
  for _ in range(rank - 1):
    sign_flip = np.expand_dims(sign_flip, axis=axis - 1)
  sign_flip = np.tile(sign_flip,
                      [w.shape[0]] + [1] * (rank - 2) + [w.shape[-1]])

  d_w *= sign_flip
  w_expand = (np.repeat(w, 2, axis=axis) + d_w) / 2
  return w_expand


def expand_2_axes(w: np.ndarray,
                  epsilon: float) -> np.ndarray:
  """Expands the first and last dimension of w.

  This operation satisfies the following expression:
  expand_vector(matmul(x, w)) == matmul(expand_vector(x), expand_2_axes(w))

  Args:
    w: Numpy array of shape [a_0, a_1, ..., a_i-1, a_i].
    epsilon: Symmetric Noise added to expanded tensor.

  Returns:
    Expanded numpy array.
  """
  rank = len(w.shape)

  d_w = np.random.normal(np.zeros_like(w), np.fabs(w) * epsilon, w.shape)
  d_w = np.repeat(np.repeat(d_w, 2, axis=0), 2, axis=-1)

  sign_flip = np.array([1, -1])
  for _ in range(rank - 1):
    sign_flip = np.expand_dims(sign_flip, axis=-1)
  sign_flip = np.tile(sign_flip,
                      [w.shape[0]] + [1] * (rank - 2) + [w.shape[-1] * 2])
  d_w *= sign_flip

  w_expand = (np.repeat(np.repeat(w, 2, axis=0), 2, axis=-1) + d_w) / 2
  return w_expand


def var_to_var(var_from: tf.Variable,
               var_to: tf.Variable,
               epsilon: float):
  """Expands a variable to another variable.

  Assuming the shape of `var_from` is (a, b, ..., y, z), then shape of `var_to`
  must be one of (a, ..., z * 2), (a * 2, ..., z * 2), or (a * 2, ..., z).

  If the shape of `var_to` is (a, ..., 2 * z):
    For any x, tf.matmul(x, var_to) ~= expand_vector(tf.matmul(x, var_from)) / 2
    Not that there will be noise added to the left hand side, if epsilon != 0.
  If the shape of `var_to` is (2 * a, ..., z):
    For any x, tf.matmul(expand_vector(x), var_to) == tf.matmul(x, var_from)
  If the shape of `var_to` is (2 * a, ..., 2 * z):
    For any x, tf.matmul(expand_vector(x), var_to) ==
        expand_vector(tf.matmul(expand_vector(x), var_from))

  Args:
    var_from: input variable to expand.
    var_to: output variable.
    epsilon: the noise ratio that will be added, when splitting `var_from`.
  """
  shape_from = var_from.shape
  shape_to = var_to.shape

  if shape_from == shape_to:
    var_to.assign(var_from)
    return

  var_from_np = var_from.numpy()

  if len(shape_from) == len(shape_to) == 1:
    var_to.assign(expand_vector(var_from_np))
    return

  a_from, z_from = shape_from[0], shape_from[-1]
  a_to, z_to = shape_to[0], shape_to[-1]

  if a_to == 2 * a_from and z_to == z_from:
    var_to.assign(expand_1_axis(var_from_np, epsilon=epsilon, axis=0))
    return

  if a_to == a_from and z_to == 2 * z_from:
    var_to.assign(expand_1_axis(var_from_np, epsilon=epsilon, axis=-1))
    return

  if a_to == 2 * a_from and z_to == 2 * z_from:
    var_to.assign(expand_2_axes(var_from_np, epsilon=epsilon))
    return

  raise ValueError("Shape not supported, {}, {}".format(shape_from, shape_to))


def model_to_model_2x_wide(model_from: tf.Module,
                           model_to: tf.Module,
                           epsilon: float = 0.1):
  """Expands a model to a wider version.

  Also makes sure that the output of the model is not changed after expanding.
  For example:
  ```
  model_narrow = tf_keras.Sequential()
  model_narrow.add(tf_keras.Input(shape=(3,)))
  model_narrow.add(tf_keras.layers.Dense(4))
  model_narrow.add(tf_keras.layers.Dense(1))

  model_wide = tf_keras.Sequential()
  model_wide.add(tf_keras.Input(shape=(6,)))
  model_wide.add(tf_keras.layers.Dense(8))
  model_wide.add(tf_keras.layers.Dense(1))

  model_to_model_2x_wide(model_narrow, model_wide)
  assert model_narrow([[1, 2, 3]]) == model_wide([[1, 1, 2, 2, 3, 3]])
  ```

  We assume that `model_from` and `model_to` have the same architecture and
  differ
  only in widths.

  Args:
    model_from: input model to expand.
    model_to: output model whose variables will be assigned expanded values
      according to `model_from`.
    epsilon: the noise ratio that will be added, when splitting `var_from`.
  """
  for w_from, w_to in zip(model_from.trainable_variables,
                          model_to.trainable_variables):
    logging.info("expanding %s %s to %s %s",
                 w_from.name, w_from.shape, w_to.name, w_to.shape)
    var_to_var(w_from, w_to, epsilon=epsilon)