Source code for rsvsim.people

Defines the People class and functions associated with making people and handling
the transitions between states (e.g., from susceptible to infected).

# %% Imports
import numpy as np
import sciris as sc
from collections import defaultdict
from . import version as cvv
from . import utils as cvu
from . import defaults as cvd
from . import base as cvb
from . import plotting as cvplt
from . import immunity as cvi

__all__ = ['People']

[docs]class People(cvb.BasePeople): ''' A class to perform all the operations on the people. This class is usually not invoked directly, but instead is created automatically by the sim. The only required input argument is the population size, but typically the full parameters dictionary will get passed instead since it will be needed before the People object is initialized. Note that this class handles the mechanics of updating the actual people, while BasePeople takes care of housekeeping (saving, loading, exporting, etc.). Please see the BasePeople class for additional methods. Args: pars (dict): the sim parameters, e.g. -- alternatively, if a number, interpreted as pop_size strict (bool): whether or not to only create keys that are already in self.meta.person; otherwise, let any key be set kwargs (dict): the actual data, e.g. from a popdict, being specified ::Examples:: ppl1 = cv.People(2000) sim = cv.Sim() ppl2 = cv.People( ''' def __init__(self, pars, strict=True, **kwargs): # Handle pars and population size self.set_pars(pars) self.version = cvv.__version__ # Store version info # Other initialization self.t = 0 # Keep current simulation time self._lock = False # Prevent further modification of keys self.meta = cvd.PeopleMeta() # Store list of keys and dtypes self.contacts = None self.init_contacts() # Initialize the contacts self.infection_log = [] # Record of infections - keys for ['source','target','date','layer'] # Set person properties -- all floats except for UID for key in self.meta.person: if key == 'uid': self[key] = np.arange(['pop_size'], dtype=cvd.default_int) elif key in ['n_infections']: self[key] = np.zeros(['pop_size'], dtype=cvd.default_int) elif key in ['rel_dur']: self[key] = np.ones(['pop_size'], dtype=cvd.default_float) elif key in ['age', 'weekage']: self[key] = np.full(['pop_size'], np.nan, dtype=cvd.default_int) else: self[key] = np.full(['pop_size'], np.nan, dtype=cvd.default_float) # Set health states -- only susceptible is true by default -- booleans except exposed by genotype which should return the genotype that ind is exposed to for key in self.meta.states: val = (key in ['susceptible', 'naive']) # Default value is True for susceptible and naive, false otherwise self[key] = np.full(['pop_size'], val, dtype=bool) # Set genotype states, which store info about which genotype a person is exposed to for key in self.meta.genotype_states: self[key] = np.full(['pop_size'], np.nan, dtype=cvd.default_float) for key in self.meta.by_genotype_states: self[key] = np.full((['n_genotypes'],['pop_size']), False, dtype=bool) # Set immunity and antibody states for key in self.meta.imm_states: # Everyone starts out with no immunity self[key] = np.full((['n_genotypes'],['pop_size']), 0, dtype=cvd.default_float) for key in self.meta.vacc_states: self[key] = np.zeros(['pop_size'], dtype=cvd.default_int) # Set dates and durations -- both floats for key in self.meta.dates + self.meta.durs: self[key] = np.full(['pop_size'], np.nan, dtype=cvd.default_float) # Store the dtypes used in a flat dict self._dtypes = {key: self[key].dtype for key in self.keys()} # Assign all to float by default if strict: self.lock() # If strict is true, stop further keys from being set (does not affect attributes) # Store to be computed during simulation self.init_flows() # Although we have called init(), we still need to call initialize() self.initialized = False # Handle contacts, if supplied (note: they usually are) if 'contacts' in kwargs: self.add_contacts(kwargs.pop('contacts')) # Handle all other values, e.g. age for key, value in kwargs.items(): if strict: self.set(key, value) else: self[key] = value return
[docs] def init_flows(self): ''' Initialize flows to be zero ''' self.flows = {key: 0 for key in cvd.new_result_flows} self.flows_genotype = {} for key in cvd.new_result_flows_by_genotype: self.flows_genotype[key] = np.zeros(['n_genotypes'], dtype=cvd.default_float) return
[docs] def initialize(self): ''' Perform initializations ''' self.set_prognoses() self.validate() self.check_conception() self.initialized = True return
[docs] def set_prognoses(self, new_year_inds=None): ''' Set the prognoses for each person based on age during initialization. Gets called throughout sim for people on the day they age a year. Need to reset the seed because viral loads are drawn stochastically. ''' pars = # Shorten if 'prognoses' not in pars or 'rand_seed' not in pars: errormsg = 'This people object does not have the required parameters ("prognoses" and "rand_seed"). Create a sim (or parameters), then do e.g. people.set_pars(' raise sc.KeyNotFoundError(errormsg) cvu.set_seed(pars['rand_seed']) progs = pars['prognoses'] # Shorten the name if new_year_inds is not None: inds = np.fromiter((cvu.find_cutoff(progs['age_cutoffs'], this_age) for this_age in self.age[new_year_inds]), dtype=cvd.default_int, count=len(new_year_inds)) # Convert ages to indices self.symp_prob[new_year_inds] = progs['symp_probs'][inds] # Probability of developing symptoms self.severe_prob[new_year_inds] = progs['severe_probs'][inds] * progs['comorbidities'][ inds] # Severe disease probability is modified by comorbidities self.crit_prob[new_year_inds] = progs['crit_probs'][inds] # Probability of developing critical disease self.death_prob[new_year_inds] = progs['death_probs'][inds] # Probability of death self.rel_sus[new_year_inds] = progs['sus_ORs'][inds] # Default susceptibilities self.rel_trans[new_year_inds] = progs['trans_ORs'][inds] * cvu.sample(**['beta_dist'], size=len( inds)) # Default transmissibilities, with viral load drawn from a distribution else: inds = np.fromiter((cvu.find_cutoff(progs['age_cutoffs'], this_age) for this_age in self.age), dtype=cvd.default_int, count=len(self)) # Convert ages to indices self.symp_prob[:] = progs['symp_probs'][inds] # Probability of developing symptoms self.severe_prob[:] = progs['severe_probs'][inds] * progs['comorbidities'][inds] # Severe disease probability is modified by comorbidities self.crit_prob[:] = progs['crit_probs'][inds] # Probability of developing critical disease self.death_prob[:] = progs['death_probs'][inds] # Probability of death self.rel_sus[:] = progs['sus_ORs'][inds] # Default susceptibilities self.rel_trans[:] = progs['trans_ORs'][inds] * cvu.sample(**['beta_dist'], size=len(inds)) # Default transmissibilities, with viral load drawn from a distribution return
[docs] def update_states_pre(self, t): ''' Perform all state updates at the current timestep ''' # Initialize self.t = t self.is_exp = self.true('exposed') # For storing the interim values since used in every subsequent calculation # Perform updates self.init_flows() self.flows['new_infectious'] += self.check_infectious() self.flows['new_symptomatic'] += self.check_symptomatic() self.flows['new_severe'] += self.check_severe() self.flows['new_critical'] += self.check_critical() self.flows['new_recoveries'] += self.check_recovery() new_deaths, new_known_deaths = self.check_death() self.flows['new_non_RSV_deaths'] += self.check_background_mortality() self.flows['new_deaths'] += new_deaths self.flows['new_known_deaths'] += new_known_deaths return
[docs] def update_states_post(self): ''' Perform post-timestep updates ''' self.flows['new_diagnoses'] += self.check_diagnosed() del self.is_exp # Tidy up return
[docs] def update_contacts(self): ''' Refresh dynamic contacts, e.g. community ''' # Figure out if anything needs to be done -- e.g. {'h':False, 'c':True} for lkey, is_dynam in['dynam_layer'].items(): if is_dynam: self.contacts[lkey].update(self) return self.contacts
# %% Methods for updating network attributes
[docs] def age_people(self): # TODO figure out timestep! self.weekage += 1 newyear_inds = cvu.true(self.weekage %['tspy'] == 0) self.age[newyear_inds] += 1 too_old = sc.findinds(self.age >=['max_age']) if len(too_old): self.make_die(too_old) newyear_inds = np.setdiff1d(newyear_inds, too_old) if len(newyear_inds): self.set_prognoses(new_year_inds=newyear_inds) preg_inds = cvu.true(self.pregnant) self.gestation[preg_inds] += 1 return
[docs] def check_conception(self): '''Method to determine who becomes pregnant on this time step based on age-specific fertility rate''' female_inds = sc.findinds((self.age >=['age_pregnancy'][0]) * (self.age <['age_pregnancy'][1]) * ( == 1) * (~self.pregnant) * (~self.post_partum)) fertility_data = cvd.default_fertility_data age_inds = np.fromiter((cvu.find_cutoff(fertility_data['age_cutoffs'], this_age) for this_age in self.age[female_inds]), dtype=cvd.default_int, count=len(female_inds)) # Convert ages to indices preg_probs = cvu.tsprob2ts(fertility_data['fert_rate'][age_inds], prob_ts=cvd.dpy,['timestep']) preg_inds = (np.random.random(len(preg_probs)) < preg_probs).nonzero()[0] self.pregnant[preg_inds] = True self.dur_preg[preg_inds] = np.random.uniform(['gestation'][0] * cvd.dpw,['gestation'][1] * cvd.dpw, len(preg_inds)) self.date_delivery[preg_inds] = self.t + self.dur_preg[preg_inds] return
[docs] def check_delivery(self): ''' Method to check if it's the day a woman has given birth and return inds of women who give birth today, otherwise step pregnancy forward in time''' female_inds = cvu.true(self.pregnant) has_deliver_date = cvu.idefinedi(self.date_delivery, female_inds) inds = cvu.itrue(self.t >= self.date_delivery[has_deliver_date], has_deliver_date) self.pregnant[inds] = False self.date_delivery[inds] = np.nan self.gestation[inds] = np.nan self.post_partum[inds] = True self.date_post_partum[inds] = self.t + (['dur_post_partum']*cvd.dpw) return inds
[docs] def check_post_partum(self): '''Method to check if it's the day a woman becomes eligible to get pregnant again''' post_partum_inds = cvu.true(self.post_partum) has_post_partum_date = cvu.idefinedi(self.date_post_partum, post_partum_inds) inds = cvu.itrue(self.t >= self.date_post_partum[has_post_partum_date], has_post_partum_date) self.post_partum[inds] = False self.date_post_partum[inds] = np.nan return
[docs] def check_household_shuffle(self): ''' Method to check if it's the day ''' return
[docs] def check_school_enrollment(self): '''Method to check if anyone enters or leaves school on this day''' school_pars =['school_pars'] new_contacts = cvb.Contacts(layer_keys='s') new_contacts['s']['p1'] = [] new_contacts['s']['p2'] = [] # Find everyone who is transitioning out of their school, remove them from school dict, remove their school contacts move_to_ps_inds = sc.findinds((self.age == school_pars['school_types_by_age']['ps'][0]) * ( self.sc_type ==['school_mapping']['pk'])) move_to_ss_inds = sc.findinds((self.age == school_pars['school_types_by_age']['ss'][0]) * ( self.sc_type ==['school_mapping']['ps'])) move_to_ts_inds = sc.findinds((self.age == school_pars['school_types_by_age']['ts'][0]) * ( self.sc_type ==['school_mapping']['ss'])) inds_to_move = np.concatenate((move_to_ps_inds, move_to_ss_inds, move_to_ts_inds)) if not isinstance(inds_to_move, np.ndarray): inds_to_move = sc.promotetoarray(inds_to_move) if inds_to_move.dtype != np.int64: # pragma: no cover # This is int64 since indices often come from cv.true(), which returns int64 inds_to_move = np.array(inds_to_move, dtype=np.int64) to_remove = cvu.find_contact_inds(self.contacts['s']['p1'], self.contacts['s']['p2'], inds_to_move) self.contacts['s'].pop_inds(list(to_remove)) for _, schools in self.schools_dict.items(): for id, school in schools.items(): schools[id] = [i for i in school if i not in inds_to_move] self.scid[inds_to_move] = np.nan # Now find those that are newly enrolling in prek and distribute amongst preks prek_age_inds = sc.findinds((self.age == school_pars['school_types_by_age']['pk'][0])) not_in_prek_inds = prek_age_inds[cvu.undefined(self.scid[prek_age_inds])] prek_probs = np.full(len(not_in_prek_inds), school_pars['enrollment_by_age'][school_pars['school_types_by_age']['pk'][0]][1]) add_to_prek_inds = not_in_prek_inds[(np.random.random(len(not_in_prek_inds)) < prek_probs).nonzero()[0]] inds_to_move_by_type = dict( pk=add_to_prek_inds, ps=move_to_ps_inds, ss=move_to_ss_inds, ts=move_to_ts_inds ) for sc_type, inds in inds_to_move_by_type.items(): schools = self.schools_dict[sc_type] school_ids = list(schools.keys()) school_id_probs = [1/len(school_ids)]*len(school_ids) school_inds_pp = cvu.choose_w(school_id_probs, len(inds), unique=False) school_ids_pp = [school_ids[i] for i in school_inds_pp] self.scid[inds] = school_ids_pp self.sc_student[inds] = 1 self.sc_type[inds] =['school_mapping'][sc_type] for id in school_ids: inds_to_add = inds[sc.findinds(school_ids_pp, id)] self.schools_dict[sc_type][id].append(inds_to_add) n_remaining = len(inds_to_add) while n_remaining > 0: p_count = int(cvu.n_poisson(['contacts']['s'], 1)) if p_count > n_remaining: p_count = n_remaining contacts_list = [[] for _ in range(p_count)] inds_in_class = np.random.choice(inds_to_add, p_count, replace=False) for i, ind_1 in enumerate(inds_in_class): # Add symmetric pairwise contacts in each cluster for j, ind_2 in enumerate(inds_in_class): if j > i: contacts_list[i].append(ind_2) for ind, contacts in enumerate(contacts_list): n = len(contacts) new_contacts['s']['p1'].extend([inds_in_class[ind]] * n) new_contacts['s']['p2'].extend(contacts) inds_to_add = np.setdiff1d(inds_to_add, inds_in_class) n_remaining = len(inds_to_add) new_layer = cvb.Layer(label='s') for ckey, value in new_contacts['s'].items(): new_layer[ckey] = np.array(value, dtype=new_layer.meta[ckey]) new_contacts['s'] = new_layer np.append(self.contacts['s']['p1'], new_contacts['s']['p1']) np.append(self.contacts['s']['p2'], new_contacts['s']['p2']) np.append(self.contacts['s']['beta'], new_contacts['s']['beta']) return
# %% Methods for updating state
[docs] def check_inds(self, current, date, filter_inds=None): ''' Return indices for which the current state is false and which meet the date criterion ''' if filter_inds is None: not_current = cvu.false(current) else: not_current = cvu.ifalsei(current, filter_inds) has_date = cvu.idefinedi(date, not_current) inds = cvu.itrue(self.t >= date[has_date], has_date) return inds
[docs] def check_infectious(self): ''' Check if they become infectious ''' inds = self.check_inds(self.infectious, self.date_infectious, filter_inds=self.is_exp) self.infectious[inds] = True self.infectious_genotype[inds] = self.exposed_genotype[inds] for genotype in range(['n_genotypes']): this_genotype_inds = cvu.itrue(self.infectious_genotype[inds] == genotype, inds) n_this_genotype_inds = len(this_genotype_inds) self.flows_genotype['new_infectious_by_genotype'][genotype] += n_this_genotype_inds self.infectious_by_genotype[genotype, this_genotype_inds] = True return len(inds)
[docs] def check_symptomatic(self): ''' Check for new progressions to symptomatic ''' inds = self.check_inds(self.symptomatic, self.date_symptomatic, filter_inds=self.is_exp) self.symptomatic[inds] = True return len(inds)
[docs] def check_severe(self): ''' Check for new progressions to severe ''' inds = self.check_inds(self.severe, self.date_severe, filter_inds=self.is_exp) self.severe[inds] = True return len(inds)
[docs] def check_critical(self): ''' Check for new progressions to critical ''' inds = self.check_inds(self.critical, self.date_critical, filter_inds=self.is_exp) self.critical[inds] = True return len(inds)
[docs] def check_recovery(self, inds=None, filter_inds='is_exp'): ''' Check for recovery. More complex than other functions to allow for recovery to be manually imposed for a specified set of indices. ''' # Handle more flexible options for setting indices if filter_inds == 'is_exp': filter_inds = self.is_exp if inds is None: inds = self.check_inds(self.recovered, self.date_recovered, filter_inds=filter_inds) # Now reset all disease states[inds] = False self.infectious[inds] = False self.symptomatic[inds] = False self.severe[inds] = False self.critical[inds] = False self.recovered[inds] = True self.recovered_genotype[inds] = self.exposed_genotype[inds] self.infectious_genotype[inds] = np.nan self.exposed_genotype[inds] = np.nan self.exposed_by_genotype[:, inds] = False self.infectious_by_genotype[:, inds] = False # Handle immunity aspects # Reset additional states self.susceptible[inds] = True self.diagnosed[inds] = False # Reset their diagnosis state because they might be reinfected return len(inds)
[docs] def check_death(self): ''' Check whether or not this person died on this timestep ''' inds = self.check_inds(self.dead, self.date_dead, filter_inds=self.is_exp) self.dead[inds] = True diag_inds = inds[self.diagnosed[inds]] # Check whether the person was diagnosed before dying self.known_dead[diag_inds] = True if len(inds): self.make_die(inds) return len(inds), len(diag_inds)
[docs] def check_background_mortality(self): ''' Check whether or not people died from background mortality on this timestep''' mortality_data = cvd.default_mortality_data age_inds = np.fromiter((cvu.find_cutoff(mortality_data['age_cutoffs'], this_age) for this_age in self.age), dtype=cvd.default_int, count=len(self)) # Convert ages to indices sexes = np.array(, dtype=cvd.default_int) life_expectancy = mortality_data['life_exp'][age_inds, sexes]/['tspy'] deaths = (np.random.random(len(life_expectancy)) < life_expectancy).nonzero()[0] already_dead = self.dead[deaths] deaths = deaths[~already_dead] # Unique indices in deaths that are not already dead if len(deaths): self.make_die(deaths) return len(deaths)
[docs] def make_die(self, inds): self.dead[inds] = True self.susceptible[inds] = False[inds] = False self.infectious[inds] = False self.symptomatic[inds] = False self.severe[inds] = False self.critical[inds] = False self.recovered[inds] = False self.infectious_genotype[inds] = np.nan self.exposed_genotype[inds] = np.nan self.recovered_genotype[inds] = np.nan if not isinstance(inds, np.ndarray): inds = sc.promotetoarray(inds) if inds.dtype != np.int64: # pragma: no cover # This is int64 since indices often come from cv.true(), which returns int64 inds = np.array(inds, dtype=np.int64) # remove dead people from contact network for lkey, contacts in self.contacts.items(): to_remove = cvu.find_contact_inds(contacts['p1'], contacts['p2'], inds) contacts.pop_inds(list(to_remove)) return
[docs] def check_diagnosed(self): ''' Check for new diagnoses. Since most data are reported with diagnoses on the date of the test, this function reports counts not for the number of people who received a positive test result on a day, but rather, the number of people who were tested on that day who are schedule to be diagnosed in the future. ''' # Handle people who tested today who will be diagnosed in future test_pos_inds = self.check_inds(self.diagnosed, self.date_pos_test, filter_inds=None) # Find people who will be diagnosed in future self.date_pos_test[test_pos_inds] = np.nan # Clear date of having will-be-positive test # Handle people who were actually diagnosed today diag_inds = self.check_inds(self.diagnosed, self.date_diagnosed, filter_inds=None) # Find who was actually diagnosed on this timestep self.diagnosed[diag_inds] = True # Set these people to be diagnosed return len(test_pos_inds)
# %% Methods to make events occur (infection and diagnosis)
[docs] def make_naive(self, inds, reset_vx=False): ''' Make a set of people naive. This is used during dynamic resampling. Args: inds (array): list of people to make naive reset_vx (bool): whether to reset vaccine-derived immunity ''' for key in self.meta.states: if key in ['susceptible', 'naive']: self[key][inds] = True else: if (key != 'vaccinated') or reset_vx: # Don't necessarily reset vaccination self[key][inds] = False # Reset genotype states for key in self.meta.genotype_states: self[key][inds] = np.nan for key in self.meta.by_genotype_states: self[key][:, inds] = False # Reset immunity and antibody states non_vx_inds = inds if reset_vx else inds[~self['vaccinated'][inds]] for key in self.meta.imm_states: self[key][:, non_vx_inds] = 0 for key in self.meta.vacc_states: self[key][non_vx_inds] = 0 # Reset dates for key in self.meta.dates + self.meta.durs: if (key != 'date_vaccinated') or reset_vx: # Don't necessarily reset vaccination self[key][inds] = np.nan return
[docs] def make_nonnaive(self, inds, date_recovered=-365, genotype='groupA'): ''' Make a set of people non-naive. This can be done either by setting only susceptible and naive states, or else by setting them as if they have been infected and recovered. ''' # First filter out children whose are younger than date recovered inds = inds[cvu.true(self.weekage[inds]*['timestep'] > -date_recovered)] self.make_naive(inds) # First make them naive and reset all other states dist = {'dist': 'normal', 'par1': 0, 'par2': 5 * 7 / 2.355} if['prior_wave_dist'] is None else sc.dcp(['prior_wave_dist']) # default is FWHM 5 weeks this_inf_offset_days = np.floor(cvu.sample(**dist, size=len(inds)) - date_recovered).astype(cvd.default_int) new_imm_length =['n_days'] + np.max(this_inf_offset_days) if new_imm_length > len(['imm_kin']['ab']):['imm_kin']['ab'] = cvi.precompute_waning(length=new_imm_length,['imm_decay']['ab'])['imm_kin']['mab'] = cvi.precompute_waning(length=new_imm_length,['imm_decay']['mab']) self.peak_imm[genotype, inds] =['imm_init'] self.t_imm_event[genotype,inds] = -this_inf_offset_days # Make them non-naive for key in ['susceptible', 'naive']: self[key][inds] = False self.date_recovered[inds] = date_recovered # Reset date recovered self.check_recovery(inds=inds, filter_inds=None) # Set recovered return
[docs] def infect(self, inds, hosp_max=None, icu_max=None, source=None, layer=None, genotype=0): ''' Infect people and determine their eventual outcomes. * Every infected person can infect other people, regardless of whether they develop symptoms * Infected people that develop symptoms are disaggregated into mild vs. severe (=requires hospitalization) vs. critical (=requires ICU) * Every asymptomatic, mildly symptomatic, and severely symptomatic person recovers * Critical cases either recover or die Method also deduplicates input arrays in case one agent is infected many times and stores who infected whom in infection_log list. Args: inds (array): array of people to infect hosp_max (bool): whether or not there is an acute bed available for this person icu_max (bool): whether or not there is an ICU bed available for this person source (array): source indices of the people who transmitted this infection (None if an importation or seed infection) layer (str): contact layer this infection was transmitted on genotype (int): the genotype people are being infected by Returns: count (int): number of people infected ''' if len(inds) == 0: return 0 # Remove duplicates inds, unique = np.unique(inds, return_index=True) if source is not None: source = source[unique] # Keep only susceptibles keep = self.susceptible[inds] # Unique indices in inds and source that are also susceptible inds = inds[keep] if source is not None: source = source[keep] # Deal with genotype parameters genotype_keys = ['rel_symp_prob', 'rel_severe_prob', 'rel_crit_prob', 'rel_death_prob'] infect_pars = {k:[k] for k in genotype_keys} genotype_label =['genotype_map'][genotype] if genotype: for k in genotype_keys: infect_pars[k] *=['genotype_pars'][genotype_label][k] n_infections = len(inds) durpars =['dur'] # Retrieve those with a secondary infection secondary_inds = cvu.true(self.peak_imm[:, inds].sum(axis=0)) self.rel_trans[secondary_inds] *=['dur_redux'] # Update states, genotype info, and flows self.susceptible[inds] = False self.naive[inds] = False self.recovered[inds] = False self.diagnosed[inds] = False[inds] = True self.n_infections[inds] += 1 self.exposed_genotype[inds] = genotype self.exposed_by_genotype[genotype, inds] = True self.flows['new_infections'] += len(inds) self.flows['new_reinfections'] += len(cvu.defined(self.date_recovered[inds])) # Record reinfections self.flows_genotype['new_infections_by_genotype'][genotype] += len(inds) # Record transmissions for i, target in enumerate(inds): entry = dict(source=source[i] if source is not None else None, target=target, date=self.t, layer=layer, genotype=genotype_label) self.infection_log.append(entry) # Calculate how long before this person can infect other people self.dur_exp2inf[inds] = cvu.sample(**durpars['exp2inf'], size=n_infections)*self.rel_trans[inds] self.date_exposed[inds] = self.t self.date_infectious[inds] = self.dur_exp2inf[inds] + self.t # Reset all other dates for key in ['date_symptomatic', 'date_severe', 'date_critical', 'date_diagnosed', 'date_recovered']: self[key][inds] = np.nan # Use prognosis probabilities to determine what happens to them symp_probs = infect_pars['rel_symp_prob'] * self.symp_prob[inds] # Calculate their actual probability of being symptomatic is_symp = cvu.binomial_arr(symp_probs) # Determine if they develop symptoms symp_inds = inds[is_symp] asymp_inds = inds[~is_symp] # Asymptomatic self.flows_genotype['new_symptomatic_by_genotype'][genotype] += len(symp_inds) # CASE 1: Asymptomatic: may infect others, but have no symptoms and do not die dur_asym2rec = cvu.sample(**durpars['asym2rec'], size=len(asymp_inds))*self.rel_trans[asymp_inds] self.date_recovered[asymp_inds] = self.date_infectious[asymp_inds] + dur_asym2rec # Date they recover self.dur_disease[asymp_inds] = self.dur_exp2inf[asymp_inds] + dur_asym2rec # Store how long this person had RSV # CASE 2: Symptomatic: can either be mild, severe, or critical n_symp_inds = len(symp_inds) self.dur_inf2sym[symp_inds] = cvu.sample(**durpars['inf2sym'], size=n_symp_inds)*self.rel_trans[symp_inds] # Store how long this person took to develop symptoms self.date_symptomatic[symp_inds] = self.date_infectious[symp_inds] + self.dur_inf2sym[ symp_inds] # Date they become symptomatic sev_probs = infect_pars['rel_severe_prob'] * self.severe_prob[symp_inds] # Probability of these people being severe is_sev = cvu.binomial_arr(sev_probs) # See if they're a severe or mild case sev_inds = symp_inds[is_sev] mild_inds = symp_inds[~is_sev] # Not severe self.flows_genotype['new_severe_by_genotype'][genotype] += len(sev_inds) # CASE 2.1: Mild symptoms, no hospitalization required and no probability of death dur_mild2rec = cvu.sample(**durpars['mild2rec'], size=len(mild_inds))*self.rel_trans[mild_inds] self.date_recovered[mild_inds] = self.date_symptomatic[mild_inds] + dur_mild2rec # Date they recover self.dur_disease[mild_inds] = self.dur_exp2inf[mild_inds] + self.dur_inf2sym[mild_inds] + dur_mild2rec # Store how long this person had RSV # CASE 2.2: Severe cases: hospitalization required, may become critical self.dur_sym2sev[sev_inds] = cvu.sample(**durpars['sym2sev'], size=len(sev_inds))*self.rel_trans[sev_inds] # Store how long this person took to develop severe symptoms self.date_severe[sev_inds] = self.date_symptomatic[sev_inds] + self.dur_sym2sev[ sev_inds] # Date symptoms become severe crit_probs = infect_pars['rel_crit_prob'] * self.crit_prob[sev_inds] * (['no_hosp_factor'] if hosp_max else 1.) # Probability of these people becoming critical - higher if no beds available is_crit = cvu.binomial_arr(crit_probs) # See if they're a critical case crit_inds = sev_inds[is_crit] non_crit_inds = sev_inds[~is_crit] # CASE 2.2.1 Not critical - they will recover dur_sev2rec = cvu.sample(**durpars['sev2rec'], size=len(non_crit_inds))*self.rel_trans[non_crit_inds] self.date_recovered[non_crit_inds] = self.date_severe[non_crit_inds] + dur_sev2rec # Date they recover self.dur_disease[non_crit_inds] = self.dur_exp2inf[non_crit_inds] + self.dur_inf2sym[non_crit_inds] + \ self.dur_sym2sev[ non_crit_inds] + dur_sev2rec # Store how long this person had RSV # CASE 2.2.2: Critical cases: ICU required, may die self.dur_sev2crit[crit_inds] = cvu.sample(**durpars['sev2crit'], size=len(crit_inds))*self.rel_trans[crit_inds] self.date_critical[crit_inds] = self.date_severe[crit_inds] + self.dur_sev2crit[crit_inds] # Date they become critical death_probs = infect_pars['rel_death_prob'] * self.death_prob[crit_inds] * (['no_icu_factor'] if icu_max else 1.) # Probability they'll die is_dead = cvu.binomial_arr(death_probs) # Death outcome dead_inds = crit_inds[is_dead] alive_inds = crit_inds[~is_dead] # CASE Did not die dur_crit2rec = cvu.sample(**durpars['crit2rec'], size=len(alive_inds))*self.rel_trans[alive_inds] self.date_recovered[alive_inds] = self.date_critical[alive_inds] + dur_crit2rec # Date they recover self.dur_disease[alive_inds] = self.dur_exp2inf[alive_inds] + self.dur_inf2sym[alive_inds] + self.dur_sym2sev[ alive_inds] + self.dur_sev2crit[alive_inds] + dur_crit2rec # Store how long this person had RSV # CASE Did die dur_crit2die = cvu.sample(**durpars['crit2die'], size=len(dead_inds))*self.rel_trans[dead_inds] self.date_dead[dead_inds] = self.date_critical[dead_inds] + dur_crit2die # Date of death self.dur_disease[dead_inds] = self.dur_exp2inf[dead_inds] + self.dur_inf2sym[dead_inds] + self.dur_sym2sev[ dead_inds] + self.dur_sev2crit[dead_inds] + dur_crit2die # Store how long this person had RSV self.date_recovered[dead_inds] = np.nan # If they did die, remove them from recovered # Handle immunity aspects cvi.update_peak_imm(self, inds,['imm_init'], imm_source=genotype) return n_infections # For incrementing counters
[docs] def test(self, inds, test_sensitivity=1.0, loss_prob=0.0, test_delay=0): ''' Method to test people. Typically not to be called by the user directly; see the test_num() and test_prob() interventions. Args: inds: indices of who to test test_sensitivity (float): probability of a true positive loss_prob (float): probability of loss to follow-up test_delay (int): number of days before test results are ready ''' inds = np.unique(inds) self.tested[inds] = True self.date_tested[inds] = self.t # Only keep the last time they tested is_infectious = cvu.itruei(self.infectious, inds) pos_test = cvu.n_binomial(test_sensitivity, len(is_infectious)) is_inf_pos = is_infectious[pos_test] not_diagnosed = is_inf_pos[np.isnan(self.date_diagnosed[is_inf_pos])] not_lost = cvu.n_binomial(1.0 - loss_prob, len(not_diagnosed)) final_inds = not_diagnosed[not_lost] # Store the date the person will be diagnosed, as well as the date they took the test which will come back positive self.date_diagnosed[final_inds] = self.t + test_delay self.date_pos_test[final_inds] = self.t return
# %% Analysis methods
[docs] def plot(self, *args, **kwargs): ''' Plot statistics of the population -- age distribution, numbers of contacts, and overall weight of contacts (number of contacts multiplied by beta per layer). Args: bins (arr) : age bins to use (default, 0-100 in one-year bins) width (float) : bar width font_size (float) : size of font alpha (float) : transparency of the plots fig_args (dict) : passed to pl.figure() axis_args (dict) : passed to pl.subplots_adjust() plot_args (dict) : passed to pl.plot() do_show (bool) : whether to show the plot fig (fig) : handle of existing figure to plot into ''' fig = cvplt.plot_people(people=self, *args, **kwargs) return fig
[docs] def story(self, uid, *args): ''' Print out a short history of events in the life of the specified individual. Args: uid (int/list): the person or people whose story is being regaled args (list): these people will tell their stories too **Example**:: sim = cv.Sim(pop_type='hybrid', verbose=0) sim.people.story(12) sim.people.story(795) ''' def label_lkey(lkey): ''' Friendly name for common layer keys ''' if lkey.lower() == 'a': llabel = 'default contact' if lkey.lower() == 'h': llabel = 'household' elif lkey.lower() == 's': llabel = 'school' elif lkey.lower() == 'w': llabel = 'workplace' elif lkey.lower() == 'c': llabel = 'community' else: llabel = f'"{lkey}"' return llabel uids = sc.promotetolist(uid) uids.extend(args) for uid in uids: p = self[uid] sex = 'female' if == 0 else 'male' intro = f'\nThis is the story of {uid}, a {p.age:.0f} year old {sex}' if not p.susceptible: if np.isnan(p.date_symptomatic): print(f'{intro}, who had asymptomatic RSV.') else: print(f'{intro}, who had symptomatic RSV.') else: print(f'{intro}, who did not contract RSV.') total_contacts = 0 no_contacts = [] for lkey in p.contacts.keys(): llabel = label_lkey(lkey) n_contacts = len(p.contacts[lkey]) total_contacts += n_contacts if n_contacts: print(f'{uid} is connected to {n_contacts} people in the {llabel} layer') else: no_contacts.append(llabel) if len(no_contacts): nc_string = ', '.join(no_contacts) print(f'{uid} has no contacts in the {nc_string} layer(s)') print(f'{uid} has {total_contacts} contacts in total') events = [] dates = { 'date_critical': 'became critically ill and needed ICU care', 'date_dead': 'died ☹', 'date_diagnosed': 'was diagnosed with RSV', 'date_infectious': 'became infectious', 'date_known_contact': 'was notified they may have been exposed to RSV', 'date_pos_test': 'recieved their positive test result', 'date_recovered': 'recovered', 'date_severe': 'developed severe symptoms and needed hospitalization', 'date_symptomatic': 'became symptomatic', 'date_tested': 'was tested for RSV', } for attribute, message in dates.items(): date = getattr(p, attribute) if not np.isnan(date): events.append((date, message)) for infection in self.infection_log: lkey = infection['layer'] llabel = label_lkey(lkey) if infection['target'] == uid: if lkey: events.append((infection['date'], f'was infected with RSV by {infection["source"]} via the {llabel} layer')) else: events.append((infection['date'], 'was infected with RSV as a seed infection')) if infection['source'] == uid: x = len([a for a in self.infection_log if a['source'] == infection['target']]) events.append((infection['date'], f'gave RSV to {infection["target"]} via the {llabel} layer ({x} secondary infections)')) if len(events): for day, event in sorted(events, key=lambda x: x[0]): print(f'On day {day:.0f}, {uid} {event}') else: print(f'Nothing happened to {uid} during the simulation.') return