jason-neal/eniric

View on GitHub
tests/test_legacy.py

Summary

Maintainability
A
0 mins
Test Coverage
import numpy as np
import pytest
from astropy import units as u
from hypothesis import given, strategies as st

from eniric.atmosphere import Atmosphere
from eniric.legacy import RVprec_calc_masked, RVprec_calc_weights_masked, mask_clumping


def test_RV_prec_masked(test_spec):
    """Test same precision results between past pre-clumped version and mask clump version."""
    wav = test_spec[0]
    flux = test_spec[1]
    mask = test_spec[2]
    if mask is not None:
        mask = mask.round().astype(bool)
    else:
        mask = np.ones_like(wav)
    print(mask)

    # Pre clumping as done in Figueira et al. 2016
    wav_masked, flux_masked = mask_clumping(wav, flux, mask)
    rv_chunks = RVprec_calc_masked(wav_masked, flux_masked, mask=None)

    rv_masked = RVprec_calc_masked(wav, flux, mask)

    assert rv_masked.value == rv_chunks.value
    assert isinstance(rv_masked, u.Quantity)
    assert rv_masked.unit == u.m / u.s


@given(st.lists(st.booleans(), min_size=5, max_size=300))
def test_mask_clumping_of_mask(mask):
    """Masking mask show return all ones."""
    wav_clumped, flux_clumped = mask_clumping(mask, mask, mask)
    assert len(wav_clumped) == len(flux_clumped)
    for wav_i, flux_i in zip(wav_clumped, flux_clumped):
        assert np.all(wav_i)
        assert np.all(flux_i)
    # sum of masked_clumped should equal mask sum
    mask_sum = np.sum(mask)
    assert np.sum([np.sum(wav_i) for wav_i in wav_clumped]) == mask_sum
    assert np.sum([np.sum(flux_i) for flux_i in flux_clumped]) == mask_sum


def test_manual_clumping():
    """Test properties of clumping function using manually defined masked_arrays."""
    wav = np.arange(15)
    flux = np.arange(15, 30)
    mask = np.array([1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0])
    mask_bool = np.array([1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0], dtype=bool)

    wav_masked, flux_masked = mask_clumping(wav, flux, mask)
    wav_masked_bool, flux_masked_bool = mask_clumping(wav, flux, mask_bool)

    expected_wav = [np.arange(0, 4), np.arange(7, 10), np.arange(11, 14)]
    expected_flux = [np.arange(15, 19), np.arange(22, 25), np.arange(26, 29)]
    for i, __ in enumerate(wav_masked):
        assert np.allclose(wav_masked[i], expected_wav[i])
        assert np.allclose(flux_masked[i], expected_flux[i])
        assert np.allclose(wav_masked_bool[i], expected_wav[i])
        assert np.allclose(flux_masked_bool[i], expected_flux[i])

    assert len(expected_wav) == len(wav_masked)
    assert len(expected_flux) == len(flux_masked)


def test_legacy_RV_warns_nonfinite(grad_flag):
    """Some warning tests."""
    with pytest.warns(RuntimeWarning, match="divide by zero"):
        RVprec_calc_masked(
            np.array([1, 2, 3, 4]),
            np.array([1, 2, 3, 4]),
            np.array([0, 1, 0, 0]),
            grad=grad_flag,
        )


@pytest.mark.xfail(ModuleNotFoundError, reason="Issue with Starfish install.")
@pytest.mark.parametrize("band", ["H", "J", "K"])
def test_weights_clumping_grad(testing_spectrum, grad_flag, band):
    # Test masked clumping verse weight mask with new gradients
    # Test on an actual spectrum to check sizes of difference
    wav, flux = testing_spectrum
    atm = Atmosphere.from_band(band)
    mask = np.ones_like(wav)
    mask = atm.at(wav).mask

    clumped = RVprec_calc_masked(wav, flux, mask=mask, grad=grad_flag)
    weighted = RVprec_calc_weights_masked(wav, flux, mask=mask, grad=grad_flag)

    # They are not exactly the same by are within a specific percentage.
    ratio = (weighted.value - clumped.value) / clumped.value

    if grad_flag:
        # The difference for the new gradient is smaller.
        assert abs(ratio) < 0.008
    else:
        assert abs(ratio) < 0.04
    assert clumped.unit == weighted.unit