Source code for hpvsim.people

'''
Defines the People class and functions associated with making people and handling
the transitions between states (e.g., from susceptible to infected).
'''

#%% Imports
import numpy as np
import sciris as sc
from . import utils as hpu
from . import defaults as hpd
from . import base as hpb
from . import population as hppop
from . import plotting as hpplt
from . import parameters as hppar
from . import immunity as hpimm


__all__ = ['People']

[docs] class People(hpb.BasePeople): ''' A class to perform all the operations on the people -- usually not invoked directly. This class is usually created automatically by the sim. The only required input argument is the population size, but typically the full parameters dictionary will get passed instead since it will be needed before the People object is initialized. However, ages, contacts, etc. will need to be created separately -- see ``hpv.make_people()`` instead. Note that this class handles the mechanics of updating the actual people, while ``hpv.BasePeople`` takes care of housekeeping (saving, loading, exporting, etc.). Please see the BasePeople class for additional methods. Args: pars (dict): the sim parameters, e.g. sim.pars -- alternatively, if a number, interpreted as n_agents strict (bool): whether or not to only create keys that are already in self.meta.person; otherwise, let any key be set pop_trend (dataframe): a dataframe of years and population sizes, if available kwargs (dict): the actual data, e.g. from a popdict, being specified **Examples**:: ppl1 = hpv.People(2000) sim = hpv.Sim() ppl2 = hpv.People(sim.pars) ''' #%% Basic methods def __init__(self, pars, strict=True, pop_trend=None, pop_age_trend=None, **kwargs): # Initialize the BasePeople, which also sets things up for filtering super().__init__(pars) # Handle pars and settings # Other initialization self.pop_trend = pop_trend self.pop_age_trend = pop_age_trend self.init_contacts() # Initialize the contacts self.ng = self.pars['n_genotypes'] self.na = len(self.pars['age_bin_edges'])-1 self.lag_bins = np.linspace(0,50,51) self.rship_lags = dict() for lkey in self.layer_keys(): self.rship_lags[lkey] = np.zeros(len(self.lag_bins)-1, dtype=hpd.default_float) # Store age bins self.age_bin_edges = self.pars['age_bin_edges'] # Age bins for age results if strict: self.lock() # If strict is true, stop further keys from being set (does not affect attributes) # Store flows to be computed during simulation self.init_flows() # Although we have called init(), we still need to call initialize() self.initialized = False # Store kwargs here for now, to be dealt with during initialize() self.kwargs = kwargs return
[docs] def init_flows(self): ''' Initialize flows to be zero ''' df = hpd.default_float self.flows = {key: 0 for key in hpd.flow_keys} self.genotype_flows = {key: np.zeros(self.ng, dtype=df) for key in hpd.genotype_flow_keys} self.age_flows = {key: np.zeros(self.na, dtype=df) for key in hpd.flow_keys} self.sex_flows = {f'{key}' : np.zeros(2, dtype=df) for key in hpd.by_sex_keys} self.demographic_flows = {f'{key}' : 0 for key in hpd.dem_keys} return
[docs] def scale_flows(self, inds): ''' Return the scaled versions of the flows -- replacement for len(inds) followed by scale factor multiplication ''' return self.scale[inds].sum()
[docs] def increment_age(self): ''' Let people age by one timestep ''' self.age[self.alive] += self.dt return
[docs] def initialize(self, sim_pars=None): ''' Perform initializations ''' super().initialize() # Initialize states # Handle partners and contacts kwargs = self.kwargs if 'partners' in kwargs: self.partners[:] = kwargs.pop('partners') # Store the desired concurrency if 'current_partners' in kwargs: self.current_partners[:] = kwargs.pop('current_partners') # Store current actual number - updated each step though for ln,lkey in enumerate(self.layer_keys()): self.rship_start_dates[ln,self.current_partners[ln]>0] = 0 if 'contacts' in kwargs: self.add_contacts(kwargs.pop('contacts')) # Also updated each step self.n_rships[:] = self.current_partners self.ever_partnered[:] = self.current_partners.sum(axis=0)>0 # Handle all other values, e.g. age for key,value in kwargs.items(): if self._lock: self.set(key, value) elif key in self._data: self[key][:] = value else: self[key] = value # Set the scale factor self.scale[:] = sim_pars['pop_scale'] # Additional validation self.validate(sim_pars=sim_pars) # First, check that essential-to-match parameters match self.initialized = True return
[docs] def update_states_pre(self, t, year=None): ''' Perform all state updates at the current timestep ''' # Initialize self.t = t self.dt = self.pars['dt'] self.init_flows() # Let people age by one time step self.increment_age() # Perform updates that are not genotype-specific update_freq = max(1, int(self.pars['dt_demog'] / self.pars['dt'])) # Ensure it's an integer not smaller than 1 if t % update_freq == 0: # Apply death rates from other causes other_deaths, deaths_female, deaths_male = self.apply_death_rates(year=year) self.demographic_flows['other_deaths'] = other_deaths self.sex_flows['other_deaths_by_sex'][0] = deaths_female self.sex_flows['other_deaths_by_sex'][1] = deaths_male # Add births new_births = self.add_births(year=year, sex_ratio=self.pars['sex_ratio']) self.demographic_flows['births'] = new_births # Check migration migration = self.check_migration(year=year) self.demographic_flows['migration'] = migration # Perform updates that are genotype-specific for g in range(self.pars['n_genotypes']): self.check_clearance(g) # check for clearance (need to do this first) # Perform updates that are not genotype specific deaths_by_age, deaths = self.check_cancer_deaths() self.flows['cancer_deaths'] = deaths self.age_flows['cancer_deaths'] = deaths_by_age # Before applying interventions or new infections, calculate the pool of susceptibles self.sus_pool = self.susceptible.all(axis=0) # True for people with no infection at the start of the timestep return
[docs] def update_states_post(self, t, year=None): ''' State updates at the end of the current timestep ''' ng = self.pars['n_genotypes'] for g in range(ng): for key in ['cins','cancers']: # update flows cases_by_age, cases = self.check_progress(key, g) self.flows[key] += cases # Increment flows (summed over all genotypes) self.genotype_flows[key][g] = cases # Store flows by genotype self.age_flows[key] += cases_by_age # Increment flows by age (summed over all genotypes)
#%% Disease progression methods
[docs] def set_prognoses(self, inds, g, gpars, dt): ''' Assigns prognoses for all infected women on day of infection. ''' # Set length of infection, which is moderated by any prior cell-level immunity sev_imm = self.sev_imm[g, inds] # Determine how long before precancerous cell changes dur_precin = hpu.sample(**gpars['dur_precin'], size=len(inds))*(1-sev_imm) # Sample from distribution self.dur_precin[g, inds] = dur_precin self.dur_infection[g, inds] = dur_precin # Probability of progressing cin_probs = hppar.compute_severity(dur_precin, rel_sev=self.rel_sev[inds], pars=gpars['cin_fn']) cin_bools = hpu.binomial_arr(cin_probs) cin_inds = inds[cin_bools] nocin_inds = inds[~cin_bools] # Duration of CIN age_mod = np.ones(len(cin_inds)) age_mod[self.age[cin_inds] >= self.pars['age_risk']['age']] = self.pars['age_risk']['risk'] dur_cin = hpu.sample(**gpars['dur_cin'], size=len(cin_inds))* age_mod self.dur_cin[g, cin_inds] = dur_cin self.dur_infection[g, cin_inds] += dur_cin # Set date of clearance for those who don't develop precancer self.date_clearance[g, nocin_inds] = self.t + sc.randround(self.dur_precin[g, nocin_inds]/dt) # Set date of onset of precancer for those who develop precancer self.date_cin[g, cin_inds] = self.t + sc.randround(self.dur_precin[g, cin_inds]/dt) # Set infection severity and outcomes self.set_severity(inds[cin_bools], g, gpars, dt) return
[docs] def set_severity(self, inds, g, gpars, dt, set_sev=True): ''' Set severity levels for individual women Args: inds: indices of women to set severity for g: genotype index dt: timestep set_sev: whether or not to set initial severity ''' # Calculate the probability of cancer for each woman dur_cin = self.dur_cin[g, inds] if gpars["cancer_fn"].get('method') == 'cin_integral': cancer_pars = sc.mergedicts(gpars["cancer_fn"], gpars["cin_fn"]) else: cancer_pars = gpars["cancer_fn"] cancer_prob = hppar.compute_severity(dur_cin, rel_sev=self.rel_sev[inds], pars=cancer_pars) # Calculate probability of cancer n_extra = self.pars['ms_agent_ratio'] cancer_scale = self.pars['pop_scale'] / n_extra if n_extra == 1: cancer_prob_arr = hppar.compute_severity(dur_cin, rel_sev=self.rel_sev[inds], pars=cancer_pars) # Multiscale version elif n_extra > 1: # Firstly, determine who will transform based on severity values, and scale them to create more agents is_cancer = hpu.binomial_arr(cancer_prob) # Select who transforms - NB, this array gets extended later cancer_inds = inds[is_cancer] # Indices of those who transform self.scale[cancer_inds] = cancer_scale # Shrink the weight of the original agents, but otherwise leave them the same # Create extra disease severity values for the extra agents full_size = (len(inds), n_extra) # Main axis is indices, but include columns for multiscale agents extra_dur_cin = hpu.sample(**gpars['dur_cin'], size=full_size) extra_dur_precin = hpu.sample(**gpars['dur_precin'], size=full_size) extra_rel_sevs = np.ones(full_size)*self.rel_sev[inds][:,None] extra_cin_probs = hppar.compute_severity(extra_dur_precin, rel_sev=extra_rel_sevs, pars=gpars['cin_fn']) extra_cin_bools = hpu.binomial_arr(extra_cin_probs[:,1:]) extra_cancer_probs = hppar.compute_severity(extra_dur_cin, rel_sev=extra_rel_sevs, pars=cancer_pars) # Calculate probability of cancer extra_cancer_probs[:,1:][~extra_cin_bools] = 0 # Based on the extra severity values, determine additional transformation probabilities extra_cancer_bools = hpu.binomial_arr(extra_cancer_probs[:,1:]) extra_cancer_bools *= self.level0[inds, None] # Don't allow existing cancer agents to make more cancer agents extra_cancer_counts = extra_cancer_bools.sum(axis=1) # Find out how many new cancer cases we have n_new_agents = extra_cancer_counts.sum() # Total number of new agents if n_new_agents: # If we have more than 0, proceed extra_source_lists = [] for i, count in enumerate(extra_cancer_counts): ii = inds[i] if count: # At least 1 new cancer agent, plus person is not already a cancer agent extra_source_lists.append([ii] * int(count)) # Duplicate the current index count times extra_source_inds = np.concatenate(extra_source_lists).flatten() # Assemble the sources for these new agents n_new_agents = len(extra_source_inds) # The same as above, *unless* a cancer agent tried to spawn more cancer agents # Create the new agents and assign them the same properties as the existing agents new_inds = self._grow(n_new_agents) for state in self.meta.states_to_set: if state.ndim == 1: self[state.name][new_inds] = self[state.name][extra_source_inds] elif state.ndim == 2: self[state.name][:, new_inds] = self[state.name][:, extra_source_inds] # Reset the states for the new agents self.level0[new_inds] = False self.level1[new_inds] = True self.scale[new_inds] = cancer_scale # Add the new indices onto the existing vectors inds = np.append(inds, new_inds) is_cancer = np.append(is_cancer, np.full(len(new_inds), fill_value=True)) new_dur_precin = extra_dur_precin[:, 1:][extra_cancer_bools] new_dur_cin = extra_dur_cin[:, 1:][extra_cancer_bools] new_dur_infection = new_dur_precin + new_dur_cin self.dur_precin[g, new_inds] = new_dur_precin self.dur_cin[g, new_inds] = new_dur_cin self.dur_infection[g, new_inds] = new_dur_infection self.date_infectious[g, new_inds] = self.t self.date_cin[g, new_inds] = self.t + sc.randround(new_dur_precin / dt) dur_cin = np.append(dur_cin, new_dur_cin) # Finally, create an array for storing the transformation probabilities. # We've already figured out who's going to transform, so we fill the array with 1s for those who do. cancer_prob_arr = np.zeros(len(inds)) cancer_prob_arr[is_cancer] = 1 # Make sure inds that got assigned cancer above dont get stochastically missed # Determine who goes to cancer is_cancer = hpu.binomial_arr(cancer_prob_arr) cancer_inds = inds[is_cancer] no_cancer_inds = inds[~is_cancer] # Indices of those who eventually heal lesion/clear infection # Set date of clearance for those who don't go to cancer dur_inf = self.dur_infection[g, inds] time_to_clear = dur_inf[~is_cancer] self.date_clearance[g, no_cancer_inds] = np.fmax(self.date_clearance[g, no_cancer_inds], self.date_exposed[g, no_cancer_inds] + sc.randround(time_to_clear / dt)) # Set dates for those who go to cancer. # Set date of onset of precancer and eventual severity outcomes for those who develop precancer dur_cin_transformed = dur_cin[is_cancer] # Duration of episomal infection for those who transform self.date_cancerous[g, cancer_inds] = self.date_cin[g, cancer_inds] + sc.randround(dur_cin_transformed/dt) self.dur_infection[g, cancer_inds] = self.dur_precin[g, cancer_inds] + self.dur_cin[g, cancer_inds] dur_cancer = hpu.sample(**self.pars['dur_cancer'], size=len(cancer_inds)) self.date_dead_cancer[cancer_inds] = self.date_cancerous[g, cancer_inds] + sc.randround(dur_cancer / dt) self.dur_cancer[g, cancer_inds] = dur_cancer return
#%% Methods for updating partnerships
[docs] def dissolve_partnerships(self, t=None): ''' Dissolve partnerships ''' n_dissolved = dict() for lno,lkey in enumerate(self.layer_keys()): layer = self.contacts[lkey] to_dissolve = (~self['alive'][layer['m']]) + (~self['alive'][layer['f']]) + ( (self.t*self.pars['dt']) > layer['end']).astype(bool) dissolved = layer.pop_inds(to_dissolve) # Remove them from the contacts list # Update current number of partners unique, counts = hpu.unique(np.concatenate([dissolved['f'],dissolved['m']])) self.current_partners[lno,unique] -= counts self.rship_end_dates[lno, unique] = self.t n_dissolved[lkey] = len(dissolved['f']) return n_dissolved # Return the number of dissolved partnerships by layer
[docs] def create_partnerships(self, tind, mixing, layer_probs, f_cross_layer, m_cross_layer, dur_pship, acts, age_act_pars): ''' Create partnerships. All the hard work of creating the contacts is done by hppop.make_contacts, which in turn relies on hpu.create_edgelist for creating the edgelist. This method is just a light wrapper that passes in the arguments in the right format and the updates relationship info stored in the People class. ''' # Initialize new_pships = dict() # Loop over layers lno=0 for lkey in self.layer_keys(): pship_args = dict( lno=lno, tind=tind, partners=self.partners[lno], current_partners=self.current_partners, ages=self.age, debuts=self.debut, is_female=self.is_female, is_active=self.is_active, mixing=mixing[lkey], layer_probs=layer_probs[lkey], f_cross_layer=f_cross_layer, m_cross_layer=m_cross_layer, durations=dur_pship[lkey], acts=acts[lkey], age_act_pars=age_act_pars[lkey], cluster=self.cluster, add_mixing=self.pars['add_mixing'] ) new_pships[lkey], current_partners, new_pship_inds, new_pship_counts = hppop.make_contacts(**pship_args) # Update relationship info if len(new_pship_inds)>0: self.ever_partnered[new_pship_inds] = True self.current_partners[:] = current_partners if len(new_pship_inds): self.rship_start_dates[lno, new_pship_inds] = self.t self.n_rships[lno, new_pship_inds] += new_pship_counts lags = self.rship_start_dates[lno, new_pship_inds] - self.rship_end_dates[lno, new_pship_inds] self.rship_lags[lkey] += np.histogram(lags, self.lag_bins)[0] lno += 1 self.add_contacts(new_pships) return
#%% Methods for updating state
[docs] def check_inds(self, current, date, filter_inds=None): ''' Return indices for which the current state is false and which meet the date criterion ''' if filter_inds is None: not_current = hpu.false(current) else: not_current = hpu.ifalsei(current, filter_inds) has_date = hpu.idefinedi(date, not_current) inds = hpu.itrue(self.t >= date[has_date], has_date) return inds
[docs] def check_inds_true(self, current, date, filter_inds=None): ''' Return indices for which the current state is true and which meet the date criterion ''' if filter_inds is None: current_inds = hpu.true(current) else: current_inds = hpu.itruei(current, filter_inds) has_date = hpu.idefinedi(date, current_inds) inds = hpu.itrue(self.t >= date[has_date], has_date) return inds
[docs] def check_progress(self, what, genotype): ''' Wrapper function for all the new progression checks ''' if what=='cins': cases_by_age, cases = self.check_cin(genotype) elif what=='cancers': cases_by_age, cases = self.check_cancer(genotype) return cases_by_age, cases
[docs] def check_cin(self, genotype): ''' Check for new progressions to CIN ''' # Only include infectious females who haven't already cleared CIN or progressed to cancer filters = self.infectious[genotype,:]*self.is_female_alive*~(self.date_clearance[genotype,:]<=self.t) filter_inds = filters.nonzero()[0] inds = self.check_inds(self.cin[genotype,:], self.date_cin[genotype,:], filter_inds=filter_inds) self.cin[genotype, inds] = True # Age calculations cases_by_age = np.histogram(self.age[inds], bins=self.age_bin_edges, weights=self.scale[inds])[0] return cases_by_age, self.scale_flows(inds)
[docs] def check_cancer(self, genotype): ''' Check for new progressions to cancer ''' not_current = hpu.ifalsei(self.cancerous[genotype,:], hpu.true(self.cin[genotype,:])) has_date = hpu.idefinedi(self.date_cancerous[genotype,:], not_current) inds = hpu.itrue(self.t >= self.date_cancerous[genotype,has_date], has_date) # Set infectious states self.susceptible[:, inds] = False # No longer susceptible to any genotype self.infectious[:, inds] = False # No longer counted as infectious with any genotype self.inactive[:,inds] = True # If this person has any other infections from any other genotypes, set them to inactive self.date_clearance[:, inds] = np.nan # Remove their clearance dates for all genotypes # Deal with dysplasia states and dates for g in range(self.ng): if g != genotype: self.date_cancerous[g, inds] = np.nan # Remove their date of cancer for all genotypes but the one currently causing cancer self.date_cin[g, inds] = np.nan # Set the properties related to cell changes and disease severity markers self.cancerous[genotype, inds] = True self.cin[:, inds] = False # No longer counted as episomal with any genotype # Age results cases_by_age = np.histogram(self.age[inds], bins=self.age_bin_edges, weights=self.scale[inds])[0] return cases_by_age, self.scale_flows(inds)
[docs] def check_cancer_deaths(self): ''' Check for new deaths from cancer ''' filter_inds = self.true('cancerous') inds = self.check_inds(self.dead_cancer, self.date_dead_cancer, filter_inds=filter_inds) self.remove_people(inds, cause='cancer') cases_by_age = np.histogram(self.age[inds], bins=self.age_bin_edges, weights=self.scale[inds])[0] # check which of these were detected by symptom or screening self.flows['detected_cancer_deaths'] += self.scale_flows(hpu.true(self.detected_cancer[inds])) return cases_by_age, self.scale_flows(inds)
[docs] def check_clearance(self, genotype): ''' Check for HPV clearance. ''' f_filter_inds = (self.is_female_alive & self.infectious[genotype,:]).nonzero()[-1] m_filter_inds = (self.is_male_alive & self.infectious[genotype,:]).nonzero()[-1] f_inds = self.check_inds_true(self.infectious[genotype,:], self.date_clearance[genotype,:], filter_inds=f_filter_inds) m_inds = self.check_inds_true(self.infectious[genotype,:], self.date_clearance[genotype,:], filter_inds=m_filter_inds) m_cleared_inds = m_inds # All males clear # For females, determine who clears and who controls if self.pars['hpv_control_prob']>0: latent_probs = np.full(len(f_inds), self.pars['hpv_control_prob'], dtype=hpd.default_float) latent_bools = hpu.binomial_arr(latent_probs) latent_inds = f_inds[latent_bools] if len(latent_inds): self.susceptible[genotype, latent_inds] = False # should already be false self.infectious[genotype, latent_inds] = False self.inactive[genotype, latent_inds] = True self.date_clearance[genotype, latent_inds] = np.nan self.date_latent[genotype, latent_inds] = self.t f_cleared_inds = f_inds[~latent_bools] else: f_cleared_inds = f_inds cleared_inds = np.array(m_cleared_inds.tolist()+f_cleared_inds.tolist()) # Now reset disease states if len(cleared_inds): self.susceptible[genotype, cleared_inds] = True self.infectious[genotype, cleared_inds] = False self.inactive[genotype, cleared_inds] = False # should already be false if len(f_cleared_inds): hpimm.update_peak_immunity(self, f_cleared_inds, imm_pars=self.pars, imm_source=genotype) # update immunity self.date_reactivated[genotype, f_cleared_inds] = np.nan # Whether infection is controlled on not, clear all cell changes and severity markeres self.cin[genotype, f_inds] = False self.date_cin[genotype, f_inds] = np.nan self.dur_cin[genotype, f_inds] = np.nan self.dur_precin[genotype, f_inds] = np.nan self.dur_infection[genotype, f_inds] = np.nan return
[docs] def apply_death_rates(self, year=None): ''' Apply death rates to remove people from the population NB people are not actually removed to avoid issues with indices ''' death_pars = self.pars['death_rates'] all_years = np.array(list(death_pars.keys())) base_year = all_years[0] age_bins = death_pars[base_year]['m'][:,0] age_inds = np.digitize(self.age, age_bins)-1 death_probs = np.empty(len(self), dtype=hpd.default_float) year_ind = sc.findnearest(all_years, year) nearest_year = all_years[year_ind] mx_f = death_pars[nearest_year]['f'][:,1]*self.pars['dt_demog'] mx_m = death_pars[nearest_year]['m'][:,1]*self.pars['dt_demog'] death_probs[self.is_female] = mx_f[age_inds[self.is_female]] death_probs[self.is_male] = mx_m[age_inds[self.is_male]] death_probs[self.age>100] = 1 # Just remove anyone >100 death_probs[~self.alive] = 0 death_probs *= self.pars['rel_death'] # Adjust overall death probabilities # Get indices of people who die of other causes death_inds = hpu.true(hpu.binomial_arr(death_probs)) deaths_female = self.scale_flows(hpu.true(self.is_female[death_inds])) deaths_male = self.scale_flows(hpu.true(self.is_male[death_inds])) other_deaths = self.remove_people(death_inds, cause='other') # Apply deaths return other_deaths, deaths_female, deaths_male
[docs] def add_births(self, year=None, new_births=None, ages=0, immunity=None, sex_ratio=0.5): ''' Add more people to the population Specify either the year from which to retrieve the birth rate, or the absolute number of new people to add. Must specify one or the other. People are added in-place to the current `People` instance. ''' assert (year is None) != (new_births is None), 'Must set either year or n_births, not both' if new_births is None: years = self.pars['birth_rates'][0] rates = self.pars['birth_rates'][1] this_birth_rate = self.pars['rel_birth']*np.interp(year, years, rates)*self.pars['dt_demog']/1e3 new_births = sc.randround(this_birth_rate*self.n_alive_level0) # Crude births per 1000 if new_births>0: # Generate other characteristics of the new people uids, sexes, debuts, rel_sev, partners, cluster = hppop.set_static(new_n=new_births, existing_n=len(self), pars=self.pars, sex_ratio=sex_ratio) # Grow the arrays` new_inds = self._grow(new_births) self.uid[new_inds] = uids self.age[new_inds] = ages self.scale[new_inds] = self.pars['pop_scale'] self.sex[new_inds] = sexes self.debut[new_inds] = debuts self.rel_sev[new_inds] = rel_sev self.partners[:,new_inds] = partners self.cluster[new_inds] = cluster if immunity is not None: self.nab_imm[:,new_inds] = immunity return new_births*self.pars['pop_scale'] # These are not indices, so they scale differently
[docs] def check_migration(self, year=None): """ Check if people need to immigrate/emigrate in order to make the population size correct. """ if self.pars['use_migration'] and self.pop_trend is not None: # Pull things out sim_start = self.pars['start'] sim_pop0 = self.pars['n_agents'] data_years = self.pop_trend.year.values data_pop = self.pop_trend.pop_size.values data_min = data_years[0] data_max = data_years[-1] age_dist_data = self.pop_age_trend[self.pop_age_trend.year == int(year)] # No migration if outside the range of the data if year < data_min: return 0 elif year > data_max: return 0 if sim_start < data_min: # Figure this out later, can't use n_agents then errormsg = 'Starting the sim earlier than the data is not hard, but has not been done yet' raise NotImplementedError(errormsg) # Do basic calculations data_pop0 = np.interp(sim_start, data_years, data_pop) scale = sim_pop0 / data_pop0 # Scale factor alive_inds = hpu.true(self.alive_level0) alive_ages = self.age[alive_inds].astype(int) # Return ages for everyone level 0 and alive count_ages = np.bincount(alive_ages, minlength=age_dist_data.shape[0]) # Bin and count them expected = age_dist_data['PopTotal'].values*scale # Compute how many of each age we would expect in population difference = (expected-count_ages).astype(int) # Compute difference between expected and simulated for each age n_migrate = np.sum(difference) # Compute total migrations (in and out) ages_to_remove = hpu.true(difference<0) # Ages where we have too many, need to apply emigration n_to_remove = difference[ages_to_remove] # Determine number of agents to remove for each age ages_to_add = hpu.true(difference>0) # Ages where we have too few, need to apply imigration n_to_add = difference[ages_to_add] # Determine number of agents to add for each age ages_to_add_list = np.repeat(ages_to_add, n_to_add) self.add_births(new_births=len(ages_to_add_list), ages=np.array(ages_to_add_list)) # Remove people remove_frac = n_to_remove / count_ages[ages_to_remove] remove_probs = np.zeros(len(self)) for ind,rf in enumerate(remove_frac): age = ages_to_remove[ind] inds_this_age = hpu.true((self.age>=age) * (self.age<age+1) * self.alive_level0) remove_probs[inds_this_age] = -rf migrate_inds = hpu.choose_w(remove_probs, -n_to_remove.sum()) self.remove_people(migrate_inds, cause='emigration') # Remove people else: n_migrate = 0 return n_migrate*self.pars['pop_scale'] # These are not indices, so they scale differently
#%% Methods to make events occur (death, infection, others TBC)
[docs] def make_naive(self, inds): ''' Make a set of people naive. This is used during dynamic resampling. Args: inds (array): list of people to make naive ''' for key in self.meta.states: if key in ['susceptible']: self[key][:, inds] = True elif key in ['other_dead']: self[key][inds] = False else: self[key][:, inds] = False # Reset immunity for key in self.meta.imm_states: self[key][:, inds] = 0 # Reset dates for key in self.meta.dates + self.meta.durs: self[key][:, inds] = np.nan return
[docs] def infect(self, inds, g=None, layer=None): ''' Infect people and determine their eventual outcomes. Method also deduplicates input arrays in case one agent is infected many times and stores who infected whom in infection_log list. Args: inds (array): array of people to infect g (int): int of genotype to infect people with layer (str): contact layer this infection was transmitted on Returns: count (int): number of people infected ''' if len(inds) == 0: return 0 # Check whether anyone is already infected with genotype - this should not happen because we only # infect susceptible people if len(hpu.true(self.infectious[g,inds])): errormsg = f'Attempting to reinfect the following agents who are already infected with genotype {g}: {hpu.itruei(self.infectious[g,:],inds)}' raise ValueError(errormsg) dt = self.pars['dt'] # Set date of infection and exposure base_t = self.t self.date_infectious[g,inds] = base_t if layer != 'reactivation': self.date_exposed[g,inds] = base_t # Count reinfections and remove any previous dates self.genotype_flows['reinfections'][g] += self.scale_flows((~np.isnan(self.date_clearance[g, inds])).nonzero()[-1]) self.flows['reinfections'] += self.scale_flows((~np.isnan(self.date_clearance[g, inds])).nonzero()[-1]) self.n_infections[g,inds] += 1 for key in ['date_clearance']: self[key][g, inds] = np.nan # Count reactivations and adjust latency status if layer == 'reactivation': self.genotype_flows['reactivations'][g] += self.scale_flows(inds) self.flows['reactivations'] += self.scale_flows(inds) self.age_flows['reactivations'] += np.histogram(self.age[inds], bins=self.age_bin_edges, weights=self.scale[inds])[0] self.latent[g, inds] = False # Adjust states -- no longer latent self.date_reactivated[g,inds] = base_t # Update states, genotype info, and flows self.susceptible[g, inds] = False # no longer susceptible self.infectious[g, inds] = True # now infectious self.inactive[g, inds] = False # no longer inactive # Add to flow results. Note, we only count these infectious in the results if they happened at this timestep if layer != 'seed_infection' and layer !='reactivation': # Create overall flows self.flows['infections'] += self.scale_flows(inds) # Add the total count to the total flow data self.genotype_flows['infections'][g] += self.scale_flows(inds) # Add the count by genotype to the flow data self.age_flows['infections'][:] += np.histogram(self.age[inds], bins=self.age_bin_edges, weights=self.scale[inds])[0] # Create by-sex flows infs_female = self.scale_flows(hpu.true(self.is_female[inds])) infs_male = self.scale_flows(hpu.true(self.is_male[inds])) self.sex_flows['infections_by_sex'][0] += infs_female self.sex_flows['infections_by_sex'][1] += infs_male # Now use genotype-specific prognosis probabilities to determine what happens. # Only women can progress beyond infection. f_inds = hpu.itruei(self.is_female,inds) m_inds = hpu.itruei(self.is_male,inds) # Compute disease progression for females if len(f_inds)>0: gpars = self.pars['genotype_pars'][g] self.set_prognoses(f_inds, g, gpars, dt) # Compute infection clearance for males if len(m_inds)>0: dur_infection = hpu.sample(**self.pars['dur_infection_male'], size=len(m_inds)) self.dur_infection[g, m_inds] = dur_infection self.date_clearance[g, m_inds] = self.date_infectious[g, m_inds] + np.ceil(dur_infection/dt) # Date they clear HPV infection (interpreted as the timestep on which they recover) return self.scale_flows(inds) # For incrementing counters
[docs] def remove_people(self, inds, cause=None): ''' Remove people - used for death and migration ''' if cause == 'other': self.date_dead_other[inds] = self.t self.dead_other[inds] = True elif cause == 'cancer': self.dead_cancer[inds] = True elif cause == 'emigration': self.emigrated[inds] = True elif cause == 'hiv': self.dead_hiv[inds] = True else: errormsg = f'Cause of death must be one of "other", "cancer", "emigration", or "hiv", not {cause}.' raise ValueError(errormsg) # Set states to false self.alive[inds] = False for state in self.meta.genotype_stock_keys: self[state][:, inds] = False for state in self.meta.intv_stock_keys: self[state][inds] = False for state in self.meta.other_stock_keys: self[state][inds] = False # Wipe future dates future_dates = [date.name for date in self.meta.dates] for future_date in future_dates: ndims = len(self[future_date].shape) if ndims == 1: iinds = (self[future_date][inds] > self.t).nonzero()[-1] if len(iinds): self[future_date][inds[iinds]] = np.nan elif ndims == 2: genotypes_to_clear, iinds = (self[future_date][:, inds] >= self.t).nonzero() if len(iinds): self[future_date][genotypes_to_clear, inds[iinds]] = np.nan return self.scale_flows(inds)
#%% Analysis methods
[docs] def plot(self, *args, **kwargs): ''' Plot statistics of the population -- age distribution, numbers of contacts, and overall weight of contacts (number of contacts multiplied by beta per layer). Args: bins (arr) : age bins to use (default, 0-100 in one-year bins) width (float) : bar width font_size (float) : size of font alpha (float) : transparency of the plots fig_args (dict) : passed to pl.figure() axis_args (dict) : passed to pl.subplots_adjust() plot_args (dict) : passed to pl.plot() do_show (bool) : whether to show the plot fig (fig) : handle of existing figure to plot into ''' fig = hpplt.plot_people(people=self, *args, **kwargs) return fig
[docs] def story(self, uid, *args): ''' Print out a short history of events in the life of the specified individual. Args: uid (int/list): the person or people whose story is being regaled args (list): these people will tell their stories too **Example**:: sim = hpv.Sim(pop_type='hybrid', verbose=0) sim.run() sim.people.story(12) sim.people.story(795) ''' def label_lkey(lkey): ''' Friendly name for common layer keys ''' if lkey.lower() == 'a': llabel = 'default contact' if lkey.lower() == 'm': llabel = 'marital' elif lkey.lower() == 'c': llabel = 'casual' else: llabel = f'"{lkey}"' return llabel uids = sc.promotetolist(uid) uids.extend(args) for uid in uids: p = self[uid] sex = 'female' if p.sex == 0 else 'male' intro = f'\nThis is the story of {uid}, a {p.age:.0f} year old {sex}.' intro += f'\n{uid} became sexually active at age {p.debut:.0f}.' if not p.susceptible: if ~np.isnan(p.date_infectious): print(f'{intro}\n{uid} contracted HPV on timestep {p.date_infectious} of the simulation.') else: print(f'{intro}\n{uid} did not contract HPV during the simulation.') total_contacts = 0 no_contacts = [] for lkey in p.contacts.keys(): llabel = label_lkey(lkey) n_contacts = len(p.contacts[lkey]) total_contacts += n_contacts if n_contacts: print(f'{uid} is connected to {n_contacts} people in the {llabel} layer') else: no_contacts.append(llabel) if len(no_contacts): nc_string = ', '.join(no_contacts) print(f'{uid} has no contacts in the {nc_string} layer(s)') print(f'{uid} has {total_contacts} contacts in total') events = [] dates = { 'date_HPV_clearance' : 'HPV cleared', } for attribute, message in dates.items(): date = getattr(p,attribute) if not np.isnan(date): events.append((date, message)) if len(events): for timestep, event in sorted(events, key=lambda x: x[0]): print(f'On timestep {timestep:.0f}, {uid} {event}') else: print(f'Nothing happened to {uid} during the simulation.') return