Source code for rsvsim.utils

'''
Numerical utilities for running RSVSim.

These include the viral load, transmissibility, and infection calculations
at the heart of the integration loop.
'''

#%% Housekeeping

import numba as nb # For faster computations
import numpy as np # For numerics
import random # Used only for resetting the seed
import sciris as sc # For additional utilities
import scipy.stats as sps # For distributions
from .settings import options as cvo # To set options
from . import defaults as cvd # To set default types


# What functions are externally visible -- note, this gets populated in each section below
__all__ = []

# Set dtypes -- note, these cannot be changed after import since Numba functions are precompiled
nbbool  = nb.bool_
nbint   = cvd.nbint
nbfloat = cvd.nbfloat

# Specify whether to allow parallel Numba calculation -- 10% faster for safe and 20% faster for random, but the random number stream becomes nondeterministic for the latter
safe_opts = [1, '1', 'safe']
full_opts = [2, '2', 'full']
safe_parallel = cvo.numba_parallel in safe_opts + full_opts
rand_parallel = cvo.numba_parallel in full_opts
if cvo.numba_parallel not in [0, 1, 2, '0', '1', '2', 'none', 'safe', 'full']:
    errormsg = f'Numba parallel must be "none", "safe", or "full", not "{cvo.numba_parallel}"'
    raise ValueError(errormsg)
cache = cvo.numba_cache # Turning this off can help switching parallelization options


#%% The core RSVsim functions -- compute the infections

@nb.njit(             (nbint, nbfloat[:], nbfloat[:],     nbfloat[:], nbfloat,   nbfloat,    nbfloat), cache=cache, parallel=safe_parallel)
def compute_viral_load(t,     time_start, time_recovered, time_dead,  frac_time, load_ratio, high_cap): # pragma: no cover
    '''
    Calculate viral load for time t.

    Args:
        t: (int) timestep
        time_start: (float[]) individuals' infectious date
        time_recovered: (float[]) individuals' recovered date
        time_dead: (float[]) individuals' death date
        frac_time: (float) fraction of time in high load
        load_ratio: (float) ratio for high to low viral load
        high_cap: (float) cap on the number of days with high viral load

    Returns:
        load (float): viral load
    '''

    # Get the end date from recover or death
    n = len(time_dead)
    time_stop = np.ones(n, dtype=cvd.default_float)*time_recovered # This is needed to make a copy
    inds = ~np.isnan(time_dead)
    time_stop[inds] = time_dead[inds]

    # Calculate which individuals with be high past the cap and when it should happen
    infect_days_total = time_stop-time_start
    trans_day = frac_time*infect_days_total
    inds = trans_day > high_cap
    cap_frac = high_cap/infect_days_total[inds]

    # Get corrected time to switch from high to low
    trans_point = np.ones(n,dtype=cvd.default_float)*frac_time
    trans_point[inds] = cap_frac

    # Calculate load
    load = np.ones(n, dtype=cvd.default_float) # allocate an array of ones with the correct dtype
    early = (t-time_start)/infect_days_total < trans_point # are we in the early or late phase
    load = (load_ratio * early + load * ~early)/(load+frac_time*(load_ratio-load)) # calculate load

    return load


@nb.njit(            (nbfloat[:], nbfloat[:], nbbool[:], nbbool[:], nbfloat,    nbfloat[:], nbbool[:], nbbool[:], nbfloat,    nbfloat[:]), cache=cache, parallel=safe_parallel)
def compute_trans_sus(rel_trans,  rel_sus,    inf,       sus,       beta_layer, viral_load, symp,      diag,      asymp_factor, immunity_factors): # pragma: no cover
    ''' Calculate relative transmissibility and susceptibility '''
    f_asymp   =  symp + ~symp * asymp_factor # Asymptomatic factor, changes e.g. [0,1] with a factor of 0.8 to [0.8,1.0]
    rel_trans = rel_trans * inf * f_asymp * beta_layer * viral_load # Recalculate transmissibility
    rel_sus   = rel_sus * sus * (1-immunity_factors) # Recalculate susceptibility
    return rel_trans, rel_sus


@nb.njit(             (nbfloat,  nbint[:], nbint[:],  nbfloat[:],  nbfloat[:], nbfloat[:]), cache=cache, parallel=rand_parallel)
def compute_infections(beta,     sources,  targets,   layer_betas, rel_trans,  rel_sus): # pragma: no cover
    '''
    Compute who infects whom

    The heaviest step of the model, taking about 50% of the total time -- figure
    out who gets infected on this timestep. Cannot be easily parallelized since
    random numbers are used.
    '''
    source_trans     = rel_trans[sources] # Pull out the transmissibility of the sources (0 for non-infectious people)
    inf_inds         = source_trans.nonzero()[0] # Infectious indices -- remove noninfectious people
    betas            = beta * layer_betas[inf_inds] * source_trans[inf_inds] * rel_sus[targets[inf_inds]] # Calculate the raw transmission probabilities
    nonzero_inds     = betas.nonzero()[0] # Find nonzero entries
    nonzero_inf_inds = inf_inds[nonzero_inds] # Map onto original indices
    nonzero_betas    = betas[nonzero_inds] # Remove zero entries from beta
    nonzero_sources  = sources[nonzero_inf_inds] # Remove zero entries from the sources
    nonzero_targets  = targets[nonzero_inf_inds] # Remove zero entries from the targets
    transmissions    = (np.random.random(len(nonzero_betas)) < nonzero_betas).nonzero()[0] # Compute the actual infections!
    source_inds      = nonzero_sources[transmissions]
    target_inds      = nonzero_targets[transmissions] # Filter the targets on the actual infections
    return source_inds, target_inds


@nb.njit((nbint[:], nbint[:], nb.int64[:]), cache=cache)
def find_contacts(p1, p2, inds): # pragma: no cover
    """
    Numba for Layer.find_contacts()

    A set is returned here rather than a sorted array so that custom tracing interventions can efficiently
    add extra people. For a version with sorting by default, see Layer.find_contacts(). Indices must be
    an int64 array since this is what's returned by true() etc. functions by default.
    """
    pairing_partners = set()
    inds = set(inds)
    for i in range(len(p1)):
        if p1[i] in inds:
            pairing_partners.add(p2[i])
        if p2[i] in inds:
            pairing_partners.add(p1[i])
    return pairing_partners


@nb.njit((nbint[:], nbint[:], nb.int64[:]), cache=cache)
def find_contact_inds(p1, p2, inds): # pragma: no cover
    """
    Numba for Layer.find_contacts()

    A set is returned here rather than a sorted array so that custom tracing interventions can efficiently
    add extra people. For a version with sorting by default, see Layer.find_contacts(). Indices must be
    an int64 array since this is what's returned by true() etc. functions by default.
    """
    pairing_partner_inds = set()
    inds = set(inds)
    for i in range(len(p1)):
        if p1[i] in inds:
            pairing_partner_inds.add(i)
        if p2[i] in inds:
            pairing_partner_inds.add(i)
    return pairing_partner_inds


def compute_beta(pars, timestep):
    '''

    Seasonality functional form from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4072624/, which returns beta based on
    timestep.

    '''

    if pars['seasonality']:
        return pars['beta']*(1 + pars['beta_seasonality']*np.cos((2*np.pi*timestep/52) + pars['phase_shift']))
    else:
        return pars['beta']

#%% Sampling and seed methods

__all__ += ['sample', 'get_pdf', 'set_seed']


[docs]def sample(dist=None, par1=None, par2=None, size=None, **kwargs): ''' Draw a sample from the distribution specified by the input. The available distributions are: - 'uniform' : uniform distribution from low=par1 to high=par2; mean is equal to (par1+par2)/2 - 'normal' : normal distribution with mean=par1 and std=par2 - 'lognormal' : lognormal distribution with mean=par1 and std=par2 (parameters are for the lognormal distribution, *not* the underlying normal distribution) - 'normal_pos' : right-sided normal distribution (i.e. only positive values), with mean=par1 and std=par2 *of the underlying normal distribution* - 'normal_int' : normal distribution with mean=par1 and std=par2, returns only integer values - 'lognormal_int' : lognormal distribution with mean=par1 and std=par2, returns only integer values - 'poisson' : Poisson distribution with rate=par1 (par2 is not used); mean and variance are equal to par1 - 'neg_binomial' : negative binomial distribution with mean=par1 and k=par2; converges to Poisson with k=∞ Args: dist (str): the distribution to sample from par1 (float): the "main" distribution parameter (e.g. mean) par2 (float): the "secondary" distribution parameter (e.g. std) size (int): the number of samples (default=1) kwargs (dict): passed to individual sampling functions Returns: A length N array of samples **Examples**:: cv.sample() # returns Unif(0,1) cv.sample(dist='normal', par1=3, par2=0.5) # returns Normal(μ=3, σ=0.5) cv.sample(dist='lognormal_int', par1=5, par2=3) # returns a lognormally distributed set of values with mean 5 and std 3 Notes: Lognormal distributions are parameterized with reference to the underlying normal distribution (see: https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.random.lognormal.html), but this function assumes the user wants to specify the mean and std of the lognormal distribution. Negative binomial distributions are parameterized with reference to the mean and dispersion parameter k (see: https://en.wikipedia.org/wiki/Negative_binomial_distribution). The r parameter of the underlying distribution is then calculated from the desired mean and k. For a small mean (~1), a dispersion parameter of ∞ corresponds to the variance and standard deviation being equal to the mean (i.e., Poisson). For a large mean (e.g. >100), a dispersion parameter of 1 corresponds to the standard deviation being equal to the mean. ''' # Some of these have aliases, but these are the "official" names choices = [ 'uniform', 'normal', 'normal_pos', 'normal_int', 'lognormal', 'lognormal_int', 'poisson', 'neg_binomial', ] # Ensure it's an integer if size is not None: size = int(size) # Compute distribution parameters and draw samples # NB, if adding a new distribution, also add to choices above if dist in ['unif', 'uniform']: samples = np.random.uniform(low=par1, high=par2, size=size, **kwargs) elif dist in ['norm', 'normal']: samples = np.random.normal(loc=par1, scale=par2, size=size, **kwargs) elif dist == 'normal_pos': samples = np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs)) elif dist == 'normal_int': samples = np.round(np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs))) elif dist == 'poisson': samples = n_poisson(rate=par1, n=size, **kwargs) # Use Numba version below for speed elif dist == 'neg_binomial': samples = n_neg_binomial(rate=par1, dispersion=par2, n=size, **kwargs) # Use custom version below elif dist in ['lognorm', 'lognormal', 'lognorm_int', 'lognormal_int']: if par1>0: mean = np.log(par1**2 / np.sqrt(par2**2 + par1**2)) # Computes the mean of the underlying normal distribution sigma = np.sqrt(np.log(par2**2/par1**2 + 1)) # Computes sigma for the underlying normal distribution samples = np.random.lognormal(mean=mean, sigma=sigma, size=size, **kwargs) else: samples = np.zeros(size) if '_int' in dist: samples = np.round(samples) else: errormsg = f'The selected distribution "{dist}" is not implemented; choices are: {sc.newlinejoin(choices)}' raise NotImplementedError(errormsg) return samples
[docs]def get_pdf(dist=None, par1=None, par2=None): ''' Return a probability density function for the specified distribution. This is used for example by test_num to retrieve the distribution of times from symptom-to-swab for testing. For example, for Washington State, these values are dist='lognormal', par1=10, par2=170. ''' choices = [ 'none', 'uniform', 'lognormal', ] if dist in ['None', 'none', None]: return None elif dist == 'uniform': pdf = sps.uniform(loc=par1, scale=par2) elif dist == 'lognormal': mean = np.log(par1**2 / np.sqrt(par2 + par1**2)) # Computes the mean of the underlying normal distribution sigma = np.sqrt(np.log(par2/par1**2 + 1)) # Computes sigma for the underlying normal distribution pdf = sps.lognorm(sigma, loc=-0.5, scale=np.exp(mean)) else: choicestr = '\n'.join(choices) errormsg = f'The selected distribution "{dist}" is not implemented; choices are: {choicestr}' raise NotImplementedError(errormsg) return pdf
[docs]def set_seed(seed=None): ''' Reset the random seed -- complicated because of Numba, which requires special syntax to reset the seed. This function also resets Python's built-in random number generated. Args: seed (int): the random seed ''' @nb.njit((nbint,), cache=cache) def set_seed_numba(seed): return np.random.seed(seed) def set_seed_regular(seed): return np.random.seed(seed) # Dies if a float is given if seed is not None: seed = int(seed) set_seed_regular(seed) # If None, reinitializes it if seed is None: # Numba can't accept a None seed, so use our just-reinitialized Numpy stream to generate one seed = np.random.randint(1e9) set_seed_numba(seed) random.seed(seed) # Finally, reset Python's built-in random number generator, just in case (used by SynthPops) return
#%% Probabilities -- mostly not jitted since performance gain is minimal __all__ += ['n_binomial', 'binomial_filter', 'binomial_arr', 'n_multinomial', 'poisson', 'n_poisson', 'n_neg_binomial', 'choose', 'choose_r', 'choose_w']
[docs]def n_binomial(prob, n): ''' Perform multiple binomial (Bernolli) trials Args: prob (float): probability of each trial succeeding n (int): number of trials (size of array) Returns: Boolean array of which trials succeeded **Example**:: outcomes = cv.n_binomial(0.5, 100) # Perform 100 coin-flips ''' return np.random.random(n) < prob
[docs]def binomial_filter(prob, arr): # No speed gain from Numba ''' Binomial "filter" -- the same as n_binomial, except return the elements of arr that succeeded. Args: prob (float): probability of each trial succeeding arr (array): the array to be filtered Returns: Subset of array for which trials succeeded **Example**:: inds = cv.binomial_filter(0.5, np.arange(20)**2) # Return which values out of the (arbitrary) array passed the coin flip ''' return arr[(np.random.random(len(arr)) < prob).nonzero()[0]]
[docs]def binomial_arr(prob_arr): ''' Binomial (Bernoulli) trials each with different probabilities. Args: prob_arr (array): array of probabilities Returns: Boolean array of which trials on the input array succeeded **Example**:: outcomes = cv.binomial_arr([0.1, 0.1, 0.2, 0.2, 0.8, 0.8]) # Perform 6 trials with different probabilities ''' return np.random.random(len(prob_arr)) < prob_arr
[docs]def n_multinomial(probs, n): # No speed gain from Numba ''' An array of multinomial trials. Args: probs (array): probability of each outcome, which usually should sum to 1 n (int): number of trials Returns: Array of integer outcomes **Example**:: outcomes = cv.multinomial(np.ones(6)/6.0, 50)+1 # Return 50 die-rolls ''' return np.searchsorted(np.cumsum(probs), np.random.random(n))
[docs]@nb.njit((nbfloat,), cache=cache, parallel=rand_parallel) # Numba hugely increases performance def poisson(rate): ''' A Poisson trial. Args: rate (float): the rate of the Poisson process **Example**:: outcome = cv.poisson(100) # Single Poisson trial with mean 100 ''' return np.random.poisson(rate, 1)[0]
[docs]@nb.njit((nbfloat, nbint), cache=cache, parallel=rand_parallel) # Numba hugely increases performance def n_poisson(rate, n): ''' An array of Poisson trials. Args: rate (float): the rate of the Poisson process (mean) n (int): number of trials **Example**:: outcomes = cv.n_poisson(100, 20) # 20 Poisson trials with mean 100 ''' return np.random.poisson(rate, n)
[docs]def n_neg_binomial(rate, dispersion, n, step=1): # Numba not used due to incompatible implementation ''' An array of negative binomial trials. See cv.sample() for more explanation. Args: rate (float): the rate of the process (mean, same as Poisson) dispersion (float): dispersion parameter; lower is more dispersion, i.e. 0 = infinite, ∞ = Poisson n (int): number of trials step (float): the step size to use if non-integer outputs are desired **Example**:: outcomes = cv.n_neg_binomial(100, 1, 50) # 50 negative binomial trials with mean 100 and dispersion roughly equal to mean (large-mean limit) outcomes = cv.n_neg_binomial(1, 100, 20) # 20 negative binomial trials with mean 1 and dispersion still roughly equal to mean (approximately Poisson) ''' nbn_n = dispersion nbn_p = dispersion/(rate/step + dispersion) samples = np.random.negative_binomial(n=nbn_n, p=nbn_p, size=n)*step return samples
[docs]@nb.njit((nbint, nbint), cache=cache) # Numba hugely increases performance def choose(max_n, n): ''' Choose a subset of items (e.g., people) without replacement. Args: max_n (int): the total number of items n (int): the number of items to choose **Example**:: choices = cv.choose(5, 2) # choose 2 out of 5 people with equal probability (without repeats) ''' return np.random.choice(max_n, n, replace=False)
[docs]@nb.njit((nbint, nbint), cache=cache) # Numba hugely increases performance def choose_r(max_n, n): ''' Choose a subset of items (e.g., people), with replacement. Args: max_n (int): the total number of items n (int): the number of items to choose **Example**:: choices = cv.choose_r(5, 10) # choose 10 out of 5 people with equal probability (with repeats) ''' return np.random.choice(max_n, n, replace=True)
[docs]def choose_w(probs, n, unique=True): # No performance gain from Numba ''' Choose n items (e.g. people), each with a probability from the distribution probs. Args: probs (array): list of probabilities, should sum to 1 n (int): number of samples to choose unique (bool): whether or not to ensure unique indices **Example**:: choices = cv.choose_w([0.2, 0.5, 0.1, 0.1, 0.1], 2) # choose 2 out of 5 people with nonequal probability. ''' probs = np.array(probs) n_choices = len(probs) n_samples = int(n) probs_sum = probs.sum() if probs_sum: # Weight is nonzero, rescale probs = probs/probs_sum else: # Weights are all zero, choose uniformly probs = np.ones(n_choices)/n_choices return np.random.choice(n_choices, n_samples, p=probs, replace=not(unique))
#%% Simple array operations __all__ += ['true', 'false', 'defined', 'undefined', 'itrue', 'ifalse', 'idefined', 'iundefined', 'itruei', 'ifalsei', 'idefinedi', 'iundefinedi']
[docs]def true(arr): ''' Returns the indices of the values of the array that are true: just an alias for arr.nonzero()[0]. Args: arr (array): any array **Example**:: inds = cv.true(np.array([1,0,0,1,1,0,1])) # Returns array([0, 3, 4, 6]) ''' return arr.nonzero()[0]
[docs]def false(arr): ''' Returns the indices of the values of the array that are false. Args: arr (array): any array **Example**:: inds = cv.false(np.array([1,0,0,1,1,0,1])) ''' return np.logical_not(arr).nonzero()[0]
[docs]def defined(arr): ''' Returns the indices of the values of the array that are not-nan. Args: arr (array): any array **Example**:: inds = cv.defined(np.array([1,np.nan,0,np.nan,1,0,1])) ''' return (~np.isnan(arr)).nonzero()[0]
[docs]def undefined(arr): ''' Returns the indices of the values of the array that are not-nan. Args: arr (array): any array **Example**:: inds = cv.defined(np.array([1,np.nan,0,np.nan,1,0,1])) ''' return np.isnan(arr).nonzero()[0]
[docs]def itrue(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[true] Args: arr (array): a Boolean array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = cv.itrue(np.array([True,False,True,True]), inds=np.array([5,22,47,93])) ''' return inds[arr]
[docs]def ifalse(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[false] Args: arr (array): a Boolean array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = cv.ifalse(np.array([True,False,True,True]), inds=np.array([5,22,47,93])) ''' return inds[np.logical_not(arr)]
[docs]def idefined(arr, inds): ''' Returns the indices that are defined in the array -- name is short for indices[defined] Args: arr (array): any array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = cv.idefined(np.array([3,np.nan,np.nan,4]), inds=np.array([5,22,47,93])) ''' return inds[~np.isnan(arr)]
[docs]def iundefined(arr, inds): ''' Returns the indices that are undefined in the array -- name is short for indices[undefined] Args: arr (array): any array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = cv.iundefined(np.array([3,np.nan,np.nan,4]), inds=np.array([5,22,47,93])) ''' return inds[np.isnan(arr)]
[docs]def itruei(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[true[indices]] Args: arr (array): a Boolean array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = cv.itruei(np.array([True,False,True,True,False,False,True,False]), inds=np.array([0,1,3,5])) ''' return inds[arr[inds]]
[docs]def ifalsei(arr, inds): ''' Returns the indices that are false in the array -- name is short for indices[false[indices]] Args: arr (array): a Boolean array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = cv.ifalsei(np.array([True,False,True,True,False,False,True,False]), inds=np.array([0,1,3,5])) ''' return inds[np.logical_not(arr[inds])]
[docs]def idefinedi(arr, inds): ''' Returns the indices that are defined in the array -- name is short for indices[defined[indices]] Args: arr (array): any array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = cv.idefinedi(np.array([4,np.nan,0,np.nan,np.nan,4,7,4,np.nan]), inds=np.array([0,1,3,5])) ''' return inds[~np.isnan(arr[inds])]
[docs]def iundefinedi(arr, inds): ''' Returns the indices that are undefined in the array -- name is short for indices[defined[indices]] Args: arr (array): any array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = cv.iundefinedi(np.array([4,np.nan,0,np.nan,np.nan,4,7,4,np.nan]), inds=np.array([0,1,3,5])) ''' return inds[np.isnan(arr[inds])]
def find_cutoff(age_cutoffs, age): ''' Find which age bin each person belongs to -- e.g. with standard age bins 0, 10, 20, etc., ages [5, 12, 4, 58] would be mapped to indices [0, 1, 0, 5]. Age bins are not guaranteed to be uniform width, which is why this can't be done as an array operation. ''' return np.nonzero(age_cutoffs <= age)[0][-1] # Index of the age bin to use __all__ += ['tsprob2ts']
[docs]def tsprob2ts(prob, prob_ts=365, model_ts=7): ''' Converts probability from a given timestep to the model timestep Args: prob (float): probability prob_ts: timestep of probability (in days) prob_ts: timestep of model (in days) Returns: probability in model timestep Examples: monthly_prob = cvu.tsprob2ts(0.012, prob_ts=365, model_ts=30) # converts annual to monthly probability weekly_prob = cvu.tsprob2ts(0.012, prob_ts=365, model_ts=7) # converts annual to weekly prob ''' model_timesteps_per_prob_timesteps = model_ts/prob_ts prob_timestep = 1 - ((1-np.minimum(1, prob))**model_timesteps_per_prob_timesteps) return prob_timestep