'''
File for storing utilities and probability calculators needed to run FP model
'''
import numpy as np
import sciris as sc
import numba as nb
from . import defaults as fpd
from . import version as fpv
# Specify all externally visible things this file defines
__all__ = ['set_seed', 'bt', 'bc', 'rbt', 'mt', 'sample']
[docs]
def set_seed(seed=None):
''' Reset the random seed -- complicated because of Numba '''
@nb.njit
def set_seed_numba(seed):
return np.random.seed(seed)
def set_seed_regular(seed):
return np.random.seed(seed)
if seed is not None:
set_seed_numba(seed)
set_seed_regular(seed)
return
[docs]
@nb.njit((nb.float64,), cache=True) # These types can also be declared as a dict, but performance is much slower...?
def bt(prob):
''' A simple Bernoulli (binomial) trial '''
return np.random.random() < prob # Or rnd.random() < prob, np.random.binomial(1, prob), which seems slower
[docs]
@nb.njit((nb.float64, nb.int64), cache=True)
def bc(prob, repeats):
''' A binomial count '''
return np.random.binomial(repeats, prob) # Or (np.random.rand(repeats) < prob).sum()
[docs]
@nb.njit((nb.float64, nb.int64), cache=True)
def rbt(prob, repeats):
''' A repeated Bernoulli (binomial) trial '''
return np.random.binomial(repeats, prob) > 0 # Or (np.random.rand(repeats) < prob).any()
[docs]
@nb.njit((nb.float64[:],), cache=True)
def mt(probs):
''' A multinomial trial '''
return np.searchsorted(np.cumsum(probs), np.random.random())
@nb.njit((nb.float64[:], nb.int64), cache=True)
def n_multinomial(probs, n):
'''
An array of multinomial trials.
Equivalent to, but faster than, `np.random.choice(len(probs), size=n, p=probs)`
Args:
probs (array): probability of each outcome, which usually should sum to 1
n (int): number of trials
Returns:
Array of integer outcomes
**Example**::
outcomes = fp.n_multinomial(np.ones(6)/6.0, 50)+1 # Return 50 die-rolls
'''
return np.searchsorted(np.cumsum(probs), np.random.random(n))
def n_binomial(prob, n):
'''
Perform multiple binomial (Bernolli) trials
Args:
prob (float): probability of each trial succeeding
n (int): number of trials (size of array)
Returns:
Boolean array of which trials succeeded
**Example**::
outcomes = cv.n_binomial(0.5, 100) # Perform 100 coin-flips
'''
return np.random.random(n) < prob
def binomial_arr(prob_arr): # No speed gain from Numba
'''
Binomial (Bernoulli) trials each with different probabilities.
Args:
prob_arr (array): array of probabilities
Returns:
Boolean array of which trials on the input array succeeded
**Example**::
outcomes = cv.binomial_arr([0.1, 0.1, 0.2, 0.2, 0.8, 0.8]) # Perform 6 trials with different probabilities
'''
return np.random.random(len(prob_arr)) < prob_arr
def annprob2ts(prob_annual, timestep=1):
''' Convert an annual probability into a timestep probability '''
prob_timestep = 1 - ((1-np.minimum(1,prob_annual))**(timestep/fpd.mpy))
return prob_timestep
@nb.njit((nb.float64[:], nb.float64, nb.float64), cache=True)
def numba_miscarriage_prob(miscarriage_rates, age, resolution):
'''Run interpolation eval to check for probability of miscarriage here'''
miscarriage_prob = miscarriage_rates[int(round(age*resolution))]
return miscarriage_prob
def set_metadata(obj):
''' Set standard metadata for an object '''
obj.created = sc.now()
obj.version = fpv.__version__
obj.git_info = sc.gitinfo(verbose=False)
return
[docs]
def sample(dist='uniform', par1=0, par2=1, size=1, **kwargs):
'''
Draw a sample from the distribution specified by the input. The available
distributions are:
- 'uniform' : uniform distribution from low=par1 to high=par2; mean is equal to (par1+par2)/2
- 'normal' : normal distribution with mean=par1 and std=par2
- 'lognormal' : lognormal distribution with mean=par1 and std=par2 (parameters are for the lognormal distribution, *not* the underlying normal distribution)
- 'normal_pos' : right-sided normal distribution (i.e. only positive values), with mean=par1 and std=par2 *of the underlying normal distribution*
- 'normal_int' : normal distribution with mean=par1 and std=par2, returns only integer values
- 'lognormal_int' : lognormal distribution with mean=par1 and std=par2, returns only integer values
- 'poisson' : Poisson distribution with rate=par1 (par2 is not used); mean and variance are equal to par1
- 'neg_binomial' : negative binomial distribution with mean=par1 and k=par2; converges to Poisson with k=∞
Args:
dist (str): the distribution to sample from
par1 (float): the "main" distribution parameter (e.g. mean)
par2 (float): the "secondary" distribution parameter (e.g. std)
size (int): the number of samples (default=1)
kwargs (dict): passed to individual sampling functions
Returns:
A length N array of samples
**Examples**::
fp.sample() # returns Unif(0,1)
fp.sample(dist='normal', par1=3, par2=0.5) # returns Normal(μ=3, σ=0.5)
fp.sample(dist='lognormal_int', par1=5, par2=3) # returns a lognormally distributed set of values with mean 5 and std 3
Notes:
Lognormal distributions are parameterized with reference to the underlying normal distribution (see:
https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.random.lognormal.html), but this
function assumes the user wants to specify the mean and std of the lognormal distribution.
Negative binomial distributions are parameterized with reference to the mean and dispersion parameter k
(see: https://en.wikipedia.org/wiki/Negative_binomial_distribution). The r parameter of the underlying
distribution is then calculated from the desired mean and k. For a small mean (~1), a dispersion parameter
of ∞ corresponds to the variance and standard deviation being equal to the mean (i.e., Poisson). For a
large mean (e.g. >100), a dispersion parameter of 1 corresponds to the standard deviation being equal to
the mean.
'''
# Some of these have aliases, but these are the "official" names
choices = [
'uniform',
'normal',
'normal_pos',
'normal_int',
'lognormal',
'lognormal_int',
]
# Ensure it's an integer
if size is not None:
size = int(size)
# Compute distribution parameters and draw samples
# NB, if adding a new distribution, also add to choices above
if dist in ['unif', 'uniform']: samples = np.random.uniform(low=par1, high=par2, size=size, **kwargs)
elif dist in ['norm', 'normal']: samples = np.random.normal(loc=par1, scale=par2, size=size, **kwargs)
elif dist == 'normal_pos': samples = np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs))
elif dist == 'normal_int': samples = np.round(np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs)))
elif dist in ['lognorm', 'lognormal', 'lognorm_int', 'lognormal_int']:
if par1>0:
mean = np.log(par1**2 / np.sqrt(par2**2 + par1**2)) # Computes the mean of the underlying normal distribution
sigma = np.sqrt(np.log(par2**2/par1**2 + 1)) # Computes sigma for the underlying normal distribution
samples = np.random.lognormal(mean=mean, sigma=sigma, size=size, **kwargs)
else:
samples = np.zeros(size)
if '_int' in dist:
samples = np.round(samples)
else:
errormsg = f'The selected distribution "{dist}" is not implemented; choices are: {sc.newlinejoin(choices)}'
raise NotImplementedError(errormsg)
return samples
def piecewise_linear(x, x0, y0, m1, m2):
'''
Compute a two-part piecewise linear function at given x values.
This function calculates the values of a piecewise linear function defined
by two slopes (k1 and k2), a point of intersection (x0, y0), and an array
of x values. The function returns an array of corresponding y values based
on the piecewise linear function.
Args:
x (array-like)
The array of x values at which the piecewise linear function is evaluated
x0 (float)
The x-coordinate of the point of intersection between the two linear segments (inflection point)
y0 (float)
The y-coordinate of the point of intersection between the two linear segments
m1 (float)
The slope of the first linear segment (for x < x0).
m2 (float)
The slope of the second linear segment (for x >= x0).
Returns:
y : ndarray
An array of y values corresponding to the piecewise linear function
evaluated at the input x values.
**Examples**::
>>> x_values = np.array([1, 2, 3, 4, 5])
>>> y_values = piecewise_linear(x_values, 3, 2, 1, -1)
'''
return np.piecewise(x, [x < x0], [lambda x:m1*x + y0-m1*x0, lambda x:m2*x + y0-m2*x0])
def logistic_5p(x, a, b, c, d, e):
'''
A logistic function with 5 parameters (5p) that enables asymmetry
Args:
x (array-like)
The array of x values at which the function is evaluated
a (float):
Minimum value (baseline) as x -> -infinity.
d (float):
Maximum value (saturation) as x -> infinity.
b (float):
Slope parameter
c (float):
Value of x at which the function reaches the midpoint between a and d.
e (float):
Exponent parameter controlling asymmetry.
- If e = 1, the curve is symmetric.
- If e > 1, the curve asymptotes toward "a" more quickly than it asymptotes toward "d."
- If e < 1, the curve asymptotes toward "d" more quickly than it asymptotes toward "a."
Returns:
y : (ndarray)
An array of y values corresponding to the piecewise linear function
evaluated at the input x values.
'''
return d + ((a - d)/(1.0 + np.exp(b*(x-c)))**e)