Source code for rsvsim.population

'''
Defines functions for making the population.
'''

#%% Imports
import numpy as np # Needed for a few things not provided by pl
import sciris as sc
import os
import json
from collections import defaultdict
from . import requirements as cvreq
from . import utils as cvu
from . import misc as cvm
from . import data as cvdata
from . import defaults as cvd
from . import parameters as cvpar
from . import people as cvppl
from . import base as cvb


# Specify all externally visible functions this file defines
__all__ = ['make_people', 'make_randpop', 'make_random_contacts',
           'make_microstructured_contacts', 'make_hybrid_contacts',
           'make_synthpop']


[docs]def make_people(sim, popdict=None, save_pop=False, popfile=None, die=True, reset=False, verbose=None, **kwargs): ''' Make the actual people for the simulation. Usually called via sim.initialize(), but can be called directly by the user. Args: sim (Sim) : the simulation object; population parameters are taken from the sim object popdict (dict) : if supplied, use this population dictionary instead of generating a new one save_pop (bool) : whether to save the population to disk popfile (bool) : if so, the filename to save to die (bool) : whether or not to fail if synthetic populations are requested but not available reset (bool) : whether to force population creation even if self.popdict/self.people exists verbose (bool) : level of detail to print kwargs (dict) : passed to make_randpop() or make_synthpop() Returns: people (People): people ''' # Set inputs and defaults pop_size = int(sim['pop_size']) # Shorten pop_type = sim['pop_type'] # Shorten if verbose is None: verbose = sim['verbose'] if popfile is None: popfile = sim.popfile # Check which type of population to produce if pop_type == 'synthpops': if not cvreq.check_synthpops(): # pragma: no cover errormsg = f'You have requested "{pop_type}" population, but synthpops is not available; please use random, clustered, or hybrid' if die: raise ValueError(errormsg) else: print(errormsg) pop_type = 'random' location = sim['location'] if location: # pragma: no cover print(f'Warning: not setting ages or contacts for "{location}" since synthpops contacts are pre-generated') # Actually create the population if sim.people and not reset: return sim.people # If it's already there, just return elif sim.popdict and not reset: popdict = sim.popdict # Use stored one sim.popdict = None # Once loaded, remove elif popdict is None: # Main use case: no popdict is supplied # Create the population if pop_type in ['random', 'clustered', 'hybrid']: popdict = make_randpop(sim, microstructure=pop_type, **kwargs) elif pop_type == 'synthpops': popdict = make_synthpop(sim, **kwargs) elif pop_type is None: # pragma: no cover errormsg = 'You have set pop_type=None. This is fine, but you must ensure sim.popdict exists before calling make_people().' raise ValueError(errormsg) else: # pragma: no cover errormsg = f'Population type "{pop_type}" not found; choices are random, clustered, hybrid, or synthpops' raise ValueError(errormsg) # Ensure prognoses are set if sim['prognoses'] is None: sim['prognoses'] = cvpar.get_prognoses() # Actually create the people people = cvppl.People(sim.pars, uid=popdict['uid'], age=popdict['age'], weekage=popdict['weekage'], hhid=popdict['hhid'], sex=popdict['sex'], scid=popdict['scid'], sc_type=popdict['sc_type'], sc_student=popdict['sc_student'], contacts=popdict['contacts']) # List for storing the people people.schools_dict = popdict['schools_dict'] average_age = sum(popdict['age']/pop_size) sc.printv(f'Created {pop_size} people, average age {average_age:0.2f} years', 2, verbose) if save_pop: if popfile is None: # pragma: no cover errormsg = 'Please specify a file to save to using the popfile kwarg' raise FileNotFoundError(errormsg) else: filepath = sc.makefilepath(filename=popfile) cvm.save(filepath, people) if verbose: print(f'Saved population of type "{pop_type}" with {pop_size:n} people to {filepath}') return people
[docs]def make_randpop(pars, use_age_data=True, use_household_data=True, microstructure='random', **kwargs): ''' Make a random population, with contacts. This function returns a "popdict" dictionary, which has the following (required) keys: - uid: an array of (usually consecutive) integers of length N, uniquely identifying each agent - age: an array of floats of length N, the age in years of each agent - sex: an array of integers of length N (not currently used, so does not have to be binary) - contacts: list of length N listing the contacts; see make_random_contacts() for details - layer_keys: a list of strings representing the different contact layers in the population; see make_random_contacts() for details Args: pars (dict): the parameter dictionary or simulation object use_age_data (bool): whether to use location-specific age data use_household_data (bool): whether to use location-specific household size data microstructure (bool): whether or not to use the microstructuring algorithm to group contacts kwargs (dict): passed to contact creation method (e.g., make_hybrid_contacts) Returns: popdict (dict): a dictionary representing the population, with the following keys for a population of N agents with M contacts between them: ''' pop_size = int(pars['pop_size']) # Number of people # Load age data and household demographics based on 2018 Seattle demographics by default, or country if available age_data = cvd.default_age_data location = pars['location'] if location is not None: if pars['verbose']: print(f'Loading location-specific data for "{location}"') if use_age_data: try: age_data = cvdata.get_age_distribution(location) except ValueError as E: print(f'Could not load age data for requested location "{location}" ({str(E)}), using default') if use_household_data: try: household_size = cvdata.get_household_size(location) if 'h' in pars['contacts']: pars['contacts']['h'] = household_size - 1 # Subtract 1 because e.g. each person in a 3-person household has 2 contacts else: keystr = ', '.join(list(pars['contacts'].keys())) print(f'Warning; not loading household size for "{location}" since no "h" key; keys are "{keystr}". Try "hybrid" population type?') except ValueError as E: if pars['verbose']>=2: # These don't exist for many locations, so skip the warning by default print(f'Could not load household size data for requested location "{location}" ({str(E)}), using default') # Handle sexes and ages uids = np.arange(pop_size, dtype=cvd.default_int) sexes = np.random.binomial(1, pars['sex_ratio'], pop_size) age_data_min = age_data[:,0] age_data_max = age_data[:,1] + 1 # Since actually e.g. 69.999 age_data_range = age_data_max - age_data_min age_data_prob = age_data[:,2] age_data_prob /= age_data_prob.sum() # Ensure it sums to 1 age_bins = cvu.n_multinomial(age_data_prob, pop_size) # Choose age bins ages = np.round(age_data_min[age_bins] + age_data_range[age_bins]*np.random.random(pop_size)) # Uniformly distribute within this age bin weekages = np.round([age * cvd.wpy for age in ages] + np.random.uniform(1, cvd.wpy, pop_size)) # Store output popdict = {} popdict['uid'] = uids popdict['age'] = ages popdict['sex'] = sexes popdict['weekage'] = weekages # Actually create the contacts if microstructure == 'random': contacts, layer_keys = make_random_contacts(pop_size, pars['contacts'], **kwargs) elif microstructure == 'clustered': contacts, layer_keys, _ = make_microstructured_contacts(pop_size, pars['contacts'], **kwargs) elif microstructure == 'hybrid': contacts, layer_keys, _ = make_hybrid_contacts(pop_size, ages, pars['contacts'], **kwargs) else: # pragma: no cover errormsg = f'Microstructure type "{microstructure}" not found; choices are random, clustered, or hybrid' raise NotImplementedError(errormsg) popdict['contacts'] = contacts popdict['layer_keys'] = layer_keys return popdict
[docs]def make_random_contacts(pop_size, contacts, overshoot=1.2, dispersion=None): ''' Make random static contacts. Args: pop_size (int): number of agents to create contacts between (N) contacts (dict): a dictionary with one entry per layer describing the average number of contacts per person for that layer overshoot (float): to avoid needing to take multiple Poisson draws dispersion (float): if not None, use a negative binomial distribution with this dispersion parameter instead of Poisson to make the contacts Returns: contacts_list (list): a list of length N, where each entry is a dictionary by layer, and each dictionary entry is the UIDs of the agent's contacts layer_keys (list): a list of layer keys, which is the same as the keys of the input "contacts" dictionary ''' # Preprocessing pop_size = int(pop_size) # Number of people contacts = sc.dcp(contacts) layer_keys = list(contacts.keys()) contacts_list = [] # Precalculate contacts n_across_layers = np.sum(list(contacts.values())) n_all_contacts = int(pop_size*n_across_layers*overshoot) # The overshoot is used so we won't run out of contacts if the Poisson draws happen to be higher than the expected value all_contacts = cvu.choose_r(max_n=pop_size, n=n_all_contacts) # Choose people at random p_counts = {} for lkey in layer_keys: if dispersion is None: p_count = cvu.n_poisson(contacts[lkey], pop_size) # Draw the number of Poisson contacts for this person else: p_count = cvu.n_neg_binomial(rate=contacts[lkey], dispersion=dispersion, n=pop_size) # Or, from a negative binomial p_counts[lkey] = np.array((p_count/2.0).round(), dtype=cvd.default_int) # Make contacts count = 0 for p in range(pop_size): contact_dict = {} for lkey in layer_keys: n_contacts = p_counts[lkey][p] contact_dict[lkey] = all_contacts[count:count+n_contacts] # Assign people count += n_contacts contacts_list.append(contact_dict) return contacts_list, layer_keys
[docs]def make_microstructured_contacts(pop_size, contacts): ''' Create microstructured contacts -- i.e. for households ''' # Preprocessing -- same as above pop_size = int(pop_size) # Number of people contacts = sc.dcp(contacts) contacts.pop('c', None) # Remove community layer_keys = list(contacts.keys()) contacts_list = [{c:[] for c in layer_keys} for p in range(pop_size)] # Pre-populate for layer_name, cluster_size in contacts.items(): # Initialize cluster_dict = dict() # Add dictionary for this layer n_remaining = pop_size # Make clusters - each person belongs to one cluster contacts_dict = defaultdict(set) # Use defaultdict of sets for convenience while initializing. Could probably change this as part of performance optimization # Loop over the clusters cluster_id = -1 while n_remaining > 0: cluster_id += 1 # Assign cluster id this_cluster = cvu.poisson(cluster_size) # Sample the cluster size if this_cluster > n_remaining: this_cluster = n_remaining # Indices of people in this cluster cluster_indices = (pop_size-n_remaining)+np.arange(this_cluster) cluster_dict[cluster_id] = cluster_indices for i in cluster_indices: # Add symmetric pairwise contacts in each cluster for j in cluster_indices: if j > i: contacts_dict[i].add(j) n_remaining -= this_cluster for key in contacts_dict.keys(): contacts_list[key][layer_name] = np.array(list(contacts_dict[key]), dtype=cvd.default_int) clusters = {layer_name: cluster_dict} return contacts_list, layer_keys, clusters
[docs]def make_hybrid_contacts(pop_size, ages, contacts, school_ages=None): ''' Create "hybrid" contacts -- microstructured contacts for households and random contacts for schools, both of which have extremely basic age structure. A combination of both make_random_contacts() and make_microstructured_contacts(). ''' # Handle inputs and defaults layer_keys = ['h', 's', 'c'] contacts = sc.mergedicts({'h':4, 's':20,'c':20}, contacts) # Ensure essential keys are populated if school_ages is None: school_ages = [6, 22] # Create the empty contacts list -- a list of {'h':[], 's':[], 'w':[]} contacts_list = [{key:[] for key in layer_keys} for i in range(pop_size)] # Start with the household contacts for each person h_contacts, _, clusters = make_microstructured_contacts(pop_size, {'h':contacts['h']}) # Make community contacts c_contacts, _ = make_random_contacts(pop_size, {'c':contacts['c']}) # Get the indices of people in each age bin ages = np.array(ages) s_inds = sc.findinds((ages >= school_ages[0]) * (ages < school_ages[1])) # Create the school and work contacts for each person s_contacts, _ = make_random_contacts(len(s_inds), {'s':contacts['s']}) # Construct the actual lists of contacts for i in range(pop_size): contacts_list[i]['h'] = h_contacts[i]['h'] # Copy over household contacts -- present for everyone for i,ind in enumerate(s_inds): contacts_list[ind]['s'] = s_inds[s_contacts[i]['s']] # Copy over school contacts for i in range(pop_size): contacts_list[i]['c'] = c_contacts[i]['c'] # Copy over community contacts -- present for everyone return contacts_list, layer_keys, clusters
[docs]def make_synthpop(sim=None, population=None, layer_mapping=None, community_contacts=None, **kwargs): ''' Make a population using SynthPops, including contacts. Usually called automatically, but can also be called manually. Either a simulation object or a population must be supplied; if a population is supplied, transform it into the correct format; otherwise, create the population and then transform it. Args: sim (Sim): a rsvsim simulation object population (list): a pre-generated SynthPops population (otherwise, create a new one) layer_mapping (dict): a custom mapping from SynthPops layers to rsvsim layers community_contacts (int): if a simulation is not supplied, create this many community contacts on average kwargs (dict): passed to sp.make_population() **Example**:: sim = cv.Sim(pop_type='synthpops') sim.popdict = cv.make_synthpop(sim) sim.run() ''' try: import synthpops as sp # Optional import except ModuleNotFoundError as E: # pragma: no cover errormsg = 'Please install the optional SynthPops module first, e.g. pip install synthpops' # Also caught in make_people() raise ModuleNotFoundError(errormsg) from E # Handle layer mapping default_layer_mapping = {'H':'h', 'S':'s', 'C':'c'} # Remap keys from old names to new names layer_mapping = sc.mergedicts(default_layer_mapping, layer_mapping) # Handle other input arguments if population is None: if sim is None: # pragma: no cover errormsg = 'Either a simulation or a population must be supplied' raise ValueError(errormsg) pop_size = sim['pop_size'] population = sp.make_population(n=pop_size, rand_seed=sim['rand_seed'], **kwargs) if community_contacts is None: if sim is not None: community_contacts = sim['contacts']['c'] else: # pragma: no cover errormsg = 'If a simulation is not supplied, the number of community contacts must be specified' raise ValueError(errormsg) filename = sim.pars['pop_data_file'] + '.json' filepath = os.path.join('..', 'rsvsim', 'data', filename) f = open(filepath, 'r') json_obj = json.load(f) school_types_by_age = json_obj['school_types_by_age'] school_types_by_age_list = [] for type in school_types_by_age: school_types_by_age_list.append(type['school_type']) school_mapping = dict() for i, key in enumerate(school_types_by_age_list): school_mapping[key] = i # Create the basic lists pop_size = len(population) uids, ages, sexes, hhids, scids, sc_types, sc_students, contacts = [], [], [], [], [], [], [], [] schools_dict = dict() for uid,person in population.items(): uids.append(uid) ages.append(person['age']) sexes.append(person['sex']) hhids.append(person['hhid']) scids.append(person['scid']) sc_students.append(person['sc_student']) sc_types.append(school_mapping[person['sc_type']] if person['sc_type'] is not None else np.nan) if person['scid'] is not None: if person['sc_type'] not in schools_dict.keys(): schools_dict[person['sc_type']]=dict() if person['scid'] in schools_dict[person['sc_type']].keys(): schools_dict[person['sc_type']][person['scid']].append(uid) else: schools_dict[person['sc_type']][person['scid']] = [uid] # Replace contact UIDs with ints uid_mapping = {uid:u for u,uid in enumerate(uids)} for uid in uids: iid = uid_mapping[uid] # Integer UID person = population.pop(uid) uid_contacts = sc.dcp(person['contacts']) int_contacts = {} for spkey in uid_contacts.keys(): if spkey in layer_mapping.keys(): lkey = layer_mapping[spkey] # Map the SynthPops key into a rsvsim layer key int_contacts[lkey] = [] for cid in uid_contacts[spkey]: # Contact ID icid = uid_mapping[cid] # Integer contact ID if icid > iid: # Don't add duplicate contacts int_contacts[lkey].append(icid) int_contacts[lkey] = np.array(int_contacts[lkey], dtype=cvd.default_int) else: f'Could not find key "{spkey}" in layer mapping "{layer_mapping}"' contacts.append(int_contacts) # Add community contacts c_contacts, _ = make_random_contacts(pop_size, {'c':community_contacts}) for i in range(int(pop_size)): contacts[i]['c'] = c_contacts[i]['c'] # Copy over community contacts -- present for everyone weekages = np.round([age * cvd.wpy for age in ages] + np.random.uniform(1, cvd.wpy, pop_size)) # Finalize popdict = {} popdict['uid'] = np.array(list(uid_mapping.values()), dtype=cvd.default_int) popdict['age'] = np.array(ages) popdict['weekage'] = np.array(weekages) popdict['sex'] = np.array(sexes) popdict['contacts'] = sc.dcp(contacts) popdict['layer_keys'] = list(layer_mapping.values()) popdict['hhid'] = np.array(hhids) popdict['scid'] = np.array(scids) popdict['sc_type'] = np.array(sc_types) popdict['sc_student'] = np.array(sc_students) popdict['schools_dict'] = schools_dict return popdict
def add_birth_cohort(sim, mom_inds): '''Adds new births to population, adds births to households based on index of mothers, and transfers mabs''' orig_people = sim.people uids = np.arange(len(orig_people), len(orig_people) + len(mom_inds), dtype=cvd.default_int) ages = np.zeros(len(mom_inds), dtype=cvd.default_int) sexes = np.random.binomial(1, sim.pars['sex_ratio'], len(mom_inds)) hhids = sim.people.hhid[mom_inds] layer_keys = orig_people.contacts.keys() contacts_list = [{key: [] for key in layer_keys} for _ in range(len(mom_inds))] household_contacts = orig_people.contacts['h'] for i, ind in enumerate(mom_inds): if not isinstance(ind, np.ndarray): ind = sc.promotetoarray(ind) if ind.dtype != np.int64: # pragma: no cover # This is int64 since indices often come from cv.true(), which returns int64 ind = np.array(ind, dtype=np.int64) mom_household_contacts = cvu.find_contacts(household_contacts['p1'], household_contacts['p2'], ind) contacts_list[i]['h'] = list(ind) + list(mom_household_contacts) sim_pars = sim.pars sim_pars['pop_size'] = len(mom_inds) new_contacts = cvb.Contacts(layer_keys=layer_keys) for lkey in layer_keys: new_contacts[lkey]['p1'] = [] # Person 1 of the contact pair new_contacts[lkey]['p2'] = [] # Person 2 of the contact pair for p, cdict in enumerate(contacts_list): for lkey, p_contacts in cdict.items(): n = len(p_contacts) new_contacts[lkey]['p1'].extend([uids[p]]*n) new_contacts[lkey]['p2'].extend(p_contacts) for lkey in layer_keys: new_layer = cvb.Layer(label=lkey) for ckey, value in new_contacts[lkey].items(): new_layer[ckey] = np.array(value, dtype=new_layer.meta[ckey]) new_contacts[lkey] = new_layer new_babies = cvppl.People(sim_pars, uid=uids, age=ages, weekage=ages, sex=sexes, hhid=hhids, contacts=new_contacts) new_babies.set_prognoses() # add contacts for lkey, val in orig_people.contacts.items(): np.append(val['p1'], new_babies.contacts[lkey]['p1']) np.append(val['p2'], new_babies.contacts[lkey]['p2']) np.append(val['beta'], new_babies.contacts[lkey]['beta']) # do maternal antibody cord transfer mom_immunity = orig_people.sus_imm[:,mom_inds] new_babies.peak_imm[:] = mom_immunity * sim_pars['cord_transfer'] new_babies.t_imm_event[:] = sim.t orig_people += new_babies sim.people = orig_people sim.pars['pop_size'] = len(sim.people) return new_babies