Source code for synthpops.pop

"""
This module provides the main class for interacting with SynthPops, the Pop class.
"""

import numpy as np
import sciris as sc
from .config import logger as log
from . import version as spv
from . import defaults
from . import base as spb
from . import config as cfg
from . import sampling as spsamp
from . import data_distributions as spdata
from . import ltcfs as spltcf
from . import households as sphh
from . import schools as spsch
from . import workplaces as spw
from . import contact_networks as spcnx
from . import plotting as sppl
from . import people as spp


__all__ = ['Pop', 'make_population', 'generate_synthetic_population']


[docs]class Pop(sc.prettyobj): def __init__(self, n=None, max_contacts=None, ltcf_pars=None, school_pars=None, with_industry_code=False, with_facilities=False, use_default=False, use_two_group_reduction=True, average_LTCF_degree=20, ltcf_staff_age_min=20, ltcf_staff_age_max=60, with_school_types=False, school_mixing_type='random', average_class_size=20, inter_grade_mixing=0.1, average_student_teacher_ratio=20, average_teacher_teacher_degree=3, teacher_age_min=25, teacher_age_max=75, with_non_teaching_staff=False, average_student_all_staff_ratio=15, average_additional_staff_degree=20, staff_age_min=20, staff_age_max=75, rand_seed=None, country_location=None, state_location=None, location=None, sheet_name=None, household_method='infer_ages', smooth_ages=False, window_length=7, do_make=True ): ''' Make a full population network including both people (ages, sexes) and contacts. By default uses Seattle, Washington data. Note about the household methods available: 'infer_ages' and 'fixed_ages'. If using 'infer_ages', then the ages of individuals in the population are generated by first placing individuals into households using the age of the head of households or reference individuals (always an adult), household age mixing patterns, household sizes, and the age distribution from data (census or other sources). If using 'fixed_ages', then individuals are pre-assigned ages according to the age distribution and placed into households using the age of the head of households or reference individuals, household age mixing patterns, and household sizes. Args: n (int) : The number of people to create. max_contacts (dict) : A dictionary for maximum number of contacts per layer: keys must be "W" (work). ltcf_pars (dict) : If supplied, replace default LTCF parameters school_pars (dict) : if supplied, replace default school parameters with_industry_code (bool) : If True, assign industry codes for workplaces, currently only possible for cached files of populations in the US. with_facilities (bool) : If True, create long term care facilities, currently only available for locations in the US. use_default (bool) : If True, use default data from settings.location, settings.state, settings.country. use_two_group_reduction (bool) : If True, create long term care facilities with reduced contacts across both groups. average_LTCF_degree (float) : default average degree in long term care facilities. ltcf_staff_age_min (int) : Long term care facility staff minimum age. ltcf_staff_age_max (int) : Long term care facility staff maximum age. with_school_types (bool) : If True, creates explicit school types. school_mixing_type (str or dict) : The mixing type for schools, 'random', 'age_clustered', or 'age_and_class_clustered' if string, and a dictionary of these by school type otherwise. average_class_size (float) : The average classroom size. inter_grade_mixing (float) : The average fraction of edges rewired to create edges between grades in the same school when school_mixing_type is 'age_clustered' average_student_teacher_ratio (float) : The average number of students per teacher. average_teacher_teacher_degree (float) : The average number of contacts per teacher with other teachers. teacher_age_min (int) : The minimum age for teachers. teacher_age_max (int) : The maximum age for teachers. with_non_teaching_staff (bool) : If True, includes non teaching staff. average_student_all_staff_ratio (float) : The average number of students per staff members at school (including both teachers and non teachers). average_additional_staff_degree (float) : The average number of contacts per additional non teaching staff in schools. staff_age_min (int) : The minimum age for non teaching staff. staff_age_max (int) : The maximum age for non teaching staff. rand_seed (int) : Start point random sequence is generated from. country_location (string) : name of the country the location is in state_location (string) : name of the state the location is in location (string) : name of the location sheet_name (string) : sheet name where data is located household_method (string) : name of household generation method used; for details see above. smooth_ages (bool) : If True, use smoothed out age distribution. window_length (int) : length of window over which to average or smooth out age distribution do_make (bool) : whether to make the population Returns: network (dict): A dictionary of the full population with ages, connections, and other attributes. ''' log.debug('Pop()') # General parameters if n is None: log.warning(f"Pop size n not given, generating a population with a default size of {defaults.default_pop_size} people.") n = defaults.default_pop_size elif n < defaults.default_pop_size: log.warning(f"Pop size n: {n} is too small for synthpops to make contact networks that statistically represent real world populations. Resultant networks may not look realistic.") # Assign all the variables self.loc_pars = sc.objdict() self.school_pars = sc.objdict() self.ltcf_pars = sc.objdict() self.n = int(n) self.max_contacts = sc.mergedicts({'W': 20}, max_contacts) self.with_industry_code = with_industry_code self.rand_seed = rand_seed self.country_location = country_location self.state_location = state_location self.location = location self.sheet_name = sheet_name self.use_default = use_default # Age distribution parameters self.smooth_ages = smooth_ages self.window_length = window_length # Household parameters self.household_method = household_method # School parameters self.school_pars.with_school_types = with_school_types self.school_pars.school_mixing_type = school_mixing_type self.school_pars.average_class_size = average_class_size self.school_pars.inter_grade_mixing = inter_grade_mixing self.school_pars.average_student_teacher_ratio = average_student_teacher_ratio self.school_pars.average_teacher_teacher_degree = average_teacher_teacher_degree self.school_pars.teacher_age_min = teacher_age_min self.school_pars.teacher_age_max = teacher_age_max self.school_pars.with_non_teaching_staff = with_non_teaching_staff self.school_pars.average_student_all_staff_ratio = average_student_all_staff_ratio self.school_pars.average_additional_staff_degree = average_additional_staff_degree self.school_pars.staff_age_min = staff_age_min self.school_pars.staff_age_max = staff_age_max # LTCF parameters self.ltcf_pars.with_facilities = with_facilities self.ltcf_pars.use_two_group_reduction = use_two_group_reduction self.ltcf_pars.average_LTCF_degree = average_LTCF_degree self.ltcf_pars.ltcf_staff_age_min = ltcf_staff_age_min self.ltcf_pars.ltcf_staff_age_max = ltcf_staff_age_max # If any parameters are supplied as a dict to override defaults, merge them in now self.school_pars = sc.objdict(sc.mergedicts(self.school_pars, school_pars)) self.ltcf_pars = sc.objdict(sc.mergedicts(self.ltcf_pars, ltcf_pars)) # what are the layers generated? if self.ltcf_pars.with_facilities: self.layers = ['H', 'S', 'W', 'LTCF'] else: self.layers = ['H', 'S', 'W'] self.layer_mappings = dict(H='Households', S='Schools', W='Workplaces', LTCF='Long Term Care facilities') # Handle the seed if self.rand_seed is not None: spsamp.set_seed(self.rand_seed) # Handle data if self.country_location is None: self.country_location = defaults.settings.country_location self.state_location = defaults.settings.state_location self.location = defaults.settings.location else: print(f"========== setting country location = {country_location}") cfg.set_location_defaults(country_location) self.max_age = defaults.settings.max_age # if country is specified, and state is not, we are doing a country population if self.state_location is None: self.location = None # if sheet name is not specified, use the default if self.sheet_name is None: self.sheet_name = defaults.settings.sheet_name self.datadir = defaults.settings.datadir # Assume this has been reset... # Location parameters self.loc_pars.location = self.location self.loc_pars.state_location = self.state_location self.loc_pars.country_location = self.country_location self.loc_pars.datadir = self.datadir self.loc_pars.use_default = self.use_default # Heavy lift: make the contacts and their connections log.debug('Generating a new population...') population = self.generate() self.popdict = population log.debug('Pop(): done.') # Add summaries post hoc --- TBD: summaries during generation self.compute_information() # compute full information self.compute_summary() # then compute condensed summary # Plotting defaults self.plkwargs = sppl.plotting_kwargs() # Set metadata -- version etc. cfg.set_metadata(self) return
[docs] def generate(self): """ Actually generate the network. Returns: network (dict): A dictionary of the full population with ages, connections, and other attributes. """ log.debug('generate()') # TODO: unpack variables -- to be refactored to pass parameters directly # General parameters datadir = self.datadir location = self.location state_location = self.state_location country_location = self.country_location n = self.n sheet_name = self.sheet_name max_contacts = self.max_contacts use_default = self.use_default loc_pars = self.loc_pars # Age distribution parameters smooth_ages = self.smooth_ages window_length = self.window_length # Household parameters household_method = self.household_method # LTCF parameters use_two_group_reduction = self.ltcf_pars.use_two_group_reduction average_LTCF_degree = self.ltcf_pars.average_LTCF_degree with_facilities = self.ltcf_pars.with_facilities ltcf_staff_age_min = self.ltcf_pars.ltcf_staff_age_min ltcf_staff_age_max = self.ltcf_pars.ltcf_staff_age_max # School parameters with_school_types = self.school_pars.with_school_types school_mixing_type = self.school_pars.school_mixing_type average_class_size = self.school_pars.average_class_size inter_grade_mixing = self.school_pars.inter_grade_mixing average_student_teacher_ratio = self.school_pars.average_student_teacher_ratio average_teacher_teacher_degree = self.school_pars.average_teacher_teacher_degree teacher_age_min = self.school_pars.teacher_age_min teacher_age_max = self.school_pars.teacher_age_max with_non_teaching_staff = self.school_pars.with_non_teaching_staff average_student_all_staff_ratio = self.school_pars.average_student_all_staff_ratio average_additional_staff_degree = self.school_pars.average_additional_staff_degree staff_age_min = self.school_pars.staff_age_min staff_age_max = self.school_pars.staff_age_max # Load and store the expected age distribution of the population age_bracket_dist = spdata.read_age_bracket_distr(**loc_pars) # age distribution defined by bins or age brackets expected_age_dist = spdata.get_smoothed_single_year_age_distr(**loc_pars, window_length=self.window_length) self.expected_age_dist = expected_age_dist expected_age_dist_values = [expected_age_dist[a] for a in expected_age_dist] self.expected_age_dist_values = expected_age_dist_values # Load and store the age brackets age_brackets = spdata.get_census_age_brackets(**loc_pars) self.age_brackets = age_brackets # mapping age_by_brackets = spb.get_age_by_brackets(age_brackets) self.age_by_brackets = age_by_brackets # Load the contact matrix contact_matrices = spdata.get_contact_matrices(datadir, sheet_name=sheet_name) # Store expected contact matrices self.contact_matrices = contact_matrices # Load age brackets, and mapping dictionary that matches contact matrices contact_matrix_shape = contact_matrices[list(contact_matrices.keys())[0]].shape contact_matrix_row = contact_matrix_shape[0] cm_age_brackets = spdata.get_census_age_brackets(**loc_pars, nbrackets=contact_matrix_row) self.cm_age_brackets = cm_age_brackets cm_age_by_brackets = spb.get_age_by_brackets(cm_age_brackets) self.cm_age_by_brackets = cm_age_by_brackets # Generate an age count for the population --- this will get passed around to methods generating the different layers where people live: long term care facilities, households, agricultural living quarters, other group living arrangements age_count = sphh.generate_age_count_multinomial(n, expected_age_dist_values) # Ages left to assign to a residence ages_left_to_assign = sc.dcp(age_count) # Generate LTCFs and remove some people from the age count of people left to place in a resident by age n_nonltcf, ltcf_adjusted_age_dist, ltcf_adjusted_age_dist_values, ages_left_to_assign, facilities = spltcf.generate_ltcfs(n, with_facilities, loc_pars, expected_age_dist, ages_left_to_assign) # Generate households household_size_dist = spdata.get_household_size_distr(**loc_pars) hh_sizes = sphh.generate_household_size_count_from_fixed_pop_size(n_nonltcf, household_size_dist) hha_brackets = spdata.get_head_age_brackets(**loc_pars) hha_by_size = spdata.get_head_age_by_size_distr(**loc_pars) if household_method == 'fixed_ages': homes_dic, homes = sphh.generate_all_households_fixed_ages(n_nonltcf, hh_sizes, hha_by_size, hha_brackets, cm_age_brackets, cm_age_by_brackets, contact_matrices, ages_left_to_assign) else: log.debug("defaulting to 'infer_ages' household generation method. See method notes for description.") homes_dic, homes = sphh.generate_all_households_infer_ages(n, n_nonltcf, hh_sizes, hha_by_size, hha_brackets, cm_age_brackets, cm_age_by_brackets, contact_matrices, ltcf_adjusted_age_dist, ages_left_to_assign) # Handle homes and facilities homes = facilities + homes homes_by_uids, age_by_uid = sphh.assign_uids_by_homes(homes) # include facilities to assign ids age_by_uid_arr = np.array([age_by_uid[i] for i in range(self.n)], dtype=int) self.age_by_uid = age_by_uid_arr facilities_by_uid_lists = homes_by_uids[0:len(facilities)] # Generate school sizes school_sizes_dist_by_brackets = spdata.get_school_size_distr_by_brackets(**loc_pars) # without school type school_size_brackets = spdata.get_school_size_brackets(**loc_pars) # for right now the size distribution for all school types will use the same brackets or bins # Figure out who's going to school as a student with enrollment rates (gets called inside sp.get_uids_in_school) uids_in_school, uids_in_school_by_age, ages_in_school_count = spsch.get_uids_in_school(datadir, n_nonltcf, location, state_location, country_location, age_by_uid, homes_by_uids, use_default=use_default) # this will call in school enrollment rates if with_school_types: school_size_distr_by_type = spdata.get_school_size_distr_by_type(**loc_pars) school_type_age_ranges = spdata.get_school_type_age_ranges(**loc_pars) school_types_distr_by_age = spsch.get_school_types_distr_by_age(school_type_age_ranges) school_type_by_age = spsch.get_school_types_by_age_single(school_types_distr_by_age) student_age_lists, student_uid_lists, school_types = spsch.send_students_to_school_with_school_types(school_size_distr_by_type, school_size_brackets, uids_in_school, uids_in_school_by_age, ages_in_school_count, school_types_distr_by_age, school_type_age_ranges) else: # Get school sizes school_sizes = spsch.generate_school_sizes(school_sizes_dist_by_brackets, school_size_brackets, uids_in_school) # Assign students to school using contact matrix method - generic schools student_age_lists, student_uid_lists, school_types = spsch.send_students_to_school(school_sizes, uids_in_school, uids_in_school_by_age, ages_in_school_count, cm_age_brackets, cm_age_by_brackets, contact_matrices) school_type_by_age = None # Get employment rates employment_rates = spdata.get_employment_rates(**loc_pars) # Find people who can be workers (removing everyone who is currently a student) uids_by_age = spb.get_ids_by_age(age_by_uid) # Make a dictionary listing out uids of people by their age potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count = spw.get_uids_potential_workers(student_uid_lists, employment_rates, age_by_uid) workers_by_age_to_assign_count = spw.get_workers_by_age_to_assign(employment_rates, potential_worker_ages_left_count, uids_by_age) # Removing facilities residents from potential workers potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spltcf.remove_ltcf_residents_from_potential_workers(facilities_by_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count, age_by_uid) # Assign teachers and update school lists teacher_age_lists, teacher_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spsch.assign_teachers_to_schools(student_age_lists, student_uid_lists, employment_rates, workers_by_age_to_assign_count, potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count, average_student_teacher_ratio=average_student_teacher_ratio, teacher_age_min=teacher_age_min, teacher_age_max=teacher_age_max) # Assign non teaching staff and update who's available to work at other places non_teaching_staff_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spsch.assign_additional_staff_to_schools(student_uid_lists, teacher_uid_lists, workers_by_age_to_assign_count, potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count, average_student_teacher_ratio=average_student_teacher_ratio, average_student_all_staff_ratio=average_student_all_staff_ratio, staff_age_min=staff_age_min, staff_age_max=staff_age_max, with_non_teaching_staff=with_non_teaching_staff) # Get facility staff if with_facilities: facilities_staff_uid_lists = spltcf.assign_facility_staff(datadir, location, state_location, country_location, ltcf_staff_age_min, ltcf_staff_age_max, facilities, workers_by_age_to_assign_count, potential_worker_uids_by_age, potential_worker_uids, facilities_by_uid_lists, age_by_uid, use_default=use_default) else: facilities_staff_uid_lists = [] # Generate non-school workplace sizes needed to send everyone to work workplace_size_brackets = spdata.get_workplace_size_brackets(**loc_pars) workplace_size_distr_by_brackets = spdata.get_workplace_size_distr_by_brackets(**loc_pars) workplace_sizes = spw.generate_workplace_sizes(workplace_size_distr_by_brackets, workplace_size_brackets, workers_by_age_to_assign_count) # Assign all workers who are not staff at schools to workplaces workplace_age_lists, workplace_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spw.assign_rest_of_workers(workplace_sizes, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count, age_by_uid, cm_age_brackets, cm_age_by_brackets, contact_matrices) # remove facilities from homes --- have already assigned each person a uid homes_by_uids = homes_by_uids[len(facilities_by_uid_lists):] homes = homes[len(facilities_by_uid_lists):] population = spcnx.make_contacts(self, age_by_uid=age_by_uid, homes_by_uids=homes_by_uids, students_by_uid_lists=student_uid_lists, teachers_by_uid_lists=teacher_uid_lists, non_teaching_staff_uid_lists=non_teaching_staff_uid_lists, workplace_by_uid_lists=workplace_uid_lists, facilities_by_uid_lists=facilities_by_uid_lists, facilities_staff_uid_lists=facilities_staff_uid_lists, use_two_group_reduction=use_two_group_reduction, average_LTCF_degree=average_LTCF_degree, with_school_types=with_school_types, school_mixing_type=school_mixing_type, average_class_size=average_class_size, inter_grade_mixing=inter_grade_mixing, average_student_teacher_ratio=average_student_teacher_ratio, average_teacher_teacher_degree=average_teacher_teacher_degree, average_student_all_staff_ratio=average_student_all_staff_ratio, average_additional_staff_degree=average_additional_staff_degree, school_type_by_age=school_type_by_age, max_contacts=max_contacts) # Change types for key, person in population.items(): for layerkey in population[key]['contacts'].keys(): population[key]['contacts'][layerkey] = list(population[key]['contacts'][layerkey]) school_mixing_types = [self.schools_in_groups[ns]['school_mixing_type'] for ns in range(len(self.schools_in_groups))] # temporarily store some information self.homes_by_uids = homes_by_uids self.workplace_uid_lists = workplace_uid_lists self.student_uid_lists = student_uid_lists self.teacher_uid_lists = teacher_uid_lists self.non_teaching_staff_uid_lists = non_teaching_staff_uid_lists self.school_types = school_types self.school_mixing_types = school_mixing_types if self.ltcf_pars.with_facilities: self.facilities_by_uid_lists = facilities_by_uid_lists self.facilities_staff_uid_lists = facilities_staff_uid_lists sum_ltcf_res = sum([len(f) for f in self.facilities_by_uid_lists]) if sum_ltcf_res == 0: log.warning(f"Heads up: Population size and long term care facility use rates were too low, no facilities were created for this population. If you wish to include people living in this type of layer, consider using a larger population size or checking your data on long term care facility use rates. Changing pop.with_facilities to False.") self.layers.remove('LTCF') self.ltcf_pars.with_facilities = False self.set_layer_classes() self.clean_up_layer_info() return population
[docs] def set_layer_classes(self): """Add layer classes.""" self.initialize_households_list() self.populate_households(self.homes_by_uids, self.age_by_uid) self.initialize_workplaces_list() self.populate_workplaces(self.workplace_uid_lists) self.initialize_schools_list() self.populate_schools(self.student_uid_lists, self.teacher_uid_lists, self.non_teaching_staff_uid_lists, self.age_by_uid, self.school_types, self.school_mixing_types) self.populate_all_classrooms(self.schools_in_groups) if self.ltcf_pars.with_facilities: self.initialize_ltcfs_list() self.populate_ltcfs(self.facilities_by_uid_lists, self.facilities_staff_uid_lists) return
[docs] def clean_up_layer_info(self): """ Clean up temporary data from the pop object after storing them in specific layer classes. """ for key in ['workplace_uid_lists', 'student_uid_lists', 'teacher_uid_lists', 'non_teaching_staff_uid_lists', 'school_types', 'school_mixing_types', 'schools_in_groups', 'facilities_by_uid_lists', 'facilities_staff_uid_lists']: self.pop_item(key) return
[docs] def pop_item(self, key): """Pop key from self.""" self.__dict__.pop(key, None) # pop checks if the key exists as an attribute and removes it in that case. Returns a default value of None if the key does not exist
[docs] def to_dict(self): """ Export to a dictionary -- official way to get the popdict. **Example**:: popdict = pop.to_dict() """ return sc.dcp(self.popdict)
[docs] def to_json(self, filename, indent=2, **kwargs): """ Export to a JSON file. **Example**:: pop.to_json('my-pop.json') """ return sc.savejson(filename, self.popdict, indent=indent, **kwargs)
[docs] def save(self, filename, **kwargs): """ Save population to an binary, gzipped object file. **Example**:: pop.save('my-pop.pop') """ return sc.saveobj(filename, self, **kwargs)
[docs] @staticmethod def load(filename, *args, **kwargs): """ Load from disk from a gzipped pickle. Args: filename (str): the name or path of the file to load from kwargs: passed to sc.loadobj() **Example**:: pop = sp.Pop.load('my-pop.pop') """ pop = sc.loadobj(filename, *args, **kwargs) if not isinstance(pop, Pop): errormsg = f'Cannot load object of {type(pop)} as a Pop object' raise TypeError(errormsg) return pop
[docs] def initialize_households_list(self): """Initialize a new households list.""" self.households = [] return
[docs] def initialize_empty_households(self, n_households=None): """ Create a list of empty households. Args: n_households (int) : the number of households to initialize """ sphh.initialize_empty_households(self, n_households) return
[docs] def populate_households(self, households, age_by_uid): """ Populate all of the households. Store each household at the index corresponding to it's hhid. Args: households (list) : list of lists where each sublist represents a household and contains the ids of the household members age_by_uid (dict) : dictionary mapping each person's id to their age """ sphh.populate_households(self, households, age_by_uid) return
[docs] def get_household(self, hhid): """ Return household with id: hhid. Args: hhid (int) : household id number Returns: sp.Household: A populated household. """ return sphh.get_household(self, hhid)
[docs] def add_household(self, household): """ Add a household to the list of households. Args: household (sp.Household): household with at minimum the hhid, member_uids, member_ages, reference_uid, and reference_age. """ sphh.add_household(self, household) return
[docs] def initialize_workplaces_list(self): """Initialize a new workplaces list.""" self.workplaces = [] return
[docs] def initialize_empty_workplaces(self, n_workplaces=None): """ Create a list of empty workplaces. Args: n_households (int) : the number of workplaces to initialize """ sphh.initialize_empty_workplaces(self, n_workplaces) return
[docs] def populate_workplaces(self, workplaces): """ Populate all of the workplaces. Store each workplace at the index corresponding to it's wpid. Args: workplaces (list) : list of lists where each sublist represents a workplace and contains the ids of the workplace members age_by_uid (dict) : dictionary mapping each person's id to their age """ spw.populate_workplaces(self, workplaces) return
[docs] def get_workplace(self, wpid): """ Return workplace with id: wpid. Args: wpid (int) : workplace id number Returns: sp.Workplace: A populated workplace. """ return spw.get_workplace(self, wpid)
[docs] def add_workplace(self, workplace): """ Add a workplace to the list of workplaces. Args: workplace (sp.Workplace): workplace with at minimum the wpid, member_uids, member_ages, reference_uid, and reference_age. """ spw.add_workplace(self, workplace) return
[docs] def initialize_ltcfs_list(self): """Initialize a new ltcfs list.""" self.ltcfs = [] return
[docs] def initialize_empty_ltcfs(self, n_ltcfs=None): """ Create a list of empty ltcfs. Args: n_ltcfs (int) : the number of ltcfs to initialize """ spltcf.initialize_empty_ltcfs(self, n_ltcfs) return
[docs] def populate_ltcfs(self, resident_lists, staff_lists): """ Populate all of the ltcfs. Store each ltcf at the index corresponding to it's ltcfid. Args: residents_list (list) : list of lists where each sublist represents a ltcf and contains the ids of the residents staff_lists (list) : list of lists where each sublist represents a ltcf and contains the ids of the staff """ spltcf.populate_ltcfs(self, resident_lists, staff_lists) return
[docs] def get_ltcf(self, ltcfid): """ Return ltcf with id: ltcfid. Args: ltcfid (int) : ltcf id number Returns: sp.LongTermCareFacility: A populated ltcf. """ return spltcf.get_ltcf(self, ltcfid)
[docs] def add_ltcf(self, ltcf): """ Add a ltcf to the list of ltcfs. Args: ltcf (sp.LongTermCareFacility): ltcf with at minimum the ltcfid, resident_uids, staff_uids, resident_ages, staff_ages, reference_uid, and reference_age. """ spltcf.add_ltcf(self, ltcf)
[docs] def initialize_schools_list(self): """Initialize a new schools list.""" self.schools = [] return
[docs] def initialize_empty_schools(self, n_schools=None): """ Create a list of empty schools. Args: n_schools (int) : the number of schools to initialize """ spsch.initialize_empty_schools(self, n_schools) return
[docs] def populate_schools(self, student_lists, teacher_lists, non_teaching_staff_lists, age_by_uid, school_types=None, school_mixing_types=None): """ Populate all of the schools. Store each school at the index corresponding to it's scid. Args: student_lists (list) : list of lists where each sublist represents a school and contains the ids of the students teacher_lists (list) : list of lists where each sublist represents a school and contains the ids of the teachers non_teaching_staff_lists (list) : list of lists where each sublist represents a school and contains the ids of the non teaching staff age_by_uid (dict) : dictionary mapping each person's id to their age school_types (list) : list of the school types school_mixing_types (list) : list of the school mixing types """ spsch.populate_schools(self, student_lists, teacher_lists, non_teaching_staff_lists, age_by_uid, school_types, school_mixing_types) return
[docs] def get_school(self, scid): """ Return school with id: scid. Args: scid (int) : school id number Returns: sp.School: A populated school. """ return spsch.get_school(self, scid)
[docs] def add_school(self, school): """ Add a school to the list of schools. Args: school (sp.School): school """ spsch.add_school(self, school) return
[docs] def populate_all_classrooms(self, schools_in_groups): """ Populate all of the classrooms in schools for each school that has school_mixing_type equal to 'age_and_class_clustered'. Each classroom will be indexed at id clid. Args: schools_in_groups (dict) : a dictionary representing each school in terms of student_groups and teacher_groups corresponding to classrooms """ for ns in range(self.n_schools): spsch.initialize_empty_classrooms(self.schools[ns], len(schools_in_groups[ns]['student_groups'])) spsch.populate_classrooms(self.schools[ns], schools_in_groups[ns]['student_groups'], schools_in_groups[ns]['teacher_groups'], self.age_by_uid) return
[docs] def get_classroom(self, scid, clid): """ Return classroom with id: clid from school with id: scid. Args: scid (int) : school id number clid (int) : classroom id number Returns: sp.Classroom : A populated classroom. """ return spsch.get_classroom(self, scid, clid)
[docs] def compute_information(self): """Computing an advanced description of the population.""" self.information = sc.objdict() self.information.age_count = self.count_pop_ages() self.information.layer_degrees = dict() self.information.layer_stats = dict() self.information.layer_degree_description = dict() for layer in self.layers: self.information.layer_degrees[layer] = spcnx.count_layer_degree(self, layer=layer) self.information.layer_stats[layer] = self.information.layer_degrees[layer].describe()[['age', 'degree']] self.information.layer_degree_description[layer] = self.information.layer_degrees[layer].groupby('age')['degree'].describe(percentiles=[0.05, 0.25, 0.5, 0.75, 0.95]) # default percentiles to include self.information.household_sizes = self.get_household_sizes() self.information.household_size_count = self.count_household_sizes() self.information.household_heads = self.get_household_heads() self.information.household_head_ages = self.get_household_head_ages() self.information.household_head_age_count = self.count_household_head_ages() self.information.household_head_ages_by_size_count = self.get_household_head_ages_by_size() self.information.ltcf_sizes = self.get_ltcf_sizes() self.information.ltcf_size_count = self.count_ltcf_sizes() self.information.enrollment_by_age = self.count_enrollment_by_age() self.information.enrollment_by_school_type = self.count_enrollment_by_school_type() self.information.employment_by_age = self.count_employment_by_age() self.information.workplace_sizes = self.get_workplace_sizes() self.information.workplace_size_count = self.count_workplace_sizes() return
[docs] def compute_summary(self): """Compute summaries and add to pop post generation.""" self.summary = sc.objdict() self.summary.mean_age = spb.calculate_mean_from_count(self.information.age_count) self.summary.std_age = spb.calculate_std_from_count(self.information.age_count) self.summary.layers = dict() for layer in self.layers: self.summary.layers[layer] = dict() percentiles = [5, 95] self.summary.layers['H']['mean'] = np.mean(list(self.information.household_sizes.values())) self.summary.layers['H']['std'] = np.std(list(self.information.household_sizes.values())) for p in percentiles: self.summary.layers['H'][p] = np.percentile(list(self.information.household_sizes.values()), q=p) sizes = [] for s in self.information.enrollment_by_school_type.keys(): sizes.extend(self.information.enrollment_by_school_type[s]) self.summary.layers['S']['mean'] = np.mean(sizes) self.summary.layers['S']['std'] = np.std(sizes) for p in percentiles: self.summary.layers['S'][p] = np.percentile(sizes, q=p) self.summary.layers['W']['mean'] = np.mean(list(self.information.workplace_sizes.values())) self.summary.layers['W']['std'] = np.std(list(self.information.workplace_sizes.values())) for p in percentiles: self.summary.layers['W'][p] = np.percentile(list(self.information.workplace_sizes.values()), q=p)
[docs] def summarize(self, return_msg=False): """Print and optionally return a brief summary string of the pop.""" msg = "" msg += f"This networked population is created to resemble the population of {self.location + ',' if self.location is not None else ''} {self.state_location + ',' if self.state_location is not None else ''} {self.country_location if self.country_location is not None else ''}.\n" msg += f"The number of people is {self.n:.0f}.\n" msg += f"The mean age is {self.summary.mean_age:.2f} +/- {self.summary.std_age:.2f} years old.\n" msg += "\n" for layer in self.layers: s = self.information.layer_stats[layer] msg += f"Layer {layer}: {self.layer_mappings[layer]}\n" msg += f" Number of people: {len(self.information.layer_degrees[layer]):.0f}\n" msg += f" Number of edges: {self.n * s.loc[s.index == 'mean']['degree'][0] * 2:.0f} ({s.loc[s.index == 'mean']['degree'][0]:.1f} ± {s.loc[s.index == 'std']['degree'][0]:.1f} per person)\n" msg += f" Age (years): {s.loc[s.index == 'mean']['age'][0]:.1f} ({s.loc[s.index == 'min']['age'][0]:.0f}-{s.loc[s.index == 'max']['age'][0]:.0f})\n" if layer in ['H', 'S', 'W']: msg += f" {self.layer_mappings[layer].title()} size: {self.summary.layers[layer]['mean']:.1f} ± {self.summary.layers[layer]['std']:.1f} people (range is {self.summary.layers[layer][5]:.1f}-{self.summary.layers[layer][95]:.1f}).\n" msg += "\n" msg += f"The rand_seed used to generate this population is {self.rand_seed}." print(msg) if return_msg: return msg else: return
[docs] def count_pop_ages(self): """ Create an age count of the generated population post generation. Returns: dict: Dictionary of the age count of the generated population. """ return spb.count_ages(self.popdict)
# convert to work on array
[docs] def get_household_sizes(self): """ Create household sizes in the generated population post generation. Returns: dict: Dictionary of household size by household id (hhid). """ return sphh.get_household_sizes(self.popdict)
# convert to work on array
[docs] def count_household_sizes(self): """ Count of household sizes in the generated population. Returns: dict: Dictionary of the count of household sizes. """ return spb.count_values(self.information.household_sizes)
# convert to work on array
[docs] def get_household_heads(self): """Get the ids of the head of households in the generated population post generation.""" return sphh.get_household_heads(self.popdict)
[docs] def get_household_head_ages(self): """Get the age of the head of each household in the generated population post generation.""" return {hhid: self.popdict[head_id]['age'] for hhid, head_id in self.information.household_heads.items()}
[docs] def count_household_head_ages(self, bins=None): """ Count of household head ages in the generated population. Args: bins (array) : If supplied, use this to create a binned count of the household head ages. Otherwise, count discrete household head ages. Returns: dict: Dictionary of the count of household head ages. """ if bins is None: return spb.count_values(self.information.household_head_ages) else: head_ages = list(self.information.household_head_ages.values()) hist, bins = np.histogram(head_ages, bins=bins, density=0) return {i: hist[i] for i in range(len(hist))}
[docs] def get_household_head_ages_by_size(self): """ Get the count of households by size and the age of the head of the household, assuming the minimal household members id is the id of the head of the household. Returns: np.ndarray: An array with row as household size and columns as household head age brackets. """ return sphh.get_household_head_ages_by_size(self)
# convert to work on array
[docs] def get_ltcf_sizes(self, keys_to_exclude=[]): """ Create long term care facility sizes in the generated population post generation. Args: keys_to_exclude (list) : possible keys to exclude for roles in long term care facilities. See notes. Returns: dict: Dictionary of the size for each long term care facility generated. Notes: keys_to_exclude is an empty list by default, but can contain the different long term care facility roles: 'snf_res' for residents and 'snf_staff' for staff. If either role is included in the parameter keys_to_exclude, then individuals with that value equal to 1 will not be counted. """ return spltcf.get_ltcf_sizes(self.popdict, keys_to_exclude)
# convert to work on array
[docs] def count_ltcf_sizes(self, keys_to_exclude=[]): """ Count of long term care facility sizes in the generated population. Args: keys_to_exclude (list) : possible keys to exclude for roles in long term care facilities. See notes. Returns: dict: Dictionary of the count of long term care facility sizes. Notes: keys_to_exclude is an empty list by default, but can contain the different long term care facility roles: 'snf_res' for residents and 'snf_staff' for staff. If either role is included in the parameter keys_to_exclude, then individuals with that value equal to 1 will not be counted. """ return spb.count_values(self.get_ltcf_sizes(keys_to_exclude))
[docs] def count_enrollment_by_age(self): """ Create enrollment count by age for students in the generated population post generation. Returns: dict: Dictionary of the count of enrolled students by age in the generated population. """ return spsch.count_enrollment_by_age(self.popdict)
@property def enrollment_rates_by_age(self): """ Enrollment rates by age for students in the generated population. Returns: dict: Dictionary of the enrollment rates by age for students in the generated population. """ return {k: self.information.enrollment_by_age[k]/self.information.age_count[k] if self.information.age_count[k] > 0 else 0 for k in range(defaults.settings.max_age)}
[docs] def count_enrollment_by_school_type(self, *args, **kwargs): """ Create enrollment sizes by school types in the generated population post generation. Returns: list: List of generated enrollment sizes by school type. """ enrollment_by_school_type = spsch.count_enrollment_by_school_type(self.popdict, *args, **kwargs) return enrollment_by_school_type
[docs] def count_employment_by_age(self): """ Create employment count by age for workers in the generated population post generation. Returns: dict: Dictionary of the count of employed workers by age in the generated population. """ return spw.count_employment_by_age(self.popdict)
@property def employment_rates_by_age(self): """ Employment rates by age for workers in the generated population. Returns: dict: Dictionary of the employment rates by age for workers in the generated population. """ return {k: self.information.employment_by_age[k]/self.information.age_count[k] if self.information.age_count[k] > 0 else 0 for k in range(defaults.settings.max_age)} # convert to work on array
[docs] def get_workplace_sizes(self): """ Create workplace sizes in the generated population post generation. Returns: dict: Dictionary of workplace size by workplace id (wpid). """ return spw.get_workplace_sizes(self.popdict)
# convert to work on array
[docs] def count_workplace_sizes(self): """ Count of workplace sizes in the generated population. Returns: dict:Dictionary of the count of workplace sizes. """ return spb.count_values(self.information.workplace_sizes)
[docs] def get_contact_counts_by_layer(self, layer='S', **kwargs): """ Get the number of contacts by layer. Returns: dict: Dictionary of the count of contacts in the layer for the different people types in the layer. See sp.contact_networks.get_contact_counts_by_layer() for method details. """ return spcnx.get_contact_counts_by_layer(self.popdict, layer, **kwargs)
[docs] def to_people(self): ''' Convert to the alternative People representation of a population ''' ppl = spp.make_people(popdict=self.popdict, rand_seed=self.rand_seed) # Create the corresponding population return ppl
[docs] def plot_people(self, *args, **kwargs): """Placeholder example of plotting the people in a population.""" ppl = self.to_people() fig = ppl.plot(*args, **kwargs) return fig
[docs] def plot_contacts(self, *args, **kwargs): """Plot matrices of the contacts for a given layer or layers.""" fig = sppl.plot_contacts(self.popdict, *args, **kwargs) return fig
[docs] def plot_contact_counts(self, contact_counter, **kwargs): """ Plot the number of contacts by contact types as a histogram. Args: contact_counter (dict) : A dictionary with people_types as keys and value as list of counts for each type of contacts **title_prefix(str) : optional title prefix for the figure **figname (str) : name to save figure to disk **fontsize (float) : Matplotlib.figure.fontsize Returns: Matplotlib figure and axes of the histograms of contact distributions for the corresponding contact_counter. **Examples**:: pars = {'n': 10e3, 'location': 'seattle_metro', 'state_location': 'Washington', 'country_location': 'usa'} pop = sp.Pop(**pars) layer = 'S' contact_counter = pop.get_contact_counts_by_layer(layer=layer) fig, ax = pop.plot_contact_counts(contact_counter) """ return sppl.plot_contact_counts(contact_counter, **kwargs)
[docs] def plot_ages(self, **kwargs): """ Plot a comparison of the expected and generated age distribution. **Example**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_ages() """ fig, ax = sppl.plot_ages(self, **kwargs) return fig, ax
[docs] def plot_household_sizes(self, **kwargs): """ Plot a comparison of the expected and generated household size distribution. **Example**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_household_sizes() """ fig, ax = sppl.plot_household_sizes(self, **kwargs) return fig, ax
[docs] def plot_household_head_ages_by_size(self, **kwargs): """ Plot a comparison of the expected and generated age distribution of the household heads by the household size. **Examples**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_household_head_ages_by_size() kwargs = pars.copy() fig, ax = pop.plot_household_head_ages_by_size(**kwargs) """ fig, ax = sppl.plot_household_head_ages_by_size(self, **kwargs) return fig, ax
[docs] def plot_ltcf_resident_sizes(self, **kwargs): """ Plot a comparison of the expected and generated ltcf resident sizes. **Examples**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_ltcf_resident_sizes() """ fig, ax = sppl.plot_ltcf_resident_sizes(self, **kwargs) return fig, ax
# def plot_ltcf_resident_staff_ratios(self, **kwargs): # """ # Plot a comparison of the expected and generated ltcf resident to staff # ratios. # **Examples**:: # pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} # pop = sp.Pop(**pars) # fig, ax = pop.plot_ltcf_resident_staff_ratios() # """ # fig, ax = sppl.plot_ltcf_resident_staff_ratios(self, **kwargs) # return fig, ax
[docs] def plot_enrollment_rates_by_age(self, **kwargs): """ Plot a comparison of the expected and generated enrollment rates by age. **Example**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_enrollment_rates_by_age() """ fig, ax = sppl.plot_enrollment_rates_by_age(self, **kwargs) return fig, ax
[docs] def plot_employment_rates_by_age(self, **kwargs): """ Plot a comparison of the expected and generated employment rates by age. **Example**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_employment_rates_by_age() """ fig, ax = sppl.plot_employment_rates_by_age(self, **kwargs) return fig, ax
[docs] def plot_school_sizes(self, *args, **kwargs): """ Plot a comparison of the expected and generated school size distributions by school type. **Example**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_school_sizes() """ fig, ax = sppl.plot_school_sizes(self, *args, **kwargs) return fig, ax
[docs] def plot_workplace_sizes(self, **kwargs): """ Plot a comparison of the expected and generated workplace sizes for workplaces that are not schools or long term care facilities. **Examples**:: pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'} pop = sp.Pop(**pars) fig, ax = pop.plot_ltcf_resident_sizes() """ fig, ax = sppl.plot_workplace_sizes(self, **kwargs) return fig, ax
[docs]def make_population(*args, **kwargs): ''' Interface to sp.Pop().to_dict(). Included for backwards compatibility. ''' log.debug('make_population()') deprecated = ['generate', 'datadir', 'sheet_name', 'verbose', 'plot', 'write', 'return_popdict', 'use_demography'] for key in list(kwargs.keys()): if key in deprecated: log.warning(f'You have specified parameter {key}, but this parameter is deprecated and will be ignored.') kwargs.pop(key) # Heavy lift 1: make the contacts and their connections log.debug('Generating a new population...') pop = Pop(*args, **kwargs) population = pop.to_dict() log.debug('make_population(): done.') return population
[docs]def generate_synthetic_population(*args, **kwargs): ''' For backwards compatibility only. ''' log.warning('This function is deprecated and may be removed in future releases') return make_population(*args, **kwargs)