ejhigson/dyPolyChord

View on GitHub
dyPolyChord/nlive_allocation.py

Summary

Maintainability
A
0 mins
Test Coverage
#!/usr/bin/env python
"""
Functions for selecting the number of live points in order to maximise
calculation accuracy.
"""
import warnings
import itertools
import numpy as np
import nestcheck.ns_run_utils
import nestcheck.data_processing


def allocate(init_run, samp_tot, dynamic_goal, smoothing_filter=None):
    """Calculates an allocation of life points for dynamic run, checks the
    output allocation and the smoothing applied, and returns the information
    needed for the dynamic run in a dictionary.

    Parameters
    ----------
    init_run: dict
        Initial exploratory run in nestcheck format (see
        http://nestcheck.readthedocs.io/en/latest/api.html for more
        information).
    samp_tot: int
        Total number of samples (including both the initial exploratory run and
        the second dynamic run).
    dynamic_goal: float
    smoothing_filter: function or None, optional
        Smoothing for nlive array. Set to None for no smoothing.

    Returns
    -------
    dyn_info: dict
    """
    assert np.all(np.diff(init_run['logl']) >= 0), (
        'the minimum logl diff is {}'.format(np.diff(init_run['logl']).min()))
    # Calculate nlive allocation with and without smoothing
    nlives = dyn_nlive_array(init_run, samp_tot, dynamic_goal,
                             smoothing_filter=smoothing_filter)
    nlives_unsmoothed = dyn_nlive_array(init_run, samp_tot, dynamic_goal,
                                        smoothing_filter=None)
    # Perform some checks
    if dynamic_goal == 0:
        if not np.all(np.diff(nlives) <= 0):
            assert np.all(np.diff(nlives_unsmoothed) <= 0)
            warnings.warn((
                'Smoothing has added turning points to nlive allocation when '
                'dynamic_goal=0. I am using the unsmoothed nlives '
                'instead.'), UserWarning)
            nlives = nlives_unsmoothed
        assert nlives[0] == nlives.max(), str(nlives)
        assert np.all(np.diff(nlives) <= 0), (
            'When targeting evidence, nlive should monotincally decrease!'
            + ' nlives = ' + str(nlives))
    if dynamic_goal == 1 and nlives[0] != 0:
        warnings.warn(
            'dynamic_goal=1 but the first allocated nlive point is {0}. '
            'For most likelihoods we expect this to equal zero (although '
            'it may be nonzero if there is significant posterior mass '
            'at the edge of the prior).'.format(nlives[0]), UserWarning)
    assert nlives.min() == 0
    # Find number of blocks where nlive is nonzero
    nonzero_blocks = np.asarray(
        [k for k, g in itertools.groupby(nlives != 0)])
    assert nonzero_blocks.sum() == 1, (
        'nlive becomes zero then becomes nonzero! nonzero_blocks='
        + str(nonzero_blocks))
    # Get the indexes of nlives points which are different to the previous
    # points (i.e. remove consecutive duplicates, keeping first occurance)
    inds_to_use = np.concatenate(
        (np.asarray([0]), np.where(np.diff(nlives) != 0)[0] + 1))
    assert (count_turning_points(nlives) ==
            count_turning_points(nlives[inds_to_use]))
    # Check logl = approx -inf is mapped to the starting number of live points
    nlives_dict = {-1.e100: int(nlives[0])}
    for ind in inds_to_use:
        nlives_dict[init_run['logl'][ind]] = int(nlives[ind])
    # Store the nlive allocations for dyn_info
    dyn_info = {'init_nlive_allocation': nlives,
                'init_nlive_allocation_unsmoothed': nlives_unsmoothed,
                'nlives_dict': nlives_dict,
                'peak_start_ind': np.where(nlives > 0)[0][0]}
    return dyn_info


def dyn_nlive_array(init_run, samp_tot, dynamic_goal, smoothing_filter=None):
    r"""Calculate the dynamic nlive allocation from the theoretical, point
    importance-based allocation. This allows for the samples taken in the
    initial run, including areas where more samples than were needed have been
    taken (meaning there are fewer samples available for the regions with peak
    importance). Also perform optional smoothing (carried out *before*
    rounding).

    Normalisation uses the the formulae for the expected number of samples:

    .. math:: N_\mathrm{samp} = \int n(\log X) \mathrm{d}\log X

    where :math:`n` is the local number of live points.

    See Appendix F of "Dynamic nested sampling: an improved algorithm for
    parameter estimation and evidence calculation" (Higson et al., 2019) for
    more information.

    Parameters
    ----------
    init_run: dict
        Initial exploratory run in nestcheck format (see
        http://nestcheck.readthedocs.io/en/latest/api.html for more
        information).
    samp_tot: int
        Total number of samples (including both the initial exploratory run and
        the second dynamic run).
    dynamic_goal: float
    smoothing_filter: function or None, optional
        Smoothing for nlive array. Set to None for no smoothing.

    Returns
    -------
    nlive_array: 1d numpy array
        Number of live points corresponding to each likelihood in
        init_run['logl'].
    """
    assert samp_tot > init_run['logl'].shape[0]
    # Calculate the importance of each point
    importance = sample_importance(init_run, dynamic_goal)
    # Calculate theoretical nlive allocation, which is proportional to
    # importance and normalised to produce an expected samp_tot samples
    logx_init = nestcheck.ns_run_utils.get_logx(init_run['nlive_array'])
    norm = samp_tot / np.abs(np.trapz(importance, x=logx_init))
    importance_nlive = importance * norm
    # Account for the points already sampled
    importance_nlive -= init_run['nlive_array'][0]
    if smoothing_filter is None:
        nlive_array = importance_nlive
    else:
        nlive_array = smoothing_filter(importance_nlive)
    nlive_array = np.clip(nlive_array, 0, None)
    # Renormalise to account for nlives below zero (i.e. regions where we have
    # already taken too many samples) as we cannot take negative samples.
    samp_remain = samp_tot - init_run['logl'].shape[0]
    nlive_array *= samp_remain / np.abs(np.trapz(nlive_array, x=logx_init))
    return np.rint(nlive_array)


def sample_importance(run, dynamic_goal):
    """
    Calculate the importance of each sample in the run.

    See "Dynamic nested sampling: an improved algorithm for parameter
    estimation and evidence calculation" (Higson et al., 2019) for more
    information.

    Parameters
    ----------
    run: dict
        Nested sampling run in nestcheck format (see
        http://nestcheck.readthedocs.io/en/latest/api.html for more
        information)
    dynamic_goal: float or int
        Specifies how much computational effort to put into improving evidence
        and parameter estimation accuracy.

    Returns
    -------
    importance: 1d numpy array
        Importance of each sample. Normalised to np.sum(importance) = 1.
    """
    assert 0 <= dynamic_goal <= 1, (
        'dynamic_goal={0} not in [0,1]'.format(dynamic_goal))
    logw = run['logl'] + nestcheck.ns_run_utils.get_logx(run['nlive_array'])
    w_rel = np.exp(logw - logw.max())
    param_imp = w_rel / np.sum(w_rel)
    if dynamic_goal == 1:
        return param_imp
    z_imp = np.cumsum(w_rel)
    z_imp = z_imp.max() - z_imp
    z_imp /= np.sum(z_imp)
    assert z_imp[0] == z_imp.max()
    if dynamic_goal == 0:
        return z_imp
    return (dynamic_goal * param_imp) + ((1 - dynamic_goal) * z_imp)


def count_turning_points(array):
    """Returns number of turning points the input sequence of values.

    Parameters
    ----------
    array: 1d numpy array

    Returns
    -------
    int
        Number of turning points.
    """
    # Remove adjacent duplicates only
    uniq = np.asarray([k for k, g in itertools.groupby(array)])
    # Count turning points
    return (np.diff(np.sign(np.diff(uniq))) != 0).sum()