albarji/proxTV

View on GitHub
prox_tv/__init__.py

Summary

Maintainability
D
2 days
Test Coverage
r"""**proxTV** is a toolbox implementing blazing fast implementations of Total Variation proximity operators.

The library provides efficient solvers for the following Total Variation proximity problems:

**Standard (l1) Total Variation on a 1-dimensional signal**

    .. figure::  img/TV1.png

**Quadratic (l2) Total Variation on a 1-dimensional signal**

    .. figure::  img/TV2.png

**lp-norm Total Variation on a 1-dimensional signal**

    .. figure::  img/TVp.png

**Weighted Total Variation on a 1-dimensional signal**

    .. figure::  img/TV1w.png

**Anisotropic Total Variation on a 2-dimensional signal**

    .. figure::  img/TV2D.png

**lp-norm Anisotropic Total Variation on a 2-dimensional signal**

    .. figure::  img/TV2Dp.png

**Weighted Anisotropic Total Variation on a 2-dimensional signal**

    .. figure::  img/TV2Dw.png

**Anisotropic Total Variation on a 3-dimensional signal**

    .. figure::  img/TV3D.png

**Generalized N-dimensional Anisotropic Total Variation**

    .. figure::  img/TVND.png
    (with X(di) every possible 1-dimensional slice of X following dimension di)


As a general rule the functions in this package have the form ``tv<a>[w]_<b>d``, where:

    * ``<a>`` : 1, 2 or p, if the norm is :math:`\ell_1`, :math:`\ell_2` or the general :math:`\ell_p`
    * ``[w]`` : if present, the methods accepts a weighted norm.
    * ``<b>`` : 1 or 2, if the expected signals is 1- or 2-dimensional.

The only expection is the function **tvgen** that solves generalized
Total Variation problems, recommended only to advanced users.

As the underlying library uses FORTRAN-style matrices (column-order), the given
matrices will be converted to this format if necessary.

If you find this toolbox useful please reference the following papers:

* Fast Newton-type Methods for Total Variation Regularization. Alvaro Barbero, Suvrit Sra. ICML 2011 proceedings.

* Modular proximal optimization for multidimensional total-variation regularization. Alvaro Barbero, Suvrit Sra. http://arxiv.org/abs/1411.0589

"""

import numpy as np
from _prox_tv import ffi, lib

# The maximum number of returned info parameters.
_N_INFO = 3


def _call(fn, *args):
    args_m = []
    for arg in args:
        if isinstance(arg, np.ndarray):
            args_m.append(ffi.cast("double *", arg.ctypes.data))
        else:
            args_m.append(arg)
    return fn(*args_m)


def force_float_scalar(x):
    r"""Forces an scalar value into float format

    Parameters
    ----------
    x: scalar value to check

    Returns
    -------
    float
        Float representation of the provided value
    """
    if not isinstance(x, float):
        return float(x)
    else:
        return x


def force_float_matrix(x):
    r"""Forces a numpy matrix into float format

    Parameters
    ----------
    x: numpy array
        matrix to check

    Returns
    -------
    numpy array
        Float representation of the provided matrix
    """
    # Check input is numpy array
    if not isinstance(x, np.ndarray):
        try:
            x = np.array(x)
        except Exception:
            raise TypeError("Input must be a numpy matrix or compatible object")
    # Enforce float type
    if x.dtype != np.dtype('float64'):
        return x.astype('float')
    else:
        return x


def tv1_1d(x, w, method='hybridtautstring', sigma=0.05, maxbacktracks=None):
    r"""1D proximal operator for :math:`\ell_1`.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2} \|x-y\|^2 + w \sum_i |y_i - y_{i+1}|.

    Parameters
    ----------
    x : numpy array
        The signal we are approximating.
    w : float
        The non-negative weight in the optimization problem.
    method : str
        The algorithm to be used, one of:

        * ``'classictautstring'`` - classic Taut String method
        * ``'linearizedtautstring'`` - linearized Taut String method
        * ``'hybridtautstring'`` - hybrid classic+linear Taut String method
        * ``'pn'`` - projected Newton.
        * ``'condat'`` - Condat's segment construction method.
        * ``'dp'`` - Johnson's dynamic programming algorithm.
        * ``'condattautstring'`` - Condat's implementation of classic taut
            string method.
        * ``'kolgomorov'`` - Kolmogorov et al's message passing method.

    sigma : float
        Tolerance for sufficient descent (used only if ``method='pn'``).
        
    maxbacktracks: float
        Backtrack steps before switching (used only if ``method='hybridtautstring'``)

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    METHODS = {
        'classictautstring' : _classictautstring_TV1,
        'linearizedtautstring' : _linearizedtautstring_TV1,
        'hybridtautstring' : _hybridtautstring_TV1,
        'pn' : _pn_TV1,
        'condat' : _condat_TV1,
        'dp' : _dp_TV1,
        'condattautstring' : _condattautstring_TV1,
        'kolmogorov' : _kolmogorov_TV1
    }
    assert method in METHODS
    assert w >= 0
    w = force_float_scalar(w)
    x = force_float_matrix(x)
    y = np.zeros(np.size(x))
    METHODS[method](x, w, y, sigma=sigma, maxbacktracks=maxbacktracks)
    return y

def _classictautstring_TV1(x, w, y, **kwargs):
    """Classic taut string method for TV1 proximity"""
    _call(lib.classicTautString_TV1, x, np.size(x), w, y)

def _linearizedtautstring_TV1(x, w, y, **kwargs):
    """Linearized taut string method for TV1 proximity"""
    _call(lib.linearizedTautString_TV1, x, w, y, np.size(x))

def _hybridtautstring_TV1(x, w, y, **kwargs):
    """Hybrid taut string method for TV1 proximity"""
    if kwargs['maxbacktracks'] is None:
        _call(lib.hybridTautString_TV1, x, np.size(x), w, y)    
    else:
        _call(lib.hybridTautString_TV1_custom, x, np.size(x), w, y, 
              kwargs['maxbacktracks']) 

def _pn_TV1(x, w, y, **kwargs):
    """Projected Newton method for TV1 proximity"""
    info = np.zeros(_N_INFO)  # Holds [num of iterations, gap]
    _call(lib.PN_TV1, x, w, y, info, np.size(x), kwargs['sigma'], ffi.NULL)

def _condat_TV1(x, w, y, **kwargs):
    """Condat's method for TV1 proximity"""
    _call(lib.TV1D_denoise, x, y, np.size(x), w)

def _condattautstring_TV1(x, w, y, **kwargs):
    """Condat's implementation of the taut string method for TV1 proximity"""
    _call(lib.TV1D_denoise_tautstring, x, y, np.size(x), w)

def _kolmogorov_TV1(x, w, y, **kwargs):
    """Kolmogorov's method for TV1 proximity"""
    _call(lib.SolveTVConvexQuadratic_a1_nw, np.size(x), x, w, y)

def _dp_TV1(x, w, y, **kwargs):
    """Johnson's dynamic programming method for TV1 proximity"""
    _call(lib.dp, np.size(x), x, w, y)

def tv1w_1d(x, w, method='tautstring', sigma=0.05):
    r"""Weighted 1D proximal operator for :math:`\ell_1`.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2} \|x-y\|^2 + \sum_i w_i |y_i - y_{i+1}|.

    Parameters
    ----------
    x : numpy array
        The signal we are approximating.
    w : numpy array
        The non-negative weights in the optimization problem.
    method : str
        Either ``'tautstring'`` or ``'pn'`` (projected Newton).
    sigma : float
        Tolerance for sufficient descent (used only if ``method='pn'``).

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    assert np.all(w >= 0)
    assert np.size(x)-1 == np.size(w)
    w = force_float_matrix(w)
    x = force_float_matrix(x)
    y = np.zeros(np.size(x))
    if method == 'tautstring':
        _call(lib.tautString_TV1_Weighted, x, w, y, np.size(x))
    else:
        info = np.zeros(_N_INFO)  # Holds [num of iterations, gap]
        _call(lib.PN_TV1_Weighted,
              x, w, y, info, np.size(x), sigma, ffi.NULL)
    return y


def tv2_1d(x, w, method='mspg'):
    r"""1D proximal operator for :math:`\ell_2`.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2} \|x-y\|^2 + w \sum_i (y_i - y_{i+1})^2.

    Parameters
    ----------
    x : numpy array
        The signal we are approximating.
    w : float
        The non-negative weight in the optimization problem.
    method : str
        One of the following:

        * ``'ms'`` - More-Sorenson.
        * ``'pg'`` - Projected gradient.
        * ``'mspg'`` - More-Sorenson + projected gradient.

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    METHODS = {
        'ms' : _ms_tv2,
        'pg' : _pg_tv2,
        'mspg': _mspg_tv2
    }
    assert w >= 0
    assert method in METHODS

    w = force_float_scalar(w)
    x = force_float_matrix(x)
    info = np.zeros(_N_INFO)
    y = np.zeros(np.size(x), order='F')
    METHODS[method](x, w, y, info)
    return y

def _ms_tv2(x, w, y, info):
    """More-Sorensen method for TV2 proximity"""
    _call(lib.more_TV2, x, w, y, info, np.size(x))

def _pg_tv2(x, w, y, info):
    """Projected Gradient method for TV2 proximity"""
    _call(lib.PG_TV2, x, w, y, info, np.size(x))

def _mspg_tv2(x, w, y, info):
    """More-Sorensen + Projected Gradient hybrid method for TV2 proximity"""
    _call(lib.morePG_TV2, x, w, y, info, np.size(x), ffi.NULL)

def tvp_1d(x, w, p, method='gpfw', max_iters=0):
    r"""1D proximal operator for any :math:`\ell_p` norm.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2} \|x-y\|^2 + w \|y_i - y_{i+1}\|_p.

    Parameters
    ----------
    x : numpy array
        The signal we are approximating.
    w : float
        The non-negative weight in the optimization problem.
    method : str
        The method to be used, one of the following:

         * ``'gp'`` - gradient projection
         * ``'fw'`` - Frank-Wolfe
         * ``'gpfw'`` - hybrid gradient projection + Frank-Wolfe

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    methods = {
        'gp': lib.GP_TVp,
        'fw': lib.FW_TVp,
        'gpfw': lib.GPFW_TVp,
    }
    assert method in methods
    assert w >= 0
    assert p >= 1
    w = force_float_scalar(w)
    p = force_float_scalar(p)
    x = force_float_matrix(x)
    info = np.zeros(_N_INFO)
    y = np.zeros(np.size(x), order='F')
    _call(methods[method], x, w, y, info, np.size(x), p, ffi.NULL)
    return y


def tv1_2d(x, w, n_threads=1, max_iters=0, method='dr'):
    r"""2D proximal oprator for :math:`\ell_1`.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2} \|x-y\|^2 +
                       w \sum_{i,j} (|y_{i, j} - y_{i, j+1}| +
                       |y_{i,j} - y_{i+1,j}|).

    Parameters
    ----------
    x : numpy array
        The signal we are approximating.
    w : float
        The non-negative weight in the optimization problem.
    str : method
        One of the following:

        * ``'dr'`` - Douglas Rachford splitting.
        * ``'pd'`` - Proximal Dykstra splitting.
        * ``'yang'`` - Yang's algorithm.
        * ``'condat'`` - Condat's gradient.
        * ``'chambolle-pock'`` - Chambolle-Pock's gradient.
        * ``'kolmogorov'`` - Kolmogorov's splitting.

    n_threads : int
        Number of threads, used only for Proximal Dykstra
            and Douglas-Rachford.

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    METHODS = {
        'yang' : _yang_tv2d,
        'dr' : _dr_tv2d,
        'pd' : _pd_tv2d,
        'kolmogorov' : _kolmogorov_tv2d,
        'condat': _condat_tv2d,
        'chambolle-pock':  _chambollepock_tv2d,
        'chambolle-pock-acc': _chambollepockacc_tv2d,
    }
    assert w >= 0
    assert method in METHODS
    x = np.asfortranarray(x, dtype='float64')
    w = force_float_scalar(w)
    y = np.asfortranarray(np.zeros(x.shape))
    info = np.zeros(_N_INFO)
    METHODS[method](x, w, y, max_iters, info, n_threads=n_threads)
    return y

def _yang_tv2d(x, w, y, max_iters, info, **kwargs):
    """Yang's method for 2D TV proximity"""
    _call(lib.Yang2_TV, x.shape[0], x.shape[1], x, w, y, max_iters, info)

def _dr_tv2d(x, w, y, max_iters, info, **kwargs):
    """Douglas Rachford parallel projections method for 2D TV proximity"""
    _call(lib.DR2_TV, x.shape[0], x.shape[1], x, w, w, 1.0, 1.0, y,
              kwargs['n_threads'], max_iters, info)

def _pd_tv2d(x, w, y, max_iters, info, **kwargs):
    """Proximal dykstra method for 2D TV proximity"""
    _call(lib.PD2_TV, x, (w, w), (1, 1), (1, 2), y, info, x.shape, 2, 2,
              kwargs['n_threads'], max_iters)

def _kolmogorov_tv2d(x, w, y, max_iters, info, **kwargs):
    """Kolmogorov's method for 2D TV proximity"""
    _call(lib.Kolmogorov2_TV, x.shape[0], x.shape[1], x, w, y, max_iters, 
              info)

def _condat_tv2d(x, w, y, max_iters, info, **kwargs):
    """Condat's method for 2D TV proximity"""
    _condatchambollepock_tv2d(x, w, y, max_iters, info, algorithm=0)

def _chambollepock_tv2d(x, w, y, max_iters, info, **kwargs):
    """Chambolle and Pock's method for 2D TV proximity"""
    _condatchambollepock_tv2d(x, w, y, max_iters, info, algorithm=1)

def _chambollepockacc_tv2d(x, w, y, max_iters, info, **kwargs):
    """Accelerated Chambolle and Pock's method for 2D TV proximity"""
    _condatchambollepock_tv2d(x, w, y, max_iters, info, algorithm=2)

def _condatchambollepock_tv2d(x, w, y, max_iters, info, **kwargs):
    """Wrapper function for general implementaton of Chambolle-Pock + Condat"""
    _call(lib.CondatChambollePock2_TV, x.shape[0], x.shape[1], x, w, y, 
          kwargs['algorithm'], max_iters, info)

def tv1w_2d(x, w_col, w_row, max_iters=0, n_threads=1):
    r"""2D weighted proximal operator for :math:`\ell_1` using DR splitting.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2} \|x-y\|^2 +
                       \sum_{i,j} w^c_{i, j} |y_{i,j} - y_{i, j+1}| +
                       w^r_{i, j} |y_{i,j}-y_{i+1,j}|.

    Parameters
    ----------
    x : numpy array
        The MxN matrix we are approximating.
    w_col : numpy array
        The (M-1) x N matrix of column weights :math:`w^c`.
    w_row : numpy array
        The M x (N-1) matrix of row weights :math:`w^r`.

    Returns
    -------
    numpy array
        The MxN solution of the optimization problem.
    """
    assert np.all(w_col >= 0)
    assert np.all(w_row >= 0)
    M, N = x.shape
    assert w_col.shape == (M-1, N)
    assert w_row.shape == (M, N-1)
    x = np.asfortranarray(x, dtype='float64')
    y = np.zeros(x.shape, order='F')
    w_col = np.asfortranarray(w_col, dtype='float64')
    w_row = np.asfortranarray(w_row, dtype='float64')
    info = np.zeros(_N_INFO)
    _call(lib.DR2L1W_TV, M, N, x, w_col, w_row, y, n_threads, max_iters, info)
    return y


def tvp_2d(x, w_col, w_row, p_col, p_row, n_threads=1, max_iters=0):
    r"""2D proximal operator for any :math:`\ell_p` norm.

    Specifically, this optimizes the following program:

    .. math::

        \mathrm{min}_y \frac{1}{2}\|x-y\|^2 + w^r \|D_\mathrm{row}(y)\|_{p_1} +
                                              w^c \|D_\mathrm{col}(y) \|_{p_2},

    where :math:`\mathrm D_{row}` and :math:`\mathrm D_{col}` take the
    differences accross rows and columns respectively.

    Parameters
    ----------
    y : numpy array
        The matrix signal we are approximating.
    p_col : float
        Column norm.
    p_row : float
        Row norm.
    w_col : float
        Column penalty.
    w_row : float
        Row penalty.

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    assert w_col >= 0
    assert w_row >= 0
    assert p_col >= 1
    assert p_row >= 1

    info = np.zeros(_N_INFO)
    x = np.asfortranarray(x, dtype='float64')
    w_col = force_float_scalar(w_col)
    w_row = force_float_scalar(w_row)
    p_col = force_float_scalar(p_col)
    p_row = force_float_scalar(p_row)
    y = np.zeros(np.shape(x), order='F')
    _call(lib.DR2_TV,
          x.shape[0], x.shape[1], x, w_col, w_row, p_col, p_row, y,
          n_threads, max_iters, info)
    return y


def tvgen(x, ws, ds, ps, n_threads=1, max_iters=0):
    r"""General TV proximal operator for multidimensional signals

    Specifically, this optimizes the following program:

    .. math::

      \min_X \frac{1}{2} ||X-Y||^2_2 + \sum_i w_i \sum_j TV^{1D}(X(d_i)_j,p_i)

    where :math:`X(d_i)_j` every possible 1-dimensional fiber of X following
    the dimension d_i, and :math:`TV^{1D}(z,p)` the 1-dimensional
    :math:`\ell_p`-norm total variation over the given fiber :math:`z`.
    The user can specify the number :math:`i` of penalty terms.

    Parameters
    ----------
    x : numpy array
        The matrix signal we are approximating.
    ws : iterable
        Weights to apply in each penalty term.
    ds : iterable
        Dimensions over which to apply each penalty term.
        Must be of equal length to ws.
    ps : iterable
        Norms to apply in each penalty term.
        Must be of equal length to ws.
    n_threads : int
        number of threads to use in the computation
    max_iters : int
        maximum number of iterations to run the solver.

    Returns
    -------
    numpy array
        The solution of the optimization problem.
    """
    assert len(ws) == len(ds)
    assert len(ws) == len(ps)
    assert n_threads >= 1
    assert max_iters >= 0

    info = np.zeros(_N_INFO)
    x = np.asfortranarray(x, dtype='float64')
    ws = force_float_matrix(ws)
    ps = force_float_matrix(ps)
    y = np.zeros(np.shape(x), order='F')

    # Run algorithm depending on the structure of the data and the requested
    # penalties.

    # Bidimensional signal with one penalty term across each dimension (full
    # 2-dimensional TV proximity): Douglas-Rachford splitting
    if len(ds) == 2 & ds[0] == 1 & ds[1] == 2:
        _call(lib.DR2_TV,
              x.shape[0], x.shape[1], x, ws[0], ws[1], ps[0], ps[1], y,
              n_threads, max_iters, info)
    # 2 arbitrary terms: Proximal Dykstra
    elif len(ws) == 2:
        _call(lib.PD2_TV,
              x, ws, ps, ds, y, info, x.shape, len(x.shape), 2, n_threads,
              max_iters)
    # More terms: Parallel Proximal Dykstra
    else:
        _call(lib.PD_TV,
              x, ws, ps, ds, y, info, x.shape, len(x.shape), len(ws),
              n_threads, max_iters)

    return y