'''
Defines the People class and functions associated with making people and handling
the transitions between states (e.g., from susceptible to infected).
'''
#%% Imports
import numpy as np
import sciris as sc
from collections import defaultdict
from . import version as cvv
from . import utils as cvu
from . import defaults as cvd
from . import base as cvb
from . import plotting as cvplt
from . import immunity as cvi
__all__ = ['People']
[docs]
class People(cvb.BasePeople):
'''
A class to perform all the operations on the people -- usually not invoked directly.
This class is usually created automatically by the sim. The only required input
argument is the population size, but typically the full parameters dictionary
will get passed instead since it will be needed before the People object is
initialized. However, ages, contacts, etc. will need to be created separately --
see ``cv.make_people()`` instead.
Note that this class handles the mechanics of updating the actual people, while
``cv.BasePeople`` takes care of housekeeping (saving, loading, exporting, etc.).
Please see the BasePeople class for additional methods.
Args:
pars (dict): the sim parameters, e.g. sim.pars -- alternatively, if a number, interpreted as pop_size
strict (bool): whether or not to only create keys that are already in self.meta.person; otherwise, let any key be set
kwargs (dict): the actual data, e.g. from a popdict, being specified
**Examples**::
ppl1 = cv.People(2000)
sim = cv.Sim()
ppl2 = cv.People(sim.pars)
'''
def __init__(self, pars, strict=True, **kwargs):
# Handle pars and population size
self.set_pars(pars)
self.version = cvv.__version__ # Store version info
# Other initialization
self.t = 0 # Keep current simulation time
self._lock = False # Prevent further modification of keys
self.meta = cvd.PeopleMeta() # Store list of keys and dtypes
self.contacts = None
self.init_contacts() # Initialize the contacts
self.infection_log = [] # Record of infections - keys for ['source','target','date','layer']
# Set person properties -- all floats except for UID
for key in self.meta.person:
if key == 'uid':
self[key] = np.arange(self.pars['pop_size'], dtype=cvd.default_int)
elif key in ['n_infections', 'n_breakthroughs']:
self[key] = np.zeros(self.pars['pop_size'], dtype=cvd.default_int)
else:
self[key] = np.full(self.pars['pop_size'], np.nan, dtype=cvd.default_float)
# Set health states -- only susceptible is true by default -- booleans except exposed by variant which should return the variant that ind is exposed to
for key in self.meta.states:
val = (key in ['susceptible', 'naive']) # Default value is True for susceptible and naive, false otherwise
self[key] = np.full(self.pars['pop_size'], val, dtype=bool)
# Set variant states, which store info about which variant a person is exposed to
for key in self.meta.variant_states:
self[key] = np.full(self.pars['pop_size'], np.nan, dtype=cvd.default_float)
for key in self.meta.by_variant_states:
self[key] = np.full((self.pars['n_variants'], self.pars['pop_size']), False, dtype=bool)
# Set immunity and antibody states
for key in self.meta.imm_states: # Everyone starts out with no immunity
self[key] = np.zeros((self.pars['n_variants'], self.pars['pop_size']), dtype=cvd.default_float)
for key in self.meta.nab_states: # Everyone starts out with no antibodies
dtype = cvd.default_int if key == 't_nab_event' else cvd.default_float
self[key] = np.zeros(self.pars['pop_size'], dtype=dtype)
for key in self.meta.vacc_states:
self[key] = np.zeros(self.pars['pop_size'], dtype=cvd.default_int)
# Set dates and durations -- both floats
for key in self.meta.dates + self.meta.durs:
self[key] = np.full(self.pars['pop_size'], np.nan, dtype=cvd.default_float)
# Store the dtypes used in a flat dict
self._dtypes = {key:self[key].dtype for key in self.keys()} # Assign all to float by default
if strict:
self.lock() # If strict is true, stop further keys from being set (does not affect attributes)
# Store flows to be computed during simulation
self.init_flows()
# Although we have called init(), we still need to call initialize()
self.initialized = False
# Handle contacts, if supplied (note: they usually are)
if 'contacts' in kwargs:
self.add_contacts(kwargs.pop('contacts'))
# Handle all other values, e.g. age
for key,value in kwargs.items():
if strict:
self.set(key, value)
else:
self[key] = value
self._pending_quarantine = defaultdict(list) # Internal cache to record people that need to be quarantined on each timestep {t:(person, quarantine_end_day)}
return
[docs]
def init_flows(self):
''' Initialize flows to be zero '''
self.flows = {key:0 for key in cvd.new_result_flows}
self.flows_variant = {}
for key in cvd.new_result_flows_by_variant:
self.flows_variant[key] = np.zeros(self.pars['n_variants'], dtype=cvd.default_float)
return
[docs]
def initialize(self, sim_pars=None):
''' Perform initializations '''
self.validate(sim_pars=sim_pars) # First, check that essential-to-match parameters match
self.set_pars(sim_pars) # Replace the saved parameters with this simulation's
self.set_prognoses()
self.initialized = True
return
[docs]
def set_prognoses(self):
'''
Set the prognoses for each person based on age during initialization. Need
to reset the seed because viral loads are drawn stochastically.
'''
pars = self.pars # Shorten
if 'prognoses' not in pars or 'rand_seed' not in pars:
errormsg = 'This people object does not have the required parameters ("prognoses" and "rand_seed"). Create a sim (or parameters), then do e.g. people.set_pars(sim.pars).'
raise sc.KeyNotFoundError(errormsg)
cvu.set_seed(pars['rand_seed'])
progs = pars['prognoses'] # Shorten the name
inds = np.digitize(self.age, progs['age_cutoffs'])-1
self.symp_prob[:] = progs['symp_probs'][inds] # Probability of developing symptoms
self.severe_prob[:] = progs['severe_probs'][inds]*progs['comorbidities'][inds] # Severe disease probability is modified by comorbidities
self.crit_prob[:] = progs['crit_probs'][inds] # Probability of developing critical disease
self.death_prob[:] = progs['death_probs'][inds] # Probability of death
self.rel_sus[:] = progs['sus_ORs'][inds] # Default susceptibilities
self.rel_trans[:] = progs['trans_ORs'][inds] * cvu.sample(**self.pars['beta_dist'], size=len(inds)) # Default transmissibilities, with viral load drawn from a distribution
return
[docs]
def update_states_pre(self, t):
''' Perform all state updates at the current timestep '''
# Initialize
self.t = t
self.is_exp = self.true('exposed') # For storing the interim values since used in every subsequent calculation
# Perform updates
self.init_flows()
self.flows['new_infectious'] += len(self.check_infectious()) # For people who are exposed and not infectious, check if they begin being infectious
self.flows['new_symptomatic'] += len(self.check_symptomatic())
self.flows['new_severe'] += len(self.check_severe())
self.flows['new_critical'] += len(self.check_critical())
self.flows['new_recoveries'] += len(self.check_recovery())
self.check_exit_iso()
_, new_deaths, new_known_deaths = self.check_death()
self.flows['new_deaths'] += new_deaths
self.flows['new_known_deaths'] += new_known_deaths
if self.pars['use_waning']:
cvi.check_immunity(self)
return
[docs]
def update_states_post(self):
''' Perform post-timestep updates '''
self.flows['new_diagnoses'] += len(self.check_diagnosed())
self.flows['new_quarantined'] += self.check_quar()
self.flows['new_isolated'] += len(self.check_enter_iso())
del self.is_exp # Tidy up
return
#%% Methods for updating state
[docs]
def check_inds(self, current, date, filter_inds=None):
''' Return indices for which the current state is false and which meet the date criterion '''
if filter_inds is None:
not_current = cvu.false(current)
else:
not_current = cvu.ifalsei(current, filter_inds)
has_date = cvu.idefinedi(date, not_current)
inds = cvu.itrue(self.t >= date[has_date], has_date)
return inds
[docs]
def check_infectious(self):
''' Check if they become infectious '''
inds = self.check_inds(self.infectious, self.date_infectious, filter_inds=self.is_exp)
self.infectious[inds] = True
self.infectious_variant[inds] = self.exposed_variant[inds]
for variant in range(self.pars['n_variants']):
this_variant_inds = cvu.itrue(self.infectious_variant[inds] == variant, inds)
n_this_variant_inds = len(this_variant_inds)
self.flows_variant['new_infectious_by_variant'][variant] += n_this_variant_inds
self.infectious_by_variant[variant, this_variant_inds] = True
return inds
[docs]
def check_symptomatic(self):
''' Check for new progressions to symptomatic '''
inds = self.check_inds(self.symptomatic, self.date_symptomatic, filter_inds=self.is_exp)
self.symptomatic[inds] = True
return inds
[docs]
def check_severe(self):
''' Check for new progressions to severe '''
inds = self.check_inds(self.severe, self.date_severe, filter_inds=self.is_exp)
self.severe[inds] = True
return inds
[docs]
def check_critical(self):
''' Check for new progressions to critical '''
inds = self.check_inds(self.critical, self.date_critical, filter_inds=self.is_exp)
self.critical[inds] = True
return inds
[docs]
def check_recovery(self, inds=None, filter_inds='is_exp'):
'''
Check for recovery.
More complex than other functions to allow for recovery to be manually imposed
for a specified set of indices.
'''
# Handle more flexible options for setting indices
if filter_inds == 'is_exp':
filter_inds = self.is_exp
if inds is None:
inds = self.check_inds(self.recovered, self.date_recovered, filter_inds=filter_inds)
# Now reset all disease states
self.exposed[inds] = False
self.infectious[inds] = False
self.symptomatic[inds] = False
self.severe[inds] = False
self.critical[inds] = False
self.recovered[inds] = True
self.recovered_variant[inds] = self.exposed_variant[inds]
self.infectious_variant[inds] = np.nan
self.exposed_variant[inds] = np.nan
self.exposed_by_variant[:, inds] = False
self.infectious_by_variant[:, inds] = False
# Handle immunity aspects
if self.pars['use_waning']:
# Reset additional states
self.susceptible[inds] = True
self.diagnosed[inds] = False # Reset their diagnosis state because they might be reinfected
return inds
[docs]
def check_death(self):
''' Check whether or not this person died on this timestep '''
inds = self.check_inds(self.dead, self.date_dead, filter_inds=self.is_exp)
self.dead[inds] = True
diag_inds = inds[self.diagnosed[inds]] # Check whether the person was diagnosed before dying
self.known_dead[diag_inds] = True
self.susceptible[inds] = False
self.exposed[inds] = False
self.infectious[inds] = False
self.symptomatic[inds] = False
self.severe[inds] = False
self.critical[inds] = False
self.known_contact[inds] = False
self.quarantined[inds] = False
self.recovered[inds] = False
self.infectious_variant[inds] = np.nan
self.exposed_variant[inds] = np.nan
self.recovered_variant[inds] = np.nan
return inds, len(inds), len(diag_inds)
[docs]
def check_diagnosed(self):
'''
Check for new diagnoses. Since most data are reported with diagnoses on
the date of the test, this function reports counts not for the number of
people who received a positive test result on a day, but rather, the number
of people who were tested on that day who are schedule to be diagnosed in
the future.
'''
# Handle people who tested today who will be diagnosed in future
test_pos_inds = self.check_inds(self.diagnosed, self.date_pos_test, filter_inds=None) # Find people who will be diagnosed in future
self.date_pos_test[test_pos_inds] = np.nan # Clear date of having will-be-positive test
# Handle people who were actually diagnosed today
diag_inds = self.check_inds(self.diagnosed, self.date_diagnosed, filter_inds=None) # Find who was actually diagnosed on this timestep
self.diagnosed[diag_inds] = True # Set these people to be diagnosed
return test_pos_inds
[docs]
def check_quar(self):
''' Update quarantine state '''
n_quarantined = 0 # Number of people entering quarantine
for ind,end_day in self._pending_quarantine[self.t]:
if self.quarantined[ind]:
self.date_end_quarantine[ind] = max(self.date_end_quarantine[ind], end_day) # Extend quarantine if required
elif not (self.dead[ind] or self.recovered[ind] or self.diagnosed[ind] or self.isolated[ind]): # Unclear whether recovered should be included here # elif not (self.dead[ind] or self.diagnosed[ind]):
self.quarantined[ind] = True
self.date_quarantined[ind] = self.t
self.date_end_quarantine[ind] = end_day
n_quarantined += 1
# If someone has been diagnosed today, end their quarantine
# By definition, 'quarantine' only applies to people that are not yet diagnosed
# After diagnosis, they are 'isolating'
diag_inds = cvu.true(self.quarantined & (self.date_diagnosed == self.t))
self.date_end_quarantine[diag_inds] = self.t
# If someone on quarantine has reached the end of their quarantine, release them
end_inds = self.check_inds(~self.quarantined, self.date_end_quarantine, filter_inds=None) # Note the double-negative here (~)
self.quarantined[end_inds] = False # Release from quarantine
return n_quarantined
def check_enter_iso(self):
# Anyone diagnosed today enters isolation for the duration of their infection
iso_inds = cvu.true(self.date_diagnosed == self.t)
self.isolated[iso_inds] = True
self.date_end_isolation[iso_inds] = self.date_recovered[iso_inds]
return iso_inds
[docs]
def check_exit_iso(self):
'''
End isolation for anyone due to exit isolation
'''
end_inds = self.check_inds(~self.isolated, self.date_end_isolation, filter_inds=None) # Note the double-negative here (~)
self.isolated[end_inds] = False # Release from isolation
return end_inds
#%% Methods to make events occur (infection and diagnosis)
[docs]
def make_naive(self, inds, reset_vx=False):
'''
Make a set of people naive. This is used during dynamic resampling.
Args:
inds (array): list of people to make naive
reset_vx (bool): whether to reset vaccine-derived immunity
'''
for key in self.meta.states:
if key in ['susceptible', 'naive']:
self[key][inds] = True
else:
if (key != 'vaccinated') or reset_vx: # Don't necessarily reset vaccination
self[key][inds] = False
# Reset variant states
for key in self.meta.variant_states:
self[key][inds] = np.nan
for key in self.meta.by_variant_states:
self[key][:, inds] = False
# Reset immunity and antibody states
non_vx_inds = inds if reset_vx else inds[~self['vaccinated'][inds]]
for key in self.meta.imm_states:
self[key][:, non_vx_inds] = 0
for key in self.meta.nab_states + self.meta.vacc_states:
self[key][non_vx_inds] = 0
# Reset dates
for key in self.meta.dates + self.meta.durs:
if (key != 'date_vaccinated') or reset_vx: # Don't necessarily reset vaccination
self[key][inds] = np.nan
return
[docs]
def make_nonnaive(self, inds, set_recovered=False, date_recovered=0):
'''
Make a set of people non-naive.
This can be done either by setting only susceptible and naive states,
or else by setting them as if they have been infected and recovered.
'''
self.make_naive(inds) # First make them naive and reset all other states
# Make them non-naive
for key in ['susceptible', 'naive']:
self[key][inds] = False
if set_recovered:
self.date_recovered[inds] = date_recovered # Reset date recovered
self.check_recovery(inds=inds, filter_inds=None) # Set recovered
return
[docs]
def infect(self, inds, hosp_max=None, icu_max=None, source=None, layer=None, variant=0):
'''
Infect people and determine their eventual outcomes.
* Every infected person can infect other people, regardless of whether they develop symptoms
* Infected people that develop symptoms are disaggregated into mild vs. severe (=requires hospitalization) vs. critical (=requires ICU)
* Every asymptomatic, mildly symptomatic, and severely symptomatic person recovers
* Critical cases either recover or die
* If the simulation is being run with waning, this method also sets/updates agents' neutralizing antibody levels
Method also deduplicates input arrays in case one agent is infected many times
and stores who infected whom in infection_log list.
Args:
inds (array): array of people to infect
hosp_max (bool): whether or not there is an acute bed available for this person
icu_max (bool): whether or not there is an ICU bed available for this person
source (array): source indices of the people who transmitted this infection (None if an importation or seed infection)
layer (str): contact layer this infection was transmitted on
variant (int): the variant people are being infected by
Returns:
count (int): number of people infected
'''
# If no infections, short-circuit
if len(inds) == 0:
return inds
# Remove duplicates
inds, unique = np.unique(inds, return_index=True)
if source is not None:
source = source[unique]
# Keep only susceptibles
keep = self.susceptible[inds] # Unique indices in inds and source that are also susceptible
inds = inds[keep]
if source is not None:
source = source[keep]
# Deal with variant parameters
variant_keys = ['rel_symp_prob', 'rel_severe_prob', 'rel_crit_prob', 'rel_death_prob']
infect_pars = {k:self.pars[k] for k in variant_keys}
variant_label = self.pars['variant_map'][variant]
if variant:
for k in variant_keys:
infect_pars[k] *= self.pars['variant_pars'][variant_label][k]
durpars = self.pars['dur']
# Retrieve those with a breakthrough infection (defined nabs)
breakthrough_inds = inds[cvu.true(self.peak_nab[inds])]
if len(breakthrough_inds):
no_prior_breakthrough = (self.n_breakthroughs[breakthrough_inds] == 0) # We only adjust transmissibility for the first breakthrough
new_breakthrough_inds = breakthrough_inds[no_prior_breakthrough]
self.rel_trans[new_breakthrough_inds] *= self.pars['trans_redux']
# Update states, variant info, and flows
n_infections = len(inds)
self.susceptible[inds] = False
self.naive[inds] = False
self.recovered[inds] = False
self.diagnosed[inds] = False
self.exposed[inds] = True
self.n_infections[inds] += 1
self.n_breakthroughs[breakthrough_inds] += 1
self.exposed_variant[inds] = variant
self.exposed_by_variant[variant, inds] = True
self.flows['new_infections'] += n_infections
self.flows['new_reinfections'] += len(cvu.defined(self.date_recovered[inds])) # Record reinfections
self.flows_variant['new_infections_by_variant'][variant] += n_infections
# Record transmissions
for i, target in enumerate(inds):
entry = dict(source=source[i] if source is not None else None, target=target, date=self.t, layer=layer, variant=variant_label)
self.infection_log.append(entry)
# Calculate how long before this person can infect other people
self.dur_exp2inf[inds] = cvu.sample(**durpars['exp2inf'], size=n_infections)
self.date_exposed[inds] = self.t
self.date_infectious[inds] = self.dur_exp2inf[inds] + self.t
# Reset all other dates
for key in ['date_symptomatic', 'date_severe', 'date_critical', 'date_diagnosed', 'date_recovered']:
self[key][inds] = np.nan
# Use prognosis probabilities to determine what happens to them
symp_probs = infect_pars['rel_symp_prob']*self.symp_prob[inds]*(1-self.symp_imm[variant, inds]) # Calculate their actual probability of being symptomatic
is_symp = cvu.binomial_arr(symp_probs) # Determine if they develop symptoms
symp_inds = inds[is_symp]
asymp_inds = inds[~is_symp] # Asymptomatic
self.flows_variant['new_symptomatic_by_variant'][variant] += len(symp_inds)
# CASE 1: Asymptomatic: may infect others, but have no symptoms and do not die
dur_asym2rec = cvu.sample(**durpars['asym2rec'], size=len(asymp_inds))
self.date_recovered[asymp_inds] = self.date_infectious[asymp_inds] + dur_asym2rec # Date they recover
self.dur_disease[asymp_inds] = self.dur_exp2inf[asymp_inds] + dur_asym2rec # Store how long this person had COVID-19
# CASE 2: Symptomatic: can either be mild, severe, or critical
n_symp_inds = len(symp_inds)
self.dur_inf2sym[symp_inds] = cvu.sample(**durpars['inf2sym'], size=n_symp_inds) # Store how long this person took to develop symptoms
self.date_symptomatic[symp_inds] = self.date_infectious[symp_inds] + self.dur_inf2sym[symp_inds] # Date they become symptomatic
sev_probs = infect_pars['rel_severe_prob'] * self.severe_prob[symp_inds]*(1-self.sev_imm[variant, symp_inds]) # Probability of these people being severe
is_sev = cvu.binomial_arr(sev_probs) # See if they're a severe or mild case
sev_inds = symp_inds[is_sev]
mild_inds = symp_inds[~is_sev] # Not severe
self.flows_variant['new_severe_by_variant'][variant] += len(sev_inds)
# CASE 2.1: Mild symptoms, no hospitalization required and no probability of death
dur_mild2rec = cvu.sample(**durpars['mild2rec'], size=len(mild_inds))
self.date_recovered[mild_inds] = self.date_symptomatic[mild_inds] + dur_mild2rec # Date they recover
self.dur_disease[mild_inds] = self.dur_exp2inf[mild_inds] + self.dur_inf2sym[mild_inds] + dur_mild2rec # Store how long this person had COVID-19
# CASE 2.2: Severe cases: hospitalization required, may become critical
self.dur_sym2sev[sev_inds] = cvu.sample(**durpars['sym2sev'], size=len(sev_inds)) # Store how long this person took to develop severe symptoms
self.date_severe[sev_inds] = self.date_symptomatic[sev_inds] + self.dur_sym2sev[sev_inds] # Date symptoms become severe
crit_probs = infect_pars['rel_crit_prob'] * self.crit_prob[sev_inds] * (self.pars['no_hosp_factor'] if hosp_max else 1.) # Probability of these people becoming critical - higher if no beds available
is_crit = cvu.binomial_arr(crit_probs) # See if they're a critical case
crit_inds = sev_inds[is_crit]
non_crit_inds = sev_inds[~is_crit]
# CASE 2.2.1 Not critical - they will recover
dur_sev2rec = cvu.sample(**durpars['sev2rec'], size=len(non_crit_inds))
self.date_recovered[non_crit_inds] = self.date_severe[non_crit_inds] + dur_sev2rec # Date they recover
self.dur_disease[non_crit_inds] = self.dur_exp2inf[non_crit_inds] + self.dur_inf2sym[non_crit_inds] + self.dur_sym2sev[non_crit_inds] + dur_sev2rec # Store how long this person had COVID-19
# CASE 2.2.2: Critical cases: ICU required, may die
self.dur_sev2crit[crit_inds] = cvu.sample(**durpars['sev2crit'], size=len(crit_inds))
self.date_critical[crit_inds] = self.date_severe[crit_inds] + self.dur_sev2crit[crit_inds] # Date they become critical
death_probs = infect_pars['rel_death_prob'] * self.death_prob[crit_inds] * (self.pars['no_icu_factor'] if icu_max else 1.)# Probability they'll die
is_dead = cvu.binomial_arr(death_probs) # Death outcome
dead_inds = crit_inds[is_dead]
alive_inds = crit_inds[~is_dead]
# CASE 2.2.2.1: Did not die
dur_crit2rec = cvu.sample(**durpars['crit2rec'], size=len(alive_inds))
self.date_recovered[alive_inds] = self.date_critical[alive_inds] + dur_crit2rec # Date they recover
self.dur_disease[alive_inds] = self.dur_exp2inf[alive_inds] + self.dur_inf2sym[alive_inds] + self.dur_sym2sev[alive_inds] + self.dur_sev2crit[alive_inds] + dur_crit2rec # Store how long this person had COVID-19
# CASE 2.2.2.2: Did die
dur_crit2die = cvu.sample(**durpars['crit2die'], size=len(dead_inds))
self.date_dead[dead_inds] = self.date_critical[dead_inds] + dur_crit2die # Date of death
self.dur_disease[dead_inds] = self.dur_exp2inf[dead_inds] + self.dur_inf2sym[dead_inds] + self.dur_sym2sev[dead_inds] + self.dur_sev2crit[dead_inds] + dur_crit2die # Store how long this person had COVID-19
self.date_recovered[dead_inds] = np.nan # If they did die, remove them from recovered
# Handle immunity aspects
if self.pars['use_waning']:
symp = dict(asymp=asymp_inds, mild=mild_inds, sev=sev_inds)
cvi.update_peak_nab(self, inds, nab_pars=self.pars, symp=symp)
return inds # For incrementing counters
[docs]
def test(self, inds, test_sensitivity=1.0, loss_prob=0.0, test_delay=0):
'''
Method to test people. Typically not to be called by the user directly;
see the test_num() and test_prob() interventions.
Args:
inds: indices of who to test
test_sensitivity (float): probability of a true positive
loss_prob (float): probability of loss to follow-up
test_delay (int): number of days before test results are ready
'''
inds = np.unique(inds)
self.tested[inds] = True
self.date_tested[inds] = self.t # Only keep the last time they tested
is_infectious = cvu.itruei(self.infectious, inds)
pos_test = cvu.n_binomial(test_sensitivity, len(is_infectious))
is_inf_pos = is_infectious[pos_test]
not_diagnosed = is_inf_pos[np.isnan(self.date_diagnosed[is_inf_pos])]
not_lost = cvu.n_binomial(1.0-loss_prob, len(not_diagnosed))
final_inds = not_diagnosed[not_lost]
# Store the date the person will be diagnosed, as well as the date they took the test which will come back positive
self.date_diagnosed[final_inds] = self.t + test_delay
self.date_pos_test[final_inds] = self.t
return final_inds
[docs]
def schedule_quarantine(self, inds, start_date=None, period=None):
'''
Schedule a quarantine. Typically not called by the user directly except
via a custom intervention; see the contact_tracing() intervention instead.
This function will create a request to quarantine a person on the start_date for
a period of time. Whether they are on an existing quarantine that gets extended, or
whether they are no longer eligible for quarantine, will be checked when the start_date
is reached.
Args:
inds (int): indices of who to quarantine, specified by check_quar()
start_date (int): day to begin quarantine (defaults to the current day, `sim.t`)
period (int): quarantine duration (defaults to ``pars['quar_period']``)
'''
start_date = self.t if start_date is None else int(start_date)
period = self.pars['quar_period'] if period is None else int(period)
for ind in inds:
self._pending_quarantine[start_date].append((ind, start_date + period))
return
#%% Analysis methods
[docs]
def plot(self, *args, **kwargs):
'''
Plot statistics of the population -- age distribution, numbers of contacts,
and overall weight of contacts (number of contacts multiplied by beta per
layer).
Args:
bins (arr) : age bins to use (default, 0-100 in one-year bins)
width (float) : bar width
font_size (float) : size of font
alpha (float) : transparency of the plots
fig_args (dict) : passed to pl.figure()
axis_args (dict) : passed to pl.subplots_adjust()
plot_args (dict) : passed to pl.plot()
do_show (bool) : whether to show the plot
fig (fig) : handle of existing figure to plot into
'''
fig = cvplt.plot_people(people=self, *args, **kwargs)
return fig
[docs]
def story(self, uid, *args):
'''
Print out a short history of events in the life of the specified individual.
Args:
uid (int/list): the person or people whose story is being regaled
args (list): these people will tell their stories too
**Example**::
sim = cv.Sim(pop_type='hybrid', verbose=0)
sim.run()
sim.people.story(12)
sim.people.story(795)
'''
def label_lkey(lkey):
''' Friendly name for common layer keys '''
if lkey.lower() == 'a':
llabel = 'default contact'
if lkey.lower() == 'h':
llabel = 'household'
elif lkey.lower() == 's':
llabel = 'school'
elif lkey.lower() == 'w':
llabel = 'workplace'
elif lkey.lower() == 'c':
llabel = 'community'
else:
llabel = f'"{lkey}"'
return llabel
uids = sc.tolist(uid)
uids.extend(args)
for uid in uids:
p = self[uid]
sex = 'female' if p.sex == 0 else 'male'
intro = f'\nThis is the story of {uid}, a {p.age:.0f} year old {sex}'
if not p.susceptible:
if np.isnan(p.date_symptomatic):
print(f'{intro}, who had asymptomatic COVID.')
else:
print(f'{intro}, who had symptomatic COVID.')
else:
print(f'{intro}, who did not contract COVID.')
total_contacts = 0
no_contacts = []
for lkey in p.contacts.keys():
llabel = label_lkey(lkey)
n_contacts = len(p.contacts[lkey])
total_contacts += n_contacts
if n_contacts:
print(f'{uid} is connected to {n_contacts} people in the {llabel} layer')
else:
no_contacts.append(llabel)
if len(no_contacts):
nc_string = ', '.join(no_contacts)
print(f'{uid} has no contacts in the {nc_string} layer(s)')
print(f'{uid} has {total_contacts} contacts in total')
events = []
dates = {
'date_critical' : 'became critically ill and needed ICU care',
'date_dead' : 'died ☹',
'date_diagnosed' : 'was diagnosed with COVID',
'date_end_quarantine' : 'ended quarantine',
'date_infectious' : 'became infectious',
'date_known_contact' : 'was notified they may have been exposed to COVID',
'date_pos_test' : 'recieved their positive test result',
'date_quarantined' : 'entered quarantine',
'date_recovered' : 'recovered',
'date_severe' : 'developed severe symptoms and needed hospitalization',
'date_symptomatic' : 'became symptomatic',
'date_tested' : 'was tested for COVID',
'date_vaccinated' : 'was vaccinated against COVID',
}
for attribute, message in dates.items():
date = getattr(p,attribute)
if not np.isnan(date):
events.append((date, message))
for infection in self.infection_log:
lkey = infection['layer']
llabel = label_lkey(lkey)
if infection['target'] == uid:
if lkey:
events.append((infection['date'], f'was infected with COVID by {infection["source"]} via the {llabel} layer'))
else:
events.append((infection['date'], 'was infected with COVID as a seed infection'))
if infection['source'] == uid:
x = len([a for a in self.infection_log if a['source'] == infection['target']])
events.append((infection['date'],f'gave COVID to {infection["target"]} via the {llabel} layer ({x} secondary infections)'))
if len(events):
for day, event in sorted(events, key=lambda x: x[0]):
print(f'On day {day:.0f}, {uid} {event}')
else:
print(f'Nothing happened to {uid} during the simulation.')
return