Source code for poliosim.model

'''
Defines the full Poliosim model, which consists of the states and functions for
updating states of each person (People class), and the dynamics of the simulation
(Sim class).
'''

#%% Imports
import numpy as np
import numba as nb
import scipy.stats as stats
import sciris as sc
from . import utils as psu
from . import base as psb
from . import plotting as psplt
from . import parameters as pspar


__all__ = ['States', 'People', 'Sim', 'initialize_immunity', 'prob_infection']


#%% Define all properties of people

[docs]class States(sc.prettyobj): ''' For storing all the keys relating to a person and people ''' # Set the properties of a person person = [ 'uid', # Int 'age', # Float 'sex', # Float 'symp_prob', # Float 'viral_shed', # Float 'household_id', # Int 'strain_type', # string, initialize ?? 't_infection', # Int days, time since last infection, initialize 0 'current_immunity', # Float, initialize 1.0 'prechallenge_immunity', # Float, initialize 1.0 'postchallenge_peak_immunity', # Float, initialize None 'shed_duration', # int (days?), initialize 0 'log10_peak_cid50', # Float, initialize ?? 'exposure_count', # Int, count the number of exposures 'sus_para_exp_count', # Int, count the number of exposures that may result in paralysis 'ri_dose_count', # int, initialize 0 'ri_take_count', # int, initialize 0 'sia_dose_count', # int, initialize 0 'sia_take_count', # int, initialize 0 'rel_trans', # Float ] # Set the states that a person can be in: these are all booleans per person -- used in people.py states = [ 'naive', 'exposed', 'symptomatic', 'susceptible_to_paralysis', # Int 0/1 (boolish), initialize true 'IPV_naive', # Int 0/1 (boolish), initialize false 'is_shed', # Int 0/1 (boolish), initialize false 'tested', 'diagnosed', 'recovered', 'known_contact', 'quarantined', ] # Set the dates various events took place: these are floats per person -- used in people.py dates = [f'date_{state}' for state in states] # Convert each state into a date dates += [ 'date_pos_test', # Store the date when a person tested which will come back positive 'date_end_quarantine', # Store the date when a person comes out of quarantine 'date_first_exposed', # Since can have multiple exposures, and date_exposed is the last one 'date_end_isolation' # The date a person comes out of isolation following a diagnosis. Isolation is not the same as quarantine. ] # Duration of different states: these are floats per person -- used in people.py durs = [ 'dur_exp2inf', 'dur_inf2sym', 'dur_disease', ] all_states = person + states + dates + durs
#%% Defaults # A subset of the above states are used for results result_stocks = { 'naive': 'Number immunonaive', 'exposed': 'Number exposed', 'is_shed': 'Number shedding', 'symptomatic': 'Number symptomatic', 'diagnosed': 'Number of confirmed cases', 'quarantined': 'Number in quarantine', 'recovered': 'Number recovered', } # The types of result that are counted as flows -- used in sim.py; value is the label suffix result_flows = {'infections': 'infections', 'sus_para_infs': 'susceptible-to-paralysis infections', 'tests': 'tests', 'diagnoses': 'diagnoses', 'recoveries': 'recoveries', 'symptomatic': 'symptomatic cases', 'quarantined': 'quarantined people', 'symp_triggered_surveilled': 'surveilled based on symptomatic trigger' } # Define these here as well new_result_flows = [f'new_{key}' for key in result_flows.keys()] cum_result_flows = [f'cum_{key}' for key in result_flows.keys()] #%% Define the Poliosim-specific People object
[docs]class People(psb.BasePeople): ''' A class to perform all the operations on the people. This class is usually not invoked directly, but instead is created automatically by the sim. Most initialization happens in BasePeople. Args: pars (dict): the sim parameters, e.g. sim.pars -- must have pop_size and n_days keys strict (bool): whether or not to only create keys that are already in self.meta.person; otherwise, let any key be set kwargs (dict): the actual data, e.g. from a popdict, being specified ''' def __init__(self, pars=None, strict=True, **kwargs): # Handle pars and population size super().__init__() pars = sc.mergedicts({'pop_size':0, 'n_days':0}, pars) self.pars = pars # Equivalent to self.set_pars(pars) pop_size = pars['pop_size'] # Other initialization self.t = 0 # Keep current simulation time self._lock = False # Prevent further modification of keys self.meta = States() # Store list of keys and dtypes self.contacts = None self.init_contacts() # Initialize the contacts self.infection_log = [] # Record of infections - keys for ['source','target','date','layer'] self.trace_log = [] # Record of traces - keys ['source','target','date','layer'] self.exposure_log = [[] for i in range(pop_size)] # Record each infection date self.sus_para_exp_log = [[] for i in range(pop_size)] # Record each susceptible-to-paralysis infection date # Set person properties init_states = dir(self) # Initial states, before we assign everything for key in self.meta.person: if key in ['uid', 'household_id']: self[key] = np.arange(pop_size, dtype=psu.default_int) elif key in ['t_infection', 'shed_duration', 'exposure_count', 'strain_type', 'ri_dose_count', 'ri_take_count', 'sia_dose_count', 'sia_take_count']: self[key] = np.full(pop_size, 0, dtype=psu.default_int) elif key in ['current_immunity', 'prechallenge_immunity']: self[key] = np.full(pop_size, 1.0, dtype=psu.default_float) elif key in ['postchallenge_peak_immunity']: self[key] = np.full(pop_size, None, dtype=psu.default_float) elif key in ['log10_peak_cid50']: self[key] = np.full(pop_size, 0.0, dtype=psu.default_float) else: self[key] = np.full(pop_size, np.nan, dtype=psu.default_float) # Set health states -- only naive is true by default -- booleans for key in self.meta.states: if key in ['naive', 'susceptible_to_paralysis']: self[key] = np.full(pop_size, 1, dtype=bool) else: self[key] = np.full(pop_size, False, dtype=bool) # Set dates and durations -- both floats for key in self.meta.dates + self.meta.durs: self[key] = np.full(pop_size, np.nan, dtype=psu.default_float) # Final states, after we assign everything final_states = dir(self) self._keys = [s for s in final_states if s not in init_states] # Store the dtypes used in a flat dict self._dtypes = {key:self[key].dtype for key in self.keys()} # Assign all to float by default self._lock = True # Stop further keys from being set (does not affect attributes) # Store flows to be computed during simulation self.flows = {key:0 for key in new_result_flows} # Although we have called init(), we still need to call initialize() self.initialized = False # Handle contacts, if supplied (note: they usually are) if 'contacts' in kwargs: self.add_contacts(kwargs.pop('contacts')) # Handle all other values, e.g. age -- NB, not assigned to _keys by default for key,value in kwargs.items(): if strict: self.set(key, value) else: self[key] = value return
[docs] def initialize(self, seed_infections=None): ''' Perform initializations ''' if seed_infections is None: seed_infections = True self.set_prognoses() self.validate() # The seed infections need to have a non-zero 'time since infection' # when we hit the zero-th time step. So we'll use -1. self.t = -1 if seed_infections: self.seed_infections() self.t = 0 self.initialized = True return
[docs] def seed_infections(self, target_inds=None): ''' Create the seed infections ''' # Create the seed infections if target_inds is None: target_inds = psu.choose(self.pars['pop_size'], self.pars['pop_infected']) self.infect(inds=target_inds, layer='seed_infection') return
[docs] def set_prognoses(self): ''' Set the prognoses for each person based on age during initialization. Need to reset the seed because viral loads are drawn stochastically. ''' pars = self.pars # Shorten def find_cutoff(age_cutoffs, age): ''' Find which age bin each person belongs to -- e.g. with standard age bins 0, 10, 20, etc., ages [5, 12, 4, 58] would be mapped to indices [0, 1, 0, 5]. Age bins are not guaranteed to be uniform width, which is why this can't be done as an array operation. ''' return np.nonzero(age_cutoffs <= age)[0][-1] # Index of the age bin to use psu.set_seed(pars['rand_seed']) progs = pars['prognoses'] # Shorten the name inds = np.fromiter((find_cutoff(progs['age_cutoffs'], this_age) for this_age in self.age), dtype=psu.default_int, count=len(self)) # Convert ages to indices self.symp_prob[:] = progs['symp_probs'][inds] # Probability of developing symptoms return
[docs] def update_states_pre(self, t): ''' Perform all state updates at the current timestep ''' # Initialize self.t = t self.flows = {key:0 for key in new_result_flows} # Perform updates self.update_polio() self.flows['new_symptomatic'] += self.check_symptomatic() return
[docs] def update_states_post(self): ''' Perform post-timestep updates ''' self.flows['new_diagnoses'] += self.check_diagnosed() self.flows['new_quarantined'] += self.check_quar() return
[docs] def get_sabin_betas(self): return self.pars['sabin_scale_parameters'][self.strain_type]
[docs] def attempt_canonical_infection(self, attempt_date, dose=10**6, beta=1.0): ''' Determine whether candidate targets will be infected based on their current immunity state, and if so, infect them. Uses unmodified betas and infection probabilities from the reference model. ''' # Cause compute_infections() to use unmodified dose calculations. beta = psu.default_float(beta) beta_layer = psu.default_float(1.0) current_immunities = self.current_immunity sabin_betas = self.get_sabin_betas() # dummy sources sources = np.arange(len(self), dtype=psu.default_int) # Remapped targets indexes into candidate_target_inds # We are doing this to get all the inputs to compute_infections() lined # up correctly. remapped_targets = np.arange(len(self), dtype=psu.default_int) source_rel_trans = np.full(len(self), dose, dtype=np.float32) layer_betas = np.full(len(self), 1.0, dtype=np.float32) is_shed = self.is_shed _, infected_mapped_inds = compute_infections(beta, beta_layer, sources, remapped_targets, layer_betas, source_rel_trans, current_immunities, sabin_betas, is_shed) infected = self.filter(inds=infected_mapped_inds) if len(infected): people = self.unfilter() people.polio_infect(inds=infected.inds) infected.date_is_shed = attempt_date return infected.inds
[docs] def reset_polio(self): ''' This isn't used internally, but is used by user scripts ''' self.naive = True self.susceptible_to_paralysis = True # titers <8 are considered to be susceptible to paralysis self.IPV_naive = False # received an IPV dose, but does not shed self.is_shed = False self.strain_type = -1 # Placeholder for an invalid integer value self.t_infection = 0 # days, time since last infection self.current_immunity = 1 # Naive self.prechallenge_immunity = 1 self.postchallenge_peak_immunity = np.nan self.shed_duration = 0 self.rel_trans = 0.0 return
[docs] def update_polio(self, t_days=None, inds=None): ''' Update each polio agent ''' if t_days is None: t_days = self.t # Update immunity and shedding naive, not_naive = self.filter_tf(self.naive) naive.t_infection = 0 # TODO: We should be able to use 0 in the maximum() call below, and # then check the assertion below. Need to look into this. not_naive.t_infection = np.maximum(1, t_days - not_naive.date_is_shed) #assert not np.any(self.t_infection[inds[not_naive_inds]] == 0), \ # "t_infection needs to be at least 1; we've screwed up a calculation somewhere" # TODO: is this calculate_current_immunity() redundant with doing it elsewhere in the state processing? people = self.unfilter() people.calculate_current_immunity(not_naive.inds) self.calculate_viral_shed() expired_infection = not_naive.filter(not_naive.t_infection >= not_naive.shed_duration) if len(expired_infection): new_recovered = expired_infection.filter(~expired_infection.recovered * expired_infection.is_shed) expired_infection.check_recovered() self.flows['new_recoveries'] += len(new_recovered) # We are actually finding recoveries here if self.pars['verbose'] > 1: print(f' POLIO DEBUG: # infected = {sum(self.exposed)}; viral_shed = {np.mean(self.viral_shed):0.2f} +/- {np.std(self.viral_shed):0.2f}') return
#%% Methods for updating state
[docs] def check_symptomatic(self): ''' Check for new progressions to symptomatic ''' symp = self.filter(~self.symptomatic * (self.t >= self.date_symptomatic) * self.exposed) # symp = self.filter_date(self.date_symptomatic) symp.symptomatic = True return len(symp)
[docs] def check_recovered(self): ''' Check for and/or do recovery = not infectious any more ''' self.exposed = False self.is_shed = False self.recovered = True return len(self)
[docs] def check_diagnosed(self): ''' Check for new diagnoses. Since most data are reported with diagnoses on the date of the test, this function reports counts not for the number of people who received a positive test result on a day, but rather, the number of people who were tested on that day who are schedule to be diagnosed in the future. ''' # Handle people who tested today who will be diagnosed in future not_diag = ~self.diagnosed test_pos = self.filter((self.t >= self.date_pos_test) * not_diag) # Find people who will be diagnosed in future test_pos.date_pos_test = np.nan # Clear date of having will-be-positive test # Handle people who were actually diagnosed today diag = self.filter((self.t >= self.date_diagnosed) * not_diag) # Find who was actually diagnosed on this timestep diag.diagnosed = True # Set these people to be diagnosed diag.quarantined = False # If you are diagnosed, you are isolated, not in quarantine diag.date_end_isolation = self.t + self.pars['quar_period'] # We are re-using quar_period here for isolation period. diag.date_end_quarantine = np.nan # Clear end quarantine time # Check for the end of isolation - on all who are diagnosed # Diagnosed = isolated (not the same as quarantined). end_iso = self.filter((self.t >= self.date_end_isolation) * self.diagnosed, reset=True) # NB: maybe can refactor end_iso.diagnosed = False # Release from isolation end_iso.date_diagnosed = np.nan # Clear date diagnosed end_iso.date_end_isolation = np.nan # Clear end isolation time return len(test_pos)
[docs] def get_quarantine_subtargets(self, quarantine_subtarget, quarantine_candidate_inds): ''' A small helper function. Args: quarantine_subtarget: a function. This function should take (people, quarantine_candidate_inds), and should return a dictionary with keys 'inds' and 'vals'. The key 'inds' should be a copy of 'quarantine_candidate_inds'. The 'vals' should be the associated probability of quarantining each of the quarantine candidates. quarantine_candidate_inds: array of the indices of the candidates to potentially be quarantined. ''' # Validation if callable(quarantine_subtarget): quarantine_subtarget = quarantine_subtarget(self, quarantine_candidate_inds) if 'inds' not in quarantine_subtarget: errormsg = f'The quarantine_subtarget dict must have keys "inds" and "vals", but you supplied {quarantine_subtarget}' raise ValueError(errormsg) # Handle the two options of type if callable(quarantine_subtarget['inds']): # A function has been provided subtarget_inds = quarantine_subtarget['inds'](self, quarantine_candidate_inds) # Call the function to get the indices else: subtarget_inds = quarantine_subtarget['inds'] # The indices are supplied directly # Validate the values if callable(quarantine_subtarget['vals']): # A function has been provided subtarget_vals = quarantine_subtarget['vals'](self, quarantine_candidate_inds) # Call the function to get the indices else: subtarget_vals = quarantine_subtarget['vals'] # The indices are supplied directly if sc.isiterable(subtarget_inds): # Basic sanity check on the returned indices. What we really should do is check # that the arrays are element-wise identical, but don't want to take the performance hit. if (len(subtarget_inds) != len(quarantine_candidate_inds)): errormsg = f'Length of subtargeting indices ({len(subtarget_inds)}) does not match length of quarantine candidate indices ({len(quarantine_candidate_inds)})' raise ValueError(errormsg) if sc.isiterable(subtarget_vals): if len(subtarget_vals) != len(subtarget_inds): errormsg = f'Length of subtargeting indices ({len(subtarget_inds)}) does not match length of values ({len(subtarget_vals)})' raise ValueError(errormsg) return subtarget_inds, subtarget_vals
[docs] def check_quar(self): ''' Check for who gets put into quarantine''' # Perform quarantine - on all who have a date_known_contact (Filter to those not already diagnosed?) quar = self.filter((self.t >= self.date_known_contact) * ~self.diagnosed * ~self.quarantined * ~self.recovered) # Not diagnosed or quarantined # If the simulation is configured to do quarantine sub-targeting, we will # filter down quar_inds accordingly. if self.pars.get("quarantine_subtarget") is not None: quarantine_subtarget = self.pars["quarantine_subtarget"] _, quarantine_probs = self.get_quarantine_subtargets(quarantine_subtarget, quar.inds) quar = quar.binomial(quarantine_probs) if len(quar) > 0: quar.quarantine() # Put people in quarantine # Check for the end of quarantine - on all who are quarantined end_quar = self.filter((self.t >= self.date_end_quarantine) * self.quarantined, reset=True) end_quar.quarantined = False # Release from quarantine end_quar.date_end_quarantine = np.nan # Clear end quarantine time n_quarantined = len(quar) return n_quarantined
#%% Methods to make events occur (infection and diagnosis)
[docs] def make_naive(self): ''' Make person naive. This is used during dynamic resampling ''' for key in self.meta.states: if key in ['naive', 'susceptible_to_paralysis']: self[key] = True else: self[key] = False for key in self.meta.dates + self.meta.durs: self[key] = np.nan return
[docs] def polio_infect(self, inds): ''' State changes associated with polio infection. Indices rather than filtering are used here for speed. ''' self.is_shed[inds] = True self.naive[inds] = False self.IPV_naive[inds] = False self.prechallenge_immunity[inds] = self.current_immunity[inds] no_paralysis_inds = inds[self.prechallenge_immunity[inds] > self.pars['max_paralysis_immunity']] self.susceptible_to_paralysis[no_paralysis_inds] = False self.calculate_peak_immunity(inds) self.calculate_current_immunity(inds) self.calculate_shed_duration(inds) return
[docs] def infect(self, inds, source=None, layer=None): ''' Infect people and determine their eventual outcomes. Polio-specific state changes are in polio_infect(). Indices rather than filtering are used here for speed. Args: inds (array): array of people to infect source (array): source indices of the people who transmitted this infection (None if an importation or seed infection) layer (str): contact layer this infection was transmitted on Returns: count (int): number of people infected ''' # Remove duplicates unique = np.unique(inds, return_index=True)[1] inds = inds[unique] if source is not None: source = source[unique] n_infections = len(inds) durpars = self.pars['dur'] # Set states self.naive[inds] = False self.recovered[inds] = False self.exposed[inds] = True self.date_exposed[inds] = self.t self.date_first_exposed[inds] = np.fmin(self.date_first_exposed[inds], self.t) # Replace nans with self.t, otherwise leave alone self.exposure_count[inds] += 1 # Increment the infection count sus_para_infs = 0 # NB: refactor for i in inds: self.exposure_log[i].append(self.t) # And record all events if self.susceptible_to_paralysis[i]: self.sus_para_exp_log[i].append(self.t) # Record here too sus_para_infs += 1 # Actually infect self.polio_infect(inds=inds) # Record transmissions target_current_immunities = self.current_immunity self.flows['new_infections'] += n_infections self.flows['new_sus_para_infs'] += sus_para_infs # Record transmissions for i, target in enumerate(inds): self.infection_log.append(dict(source=source[i] if source is not None else None, target=target, date=self.t, layer=layer, source_immunity=self.current_immunity[source[i]] if source is not None else None, target_immunity=target_current_immunities[i], source_viral_shed=self.viral_shed[source[i]] if source is not None else None, target_exposure_count=self.exposure_count[target] )) # Calculate how long before this person can infect other people self.dur_exp2inf[inds] = psu.sample(**durpars['exp2inf'], size=n_infections) self.date_is_shed[inds] = self.dur_exp2inf[inds] + self.t # Use prognosis probabilities to determine what happens to them # All the infections will stop after the shed duration. # 'recovered' means not shedding, but symptoms may still persist. self.date_recovered[inds] = self.date_is_shed[inds] + self.shed_duration[inds] # Store how long this person had infection. # 'dur_disease' means duration infected. self.dur_disease[inds] = self.dur_exp2inf[inds] + self.shed_duration[inds] # Calculate their probability of being symptomatic symp_probs = self.pars['rel_symp_prob']*self.symp_prob[inds]*self.susceptible_to_paralysis[inds] is_symp = psu.binomial_arr(symp_probs) symp_inds = inds[is_symp] # Determine if they develop symptoms # Set symptomatic date for symptomatic cases. # Symptomatic means permanently paralyzed, we don't ever turn off symptoms. # Symptomatic date triggers downstream TTQ from the symptomatic individual(s), if TTQ enabled. self.dur_inf2sym[symp_inds] = psu.sample(**durpars['inf2sym'], size=len(symp_inds)) # Store how long this person took to develop symptoms self.date_symptomatic[symp_inds] = self.date_is_shed[symp_inds] + self.dur_inf2sym[symp_inds] # Date they become symptomatic return n_infections # For incrementing counters
[docs] def calculate_peak_immunity(self, inds, a=4.82, b=-0.30, c=3.31, d=-0.32): '''immunity immediately post infection''' Nabs = self.prechallenge_immunity[inds] means = a + b * np.log2(Nabs) stdevs = np.sqrt(np.maximum(c + d * np.log2(Nabs), 0)) theta_Nabs = np.exp(np.random.normal(loc=means, scale=stdevs)) # prevent immunity from decreasing due to challenge self.postchallenge_peak_immunity[inds] = self.prechallenge_immunity[inds] * np.maximum(1, theta_Nabs) return
[docs] def calculate_current_immunity(self, inds, rate=0.87): '''immunity after t months have passed since exposure''' # Immunity mode 0: suppress immunity model. if self.pars['immunity_mode'] == 0: self.current_immunity[inds] = 1.0 return months_since_infection = self.t_infection[inds] / 30 floor_months_since_infection = np.floor(months_since_infection) # Case: month >= 1 # later_month_inds indexes into inds later_inds = inds[floor_months_since_infection != 0] self.current_immunity[later_inds] = np.maximum(1, self.postchallenge_peak_immunity[later_inds] * ( (self.t_infection[later_inds]/30) ** -rate )) # Case: month == 0 # zero_month_inds indexes into inds zero_inds = inds[floor_months_since_infection == 0] self.current_immunity[zero_inds] = np.maximum(1, self.postchallenge_peak_immunity[zero_inds]) return
[docs] def calculate_shed_duration(self, inds, u=30.3, delta=1.16, sigma=1.86): """probability of shedding given Nab at time t (days post infection); assumes that individual was infected at t = 0; time is measured in days Equation S1 in Famulare 2018 PLOS Bio paper delta_t = time (days) since last infection -- survival curve follows lognormal distribution""" # Default u and sigma U = np.full(len(inds), u) SIGMA = np.full(len(inds), sigma) WPV_inds = np.nonzero(self.strain_type[inds] == pspar.strain_map('WPV'))[0] # Override u and sigma for WPV U[WPV_inds] = 43.0 SIGMA[WPV_inds] = 1.69 mu = np.log(U) - np.log(delta) * np.log2(self.prechallenge_immunity[inds]) std = np.log(SIGMA) # scipy stats has weird implementation of parameters # the shape parameter (s) is the same as the stdev # the scale parameter is the same as the e^(mu) q = np.random.uniform(0, 1, len(inds)) # inverse lognormal survival curve sampling self.shed_duration[inds] = stats.lognorm.isf(q, s=std, scale=np.exp(mu)) return
[docs] def calculate_viral_shed(self, eta=1.65, v=0.17, epsilon=0.32): '''virus shed per gram, time is in months!''' # TODO: is t_infection below being used properly or does # it need to be converted from days to months? is_shed, not_shed = self.filter_tf(self.is_shed) self.calculate_log10_peak_cid50() t_inf = is_shed.t_infection log_t_inf = np.log(t_inf) exponent = eta - (0.5*v**2) - ((log_t_inf - eta)**2) / (2 * (v + epsilon*log_t_inf)**2) predicted_concentration = 10**is_shed.log10_peak_cid50 * np.exp(exponent) / t_inf # TODO: correct to mix exponents? # predicted_concentration = 10 ** is_shed.log10_peak_cid50 \ # * np.exp( # eta - (0.5 * v ** 2) - # ((np.log(t_inf) - eta) ** 2) # / # (2 * (v + epsilon * np.log(t_inf)) ** 2) # ) / t_inf is_shed.viral_shed = predicted_concentration not_shed.viral_shed = 0 # print('fiiiiiiiix') return
[docs] def calculate_log10_peak_cid50(self, k=0.056, Smax=6.7, Smin=4.3, tau=12): '''returns the peak log10(cid50/g) given prior immunity, age is in months!''' # age_months_over_5_inds indexes into inds and peak_cid50_naive over5mo, under5mo = self.filter_tf(self.age*12 >= 6) # TODO: this should be >5, not >=6 since months is not an integer # Override the over 5 months, leaving the rest with Smax which they were initialized with above. over5Smax = ((Smax - Smin) * np.exp((7 - over5mo.age*12) / tau) + Smin) over5mo.log10_peak_cid50 = (1 - k * np.log2(over5mo.prechallenge_immunity)) * over5Smax # TODO: remove duplication under5mo.log10_peak_cid50 = (1 - k * np.log2(under5mo.prechallenge_immunity)) * Smax return
[docs] def test(self, inds, test_sensitivity=1.0, loss_prob=0.0, test_delay=0): ''' Method to test people Args: inds: indices of who to test test_sensitivity (float): probability of a true positive loss_prob (float): probability of loss to follow-up test_delay (int): number of days before test results are ready Returns: Whether or not this person tested positive ''' inds = np.unique(inds) tested = self.filter(inds=inds, reset=True) tested.tested = True tested.date_tested = self.t # Only keep the last time they tested is_test_shed = tested.filter(self.is_shed) is_shed_pos = is_test_shed.binomial(test_sensitivity) not_diagnosed = is_shed_pos.filter(np.isnan(is_shed_pos.date_diagnosed)) not_lost = not_diagnosed.binomial(1.0-loss_prob) # Store the date the person will be diagnosed, as well as the date they took the test which will come back positive not_lost.date_diagnosed = self.t + test_delay not_lost.date_pos_test = self.t return
[docs] def quarantine(self): ''' Quarantine selected people starting on the current day. If a person is already quarantined, this will extend their quarantine. Args: inds (array): indices of who to quarantine, specified by check_quar() ''' self.quarantined = True self.date_quarantined = self.t self.date_end_quarantine = self.t + self.pars['quar_period'] self.date_known_contact = np.nan # Clear date return
[docs] def trace(self, inds, trace_probs, trace_time): ''' Trace the contacts of the people provided Args: inds (array): indices of whose contacts to trace trace_probs (dict): probability of being able to trace people at each contact layer - should have the same keys as contacts trace_time (dict): days it'll take to trace people at each contact layer - should have the same keys as contacts ''' # Extract the indices of the people who'll be contacted traceable_layers = {k:v for k,v in trace_probs.items() if v != 0.} # Only trace if there's a non-zero tracing probability for lkey,this_trace_prob in traceable_layers.items(): if self.pars['beta_layer'][lkey]: # Skip if beta is 0 for this layer this_trace_time = trace_time[lkey] contact_sources = dict() # Find all the contacts of these people inds_list = [] for k1,k2 in [['p1','p2'],['p2','p1']]: # Loop over the contact network in both directions -- k1,k2 are the keys in_k1 = np.isin(self.contacts[lkey][k1], inds).nonzero()[0] # Get all the indices of the pairs that each person is in contacted = self.contacts[lkey][k2][in_k1] inds_list.append(contacted) # Find their pairing partner if len(contacted) > 0: # TODO: this could maybe be made to run faster for contact_target in contacted: # Store the first contact source... there could be multiple on this cycle. contact_sources[contact_target] = self.contacts[lkey][k1][in_k1][0] edge_inds = np.unique(np.concatenate(inds_list)) # Find all edges # Check contacts contact_inds = psu.binomial_filter(this_trace_prob, edge_inds) # Filter the indices according to the probability of being able to trace this layer if len(contact_inds): self.known_contact[contact_inds] = True self.date_known_contact[contact_inds] = np.fmin(self.date_known_contact[contact_inds], self.t+this_trace_time) for ind in contact_inds: self.trace_log.append(dict(source=contact_sources[ind], target=ind, date=self.t, layer=lkey)) return
#%% Analysis methods
[docs] def plot(self, *args, **kwargs): ''' Plot statistics of the population -- age distribution, numbers of contacts, and overall weight of contacts (number of contacts multiplied by beta per layer). ''' fig = psplt.plot_people(people=self, *args, **kwargs) return fig
# %% Everything else in this file is contained in the Sim class
[docs]class Sim(psb.BaseSim): ''' The Sim class handles the running of the simulation: the number of children, number of time points, and the parameters of the simulation. Args: pars (dict): parameters to modify from their default values datafile (str/df): filename of (Excel, CSV) data file to load, or a pandas dataframe of the data datacols (list): list of column names of the data to load label (str): the name of the simulation (useful to distinguish in batch runs) simfile (str): the filename for this simulation, if it's saved (default: creation date) popfile (str): the filename to load/save the population for this simulation load_pop (bool): whether to load the population from the named file save_pop (bool): whether to save the population to the named file kwargs (dict): passed to make_pars() **Examples**:: sim = ps.Sim() sim = ps.Sim(pop_size=10e3, datafile='my_data.xlsx') ''' def __init__(self, pars=None, datafile=None, datacols=None, label=None, simfile=None, popfile=None, load_pop=False, save_pop=False, **kwargs): # Create the object default_pars = pspar.make_pars() # Start with default pars super().__init__(default_pars) # Initialize and set the parameters as attributes # Set attributes self.label = label # The label/name of the simulation self.created = None # The datetime the sim was created self.simfile = simfile # The filename of the sim self.datafile = datafile # The name of the data file self.popfile = popfile # The population file self.load_pop = load_pop # Whether to load the population self.save_pop = save_pop # Whether to save the population self.data = None # The actual data self.popdict = None # The population dictionary self.t = None # The current time in the simulation self.people = None # Initialize these here so methods that check their length can see they're empty self.results = {} # For storing results self.initialized = False # Whether or not initialization is complete self.complete = False # Whether a simulation has completed running self.results_ready = False # Whether or not results are ready self._orig_pars = None # Store original parameters to optionally restore at the end of the simulation # Now update everything self.set_metadata(simfile) # Set the simulation date and filename self.update_pars(pars, **kwargs) # Update the parameters, if provided self.load_data(datafile, datacols) # Load the data, if provided if self.load_pop: self.load_population(popfile) # Load the population, if provided return
[docs] def initialize(self, **kwargs): ''' Perform all initializations, including validating the parameters, setting the random number seed, creating the results structure, initializing the people, validating the layer parameters (which requires the people), and initializing the interventions. Args: kwargs (dict): passed to init_people ''' self.t = 0 # The current time index self.validate_pars() # Ensure parameters have valid values self.set_seed() # Reset the random seed before the population is created self.init_results() # Create the results stucture self.init_people(save_pop=self.save_pop, load_pop=self.load_pop, popfile=self.popfile, **kwargs) # Create all the people (slow) self.validate_layer_pars() # Once the population is initialized, validate the layer parameters again self.init_interventions() # Initialize the interventions self.init_analyzers() # ...and the interventions self.set_seed() # Reset the random seed again so the random number stream is consistent self.initialized = True self.complete = False self.results_ready = False return
[docs] def init_results(self): ''' Create the main results structure. We differentiate between flows, stocks, and cumulative results The prefix "new" is used for flow variables, i.e. counting new events (infections/recoveries) on each timestep The prefix "n" is used for stock variables, i.e. counting the total number in any given state (sus/inf/rec/etc) on any particular timestep The prefix "cum" is used for cumulative variables, i.e. counting the total number that have ever been in a given state at some point in the sim ''' def init_res(*args, **kwargs): ''' Initialize a single result object ''' output = psb.Result(*args, **kwargs, npts=self.npts) return output dcols = sc.objdict( naive='#5e7544', infections='#c75649', is_shed='#aabbcc', exposed='#c75649', # Duplicate sus_para_infs='#f75649', # Brighter tests='#aaa8ff', diagnoses='#8886cc', diagnosed='#8886cc', # Duplicate recoveries='#799956', recovered='#799956', # Duplicate symptomatic='#c1ad71', quarantined='#5f1914', symp_triggered_surveilled='#b0bb0b', ) # Flows and cumulative flows for key, label in result_flows.items(): self.results[f'cum_{key}'] = init_res(f'Cumulative {label}', color=dcols[ key]) # Cumulative variables -- e.g. "Cumulative infections" for key, label in result_flows.items(): # Repeat to keep all the cumulative keys together self.results[f'new_{key}'] = init_res(f'Number of new {label}', color=dcols[ key]) # Flow variables -- e.g. "Number of new infections" # Stock variables for key, label in result_stocks.items(): self.results[f'n_{key}'] = init_res(label, color=dcols[key]) self.results['n_naive'].scale = 'static' # Other variables self.results['prevalence'] = init_res('Prevalence', scale=False) self.results['incidence'] = init_res('Incidence', scale=False) self.results['r_eff'] = init_res('Effective reproductive number', scale=False) self.results['doubling_time'] = init_res('Doubling time', scale=False) self.results['test_yield'] = init_res('Testing yield', scale=False) # Populate the rest of the results if self['rescale']: scale = 1 else: scale = self['pop_scale'] self.rescale_vec = scale * np.ones(self.npts) # Not included in the results, but used to scale them self.results['t'] = self.tvec self.results['date'] = self.datevec self.results_ready = False return
[docs] def rescale(self): ''' Dynamically rescale the population -- used during step() ''' if self['rescale']: raise NotImplementedError('This has not been updated for polio') pop_scale = self['pop_scale'] current_scale = self.rescale_vec[self.t] if current_scale < pop_scale: # We have room to rescale not_sus_inds = self.people.false('naive') # Find everyone not susceptible n_not_sus = len(not_sus_inds) # Number of people who are not susceptible n_people = len(self.people) # Number of people overall current_ratio = n_not_sus / n_people # Current proportion not susceptible threshold = self['rescale_threshold'] # Threshold to trigger rescaling if current_ratio > threshold: # Check if we've reached point when we want to rescale max_ratio = pop_scale / current_scale # We don't want to exceed the total population size proposed_ratio = max(current_ratio / threshold, self[ 'rescale_factor']) # The proposed ratio to rescale: the rescale factor, unless we've exceeded it scaling_ratio = min(proposed_ratio, max_ratio) # We don't want to scale by more than the maximum ratio self.rescale_vec[self.t:] *= scaling_ratio # Update the rescaling factor from here on n = int(round(n_not_sus * ( 1.0 - 1.0 / scaling_ratio))) # For example, rescaling by 2 gives n = 0.5*not_sus_inds choices = psu.choose(max_n=n_not_sus, n=n) # Choose who to make susceptible again new_sus_inds = not_sus_inds[choices] # Convert these back into indices for people self.people.make_naive(new_sus_inds) # Make people susceptible again return
[docs] def step(self): ''' Step the simulation forward in time. Usually, the user would use sim.run() rather than calling sim.step() directly. ''' # Set the time and if we have reached the end of the simulation, then do nothing if self.complete: raise psb.AlreadyRunError('Simulation already complete (call sim.initialize() to re-run)') t = self.t # Perform initial operations self.rescale() # Check if we need to rescale people = self.people # Shorten this for later use people.update_states_pre(t=t) # Update the state of everyone and count the flows contacts = people.update_contacts() # Compute new contacts # Randomly infect some people (imported infections) n_imports = psu.poisson(self['n_imports']) # Imported cases if n_imports > 0: importation_inds = psu.choose(max_n=len(people), n=n_imports) people.infect(inds=importation_inds, layer='importation') # Apply interventions for intervention in self['interventions']: intervention(self) # If it's a function, call it directly people.update_states_post() # Check for state changes after interventions # Compute the probability of transmission beta = psu.default_float(self['beta']*self['scale_beta']) asymp_factor = psu.default_float(self['asymp_factor']) viral_shed = people.viral_shed symp = people.symptomatic diag = people.diagnosed quar = people.quarantined is_shed = people.is_shed sabin_betas = self['sabin_scale_parameters'][people.strain_type] current_immunity = people.current_immunity for lkey, layer in contacts.items(): p1 = layer['p1'] p2 = layer['p2'] betas = layer['beta'] # Compute relative transmission and susceptibility iso_factor = psu.default_float(self['iso_factor'][lkey]) quar_factor = psu.default_float(self['quar_factor'][lkey]) beta_layer = psu.default_float(self['beta_layer'][lkey]) rel_trans = compute_trans(viral_shed, is_shed, symp, diag, quar, asymp_factor, iso_factor, quar_factor) people.rel_trans = rel_trans # Calculate actual transmission all_sources = np.array([], dtype=psu.default_int) all_targets = np.array([], dtype=psu.default_int) for sources, targets in [[p1, p2], [p2, p1]]: # Loop over the contact network from p1->p2 and p2->p1 source_inds, target_inds = compute_infections(beta, beta_layer, sources, targets, betas, rel_trans, current_immunity, sabin_betas, is_shed) all_sources = np.concatenate([all_sources, source_inds]) all_targets = np.concatenate([all_targets, target_inds]) people.infect(inds=all_targets, source=all_sources, layer=lkey) # Actually infect people # Update counts for this time step: stocks for key in result_stocks.keys(): self.results[f'n_{key}'][t] = people.count(key) # Update counts for this time step: flows for key, count in people.flows.items(): self.results[key][t] += count # Apply analyzers -- same syntax as interventions for analyzer in self['analyzers']: analyzer(self) # Tidy up self.t += 1 if self.t == self.npts: self.complete = True return
[docs] def run(self, do_plot=False, until=None, restore_pars=True, reset_seed=True, verbose=None): ''' Run the simulation. Args: do_plot (bool): whether to plot until (int/str): day or date to run until restore_pars (bool): whether to make a copy of the parameters before the run and restore it after, so runs are repeatable reset_seed (bool): whether to reset the random number stream immediately before run verbose (float): level of detail to print, e.g. 0 = no output, 0.2 = print every 5th day, 1 = print every day Returns: A pointer to the sim object (with results modified in-place) ''' # Initialization steps -- start the timer, initialize the sim and the seed, and check that the sim hasn't been run T = sc.tic() if not self.initialized: self.initialize() self._orig_pars = sc.dcp(self.pars) # Create a copy of the parameters, to restore after the run, in case they are dynamically modified if verbose is None: verbose = self['verbose'] if reset_seed: # Reset the RNG. If the simulation is newly created, then the RNG will be reset by sim.initialize() so the use case # for resetting the seed here is if the simulation has been partially run, and changing the seed is required self.set_seed() until = self.npts if until is None else self.day(until) if until > self.npts: raise psb.AlreadyRunError(f'Requested to run until t={until} but the simulation end is t={self.npts}') if self.complete: raise psb.AlreadyRunError('Simulation is already complete (call sim.initialize() to re-run)') if self.t >= until: # NB. At the start, self.t is None so this check must occur after initialization raise psb.AlreadyRunError(f'Simulation is currently at t={self.t}, requested to run until t={until} which has already been reached') # Main simulation loop while self.t < until: # Check if we were asked to stop elapsed = sc.toc(T, output=True) if self['timelimit'] and elapsed > self['timelimit']: sc.printv(f"Time limit ({self['timelimit']} s) exceeded; call sim.finalize() to compute results if desired", 1, verbose) return elif self['stopping_func'] and self['stopping_func'](self): sc.printv("Stopping function terminated the simulation; call sim.finalize() to compute results if desired", 1, verbose) return # Print progress if verbose: simlabel = f'"{self.label}": ' if self.label else '' string = f' Running {simlabel}{self.datevec[self.t]} ({self.t:2.0f}/{self.pars["n_days"]}) ({elapsed:0.2f} s) ' if verbose >= 2: sc.heading(string) elif verbose>0: if not (self.t % int(1.0/verbose)): sc.progressbar(self.t+1, self.npts, label=string, length=20, newline=True) # Do the heavy lifting -- actually run the model! self.step() # If simulation reached the end, finalize the results if self.complete: self.finalize(verbose=verbose, restore_pars=restore_pars) sc.printv(f'Run finished after {elapsed:0.2f} s.\n', 1, verbose) return self
[docs] def finalize(self, verbose=None, restore_pars=True): ''' Compute final results ''' if self.results_ready: # Because the results are rescaled in-place, finalizing the sim cannot be run more than once or # otherwise the scale factor will be applied multiple times raise psb.AlreadyRunError('Simulation has already been finalized') # Scale the results for reskey in self.result_keys(): if self.results[reskey].scale == 'dynamic': self.results[reskey].values *= self.rescale_vec elif self.results[reskey].scale == 'static': self.results[reskey].values *= self['pop_scale'] # Calculate cumulative results for key in result_flows.keys(): self.results[f'cum_{key}'].values[:] = np.cumsum(self.results[f'new_{key}'].values) self.results['cum_infections'].values += self['pop_infected'] * self.rescale_vec[0] # Include initially infected people # Finalize interventions and analyzers self.finalize_interventions() self.finalize_analyzers() # Final settings self.results_ready = True # Set this first so self.summary() knows to print the results self.t -= 1 # During the run, this keeps track of the next step; restore this be the final day of the sim # Perform calculations on results self.compute_results(verbose=verbose) # Calculate the rest of the results self.results = sc.objdict(self.results) # Convert results to a odicts/objdict to allow e.g. sim.results.diagnoses if restore_pars and self._orig_pars: preserved = ['analyzers', 'interventions'] orig_pars_keys = list(self._orig_pars.keys()) # Get a list of keys so we can iterate over them for key in orig_pars_keys: if key not in preserved: self.pars[key] = self._orig_pars.pop(key) # Restore everything except for the analyzers and interventions # Optionally print summary output if verbose: # Verbose is any non-zero value if verbose>0: # Verbose is any positive number self.summarize() # Print medium-length summary of the sim else: self.brief() # Print brief summary of the sim return
[docs] def compute_results(self, verbose=None): ''' Perform final calculations on the results ''' self.compute_prev_inci() self.compute_summary(verbose=verbose) return
[docs] def compute_prev_inci(self): ''' Compute prevalence and incidence. Prevalence is the current number of infected people divided by the number of people who are alive. Incidence is the number of new infections per day divided by the susceptible population. ''' n_exposed = self.results['n_exposed'].values # Number of people currently infected n_alive = self.scaled_pop_size # Number of people still alive n_susceptible = self.results['n_naive'].values # Number of people still susceptible new_infections = self.results['new_infections'].values # Number of new infections self.results['prevalence'][:] = n_exposed / n_alive # Calculate the prevalence self.results['incidence'][:] = new_infections / n_susceptible # Calculate the incidence return
#%% The core Poliosim functions -- compute the infections # Set dtypes -- note, these cannot be changed after import since Numba functions are precompiled nbbool = nb.bool_ nbint = nb.int32 nbfloat = nb.float32 @nb.njit( (nbfloat[:], nbbool[:], nbbool[:], nbbool[:], nbbool[:], nbfloat, nbfloat, nbfloat), cache=True) def compute_trans(viral_shed, inf, symp, diag, quar, asymp_factor, iso_factor, quar_factor): ''' Calculate relative transmissibility and susceptibility ''' f_asymp = symp + ~symp * asymp_factor # Asymptomatic factor, changes e.g. [0,1] with a factor of 0.8 to [0.8,1.0] f_iso = ~diag + diag * iso_factor # Isolation factor, changes e.g. [0,1] with a factor of 0.2 to [1,0.2] f_quar = ~quar + quar * quar_factor # Quarantine, changes e.g. [0,1] with a factor of 0.5 to [1,0.5] rel_trans = viral_shed * inf * f_quar * f_asymp * f_iso # Transmissibility return rel_trans def compute_infections(beta, beta_layer, sources, targets, layer_betas, rel_trans, current_immunities, sabin_betas, is_shed): ''' Compute the actual infections -- NB, the non-Numba version is almost as fast ''' # Candidate transmission indices -- anyone shedding; this step takes 75% of the total time of this function! candidates = np.nonzero(rel_trans[sources]>0.0)[0] # No transmissions if len(candidates) == 0: source_inds = sources[:0] # Empty array (but of the right type) target_inds = targets[:0] # There were transmissions, proceed else: # Pull out candidates source_candidates = sources[candidates] target_candidates = targets[candidates] # Calculate beta doses = beta * rel_trans[source_candidates] modifier = beta_layer * layer_betas[candidates] target_sabin_betas = sabin_betas[target_candidates] # Calculate target target_current_immunities = current_immunities[target_candidates] target_is_not_shedding = np.logical_not(is_shed[target_candidates]) # Zero out probabilities if the target is currently shedding # Calculate infections p_infections = target_is_not_shedding * prob_infection(beta=target_sabin_betas, current_immunities=target_current_immunities, doses=doses) draws = np.random.random(size=len(target_candidates)) trials = draws < p_infections * modifier transmissions = trials.nonzero()[0] # Find actual transmissions # Turn into indices transmission_inds = candidates[transmissions] source_inds = sources[transmission_inds] target_inds = targets[transmission_inds] return source_inds, target_inds
[docs]def prob_infection(beta, current_immunities, doses=10**6, alpha =0.44, gamma=0.46): prob = (1 - (1 + doses / beta) ** (-alpha * (current_immunities) ** -gamma)) return prob
[docs]def initialize_immunity(people, sia_coverage, ri_coverage, ri_ages_years): def vprint(string): ''' Only print if verbose >= 1 ''' if people.pars['verbose'] >= 1: print(string) # Initialize immunity for <5's vprint('initializing immunities < 5') vprint(f"sia_coverage: {sia_coverage} ri_coverage: {ri_coverage}") u5, o5 = people.filter_tf(people.age < 5.0) # Split people into under and over 5 assert len(u5) > 0, "There are no people < 5" vprint(f"Number people < 5: {len(u5)}") # Set number of annual campaigns camp_per_yr = 10 # Determine campaign interval (in days) camp_interval_days = 365.0 / camp_per_yr # We're checking for immunizations each day, so make the campaign days # matchable ints. We can't use dtype=int on np.arange because it rounds the # interval instead of rounding the result (e.g. 36.5 days becomes 37 days for all steps) campaign_days = np.arange(-5.0 * 365.0, 0, camp_interval_days) u5.reset_polio() u5.strain_type = pspar.strain_map('S1') # create a copy of the age data that we can manipulate throughout u5_age_days = u5.age * 365 # Initialize immunizations via immunization campaigns if sia_coverage > 0: for day in campaign_days: vprint(f"\tcampaign day = {day}") people_alive = u5.filter(u5_age_days > abs(day)) if not len(people_alive): continue people_alive.update_polio(day) # Figure out which inds get inoculated during this campaign. inoculated = people_alive.binomial(sia_coverage, as_filter=True) vprint(f"\t\tvax campaign inoculated {len(inoculated)}") if len(inoculated) > 0: inoculated.sia_dose_count += 1 sia_inoculation_take_inds = inoculated.attempt_canonical_infection(day) vprint(f"\t\t# sia take: {len(sia_inoculation_take_inds)}") people.filter(inds=sia_inoculation_take_inds).sia_take_count += 1 if ri_coverage > 0: # routine immunizations age in years and days ri_age_days = [round(x * 365) for x in ri_ages_years] # routine immunizations are based on age, not calendar date so immunizations # are possible on any day. The age data provided to the model are age at t=0. # Because immunization procedures are different for age <5 yrs, roll back the time # to t-5 years. for day in range(-5 * 365, 0): # Five years before t=0 not all individuals are born. people_alive = u5.filter(u5_age_days > abs(day)) current_age_days = np.floor(u5_age_days[people_alive.subinds]).astype(int) - abs(day) vprint(f"\tday: {day}\tnum people < 5 alive: {len(people_alive)}") # if nobody is born, there's no one to immunize if not len(people_alive): continue people_alive.update_polio(day) # get a list of all individuals whose age matches the RI target ages for ri_age_day in ri_age_days: ri_eligible = people_alive.filter(current_age_days == ri_age_day) # not all eligible individuals will receive immunizations inoculated = ri_eligible.binomial(ri_coverage, as_filter=True) vprint(f"\tri inoculated (age: {ri_age_day}) {len(inoculated)}") if len(inoculated) > 0: inoculated.ri_dose_count += 1 ri_inoculation_take_inds = inoculated.attempt_canonical_infection(day) people.filter(inds=ri_inoculation_take_inds).ri_take_count += 1 vprint(f"\t# ri take: {len(ri_inoculation_take_inds)}") u5.update_polio(0.0) # Restore people < 5 to state we want them in. u5.is_shed = False # Make sure to turn off shedding u5.strain_type = pspar.strain_map('WPV') # Change strain type to WPV1 # Initialize immunity for >5's def taniuchi_decay_immunity(age, initial_Nab=580, rate=0.24): '''Average empirical decay in immunity observed in household contacts > 5 years from Taniuchi et al (2018) age in years''' return initial_Nab * (1 + (12. * age - 30)) ** - rate vprint('initializing immunities >= 5') vprint(f"Number people < 5: {len(u5)}") vprint(f"Number people >= 5: {len(o5)}") o5.current_immunity = taniuchi_decay_immunity(o5.age) # Estimate immunity o5.postchallenge_peak_immunity = 580 # Fix immunity settings o5.prechallenge_immunity = 1 # Fix immunity settings o5.is_shed = False # Make sure to turn off shedding o5.date_is_shed = -1 * (o5.age - 5.0) * 365 # Estimate time since last infection assuming it ended at 5 yo # TODO refactor # Set exposure status to False since I've messed with shedding status. We'll re-infect on the next step and people.exposed[:] = False # TODO why is this here mean_u5_immunity = np.mean(u5.current_immunity) mean_o5_immunity = np.mean(o5.current_immunity) mean_all_immunity = np.mean(people.current_immunity) count_u5_pcp_immunity_nan = len(sc.findinds(np.isnan(u5.postchallenge_peak_immunity))) count_all_pcp_immunity_nan = len(sc.findinds(np.isnan(people.postchallenge_peak_immunity))) mean_u5_pcp_immunity = np.nanmean(u5.postchallenge_peak_immunity) mean_all_pcp_immunity = np.nanmean(people.postchallenge_peak_immunity) vprint(f"mean_u5_immunity: [{mean_u5_immunity}]") vprint(f"mean_o5_immunity: [{mean_o5_immunity}]") vprint(f"mean_all_immunity: [{mean_all_immunity}]") vprint(f"count_u5_pcp_immunity_nan: [{count_u5_pcp_immunity_nan}]") vprint(f"count_all_pcp_immunity_nan: [{count_all_pcp_immunity_nan}]") vprint(f"mean_pcp_u5_immunity: [{mean_u5_pcp_immunity}]") vprint(f"mean_pcp_all_immunity: [{mean_all_pcp_immunity}]") return