'''
Defines classes and methods for calculating immunity
'''
import numpy as np
import sciris as sc
from . import utils as cvu
from . import defaults as cvd
from . import parameters as cvpar
from . import interventions as cvi
# %% Define genotype class -- all other functions are for internal use only
__all__ = ['genotype']
[docs]class genotype(sc.prettyobj):
'''
Add a new genotype to the sim
Args:
genotype (str/dict): name of genotype, or dictionary of parameters specifying information about the genotype
days (int/list): day(s) on which new genotype is introduced
label (str): if genotype is supplied as a dict, the name of the genotype
n_imports (int): the number of imports of the genotype to be added
rescale (bool): whether the number of imports should be rescaled with the population
**Example**::
groupA = cv.genotype('groupA', days=10) # Make genotype groupA active from day 10
groupB = cv.genotype('groupB', days=15) # Make genotype groupB active from day 15
my_geno = cv.genotype(genotype={'rel_beta': 2.5}, label='My genotype', days=20)
sim = cv.Sim(genotype=[groupA, groupB, my_geno]).run() # Add them all to the sim
'''
def __init__(self, genotype, days, label=None, n_imports=1, rescale=True):
self.days = days # Handle inputs
self.n_imports = int(n_imports)
self.rescale = rescale
self.index = None # Index of the genotype in the sim; set later
self.label = None # genotype label (used as a dict key)
self.p = None # This is where the parameters will be stored
self.parse(genotype=genotype, label=label) # genotypes can be defined in different ways: process these here
self.initialized = False
return
[docs] def parse(self, genotype=None, label=None):
''' Unpack genotype information, which may be given as either a string or a dict '''
# Option 1: genotypes can be chosen from a list of pre-defined genotypes
if isinstance(genotype, str):
choices, mapping = cvpar.get_genotype_choices()
known_genotype_pars = cvpar.get_genotype_pars()
label = genotype.lower()
for txt in ['.', ' ', 'genotype']:
label = label.replace(txt, '')
if label in mapping:
label = mapping[label]
genotype_pars = known_genotype_pars[label]
else:
errormsg = f'The selected genotype "{genotype}" is not implemented; choices are:\n{sc.pp(choices, doprint=False)}'
raise NotImplementedError(errormsg)
# Option 2: genotypes can be specified as a dict of pars
elif isinstance(genotype, dict):
default_genotype_pars = cvpar.get_genotype_pars(default=True)
default_keys = list(default_genotype_pars.keys())
# Parse label
genotype_pars = genotype
label = genotype_pars.pop('label', label) # Allow including the label in the parameters
if label is None:
label = 'custom'
# Check that valid keys have been supplied...
invalid = []
for key in genotype_pars.keys():
if key not in default_keys:
invalid.append(key)
if len(invalid):
errormsg = f'Could not parse genotype keys "{sc.strjoin(invalid)}"; valid keys are: "{sc.strjoin(cvd.genotype_pars)}"'
raise sc.KeyNotFoundError(errormsg)
# ...and populate any that are missing
for key in default_keys:
if key not in genotype_pars:
genotype_pars[key] = default_genotype_pars[key]
else:
errormsg = f'Could not understand {type(genotype)}, please specify as a dict or a predefined genotype:\n{sc.pp(choices, doprint=False)}'
raise ValueError(errormsg)
# Set label and parameters
self.label = label
self.p = sc.objdict(genotype_pars)
return
[docs] def initialize(self, sim):
''' Update genotype info in sim '''
self.days = cvi.process_days(sim, self.days) # Convert days into correct format
sim['genotype_pars'][self.label] = self.p # Store the parameters
self.index = list(sim['genotype_pars'].keys()).index(self.label) # Find where we are in the list
sim['genotype_map'][self.index] = self.label # Use that to populate the reverse mapping
self.initialized = True
return
[docs] def apply(self, sim):
''' Introduce new infections with this genotype '''
for _ in cvi.find_day(self.days, sim.t, interv=self, sim=sim): # Time to introduce genotype
susceptible_inds = cvu.true(sim.people.susceptible)
rescale_factor = sim.rescale_vec[sim.timestep] if self.rescale else 1.0
n_imports = sc.randround(self.n_imports/rescale_factor) # Round stochastically to the nearest number of imports
importation_inds = np.random.choice(susceptible_inds, n_imports)
sim.people.infect(inds=importation_inds, layer='importation', genotype=self.index)
return
#%% Neutralizing antibody methods
def update_peak_imm(people, inds, imm_pars, imm_source):
'''
Update peak immunity level
This function updates the peak immunity level for individuals when an immunity event occurs.
Immunity can come from maternal antibody transfer, a natural infection or vaccination.
Args:
people: A people object
inds: Array of people indices
imm_pars: Parameters from which to draw values for quantities like ['imm_init']
imm_source: index of genotype where immunity is coming from
Returns: None
'''
people.peak_imm[imm_source, inds] = imm_pars
people.t_imm_event[imm_source, inds] = people.t
return
# %% Immunity methods
def init_immunity(sim, create=False):
''' Initialize immunity matrices with all genotypes that will eventually be in the sim'''
# Pull out all of the circulating genotypes for cross-immunity
nv = sim['n_genotypes']
# If immunity values have been provided, process them
if sim['immunity'] is None or create:
# Firstly, initialize immunity matrix with defaults. These are then overwitten with genotype-specific values below
# Susceptibility matrix is of size sim['n_genotypes']*sim['n_genotypes']
immunity = np.ones((nv, nv), dtype=cvd.default_float) # Fill with defaults
# Next, overwrite these defaults with any known immunity values about specific genotypes
default_cross_immunity = cvpar.get_cross_immunity()
for i in range(nv):
label_i = sim['genotype_map'][i]
for j in range(nv):
label_j = sim['genotype_map'][j]
if label_i in default_cross_immunity and label_j in default_cross_immunity:
immunity[j][i] = default_cross_immunity[label_j][label_i]
sim['immunity'] = immunity
# Next, precompute the immunity kinetics and store these for access during the sim
sim['imm_kin'] = dict()
sim['imm_kin']['mab'] = precompute_waning(length=sim.npts, pars=sim['imm_decay']['mab'])
sim['imm_kin']['ab'] = precompute_waning(length=sim.npts, pars=sim['imm_decay']['ab'])
return
def check_immunity(people, genotype):
'''
Calculate people's immunity from maternal antibodies, prior infections + vaccination.
There are three fundamental sources of immunity all of which are genotype-specific:
(1) maternal antibody transfer
(2) prior exposure: degree of protection depends on genotype, prior symptoms, and time since recovery
(3) vaccination: degree of protection depends on genotype, vaccine, and time since vaccination
'''
# Handle parameters and indices
pars = people.pars
imm_kin = pars['imm_kin']
mab_inds = cvu.true(people.weekage<(cvd.wpy/2) * (people.n_infections == 0))
t_since_imm = people.t - people.t_imm_event[genotype,:].astype(cvd.default_int)
current_imm = people.peak_imm[genotype,:]*imm_kin['ab'][t_since_imm]
current_imm[mab_inds] = people.peak_imm[genotype, mab_inds]*imm_kin['mab'][t_since_imm[mab_inds]]
people.sus_imm[genotype,:] = current_imm
return
#%% Methods for computing waning
def precompute_waning(length, pars=None):
'''
Process functional form and parameters into values:
- 'growth_decay' : linear growth followed by exponential decay
- 'exp_decay' : exponential decay. Parameters should be init_val and half_life (half_life can be None/nan)
- 'linear_decay': linear decay
A custom function can also be supplied.
Args:
length (float): length of array to return, i.e., for how long waning is calculated
pars (dict): passed to individual immunity functions
Returns:
array of length 'length' of values
'''
pars = sc.dcp(pars)
form = pars.pop('form')
choices = [
'growth_decay', # Default if no form is provided
'exp_decay',
]
# Process inputs
if form is None or form == 'growth_decay':
output = growth_decay(length, **pars)
elif form == 'exp_decay':
if pars['half_life'] is None:
pars['half_life'] = np.nan
output = exp_decay(length, **pars)
elif callable(form):
output = form(length, **pars)
else:
errormsg = f'The selected functional form "{form}" is not implemented; choices are: {sc.strjoin(choices)}'
raise NotImplementedError(errormsg)
return output
def growth_decay(length, growth_time, decay_rate):
'''
Returns an array of length 'length' containing the evaluated growth/decay
function at each point.
Uses linear growth + exponential decay.
Args:
length (int): number of points
growth_time (int): length of time immunity grows (used to determine slope)
decay_rate (float): rate of exponential decay
'''
def f1(t, growth_time):
'''Simple linear growth'''
return (1 / growth_time) * t
def f2(t, decay_rate):
decayRate = np.full(len(t), fill_value=decay_rate)
titre = np.zeros(len(t))
for i in range(1, len(t)):
titre[i] = titre[i-1]+decayRate[i]
return np.exp(-titre)
length = length + 1
t1 = np.arange(growth_time, dtype=cvd.default_int)
t2 = np.arange(length - growth_time, dtype=cvd.default_int)
y1 = f1(t1, growth_time)
y2 = f2(t2, decay_rate)
y = np.concatenate([y1,y2])
y = y[0:length]
return y
def exp_decay(length, init_val, half_life, delay=None):
'''
Returns an array of length t with values for the immunity at each time step after recovery
'''
length = length+1
decay_rate = np.log(2) / half_life if ~np.isnan(half_life) else 0.
if delay is not None:
t = np.arange(length-delay, dtype=cvd.default_int)
growth = linear_growth(delay, init_val/delay)
decay = init_val * np.exp(-decay_rate * t)
result = np.concatenate([growth, decay], axis=None)
else:
t = np.arange(length, dtype=cvd.default_int)
result = init_val * np.exp(-decay_rate * t)
return result
def linear_decay(length, init_val, slope):
''' Calculate linear decay '''
result = -slope*np.ones(length)
result[0] = init_val
return result
def linear_growth(length, slope):
''' Calculate linear growth '''
return slope*np.ones(length)