"""
This module provides the main class for interacting with SynthPops, the Pop class.
"""
import numpy as np
import sciris as sc
from .config import logger as log
from . import version as spv
from . import defaults
from . import base as spb
from . import config as cfg
from . import sampling as spsamp
from . import data_distributions as spdata
from . import ltcfs as spltcf
from . import households as sphh
from . import schools as spsch
from . import workplaces as spw
from . import contact_networks as spcnx
from . import plotting as sppl
from . import people as spp
__all__ = ['Pop', 'make_population', 'generate_synthetic_population']
[docs]class Pop(sc.prettyobj):
def __init__(self,
n=None,
max_contacts=None,
ltcf_pars=None,
school_pars=None,
with_industry_code=False,
with_facilities=False,
use_default=False,
use_two_group_reduction=True,
average_LTCF_degree=20,
ltcf_staff_age_min=20,
ltcf_staff_age_max=60,
with_school_types=False,
school_mixing_type='random',
average_class_size=20,
inter_grade_mixing=0.1,
average_student_teacher_ratio=20,
average_teacher_teacher_degree=3,
teacher_age_min=25,
teacher_age_max=75,
with_non_teaching_staff=False,
average_student_all_staff_ratio=15,
average_additional_staff_degree=20,
staff_age_min=20,
staff_age_max=75,
rand_seed=None,
country_location=None,
state_location=None,
location=None,
sheet_name=None,
household_method='infer_ages',
smooth_ages=False,
window_length=7,
do_make=True
):
'''
Make a full population network including both people (ages, sexes) and
contacts. By default uses Seattle, Washington data. Note about the
household methods available: 'infer_ages' and 'fixed_ages'.
If using 'infer_ages', then the ages of individuals in the population
are generated by first placing individuals into households using the age
of the head of households or reference individuals (always an adult),
household age mixing patterns, household sizes, and the age distribution
from data (census or other sources).
If using 'fixed_ages', then individuals are pre-assigned ages according
to the age distribution and placed into households using the age of the
head of households or reference individuals, household age mixing
patterns, and household sizes.
Args:
n (int) : The number of people to create.
max_contacts (dict) : A dictionary for maximum number of contacts per layer: keys must be "W" (work).
ltcf_pars (dict) : If supplied, replace default LTCF parameters
school_pars (dict) : if supplied, replace default school parameters
with_industry_code (bool) : If True, assign industry codes for workplaces, currently only possible for cached files of populations in the US.
with_facilities (bool) : If True, create long term care facilities, currently only available for locations in the US.
use_default (bool) : If True, use default data from settings.location, settings.state, settings.country.
use_two_group_reduction (bool) : If True, create long term care facilities with reduced contacts across both groups.
average_LTCF_degree (float) : default average degree in long term care facilities.
ltcf_staff_age_min (int) : Long term care facility staff minimum age.
ltcf_staff_age_max (int) : Long term care facility staff maximum age.
with_school_types (bool) : If True, creates explicit school types.
school_mixing_type (str or dict) : The mixing type for schools, 'random', 'age_clustered', or 'age_and_class_clustered' if string, and a dictionary of these by school type otherwise.
average_class_size (float) : The average classroom size.
inter_grade_mixing (float) : The average fraction of edges rewired to create edges between grades in the same school when school_mixing_type is 'age_clustered'
average_student_teacher_ratio (float) : The average number of students per teacher.
average_teacher_teacher_degree (float) : The average number of contacts per teacher with other teachers.
teacher_age_min (int) : The minimum age for teachers.
teacher_age_max (int) : The maximum age for teachers.
with_non_teaching_staff (bool) : If True, includes non teaching staff.
average_student_all_staff_ratio (float) : The average number of students per staff members at school (including both teachers and non teachers).
average_additional_staff_degree (float) : The average number of contacts per additional non teaching staff in schools.
staff_age_min (int) : The minimum age for non teaching staff.
staff_age_max (int) : The maximum age for non teaching staff.
rand_seed (int) : Start point random sequence is generated from.
country_location (string) : name of the country the location is in
state_location (string) : name of the state the location is in
location (string) : name of the location
sheet_name (string) : sheet name where data is located
household_method (string) : name of household generation method used; for details see above.
smooth_ages (bool) : If True, use smoothed out age distribution.
window_length (int) : length of window over which to average or smooth out age distribution
do_make (bool) : whether to make the population
Returns:
network (dict): A dictionary of the full population with ages, connections, and other attributes.
'''
log.debug('Pop()')
# General parameters
if n is None:
log.warning(f"Pop size n not given, generating a population with a default size of {defaults.default_pop_size} people.")
n = defaults.default_pop_size
elif n < defaults.default_pop_size:
log.warning(f"Pop size n: {n} is too small for synthpops to make contact networks that statistically represent real world populations. Resultant networks may not look realistic.")
# Assign all the variables
self.loc_pars = sc.objdict()
self.school_pars = sc.objdict()
self.ltcf_pars = sc.objdict()
self.n = int(n)
self.max_contacts = sc.mergedicts({'W': 20}, max_contacts)
self.with_industry_code = with_industry_code
self.rand_seed = rand_seed
self.country_location = country_location
self.state_location = state_location
self.location = location
self.sheet_name = sheet_name
self.use_default = use_default
# Age distribution parameters
self.smooth_ages = smooth_ages
self.window_length = window_length
# Household parameters
self.household_method = household_method
# School parameters
self.school_pars.with_school_types = with_school_types
self.school_pars.school_mixing_type = school_mixing_type
self.school_pars.average_class_size = average_class_size
self.school_pars.inter_grade_mixing = inter_grade_mixing
self.school_pars.average_student_teacher_ratio = average_student_teacher_ratio
self.school_pars.average_teacher_teacher_degree = average_teacher_teacher_degree
self.school_pars.teacher_age_min = teacher_age_min
self.school_pars.teacher_age_max = teacher_age_max
self.school_pars.with_non_teaching_staff = with_non_teaching_staff
self.school_pars.average_student_all_staff_ratio = average_student_all_staff_ratio
self.school_pars.average_additional_staff_degree = average_additional_staff_degree
self.school_pars.staff_age_min = staff_age_min
self.school_pars.staff_age_max = staff_age_max
# LTCF parameters
self.ltcf_pars.with_facilities = with_facilities
self.ltcf_pars.use_two_group_reduction = use_two_group_reduction
self.ltcf_pars.average_LTCF_degree = average_LTCF_degree
self.ltcf_pars.ltcf_staff_age_min = ltcf_staff_age_min
self.ltcf_pars.ltcf_staff_age_max = ltcf_staff_age_max
# If any parameters are supplied as a dict to override defaults, merge them in now
self.school_pars = sc.objdict(sc.mergedicts(self.school_pars, school_pars))
self.ltcf_pars = sc.objdict(sc.mergedicts(self.ltcf_pars, ltcf_pars))
# what are the layers generated?
if self.ltcf_pars.with_facilities:
self.layers = ['H', 'S', 'W', 'LTCF']
else:
self.layers = ['H', 'S', 'W']
self.layer_mappings = dict(H='Households', S='Schools', W='Workplaces', LTCF='Long Term Care facilities')
# Handle the seed
if self.rand_seed is not None:
spsamp.set_seed(self.rand_seed)
# Handle data
if self.country_location is None:
self.country_location = defaults.settings.country_location
self.state_location = defaults.settings.state_location
self.location = defaults.settings.location
else:
print(f"========== setting country location = {country_location}")
cfg.set_location_defaults(country_location)
self.max_age = defaults.settings.max_age
# if country is specified, and state is not, we are doing a country population
if self.state_location is None:
self.location = None
# if sheet name is not specified, use the default
if self.sheet_name is None:
self.sheet_name = defaults.settings.sheet_name
self.datadir = defaults.settings.datadir # Assume this has been reset...
# Location parameters
self.loc_pars.location = self.location
self.loc_pars.state_location = self.state_location
self.loc_pars.country_location = self.country_location
self.loc_pars.datadir = self.datadir
self.loc_pars.use_default = self.use_default
# Heavy lift: make the contacts and their connections
log.debug('Generating a new population...')
population = self.generate()
self.popdict = population
log.debug('Pop(): done.')
# Add summaries post hoc --- TBD: summaries during generation
self.compute_information() # compute full information
self.compute_summary() # then compute condensed summary
# Plotting defaults
self.plkwargs = sppl.plotting_kwargs()
# Set metadata -- version etc.
cfg.set_metadata(self)
return
[docs] def generate(self):
"""
Actually generate the network.
Returns:
network (dict): A dictionary of the full population with ages, connections, and other attributes.
"""
log.debug('generate()')
# TODO: unpack variables -- to be refactored to pass parameters directly
# General parameters
datadir = self.datadir
location = self.location
state_location = self.state_location
country_location = self.country_location
n = self.n
sheet_name = self.sheet_name
max_contacts = self.max_contacts
use_default = self.use_default
loc_pars = self.loc_pars
# Age distribution parameters
smooth_ages = self.smooth_ages
window_length = self.window_length
# Household parameters
household_method = self.household_method
# LTCF parameters
use_two_group_reduction = self.ltcf_pars.use_two_group_reduction
average_LTCF_degree = self.ltcf_pars.average_LTCF_degree
with_facilities = self.ltcf_pars.with_facilities
ltcf_staff_age_min = self.ltcf_pars.ltcf_staff_age_min
ltcf_staff_age_max = self.ltcf_pars.ltcf_staff_age_max
# School parameters
with_school_types = self.school_pars.with_school_types
school_mixing_type = self.school_pars.school_mixing_type
average_class_size = self.school_pars.average_class_size
inter_grade_mixing = self.school_pars.inter_grade_mixing
average_student_teacher_ratio = self.school_pars.average_student_teacher_ratio
average_teacher_teacher_degree = self.school_pars.average_teacher_teacher_degree
teacher_age_min = self.school_pars.teacher_age_min
teacher_age_max = self.school_pars.teacher_age_max
with_non_teaching_staff = self.school_pars.with_non_teaching_staff
average_student_all_staff_ratio = self.school_pars.average_student_all_staff_ratio
average_additional_staff_degree = self.school_pars.average_additional_staff_degree
staff_age_min = self.school_pars.staff_age_min
staff_age_max = self.school_pars.staff_age_max
# Load and store the expected age distribution of the population
age_bracket_dist = spdata.read_age_bracket_distr(**loc_pars) # age distribution defined by bins or age brackets
expected_age_dist = spdata.get_smoothed_single_year_age_distr(**loc_pars, window_length=self.window_length)
self.expected_age_dist = expected_age_dist
expected_age_dist_values = [expected_age_dist[a] for a in expected_age_dist]
self.expected_age_dist_values = expected_age_dist_values
# Load and store the age brackets
age_brackets = spdata.get_census_age_brackets(**loc_pars)
self.age_brackets = age_brackets
# mapping
age_by_brackets = spb.get_age_by_brackets(age_brackets)
self.age_by_brackets = age_by_brackets
# Load the contact matrix
contact_matrices = spdata.get_contact_matrices(datadir, sheet_name=sheet_name)
# Store expected contact matrices
self.contact_matrices = contact_matrices
# Load age brackets, and mapping dictionary that matches contact matrices
contact_matrix_shape = contact_matrices[list(contact_matrices.keys())[0]].shape
contact_matrix_row = contact_matrix_shape[0]
cm_age_brackets = spdata.get_census_age_brackets(**loc_pars, nbrackets=contact_matrix_row)
self.cm_age_brackets = cm_age_brackets
cm_age_by_brackets = spb.get_age_by_brackets(cm_age_brackets)
self.cm_age_by_brackets = cm_age_by_brackets
# Generate an age count for the population --- this will get passed around to methods generating the different layers where people live: long term care facilities, households, agricultural living quarters, other group living arrangements
age_count = sphh.generate_age_count_multinomial(n, expected_age_dist_values)
# Ages left to assign to a residence
ages_left_to_assign = sc.dcp(age_count)
# Generate LTCFs and remove some people from the age count of people left to place in a resident by age
n_nonltcf, ltcf_adjusted_age_dist, ltcf_adjusted_age_dist_values, ages_left_to_assign, facilities = spltcf.generate_ltcfs(n, with_facilities, loc_pars, expected_age_dist, ages_left_to_assign)
# Generate households
household_size_dist = spdata.get_household_size_distr(**loc_pars)
hh_sizes = sphh.generate_household_size_count_from_fixed_pop_size(n_nonltcf, household_size_dist)
hha_brackets = spdata.get_head_age_brackets(**loc_pars)
hha_by_size = spdata.get_head_age_by_size_distr(**loc_pars)
if household_method == 'fixed_ages':
homes_dic, homes = sphh.generate_all_households_fixed_ages(n_nonltcf, hh_sizes, hha_by_size, hha_brackets, cm_age_brackets, cm_age_by_brackets, contact_matrices, ages_left_to_assign)
else:
log.debug("defaulting to 'infer_ages' household generation method. See method notes for description.")
homes_dic, homes = sphh.generate_all_households_infer_ages(n, n_nonltcf, hh_sizes, hha_by_size, hha_brackets, cm_age_brackets, cm_age_by_brackets, contact_matrices, ltcf_adjusted_age_dist, ages_left_to_assign)
# Handle homes and facilities
homes = facilities + homes
homes_by_uids, age_by_uid = sphh.assign_uids_by_homes(homes) # include facilities to assign ids
age_by_uid_arr = np.array([age_by_uid[i] for i in range(self.n)], dtype=int)
self.age_by_uid = age_by_uid_arr
facilities_by_uid_lists = homes_by_uids[0:len(facilities)]
# Generate school sizes
school_sizes_dist_by_brackets = spdata.get_school_size_distr_by_brackets(**loc_pars) # without school type
school_size_brackets = spdata.get_school_size_brackets(**loc_pars) # for right now the size distribution for all school types will use the same brackets or bins
# Figure out who's going to school as a student with enrollment rates (gets called inside sp.get_uids_in_school)
uids_in_school, uids_in_school_by_age, ages_in_school_count = spsch.get_uids_in_school(datadir, n_nonltcf, location, state_location, country_location, age_by_uid, homes_by_uids, use_default=use_default) # this will call in school enrollment rates
if with_school_types:
school_size_distr_by_type = spdata.get_school_size_distr_by_type(**loc_pars)
school_type_age_ranges = spdata.get_school_type_age_ranges(**loc_pars)
school_types_distr_by_age = spsch.get_school_types_distr_by_age(school_type_age_ranges)
school_type_by_age = spsch.get_school_types_by_age_single(school_types_distr_by_age)
student_age_lists, student_uid_lists, school_types = spsch.send_students_to_school_with_school_types(school_size_distr_by_type,
school_size_brackets,
uids_in_school,
uids_in_school_by_age,
ages_in_school_count,
school_types_distr_by_age,
school_type_age_ranges)
else:
# Get school sizes
school_sizes = spsch.generate_school_sizes(school_sizes_dist_by_brackets, school_size_brackets, uids_in_school)
# Assign students to school using contact matrix method - generic schools
student_age_lists, student_uid_lists, school_types = spsch.send_students_to_school(school_sizes,
uids_in_school,
uids_in_school_by_age,
ages_in_school_count,
cm_age_brackets,
cm_age_by_brackets,
contact_matrices)
school_type_by_age = None
# Get employment rates
employment_rates = spdata.get_employment_rates(**loc_pars)
# Find people who can be workers (removing everyone who is currently a student)
uids_by_age = spb.get_ids_by_age(age_by_uid) # Make a dictionary listing out uids of people by their age
potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count = spw.get_uids_potential_workers(student_uid_lists,
employment_rates,
age_by_uid)
workers_by_age_to_assign_count = spw.get_workers_by_age_to_assign(employment_rates, potential_worker_ages_left_count, uids_by_age)
# Removing facilities residents from potential workers
potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spltcf.remove_ltcf_residents_from_potential_workers(facilities_by_uid_lists,
potential_worker_uids,
potential_worker_uids_by_age,
workers_by_age_to_assign_count,
age_by_uid)
# Assign teachers and update school lists
teacher_age_lists, teacher_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spsch.assign_teachers_to_schools(student_age_lists,
student_uid_lists,
employment_rates,
workers_by_age_to_assign_count,
potential_worker_uids,
potential_worker_uids_by_age,
potential_worker_ages_left_count,
average_student_teacher_ratio=average_student_teacher_ratio,
teacher_age_min=teacher_age_min,
teacher_age_max=teacher_age_max)
# Assign non teaching staff and update who's available to work at other places
non_teaching_staff_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spsch.assign_additional_staff_to_schools(student_uid_lists,
teacher_uid_lists,
workers_by_age_to_assign_count,
potential_worker_uids,
potential_worker_uids_by_age,
potential_worker_ages_left_count,
average_student_teacher_ratio=average_student_teacher_ratio,
average_student_all_staff_ratio=average_student_all_staff_ratio,
staff_age_min=staff_age_min,
staff_age_max=staff_age_max,
with_non_teaching_staff=with_non_teaching_staff)
# Get facility staff
if with_facilities:
facilities_staff_uid_lists = spltcf.assign_facility_staff(datadir,
location,
state_location,
country_location,
ltcf_staff_age_min,
ltcf_staff_age_max,
facilities,
workers_by_age_to_assign_count,
potential_worker_uids_by_age,
potential_worker_uids,
facilities_by_uid_lists,
age_by_uid,
use_default=use_default)
else:
facilities_staff_uid_lists = []
# Generate non-school workplace sizes needed to send everyone to work
workplace_size_brackets = spdata.get_workplace_size_brackets(**loc_pars)
workplace_size_distr_by_brackets = spdata.get_workplace_size_distr_by_brackets(**loc_pars)
workplace_sizes = spw.generate_workplace_sizes(workplace_size_distr_by_brackets, workplace_size_brackets, workers_by_age_to_assign_count)
# Assign all workers who are not staff at schools to workplaces
workplace_age_lists, workplace_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count = spw.assign_rest_of_workers(workplace_sizes,
potential_worker_uids,
potential_worker_uids_by_age,
workers_by_age_to_assign_count,
age_by_uid,
cm_age_brackets,
cm_age_by_brackets,
contact_matrices)
# remove facilities from homes --- have already assigned each person a uid
homes_by_uids = homes_by_uids[len(facilities_by_uid_lists):]
homes = homes[len(facilities_by_uid_lists):]
population = spcnx.make_contacts(self,
age_by_uid=age_by_uid,
homes_by_uids=homes_by_uids,
students_by_uid_lists=student_uid_lists,
teachers_by_uid_lists=teacher_uid_lists,
non_teaching_staff_uid_lists=non_teaching_staff_uid_lists,
workplace_by_uid_lists=workplace_uid_lists,
facilities_by_uid_lists=facilities_by_uid_lists,
facilities_staff_uid_lists=facilities_staff_uid_lists,
use_two_group_reduction=use_two_group_reduction,
average_LTCF_degree=average_LTCF_degree,
with_school_types=with_school_types,
school_mixing_type=school_mixing_type,
average_class_size=average_class_size,
inter_grade_mixing=inter_grade_mixing,
average_student_teacher_ratio=average_student_teacher_ratio,
average_teacher_teacher_degree=average_teacher_teacher_degree,
average_student_all_staff_ratio=average_student_all_staff_ratio,
average_additional_staff_degree=average_additional_staff_degree,
school_type_by_age=school_type_by_age,
max_contacts=max_contacts)
# Change types
for key, person in population.items():
for layerkey in population[key]['contacts'].keys():
population[key]['contacts'][layerkey] = list(population[key]['contacts'][layerkey])
school_mixing_types = [self.schools_in_groups[ns]['school_mixing_type'] for ns in range(len(self.schools_in_groups))]
# temporarily store some information
self.homes_by_uids = homes_by_uids
self.workplace_uid_lists = workplace_uid_lists
self.student_uid_lists = student_uid_lists
self.teacher_uid_lists = teacher_uid_lists
self.non_teaching_staff_uid_lists = non_teaching_staff_uid_lists
self.school_types = school_types
self.school_mixing_types = school_mixing_types
if self.ltcf_pars.with_facilities:
self.facilities_by_uid_lists = facilities_by_uid_lists
self.facilities_staff_uid_lists = facilities_staff_uid_lists
sum_ltcf_res = sum([len(f) for f in self.facilities_by_uid_lists])
if sum_ltcf_res == 0:
log.warning(f"Heads up: Population size and long term care facility use rates were too low, no facilities were created for this population. If you wish to include people living in this type of layer, consider using a larger population size or checking your data on long term care facility use rates. Changing pop.with_facilities to False.")
self.layers.remove('LTCF')
self.ltcf_pars.with_facilities = False
self.set_layer_classes()
self.clean_up_layer_info()
return population
[docs] def set_layer_classes(self):
"""Add layer classes."""
self.initialize_households_list()
self.populate_households(self.homes_by_uids, self.age_by_uid)
self.initialize_workplaces_list()
self.populate_workplaces(self.workplace_uid_lists)
self.initialize_schools_list()
self.populate_schools(self.student_uid_lists, self.teacher_uid_lists,
self.non_teaching_staff_uid_lists, self.age_by_uid,
self.school_types, self.school_mixing_types)
self.populate_all_classrooms(self.schools_in_groups)
if self.ltcf_pars.with_facilities:
self.initialize_ltcfs_list()
self.populate_ltcfs(self.facilities_by_uid_lists, self.facilities_staff_uid_lists)
return
[docs] def clean_up_layer_info(self):
"""
Clean up temporary data from the pop object after storing them in specific layer classes.
"""
for key in ['workplace_uid_lists', 'student_uid_lists', 'teacher_uid_lists',
'non_teaching_staff_uid_lists', 'school_types',
'school_mixing_types', 'schools_in_groups',
'facilities_by_uid_lists', 'facilities_staff_uid_lists']:
self.pop_item(key)
return
[docs] def pop_item(self, key):
"""Pop key from self."""
self.__dict__.pop(key, None) # pop checks if the key exists as an attribute and removes it in that case. Returns a default value of None if the key does not exist
[docs] def to_dict(self):
"""
Export to a dictionary -- official way to get the popdict.
**Example**::
popdict = pop.to_dict()
"""
return sc.dcp(self.popdict)
[docs] def to_json(self, filename, indent=2, **kwargs):
"""
Export to a JSON file.
**Example**::
pop.to_json('my-pop.json')
"""
return sc.savejson(filename, self.popdict, indent=indent, **kwargs)
[docs] def save(self, filename, **kwargs):
"""
Save population to an binary, gzipped object file.
**Example**::
pop.save('my-pop.pop')
"""
return sc.saveobj(filename, self, **kwargs)
[docs] @staticmethod
def load(filename, *args, **kwargs):
"""
Load from disk from a gzipped pickle.
Args:
filename (str): the name or path of the file to load from
kwargs: passed to sc.loadobj()
**Example**::
pop = sp.Pop.load('my-pop.pop')
"""
pop = sc.loadobj(filename, *args, **kwargs)
if not isinstance(pop, Pop):
errormsg = f'Cannot load object of {type(pop)} as a Pop object'
raise TypeError(errormsg)
return pop
[docs] def initialize_households_list(self):
"""Initialize a new households list."""
self.households = []
return
[docs] def initialize_empty_households(self, n_households=None):
"""
Create a list of empty households.
Args:
n_households (int) : the number of households to initialize
"""
sphh.initialize_empty_households(self, n_households)
return
[docs] def populate_households(self, households, age_by_uid):
"""
Populate all of the households. Store each household at the index corresponding to it's hhid.
Args:
households (list) : list of lists where each sublist represents a household and contains the ids of the household members
age_by_uid (dict) : dictionary mapping each person's id to their age
"""
sphh.populate_households(self, households, age_by_uid)
return
[docs] def get_household(self, hhid):
"""
Return household with id: hhid.
Args:
hhid (int) : household id number
Returns:
sp.Household: A populated household.
"""
return sphh.get_household(self, hhid)
[docs] def add_household(self, household):
"""
Add a household to the list of households.
Args:
household (sp.Household): household with at minimum the hhid, member_uids, member_ages, reference_uid, and reference_age.
"""
sphh.add_household(self, household)
return
[docs] def initialize_workplaces_list(self):
"""Initialize a new workplaces list."""
self.workplaces = []
return
[docs] def initialize_empty_workplaces(self, n_workplaces=None):
"""
Create a list of empty workplaces.
Args:
n_households (int) : the number of workplaces to initialize
"""
sphh.initialize_empty_workplaces(self, n_workplaces)
return
[docs] def populate_workplaces(self, workplaces):
"""
Populate all of the workplaces. Store each workplace at the index corresponding to it's wpid.
Args:
workplaces (list) : list of lists where each sublist represents a workplace and contains the ids of the workplace members
age_by_uid (dict) : dictionary mapping each person's id to their age
"""
spw.populate_workplaces(self, workplaces)
return
[docs] def get_workplace(self, wpid):
"""
Return workplace with id: wpid.
Args:
wpid (int) : workplace id number
Returns:
sp.Workplace: A populated workplace.
"""
return spw.get_workplace(self, wpid)
[docs] def add_workplace(self, workplace):
"""
Add a workplace to the list of workplaces.
Args:
workplace (sp.Workplace): workplace with at minimum the wpid, member_uids, member_ages, reference_uid, and reference_age.
"""
spw.add_workplace(self, workplace)
return
[docs] def initialize_ltcfs_list(self):
"""Initialize a new ltcfs list."""
self.ltcfs = []
return
[docs] def initialize_empty_ltcfs(self, n_ltcfs=None):
"""
Create a list of empty ltcfs.
Args:
n_ltcfs (int) : the number of ltcfs to initialize
"""
spltcf.initialize_empty_ltcfs(self, n_ltcfs)
return
[docs] def populate_ltcfs(self, resident_lists, staff_lists):
"""
Populate all of the ltcfs. Store each ltcf at the index corresponding to it's ltcfid.
Args:
residents_list (list) : list of lists where each sublist represents a ltcf and contains the ids of the residents
staff_lists (list) : list of lists where each sublist represents a ltcf and contains the ids of the staff
"""
spltcf.populate_ltcfs(self, resident_lists, staff_lists)
return
[docs] def get_ltcf(self, ltcfid):
"""
Return ltcf with id: ltcfid.
Args:
ltcfid (int) : ltcf id number
Returns:
sp.LongTermCareFacility: A populated ltcf.
"""
return spltcf.get_ltcf(self, ltcfid)
[docs] def add_ltcf(self, ltcf):
"""
Add a ltcf to the list of ltcfs.
Args:
ltcf (sp.LongTermCareFacility): ltcf with at minimum the ltcfid, resident_uids, staff_uids, resident_ages, staff_ages, reference_uid, and reference_age.
"""
spltcf.add_ltcf(self, ltcf)
[docs] def initialize_schools_list(self):
"""Initialize a new schools list."""
self.schools = []
return
[docs] def initialize_empty_schools(self, n_schools=None):
"""
Create a list of empty schools.
Args:
n_schools (int) : the number of schools to initialize
"""
spsch.initialize_empty_schools(self, n_schools)
return
[docs] def populate_schools(self, student_lists, teacher_lists, non_teaching_staff_lists, age_by_uid, school_types=None, school_mixing_types=None):
"""
Populate all of the schools. Store each school at the index corresponding to it's scid.
Args:
student_lists (list) : list of lists where each sublist represents a school and contains the ids of the students
teacher_lists (list) : list of lists where each sublist represents a school and contains the ids of the teachers
non_teaching_staff_lists (list) : list of lists where each sublist represents a school and contains the ids of the non teaching staff
age_by_uid (dict) : dictionary mapping each person's id to their age
school_types (list) : list of the school types
school_mixing_types (list) : list of the school mixing types
"""
spsch.populate_schools(self, student_lists, teacher_lists, non_teaching_staff_lists, age_by_uid, school_types, school_mixing_types)
return
[docs] def get_school(self, scid):
"""
Return school with id: scid.
Args:
scid (int) : school id number
Returns:
sp.School: A populated school.
"""
return spsch.get_school(self, scid)
[docs] def add_school(self, school):
"""
Add a school to the list of schools.
Args:
school (sp.School): school
"""
spsch.add_school(self, school)
return
[docs] def populate_all_classrooms(self, schools_in_groups):
"""
Populate all of the classrooms in schools for each school that has
school_mixing_type equal to 'age_and_class_clustered'. Each classroom
will be indexed at id clid.
Args:
schools_in_groups (dict) : a dictionary representing each school in terms of student_groups and teacher_groups corresponding to classrooms
"""
for ns in range(self.n_schools):
spsch.initialize_empty_classrooms(self.schools[ns], len(schools_in_groups[ns]['student_groups']))
spsch.populate_classrooms(self.schools[ns], schools_in_groups[ns]['student_groups'], schools_in_groups[ns]['teacher_groups'], self.age_by_uid)
return
[docs] def get_classroom(self, scid, clid):
"""
Return classroom with id: clid from school with id: scid.
Args:
scid (int) : school id number
clid (int) : classroom id number
Returns:
sp.Classroom : A populated classroom.
"""
return spsch.get_classroom(self, scid, clid)
[docs] def compute_summary(self):
"""Compute summaries and add to pop post generation."""
self.summary = sc.objdict()
self.summary.mean_age = spb.calculate_mean_from_count(self.information.age_count)
self.summary.std_age = spb.calculate_std_from_count(self.information.age_count)
self.summary.layers = dict()
for layer in self.layers:
self.summary.layers[layer] = dict()
percentiles = [5, 95]
self.summary.layers['H']['mean'] = np.mean(list(self.information.household_sizes.values()))
self.summary.layers['H']['std'] = np.std(list(self.information.household_sizes.values()))
for p in percentiles:
self.summary.layers['H'][p] = np.percentile(list(self.information.household_sizes.values()), q=p)
sizes = []
for s in self.information.enrollment_by_school_type.keys():
sizes.extend(self.information.enrollment_by_school_type[s])
self.summary.layers['S']['mean'] = np.mean(sizes)
self.summary.layers['S']['std'] = np.std(sizes)
for p in percentiles:
self.summary.layers['S'][p] = np.percentile(sizes, q=p)
self.summary.layers['W']['mean'] = np.mean(list(self.information.workplace_sizes.values()))
self.summary.layers['W']['std'] = np.std(list(self.information.workplace_sizes.values()))
for p in percentiles:
self.summary.layers['W'][p] = np.percentile(list(self.information.workplace_sizes.values()), q=p)
[docs] def summarize(self, return_msg=False):
"""Print and optionally return a brief summary string of the pop."""
msg = ""
msg += f"This networked population is created to resemble the population of {self.location + ',' if self.location is not None else ''} {self.state_location + ',' if self.state_location is not None else ''} {self.country_location if self.country_location is not None else ''}.\n"
msg += f"The number of people is {self.n:.0f}.\n"
msg += f"The mean age is {self.summary.mean_age:.2f} +/- {self.summary.std_age:.2f} years old.\n"
msg += "\n"
for layer in self.layers:
s = self.information.layer_stats[layer]
msg += f"Layer {layer}: {self.layer_mappings[layer]}\n"
msg += f" Number of people: {len(self.information.layer_degrees[layer]):.0f}\n"
msg += f" Number of edges: {self.n * s.loc[s.index == 'mean']['degree'][0] * 2:.0f} ({s.loc[s.index == 'mean']['degree'][0]:.1f} ± {s.loc[s.index == 'std']['degree'][0]:.1f} per person)\n"
msg += f" Age (years): {s.loc[s.index == 'mean']['age'][0]:.1f} ({s.loc[s.index == 'min']['age'][0]:.0f}-{s.loc[s.index == 'max']['age'][0]:.0f})\n"
if layer in ['H', 'S', 'W']:
msg += f" {self.layer_mappings[layer].title()} size: {self.summary.layers[layer]['mean']:.1f} ± {self.summary.layers[layer]['std']:.1f} people (range is {self.summary.layers[layer][5]:.1f}-{self.summary.layers[layer][95]:.1f}).\n"
msg += "\n"
msg += f"The rand_seed used to generate this population is {self.rand_seed}."
print(msg)
if return_msg:
return msg
else:
return
[docs] def count_pop_ages(self):
"""
Create an age count of the generated population post generation.
Returns:
dict: Dictionary of the age count of the generated population.
"""
return spb.count_ages(self.popdict)
# convert to work on array
[docs] def get_household_sizes(self):
"""
Create household sizes in the generated population post generation.
Returns:
dict: Dictionary of household size by household id (hhid).
"""
return sphh.get_household_sizes(self.popdict)
# convert to work on array
[docs] def count_household_sizes(self):
"""
Count of household sizes in the generated population.
Returns:
dict: Dictionary of the count of household sizes.
"""
return spb.count_values(self.information.household_sizes)
# convert to work on array
[docs] def get_household_heads(self):
"""Get the ids of the head of households in the generated population post generation."""
return sphh.get_household_heads(self.popdict)
[docs] def get_household_head_ages(self):
"""Get the age of the head of each household in the generated population post generation."""
return {hhid: self.popdict[head_id]['age'] for hhid, head_id in self.information.household_heads.items()}
[docs] def count_household_head_ages(self, bins=None):
"""
Count of household head ages in the generated population.
Args:
bins (array) : If supplied, use this to create a binned count of the household head ages. Otherwise, count discrete household head ages.
Returns:
dict: Dictionary of the count of household head ages.
"""
if bins is None:
return spb.count_values(self.information.household_head_ages)
else:
head_ages = list(self.information.household_head_ages.values())
hist, bins = np.histogram(head_ages, bins=bins, density=0)
return {i: hist[i] for i in range(len(hist))}
[docs] def get_household_head_ages_by_size(self):
"""
Get the count of households by size and the age of the head of the
household, assuming the minimal household members id is the id of the
head of the household.
Returns:
np.ndarray: An array with row as household size and columns as
household head age brackets.
"""
return sphh.get_household_head_ages_by_size(self)
# convert to work on array
[docs] def get_ltcf_sizes(self, keys_to_exclude=[]):
"""
Create long term care facility sizes in the generated population post generation.
Args:
keys_to_exclude (list) : possible keys to exclude for roles in long term care facilities. See notes.
Returns:
dict: Dictionary of the size for each long term care facility generated.
Notes:
keys_to_exclude is an empty list by default, but can contain the
different long term care facility roles: 'snf_res' for residents and
'snf_staff' for staff. If either role is included in the parameter
keys_to_exclude, then individuals with that value equal to 1 will not
be counted.
"""
return spltcf.get_ltcf_sizes(self.popdict, keys_to_exclude)
# convert to work on array
[docs] def count_ltcf_sizes(self, keys_to_exclude=[]):
"""
Count of long term care facility sizes in the generated population.
Args:
keys_to_exclude (list) : possible keys to exclude for roles in long term care facilities. See notes.
Returns:
dict: Dictionary of the count of long term care facility sizes.
Notes:
keys_to_exclude is an empty list by default, but can contain the
different long term care facility roles: 'snf_res' for residents and
'snf_staff' for staff. If either role is included in the parameter
keys_to_exclude, then individuals with that value equal to 1 will not
be counted.
"""
return spb.count_values(self.get_ltcf_sizes(keys_to_exclude))
[docs] def count_enrollment_by_age(self):
"""
Create enrollment count by age for students in the generated population post generation.
Returns:
dict: Dictionary of the count of enrolled students by age in the generated population.
"""
return spsch.count_enrollment_by_age(self.popdict)
@property
def enrollment_rates_by_age(self):
"""
Enrollment rates by age for students in the generated population.
Returns:
dict: Dictionary of the enrollment rates by age for students in the generated population.
"""
return {k: self.information.enrollment_by_age[k]/self.information.age_count[k] if self.information.age_count[k] > 0 else 0 for k in range(defaults.settings.max_age)}
[docs] def count_enrollment_by_school_type(self, *args, **kwargs):
"""
Create enrollment sizes by school types in the generated population post generation.
Returns:
list: List of generated enrollment sizes by school type.
"""
enrollment_by_school_type = spsch.count_enrollment_by_school_type(self.popdict, *args, **kwargs)
return enrollment_by_school_type
[docs] def count_employment_by_age(self):
"""
Create employment count by age for workers in the generated population post generation.
Returns:
dict: Dictionary of the count of employed workers by age in the generated population.
"""
return spw.count_employment_by_age(self.popdict)
@property
def employment_rates_by_age(self):
"""
Employment rates by age for workers in the generated population.
Returns:
dict: Dictionary of the employment rates by age for workers in the generated population.
"""
return {k: self.information.employment_by_age[k]/self.information.age_count[k] if self.information.age_count[k] > 0 else 0 for k in range(defaults.settings.max_age)}
# convert to work on array
[docs] def get_workplace_sizes(self):
"""
Create workplace sizes in the generated population post generation.
Returns:
dict: Dictionary of workplace size by workplace id (wpid).
"""
return spw.get_workplace_sizes(self.popdict)
# convert to work on array
[docs] def count_workplace_sizes(self):
"""
Count of workplace sizes in the generated population.
Returns:
dict:Dictionary of the count of workplace sizes.
"""
return spb.count_values(self.information.workplace_sizes)
[docs] def to_people(self):
''' Convert to the alternative People representation of a population '''
ppl = spp.make_people(popdict=self.popdict, rand_seed=self.rand_seed) # Create the corresponding population
return ppl
[docs] def plot_people(self, *args, **kwargs):
"""Placeholder example of plotting the people in a population."""
ppl = self.to_people()
fig = ppl.plot(*args, **kwargs)
return fig
[docs] def plot_ages(self, **kwargs):
"""
Plot a comparison of the expected and generated age distribution.
**Example**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_ages()
"""
fig, ax = sppl.plot_ages(self, **kwargs)
return fig, ax
[docs] def plot_household_sizes(self, **kwargs):
"""
Plot a comparison of the expected and generated household size distribution.
**Example**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_household_sizes()
"""
fig, ax = sppl.plot_household_sizes(self, **kwargs)
return fig, ax
[docs] def plot_household_head_ages_by_size(self, **kwargs):
"""
Plot a comparison of the expected and generated age distribution of the
household heads by the household size.
**Examples**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_household_head_ages_by_size()
kwargs = pars.copy()
fig, ax = pop.plot_household_head_ages_by_size(**kwargs)
"""
fig, ax = sppl.plot_household_head_ages_by_size(self, **kwargs)
return fig, ax
[docs] def plot_ltcf_resident_sizes(self, **kwargs):
"""
Plot a comparison of the expected and generated ltcf resident sizes.
**Examples**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_ltcf_resident_sizes()
"""
fig, ax = sppl.plot_ltcf_resident_sizes(self, **kwargs)
return fig, ax
# def plot_ltcf_resident_staff_ratios(self, **kwargs):
# """
# Plot a comparison of the expected and generated ltcf resident to staff
# ratios.
# **Examples**::
# pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
# pop = sp.Pop(**pars)
# fig, ax = pop.plot_ltcf_resident_staff_ratios()
# """
# fig, ax = sppl.plot_ltcf_resident_staff_ratios(self, **kwargs)
# return fig, ax
[docs] def plot_enrollment_rates_by_age(self, **kwargs):
"""
Plot a comparison of the expected and generated enrollment rates by age.
**Example**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_enrollment_rates_by_age()
"""
fig, ax = sppl.plot_enrollment_rates_by_age(self, **kwargs)
return fig, ax
[docs] def plot_employment_rates_by_age(self, **kwargs):
"""
Plot a comparison of the expected and generated employment rates by age.
**Example**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_employment_rates_by_age()
"""
fig, ax = sppl.plot_employment_rates_by_age(self, **kwargs)
return fig, ax
[docs] def plot_school_sizes(self, *args, **kwargs):
"""
Plot a comparison of the expected and generated school size distributions by school type.
**Example**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_school_sizes()
"""
fig, ax = sppl.plot_school_sizes(self, *args, **kwargs)
return fig, ax
[docs] def plot_workplace_sizes(self, **kwargs):
"""
Plot a comparison of the expected and generated workplace sizes for
workplaces that are not schools or long term care facilities.
**Examples**::
pars = {'n': 10e3, 'location':'seattle_metro', 'state_location':'Washington', 'country_location':'usa'}
pop = sp.Pop(**pars)
fig, ax = pop.plot_ltcf_resident_sizes()
"""
fig, ax = sppl.plot_workplace_sizes(self, **kwargs)
return fig, ax
[docs]def make_population(*args, **kwargs):
'''
Interface to sp.Pop().to_dict(). Included for backwards compatibility.
'''
log.debug('make_population()')
deprecated = ['generate', 'datadir', 'sheet_name', 'verbose', 'plot', 'write', 'return_popdict', 'use_demography']
for key in list(kwargs.keys()):
if key in deprecated:
log.warning(f'You have specified parameter {key}, but this parameter is deprecated and will be ignored.')
kwargs.pop(key)
# Heavy lift 1: make the contacts and their connections
log.debug('Generating a new population...')
pop = Pop(*args, **kwargs)
population = pop.to_dict()
log.debug('make_population(): done.')
return population
[docs]def generate_synthetic_population(*args, **kwargs):
''' For backwards compatibility only. '''
log.warning('This function is deprecated and may be removed in future releases')
return make_population(*args, **kwargs)