Source code for rsvsim.immunity

Defines classes and methods for calculating immunity

import numpy as np
import sciris as sc
from . import utils as cvu
from . import defaults as cvd
from . import parameters as cvpar
from . import interventions as cvi

# %% Define genotype class -- all other functions are for internal use only

__all__ = ['genotype']

[docs]class genotype(sc.prettyobj): ''' Add a new genotype to the sim Args: genotype (str/dict): name of genotype, or dictionary of parameters specifying information about the genotype days (int/list): day(s) on which new genotype is introduced label (str): if genotype is supplied as a dict, the name of the genotype n_imports (int): the number of imports of the genotype to be added rescale (bool): whether the number of imports should be rescaled with the population **Example**:: groupA = cv.genotype('groupA', days=10) # Make genotype groupA active from day 10 groupB = cv.genotype('groupB', days=15) # Make genotype groupB active from day 15 my_geno = cv.genotype(genotype={'rel_beta': 2.5}, label='My genotype', days=20) sim = cv.Sim(genotype=[groupA, groupB, my_geno]).run() # Add them all to the sim ''' def __init__(self, genotype, days, label=None, n_imports=1, rescale=True): self.days = days # Handle inputs self.n_imports = int(n_imports) self.rescale = rescale self.index = None # Index of the genotype in the sim; set later self.label = None # genotype label (used as a dict key) self.p = None # This is where the parameters will be stored self.parse(genotype=genotype, label=label) # genotypes can be defined in different ways: process these here self.initialized = False return
[docs] def parse(self, genotype=None, label=None): ''' Unpack genotype information, which may be given as either a string or a dict ''' # Option 1: genotypes can be chosen from a list of pre-defined genotypes if isinstance(genotype, str): choices, mapping = cvpar.get_genotype_choices() known_genotype_pars = cvpar.get_genotype_pars() label = genotype.lower() for txt in ['.', ' ', 'genotype']: label = label.replace(txt, '') if label in mapping: label = mapping[label] genotype_pars = known_genotype_pars[label] else: errormsg = f'The selected genotype "{genotype}" is not implemented; choices are:\n{sc.pp(choices, doprint=False)}' raise NotImplementedError(errormsg) # Option 2: genotypes can be specified as a dict of pars elif isinstance(genotype, dict): default_genotype_pars = cvpar.get_genotype_pars(default=True) default_keys = list(default_genotype_pars.keys()) # Parse label genotype_pars = genotype label = genotype_pars.pop('label', label) # Allow including the label in the parameters if label is None: label = 'custom' # Check that valid keys have been supplied... invalid = [] for key in genotype_pars.keys(): if key not in default_keys: invalid.append(key) if len(invalid): errormsg = f'Could not parse genotype keys "{sc.strjoin(invalid)}"; valid keys are: "{sc.strjoin(cvd.genotype_pars)}"' raise sc.KeyNotFoundError(errormsg) # ...and populate any that are missing for key in default_keys: if key not in genotype_pars: genotype_pars[key] = default_genotype_pars[key] else: errormsg = f'Could not understand {type(genotype)}, please specify as a dict or a predefined genotype:\n{sc.pp(choices, doprint=False)}' raise ValueError(errormsg) # Set label and parameters self.label = label self.p = sc.objdict(genotype_pars) return
[docs] def initialize(self, sim): ''' Update genotype info in sim ''' self.days = cvi.process_days(sim, self.days) # Convert days into correct format sim['genotype_pars'][self.label] = self.p # Store the parameters self.index = list(sim['genotype_pars'].keys()).index(self.label) # Find where we are in the list sim['genotype_map'][self.index] = self.label # Use that to populate the reverse mapping self.initialized = True return
[docs] def apply(self, sim): ''' Introduce new infections with this genotype ''' for _ in cvi.find_day(self.days, sim.t, interv=self, sim=sim): # Time to introduce genotype susceptible_inds = cvu.true(sim.people.susceptible) rescale_factor = sim.rescale_vec[sim.timestep] if self.rescale else 1.0 n_imports = sc.randround(self.n_imports/rescale_factor) # Round stochastically to the nearest number of imports importation_inds = np.random.choice(susceptible_inds, n_imports) sim.people.infect(inds=importation_inds, layer='importation', genotype=self.index) return
#%% Neutralizing antibody methods def update_peak_imm(people, inds, imm_pars, imm_source): ''' Update peak immunity level This function updates the peak immunity level for individuals when an immunity event occurs. Immunity can come from maternal antibody transfer, a natural infection or vaccination. Args: people: A people object inds: Array of people indices imm_pars: Parameters from which to draw values for quantities like ['imm_init'] imm_source: index of genotype where immunity is coming from Returns: None ''' people.peak_imm[imm_source, inds] = imm_pars people.t_imm_event[imm_source, inds] = people.t return # %% Immunity methods def init_immunity(sim, create=False): ''' Initialize immunity matrices with all genotypes that will eventually be in the sim''' # Pull out all of the circulating genotypes for cross-immunity nv = sim['n_genotypes'] # If immunity values have been provided, process them if sim['immunity'] is None or create: # Firstly, initialize immunity matrix with defaults. These are then overwitten with genotype-specific values below # Susceptibility matrix is of size sim['n_genotypes']*sim['n_genotypes'] immunity = np.ones((nv, nv), dtype=cvd.default_float) # Fill with defaults # Next, overwrite these defaults with any known immunity values about specific genotypes default_cross_immunity = cvpar.get_cross_immunity() for i in range(nv): label_i = sim['genotype_map'][i] for j in range(nv): label_j = sim['genotype_map'][j] if label_i in default_cross_immunity and label_j in default_cross_immunity: immunity[j][i] = default_cross_immunity[label_j][label_i] sim['immunity'] = immunity # Next, precompute the immunity kinetics and store these for access during the sim sim['imm_kin'] = dict() sim['imm_kin']['mab'] = precompute_waning(length=sim.npts, pars=sim['imm_decay']['mab']) sim['imm_kin']['ab'] = precompute_waning(length=sim.npts, pars=sim['imm_decay']['ab']) return def check_immunity(people, genotype): ''' Calculate people's immunity from maternal antibodies, prior infections + vaccination. There are three fundamental sources of immunity all of which are genotype-specific: (1) maternal antibody transfer (2) prior exposure: degree of protection depends on genotype, prior symptoms, and time since recovery (3) vaccination: degree of protection depends on genotype, vaccine, and time since vaccination ''' # Handle parameters and indices pars = imm_kin = pars['imm_kin'] mab_inds = cvu.true(people.weekage<(cvd.wpy/2) * (people.n_infections == 0)) t_since_imm = people.t - people.t_imm_event[genotype,:].astype(cvd.default_int) current_imm = people.peak_imm[genotype,:]*imm_kin['ab'][t_since_imm] current_imm[mab_inds] = people.peak_imm[genotype, mab_inds]*imm_kin['mab'][t_since_imm[mab_inds]] people.sus_imm[genotype,:] = current_imm return #%% Methods for computing waning def precompute_waning(length, pars=None): ''' Process functional form and parameters into values: - 'growth_decay' : linear growth followed by exponential decay - 'exp_decay' : exponential decay. Parameters should be init_val and half_life (half_life can be None/nan) - 'linear_decay': linear decay A custom function can also be supplied. Args: length (float): length of array to return, i.e., for how long waning is calculated pars (dict): passed to individual immunity functions Returns: array of length 'length' of values ''' pars = sc.dcp(pars) form = pars.pop('form') choices = [ 'growth_decay', # Default if no form is provided 'exp_decay', ] # Process inputs if form is None or form == 'growth_decay': output = growth_decay(length, **pars) elif form == 'exp_decay': if pars['half_life'] is None: pars['half_life'] = np.nan output = exp_decay(length, **pars) elif callable(form): output = form(length, **pars) else: errormsg = f'The selected functional form "{form}" is not implemented; choices are: {sc.strjoin(choices)}' raise NotImplementedError(errormsg) return output def growth_decay(length, growth_time, decay_rate): ''' Returns an array of length 'length' containing the evaluated growth/decay function at each point. Uses linear growth + exponential decay. Args: length (int): number of points growth_time (int): length of time immunity grows (used to determine slope) decay_rate (float): rate of exponential decay ''' def f1(t, growth_time): '''Simple linear growth''' return (1 / growth_time) * t def f2(t, decay_rate): decayRate = np.full(len(t), fill_value=decay_rate) titre = np.zeros(len(t)) for i in range(1, len(t)): titre[i] = titre[i-1]+decayRate[i] return np.exp(-titre) length = length + 1 t1 = np.arange(growth_time, dtype=cvd.default_int) t2 = np.arange(length - growth_time, dtype=cvd.default_int) y1 = f1(t1, growth_time) y2 = f2(t2, decay_rate) y = np.concatenate([y1,y2]) y = y[0:length] return y def exp_decay(length, init_val, half_life, delay=None): ''' Returns an array of length t with values for the immunity at each time step after recovery ''' length = length+1 decay_rate = np.log(2) / half_life if ~np.isnan(half_life) else 0. if delay is not None: t = np.arange(length-delay, dtype=cvd.default_int) growth = linear_growth(delay, init_val/delay) decay = init_val * np.exp(-decay_rate * t) result = np.concatenate([growth, decay], axis=None) else: t = np.arange(length, dtype=cvd.default_int) result = init_val * np.exp(-decay_rate * t) return result def linear_decay(length, init_val, slope): ''' Calculate linear decay ''' result = -slope*np.ones(length) result[0] = init_val return result def linear_growth(length, slope): ''' Calculate linear growth ''' return slope*np.ones(length)