Source code for synthpops.schools

"""
This module generates school contacts by class and grade in flexible ways.
Contacts can be clustered into classes and also mixed across the grade and
across the school.

H. Guclu et. al (2016) shows that mixing across grades is low for public schools
in elementary and middle schools. Mixing across grades is however higher in high
schools.

Functions in this module are flexible to allow users to specify the inter-grade
mixing (for 'age_clustered' school_mixing_type), and to choose whether contacts
are clustered within a grade. Clustering contacts across different grades is not
supported because there is no data to suggest that this happens commonly.
"""
from collections import Counter
from itertools import combinations

import sciris as sc
import numpy as np
import networkx as nx
import logging

from . import data_distributions as spdata
from . import defaults

from . import base as spb
from . import sampling as spsamp
from . import contact_networks as spcnx
from .config import logger as log


__all__ = ['get_school_type_labels', 'count_enrollment_by_school_type',
           'get_generated_school_size_distributions', 'count_enrollment_by_age',
           'get_enrollment_rates_by_age',
           'School',
           'Classroom',
           ]


[docs]class School(spb.LayerGroup): """ A class for individual schools and methods to operate on each. Args: kwargs (dict): data dictionary of the school """ def __init__(self, scid=None, sc_type=None, school_mixing_type=None, student_uids=np.array([], dtype=int), teacher_uids=np.array([], dtype=int), non_teaching_staff_uids=np.array([], dtype=int), **kwargs): """ Class constructor for an base empty setting group. Args: **scid (int) : id of the school **sc_type (str) : school type defined by grade/age ranges **school_mixing_type (str) : the mixing type of the school, 'random', 'age_clustered', or 'age_and_class_clustered' if str. Else, None. See sp.schools.add_school_edges() for more information. **student_uids (np.array) : ids of student members **teacher_uids (np.array) : ids of teacher members **non_teaching_staff_uids (np.array) : ids of non_teaching_staff members """ super().__init__(scid=scid, sc_type=sc_type, school_mixing_type=school_mixing_type, student_uids=student_uids, teacher_uids=teacher_uids, non_teaching_staff_uids=non_teaching_staff_uids, **kwargs) self.validate() return
[docs] def validate(self): """ Check that information supplied to make a school is valid and update to the correct type if necessary. """ for key in ['student_uids', 'teacher_uids', 'non_teaching_staff_uids']: if key in self.keys(): try: self[key] = sc.promotetoarray(self[key], dtype=int) except: errmsg = f"Could not convert school key {key} to an np.array() with type int. This key only takes arrays with int values." raise TypeError(errmsg) for key in ['scid']: if key in self.keys(): if not isinstance(self[key], (int)): if self[key] is not None: errmsg = f"Error: Expected type int or None for school key {key}. Instead the type of this value is {type(self[key])}." raise TypeError(errmsg) for key in ['sc_type']: if key in self.keys(): if not isinstance(self[key], str): if self[key] is not None: errmsg = f"Error: Expected type str or None school key {key}." raise TypeError(errmsg) return
@property def member_uids(self): """ Return ids of all school members: students, teachers, and non teaching staff. Returns: np.ndarray : school member ids """ return np.concatenate((self['student_uids'], self['teacher_uids'], self['non_teaching_staff_uids']))
[docs] def member_ages(self, age_by_uid): """ Return ages of all school members: students, teachers, and non teaching staff. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray: school member ages """ return np.concatenate((self.student_ages(age_by_uid), self.teacher_ages(age_by_uid), self.non_teaching_staff_ages(age_by_uid)))
[docs] def student_ages(self, age_by_uid): """ Return student ages in the school. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : student ages in school """ return super().member_ages(age_by_uid, self['student_uids'])
[docs] def teacher_ages(self, age_by_uid): """ Return teacher ages in the school. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : teacher ages in school """ return super().member_ages(age_by_uid, self['teacher_uids'])
[docs] def non_teaching_staff_ages(self, age_by_uid): """ Return non-teaching staff ages in the school. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : non-teaching staff ages in school """ return super().member_ages(age_by_uid, self['non_teaching_staff_uids'])
def __len__(self): """Return the length as the number of members in the school.""" return len(self.member_uids)
[docs] def get_classroom(self, clid): """ Return the classroom indexed at clid if school_mixing_type is equal to 'age_and_class_clustered'. Args: clid (int) : classroom id number Returns: sp.Classroom : the classroom indexed at clid """ if self['school_mixing_type'] == 'age_and_class_clustered': if not isinstance(clid, int): raise TypeError("clid must be an int.") if len(self['classrooms']) <= clid: raise IndexError(f"Classroom id (clid): {clid} out of range.") return self['classrooms'][clid] else: return
[docs]class Classroom(spb.LayerGroup): """ A class for individual classrooms and methods to operate on each. Args: kwargs (dict): data dictionary of the classroom """ def __init__(self, clid=None, student_uids=np.array([], dtype=int), teacher_uids=np.array([], dtype=int), **kwargs): """ Class constructor for an base empty setting group. Args: **clid (int) : id of the classroom **student_uids (np.array) : ids of student members **teacher_uids (np.array) : ids of teacher members """ super().__init__(clid=clid, student_uids=student_uids, teacher_uids=teacher_uids, **kwargs) self.validate() return
[docs] def validate(self): """ Check that information supplied to make a school is valid and update to the correct type if necessary. """ for key in ['student_uids', 'teacher_uids']: if key in self.keys(): try: self[key] = sc.promotetoarray(self[key], dtype=int) except: errmsg = f"Could not convert classroom key {key} to a np.array()" raise TypeError(errmsg) for key in ['clid']: if key in self.keys(): if not isinstance(self[key], int): if self[key] is not None: errmsg = f"Error: Expected type int or None for classroom key {key}." raise TypeError(errmsg) return
@property def member_uids(self): """ Return ids of all classroom members: students and teachers. Returns: np.ndarray : classroom member ids """ return np.concatenate((self['student_uids'], self['teacher_uids']))
[docs] def member_ages(self, age_by_uid): """ Return ages of all classroom members: students and teachers. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : classroom member ages """ return np.concatenate((self.student_ages(age_by_uid), self.teacher_ages(age_by_uid)))
[docs] def student_ages(self, age_by_uid): """ Return student ages in the classroom. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : student ages in classroom """ return super().member_ages(age_by_uid, self['student_uids'])
[docs] def teacher_ages(self, age_by_uid): """ Return teacher ages in the classroom. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : teacher ages in classroom """ return super().member_ages(age_by_uid, self['teacher_uids'])
def __len__(self): """Return the length as the number of members in the classroom.""" return len(self.member_uids)
def get_school(pop, scid): """ Return school with id: scid. Args: pop (sp.Pop) : population scid (int) : school id number Returns: sp.School: A populated school. """ if not isinstance(scid, int): raise TypeError(f"scid must be an int.") if len(pop.schools) <= scid: raise IndexError(f"School id (scid): {scid} out of range.") return pop.schools[scid] def get_classroom(pop, scid, clid): """ Return the classroom indexed at clid if school_mixing_type is equal to 'age_and_class_clustered'. Args: pop (sp.Pop) : population scid (int) : school id number Returns: sp.Classroom: A populated classroom. """ school = get_school(pop, scid) return school.get_classroom(clid) def add_school(pop, school): """ Add a school to the list of schools. Args: pop (sp.Pop) : population school (sp.School) : school """ if not isinstance(school, School): raise ValueError('school is not a sp.School') # ensure scid to match the index in the list if school['scid'] != len(pop.schools): school['scid'] = len(pop.schools) pop.schools.append(school) pop.n_schools = len(pop.schools) return def add_classroom(school, classroom): """ Add a classroom to the school. Args: school (sp.School) : school classroom (sp.Classroom) : classroom """ if not isinstance(school, School): raise ValueError('school is not a sp.School') if not isinstance(classroom, Classroom): raise ValueError('classroom is not a sp.Classroom') # ensure scid to match the index in the list if classroom['scid'] != len(school['classrooms']): school['scid'] = len(school['classrooms']) school['classrooms'].append(classroom) school['n_classrooms'] = len(school['classrooms']) return def initialize_empty_schools(pop, n_schools=None): """ Array of empty schools. Args: pop (sp.Pop) : population n_schools (int) : the number of schools to initialize """ if n_schools is not None and isinstance(n_schools, int): pop.n_schools = n_schools else: pop.n_schools = 0 pop.schools = [School() for ns in range(pop.n_schools)] return def initialize_empty_classrooms(school, n_classrooms=None): """ Array of empty classrooms. Args: school (sp.School) : school n_classrooms (int) : the number of classrooms to initialize """ if school['school_mixing_type'] == 'age_and_class_clustered': if n_classrooms is not None and isinstance(n_classrooms, int): school['n_classrooms'] = n_classrooms else: school['n_classrooms'] = 0 school['classrooms'] = [Classroom() for nc in range(school['n_classrooms'])] return def populate_schools(pop, student_lists, teacher_lists, non_teaching_staff_lists, age_by_uid, school_types=None, school_mixing_types=None): """ Populate all of the schools. Store each school at the index corresponding to it's scid. Args: pop (sp.Pop) : population student_lists (list) : list of lists where each sublist represents a school and contains the ids of the students teacher_lists (list) : list of lists where each sublist represents a school and contains the ids of the teachers non_teaching_staff_lists (list) : list of lists where each sublist represents a school and contains the ids of the non teaching staff age_by_uid (dict) : dictionary mapping each person's id to their age school_types (list) : list of the school types school_mixing_types (list) : list of the school mixing types """ initialize_empty_schools(pop, len(student_lists)) log.debug("Populating schools.") if school_types is None: school_types = [None for ns in range(len(student_lists))] if school_mixing_types is None: school_mixing_types = [None for ns in range(len(student_lists))] for ns in range(len(student_lists)): students = student_lists[ns] teachers = teacher_lists[ns] non_teaching_staff = non_teaching_staff_lists[ns] sc_type = school_types[ns] school_mixing_type = school_mixing_types[ns] kwargs = dict(scid=ns, sc_type=sc_type, school_mixing_type=school_mixing_type, student_uids=students, teacher_uids=teachers, non_teaching_staff_uids=non_teaching_staff, ) school = School() school.set_layer_group(**kwargs) pop.schools[school['scid']] = sc.dcp(school) return def populate_classrooms(school, student_lists, teacher_lists, age_by_uid): """ Populate all of the classrooms in a school if school_mixing_type == 'age_and_class_clustered'. Store each school at the index corresponding to it's scid. Args: school (sp.School) : school student_lists (list) : list of lists where each sublist represents a classroom and contains the ids of the students teacher_lists (list) : list of lists where each sublist represents a classroom and contains the ids of the teachers age_by_uid (dict) : dictionary mapping each person's id to their age """ if school['school_mixing_type'] == 'age_and_class_clustered': if len(school['classrooms']) < len(student_lists): log.debug(f"Reinitializing list of classrooms") initialize_empty_classrooms(school, len(student_lists)) log.debug("Populating classrooms.") for nc in range(len(student_lists)): students = student_lists[nc] teachers = teacher_lists[nc] kwargs = dict(clid=nc, student_uids=students, teacher_uids=teachers, ) classroom = Classroom() classroom.set_layer_group(**kwargs) school['classrooms'][classroom['clid']] = sc.dcp(classroom) return
[docs]def get_school_type_labels(): school_type_labels = {'pk': 'Pre-school', 'es': 'Elementary School', 'ms': 'Middle School', 'hs': 'High School', 'uv': 'University'} return school_type_labels
def get_uids_in_school(datadir, n, location, state_location, country_location, age_by_uid=None, homes_by_uids=None, folder_name=None, use_default=False): """ Identify who in the population is attending school based on enrollment rates by age. Args: datadir (string) : The file path to the data directory. n (int) : The number of people in the population. location (string) : The name of the location. state_location (string) : The name of the state the location is in. country_location (string) : The name of the country the location is in. age_by_uid (dict) : A dictionary mapping ID to age for all individuals in the population. homes_by_uids (list) : A list of lists where each sublist is a household and the IDs of the household members. folder_name (string) : The name of the folder the location is in, e.g. 'contact_networks' use_default (bool) : If True, try to first use the other parameters to find data specific to the location under study; otherwise, return default data drawing from default_location, default_state, default_country. Returns: A dictionary of students in schools mapping their ID to their age, a dictionary of students in school mapping age to the list of IDs with that age, and a dictionary mapping age to the number of students with that age. """ uids_in_school = {} uids_in_school_by_age = {} ages_in_school_count = dict.fromkeys(np.arange(101), 0) rates = spdata.get_school_enrollment_rates(datadir, location=location, state_location=state_location, country_location=country_location, use_default=use_default) for a in np.arange(101): uids_in_school_by_age[a] = [] # go through homes and make a list of uids going to school as students, this should preserve ordering of students by homes and so create schools with siblings going to the same school for home in homes_by_uids: for uid in home: a = age_by_uid[uid] if rates[a] > 0: b = np.random.binomial(1, rates[a]) # ask each person if they'll be a student - probably could be done in a faster, more aggregate way. if b: uids_in_school[uid] = a uids_in_school_by_age[a].append(uid) ages_in_school_count[a] += 1 return uids_in_school, uids_in_school_by_age, ages_in_school_count def send_students_to_school_with_school_types(school_size_distr_by_type, school_size_brackets, uids_in_school, uids_in_school_by_age, ages_in_school_count, school_types_distr_by_age, school_type_age_ranges): """ A method to send students to school together. This method uses the dictionaries school_types_distr_by_age, school_type_age_ranges, and school_size_distr_by_type to first determine the type of school based on the age of a sampled reference student. Then the school type is used to determine the age range of the school. After that, the size of the school is then sampled conditionally on the school type and then the rest of the students are chosen from the lists of students available in the dictionary uids_in_school_by_age. This method is not perfect and requires a strict definition of school type by age. For now, it is not able to model mixed school types such as schools with Kindergarten through Grade 8 (K-8), or Kindergarten through Grade 12. These mixed types of schools may be common in some settings and this feature may be added later. Args: school_size_distr_by_type (dict) : A dictionary of school size distributions binned by size groups or brackets for each school type. school_size_brackets (dict) : A dictionary of school size brackets. uids_in_school (dict) : A dictionary of students in school mapping ID to age. uids_in_school_by_age (dict) : A dictionary of students in school mapping age to the list of IDs with that age. ages_in_school_count (dict) : A dictionary mapping age to the number of students with that age. school_types_distr_by_age (dict) : A dictionary of the school type for each age. school_type_age_ranges (dict) : A dictionary of the age range for each school type. Returns: Two lists of lists and third flat list, the first where each sublist is the ages of students in the same school, and the second is the same list but with the IDs of each student in place of their age. The third is a list of the school types for each school, where each school has a single string to represent it's school type. """ student_age_lists = [] student_uid_lists = [] school_types = [] sorted_size_brackets = sorted(school_size_brackets.keys()) ages_in_school_distr = spb.norm_dic(ages_in_school_count) age_keys = list(ages_in_school_count.keys()) while len(uids_in_school): new_student_ages = [] new_student_uids = [] aindex = age_keys[spsamp.fast_choice(ages_in_school_distr.values())] uid = uids_in_school_by_age[aindex][0] uids_in_school_by_age[aindex].remove(uid) uids_in_school.pop(uid, None) ages_in_school_count[aindex] -= 1 ages_in_school_distr = spb.norm_dic(ages_in_school_count) new_student_ages.append(aindex) new_student_uids.append(uid) school_types_possible = sorted(school_types_distr_by_age[aindex].keys()) prob = [school_types_distr_by_age[aindex][s] for s in school_types_possible] school_type = np.random.choice(school_types_possible, p=prob, size=1)[0] school_type_age_range = school_type_age_ranges[school_type] school_size_distr = school_size_distr_by_type[school_type] prob_by_sorted_size_brackets = [school_size_distr[b] for b in sorted_size_brackets] size_bracket = np.random.choice(sorted_size_brackets, p=prob_by_sorted_size_brackets) size = np.random.choice(school_size_brackets[size_bracket]) size -= 1 potential_student_ages = [] for a in school_type_age_range: potential_student_ages.extend([a] * ages_in_school_count[a]) if size >= len(potential_student_ages): size = len(potential_student_ages) school_age_count = {a: ages_in_school_count[a] for a in school_type_age_range} other_schools = [ns for ns in range(len(student_uid_lists)) if school_types[ns] == school_type] log.debug(f"other schools to merge with {other_schools} {school_type} {size} {school_size_brackets[0][0]}") # school is too small, try to merge it without another school of the same type if (size < school_size_brackets[0][0]) & (len(other_schools) > 0): log.debug(f'School size ({size + 1}) smaller than minimum school size {school_size_brackets[0][0]}. Will try now to merge with another school of the same type already made.') # another random school of the same type rns = other_schools[spsamp.fast_choice(np.ones(len(other_schools)))] for n, a in enumerate(school_type_age_range): count = len(uids_in_school_by_age[a]) school_uids_in_age = uids_in_school_by_age[a] new_student_ages.extend([a for i in range(count)]) new_student_uids.extend(school_uids_in_age) ages_in_school_count[a] -= count # add to a previously generated school, add their ages and their uids, school type was already determined student_age_lists[rns].extend(new_student_ages) student_uid_lists[rns].extend(new_student_uids) else: log.debug(f'School size ({size + 1}) smaller than minimum school size {school_size_brackets[0][0]} but there are no other schools of the same type to merge with, so creating this one with however many students are available.') for n, a in enumerate(school_type_age_range): count = len(uids_in_school_by_age[a]) school_uids_in_age = uids_in_school_by_age[a] new_student_ages.extend([a for i in range(count)]) new_student_uids.extend(school_uids_in_age) ages_in_school_count[a] -= count # add new school to lists although smaller than expected from school size distribution data student_age_lists.append(new_student_ages) student_uid_lists.append(new_student_uids) school_types.append(school_type) else: chosen = np.random.choice(potential_student_ages, size=size, replace=False) school_age_count = Counter(chosen) for n, a in enumerate(school_type_age_range): count = school_age_count[a] school_uids_in_age = uids_in_school_by_age[a][:count] uids_in_school_by_age[a] = uids_in_school_by_age[a][count:] new_student_ages += [a for i in range(count)] new_student_uids += school_uids_in_age ages_in_school_count[a] -= count # have created a new school and now adding the school with students to the lists for each data type (age, uid, and school type) student_age_lists.append(new_student_ages) student_uid_lists.append(new_student_uids) school_types.append(school_type) # having placed the students in the appropriate school, either a new one or an old one when sizes are too small, remove these students from those available to place in future schools for uid in new_student_uids: uids_in_school.pop(uid, None) ages_in_school_distr = spb.norm_dic(ages_in_school_count) return student_age_lists, student_uid_lists, school_types # adding edges to the popdict, either from an edgelist or groups (groups are better when you have fully connected graphs - no need to enumerate for n*(n-1)/2 edges!) def add_contacts_from_edgelist(popdict, edgelist, setting): """ Add contacts to popdict from edges in an edgelist. Note that this simply adds to the contacts already in the layer and does not overwrite the contacts. Args: popdict (dict) : dict of people edgelist (list) : list of edges setting (str) : social setting layer Returns: Updated popdict. """ for e in edgelist: i, j = e popdict[i]['contacts'][setting].add(j) popdict[j]['contacts'][setting].add(i) return popdict def add_contacts_from_group(popdict, group, setting): """ Add contacts to popdict from fully connected group. Note that this simply adds to the contacts already in the layer and does not overwrite the contacts. Args: popdict (dict) : dict of people group (list) : list of people in group setting (str) : social setting layer Returns: Updated popdict. """ for i in group: popdict[i]['contacts'][setting] = popdict[i]['contacts'][setting].union(group) popdict[i]['contacts'][setting].remove(i) return popdict def generate_random_contacts_for_additional_school_members(school_uids, additional_school_member_uids, average_additional_school_members_degree=20): """ Generate random contacts for additional school members. This might be people like non teaching staff such as principals, administrative staff, cleaning staff, or school nurses. Args: school_uids (list) : list of uids of individuals already in the school additional_school_member_uids (list) : list of uids of the additional school member who do not have contacts yet or for whom more contacts are needed average_additional_school_members_degree (float) : average degree for the additional school members Returns: List of edges for the additional school members in school. """ edges = [] all_school_uids = school_uids.copy() + additional_school_member_uids.copy() for uid in additional_school_member_uids: k = np.random.poisson(average_additional_school_members_degree) possible_neighbors = all_school_uids.copy() possible_neighbors.remove(uid) new_neighbours = np.random.choice(possible_neighbors, k) for j in new_neighbours: e = (uid, j) edges.append(e) return edges def generate_random_classes_by_grade_in_school(student_uids, student_ages, age_by_uid, grade_age_mapping, age_grade_mapping, average_class_size=20, inter_grade_mixing=0.1): """ Generate edges for contacts mostly within the same age/grade. Edges are randomly distributed so that clustering is roughly average_class_size/size of the grade. Inter grade mixing is done by rewiring edges, specifically swapping endpoints of pairs of randomly sampled edges. Args: student_uids (list) : list of uids of students in the school student_ages (list) : list of the ages of the students in the school age_by_uid (dict) : dict mapping uid to age grade_age_mapping (dict) : dict mapping grade to an age age_grade_mapping (dict) : dict mapping age to a grade average_class_size (float) : average class size inter_grade_mixing (float) : percent of edges that rewired to create edges across grades in schools when school_mixing_type is 'age_clustered' Returns: List of edges between students in school. """ # what are the ages in the school age_counter = Counter(student_ages) age_keys = sorted(age_counter.keys()) age_keys_indices = {a: i for i, a in enumerate(age_keys)} # create a dictionary with the list of uids for each age/grade uids_in_school_by_age = {} for a in age_keys: uids_in_school_by_age[a] = [] for uid in student_uids: a = age_by_uid[uid] uids_in_school_by_age[a].append(uid) age_groups_smaller_than_degree = False for a in uids_in_school_by_age: if average_class_size > len(uids_in_school_by_age[a]): age_groups_smaller_than_degree = True # create a graph of contacts in the school G = nx.Graph() for a in uids_in_school_by_age: # for Erdos Renyi graph of N nodes and average degree k, p is essentially the density of all possible edges --> p = # edges / # all possible edges. With average degree k, # of edges is roughly N * k / 2 and # of all possible edges is N * (N-1) / 2, which leads us to k = (N - 1) * p or, in Stirling's Approx. k = N * p, that is p = k / N Ga = spcnx.random_graph_model(uids_in_school_by_age[a], average_class_size) for e in Ga.edges(): i, j = e # add each edge to the overall school graph G.add_edge(uids_in_school_by_age[a][i], uids_in_school_by_age[a][j]) # make sure all students are in the graph by adding those without an edge yet missing_uids = set(student_uids) - set(G.nodes()) G.add_nodes_from(missing_uids) # flag was turned on to indicate that the average degree is too low. How can we add more edges? do the following: create a second random graph across the entire school. Loop over everyone and grab edges as necessary. Loop again to remove edges if it's too many. if age_groups_smaller_than_degree: G = add_random_contacts_from_graph(G, average_class_size) # rewire some edges between people within the same grade/age to now being edges across grades/ages E = list(G.edges()) np.random.shuffle(E) nE = int(len(E) / 2.) # we'll loop over edges in pairs so only need to loop over half the length missed_rewiring = 0 for n in range(nE): if np.random.binomial(1, p=inter_grade_mixing): i = 2 * n j = 2 * n + 1 ei = E[i] ej = E[j] ei1, ei2 = ei ej1, ej2 = ej # try to switch from ei1-ei2, ej1-ej2 to ei1-ej2, ej1-ei2 if ei1 != ej1 and ei2 != ej2 and ei1 != ej2 and ej1 != ei2: new_ei = (ei1, ej2) new_ej = (ei2, ej1) # instead try to switch from ei1-ei2, ej1-ej2 to ei1-ej1, ei2-ej2 elif ei1 != ej2 and ei2 != ej1 and ei1 != ej1 and ej2 != ei2: new_ei = (ei1, ej1) new_ej = (ei2, ej2) else: missed_rewiring += 1 continue G.remove_edges_from([ei, ej]) G.add_edges_from([new_ei, new_ej]) # calculate school age mixing and print some debugging statements if logging.getLevelName(log.level) == 'DEBUG': # pragma: no cover print(f"clustering within age/grade clustered school: {nx.transitivity(G)}") print(f"missed rewiring {missed_rewiring} edge pairs out of {nE} possible pairs.") ecount = np.zeros((len(age_keys), len(age_keys))) for e in G.edges(): i, j = e age_i = age_by_uid[i] index_i = age_keys_indices[age_i] age_j = age_by_uid[j] index_j = age_keys_indices[age_j] ecount[index_i][index_j] += 1 ecount[index_j][index_i] += 1 print(f"within school age mixing matrix\n {ecount}") return list(G.edges()) def generate_clustered_classes_by_grade_in_school(student_uids, student_ages, age_by_uid, grade_age_mapping, age_grade_mapping, average_class_size=20, return_edges=False): """ Generate edges for contacts mostly within the same age/grade. Edges are randomly distributed so that clustering is roughly average_class_size/size of the grade. The last classroom created may be much smaller than the average_class_size. Args: student_uids (list) : list of uids of students in the school student_ages (list) : list of the ages of the students in the school age_by_uid (dict) : dict mapping uid to age grade_age_mapping (dict) : dict mapping grade to an age age_grade_mapping (dict) : dict mapping age to a grade average_class_size (float) : average class size return_edges (bool) : If True, return edges, else return two groups of contacts - students and teachers for each class Returns: List of edges between students in school or groups of contacts. """ # what are the ages in the school age_counter = Counter(student_ages) age_keys = sorted(age_counter.keys()) age_keys_indices = {a: i for i, a in enumerate(age_keys)} # create a dictionary with the list of uids for each age/grade uids_in_school_by_age = {} for a in age_keys: uids_in_school_by_age[a] = [] for uid in student_uids: a = age_by_uid[uid] uids_in_school_by_age[a].append(uid) G = nx.Graph() nodes_left = [] groups = [] for a in uids_in_school_by_age: nodes = sc.dcp(uids_in_school_by_age[a]) np.random.shuffle(nodes) while len(nodes) > 0: cluster_size = np.random.poisson(average_class_size) if cluster_size > len(nodes): # gather the last group of nodes into a pool to choose from afterwards nodes_left += list(nodes) break group = nodes[:cluster_size] if cluster_size > 0: groups.append(group) nodes = nodes[cluster_size:] # shuffle the students left over to place into classrooms np.random.shuffle(nodes_left) while len(nodes_left) > 0: cluster_size = np.random.poisson(average_class_size) if cluster_size > len(nodes_left): cluster_size = len(nodes_left) break group = nodes_left[:cluster_size] if cluster_size > 0: groups.append(group) nodes_left = nodes_left[cluster_size:] # with some school sizes and parameter values you may not have made any classrooms yet if len(groups) == 0: groups.append(nodes_left[:cluster_size]) nodes_left = nodes_left[cluster_size:] else: for i in nodes_left: ng = spsamp.fast_choice(np.ones(len(groups))) # choose one of the other classes to add to groups[ng].append(i) if return_edges: # pragma: no cover for ng in range(len(groups)): group = groups[ng] Gn = nx.complete_graph(len(group)) for e in Gn.edges(): i, j = e node_i = group[i] node_j = group[j] G.add_edge(node_i, node_j) if logging.getLevelName(log.level) == 'DEBUG': # pragma: no cover if return_edges: ecount = np.zeros((len(age_keys), len(age_keys))) for e in G.edges(): i, j = e age_i = age_by_uid[i] index_i = age_keys_indices[age_i] age_j = age_by_uid[j] index_j = age_keys_indices[age_j] ecount[index_i][index_j] += 1 ecount[index_j][index_i] += 1 print(f"within school age mixing matrix\n{ecount}") if return_edges: return list(G.edges()) else: # if returning groups, much easier to add to population dictionaries and assign teachers to a single class return groups def generate_edges_between_teachers(teacher_uids, average_teacher_teacher_degree): """ Generate edges between teachers. Args: teachers (list) : a list of teachers average_teacher_teacher_degree (int) : average number of contacts with other teachers Return: List of edges between teachers. """ edges = [] if average_teacher_teacher_degree > len(teacher_uids): eiter = combinations(teacher_uids, 2) edges = [e for e in eiter] else: G = spcnx.random_graph_model(teacher_uids, average_teacher_teacher_degree) for e in G.edges(): i, j = e teacher_i = teacher_uids[i] teacher_j = teacher_uids[j] e = (teacher_i, teacher_j) edges.append(e) return edges def generate_edges_for_teachers_in_random_classes(student_uids, student_ages, teacher_uids, age_by_uid, average_student_teacher_ratio=20, average_teacher_teacher_degree=4): """ Generate edges for teachers, including to both students and other teachers at the same school. Well mixed contacts within the same age/grade, some cross grade mixing. Teachers are clustered by grade mostly. Args: student_uids (list) : list of uids of students in the school student_ages (list) : list of the ages of the students in the school teacher_uids (list) : list of teachers in the school age_by_uid (dict) : dict mapping uid to age grade_age_mapping (dict) : dict mapping grade to an age age_grade_mapping (dict) : dict mapping age to a grade average_student_teacher_ratio (float) : average number of students per teacher average_teacher_teacher_degree (float) : average number of contacts with other teachers Return: List of edges connected to teachers. """ age_keys = list(set(student_ages)) # create a dictionary with the list of uids for each age/grade uids_in_school_by_age = {} for a in age_keys: uids_in_school_by_age[a] = [] for uid in student_uids: a = age_by_uid[uid] uids_in_school_by_age[a].append(uid) edges = [] teachers_assigned = [] available_teachers = sc.dcp(teacher_uids) for a in uids_in_school_by_age: n_teachers_needed = int(np.round(len(uids_in_school_by_age[a]) / average_student_teacher_ratio, 1)) n_teachers_needed = max(1, n_teachers_needed) # at least one teacher if n_teachers_needed > len(available_teachers) + len(teachers_assigned): n_teachers_needed = len(available_teachers) + len(teachers_assigned) selected_teachers = available_teachers + teachers_assigned elif n_teachers_needed > len(available_teachers): selected_teachers = available_teachers n_teachers_needed = n_teachers_needed - len(available_teachers) selected_teachers += list(np.random.choice(teachers_assigned, replace=False, size=n_teachers_needed)) else: selected_teachers = np.random.choice(available_teachers, replace=False, size=n_teachers_needed) for t in selected_teachers: available_teachers.remove(t) teachers_assigned.append(t) # only adds one teacher per student for student in uids_in_school_by_age[a]: teacher = np.random.choice(selected_teachers) e = (student, teacher) edges.append(e) # some teachers left so add them as contacts to other students for teacher in available_teachers: n_students = max(1, np.random.poisson(average_student_teacher_ratio)) if n_students > len(student_uids): n_students = len(student_uids) selected_students = np.random.choice(student_uids, replace=False, size=n_students) for student in selected_students: e = (student, teacher) edges.append(e) teachers_assigned.append(teacher) available_teachers = [] teacher_teacher_edges = generate_edges_between_teachers(teachers_assigned, average_teacher_teacher_degree) edges += teacher_teacher_edges G = nx.Graph() G.add_edges_from(edges) for s in student_uids: log.debug(f"student {s}, age: {age_by_uid[s]}, has {G.degree(s)} contacts with teachers") for t in teachers_assigned: log.debug(f"teacher {t}, age: {age_by_uid[t]}, has {G.degree(t)} contacts with students") # not returning student-student contacts return edges def generate_edges_for_teachers_in_clustered_classes(groups, teacher_uids, average_teacher_teacher_degree=4, return_edges=False): """ Generate edges for teachers, including to both students and other teachers at the same school. Students and teachers are clustered into disjoint classes. Args: groups (list) : list of lists of students, clustered into groups mostly by grade teacher_uids (list) : list of teachers in the school average_teacher_teacher_degree (float) : average number of contacts with other teachers return_edges (bool) : If True, return edges, else return two groups of contacts - students and teachers for each class Return: List of edges connected to teachers. """ edges = [] teacher_groups = [] np.random.shuffle(groups) # shuffle the clustered groups of students / classes so that the classes aren't ordered from youngest to oldest available_teachers = sc.dcp(teacher_uids) # have exactly as many teachers as needed if len(groups) == len(available_teachers): for ng, t in enumerate(available_teachers): teacher_groups.append([t]) available_teachers = [] # you don't have enough teachers to cover the classes so break the extra groups up elif len(groups) > len(available_teachers): n_groups_to_break = len(groups) - len(available_teachers) # grab the last cluster and split it up and spread the students to the other groups for ngb in range(n_groups_to_break): group_to_break = groups[-1] for student in group_to_break: ng = np.random.randint(len(groups) - 1) # find another class to join groups[ng].append(student) groups = groups[:-1] for ng, t in enumerate(available_teachers): teacher_groups.append([t]) available_teachers = [] elif len(groups) < len(available_teachers): for ng, group in enumerate(groups): # class size already determines that each class gets at least one teacher and make that a list - maybe we can add other teachers some other way teacher_groups.append([available_teachers[ng]]) available_teachers = available_teachers[len(groups):] # spread extra teachers among the classes for t in available_teachers: ng = np.random.randint(len(groups)) teacher_groups[ng].append(t) available_teachers = [] # create edges between students and teachers for ng, group in enumerate(groups): for student in group: for teacher in teacher_groups[ng]: e = (student, teacher) edges.append(e) if return_edges: teacher_teacher_edges = [] for ng, teacher_group in enumerate(teacher_groups): teacher_teacher_edges += generate_edges_between_teachers(teacher_group, average_teacher_teacher_degree) edges += teacher_teacher_edges # not returning student-student contacts return edges else: return groups, teacher_groups def generate_random_contacts_across_school(all_school_uids, average_class_size): """ Generate edges for contacts in a school where everyone mixes randomly. Assuming class and thus class size determines effective contacts. Args: all_school_uids (list) : list of uids of individuals in the school average_class_size (int) : average class size or number of contacts in school Returns: List of edges between individuals in school. """ edges = [] G = spcnx.random_graph_model(all_school_uids, average_class_size) # undirected graph for u, uid in enumerate(all_school_uids): es = [(uid, all_school_uids[v]) for v in G.neighbors(u)] edges.extend(es) return edges def add_school_edges(popdict, student_uids, student_ages, teacher_uids, non_teaching_staff_uids, age_by_uid, grade_age_mapping, age_grade_mapping, average_class_size=20, inter_grade_mixing=0.1, average_student_teacher_ratio=20, average_teacher_teacher_degree=3, average_additional_staff_degree=20, school_mixing_type='random'): """ Generate edges for teachers, including to both students and other teachers at the same school. When school_mixing_type is 'age_clustered' then inter_grade_mixing will rewire a fraction of the edges between students in the same age or grade to be edges with any other student in the school. When school_mixing_type is 'random' or 'age_and_class_clustered', inter_grade_mixing has no effect. Args: popdict (dict) : dictionary of people student_uids (list) : list of uids of students in the school student_ages (list) : list of the ages of the students in the school teacher_uids (list) : list of teachers in the school non_teaching_staff_uids (list) : list of non teaching staff in the school age_by_uid (dict) : dict mapping uid to age grade_age_mapping (dict) : dict mapping grade to an age age_grade_mapping (dict) : dict mapping age to a grade average_class_size (float) : average class size inter_grade_mixing (float) : percent of edges that rewired to create edges across grades in schools when school_mixing_type is 'age_clustered' average_student_teacher_ratio (float) : average number of students per teacher average_teacher_teacher_degree (float) : average number of contacts with other teachers average_additional_staff_degree (float) : The average number of contacts per additional non teaching staff in schools. school_mixing_type(str) : 'random' for well mixed schools, 'age_clustered' for well mixed within the same grade and some intermixing with other grades, 'age_and_class_clustered' for disjoint classes in a school by age or grade Return: Updated popdict with edges generated in schools. Notes: average_teacher_teacher_degree will not be used in school_mixing_type == 'random' scenario. """ # completely random contacts across the school, no guarantee of contact with a teacher, much like universities available_school_mixing_types = ['random', 'age_clustered', 'age_and_class_clustered'] if school_mixing_type not in available_school_mixing_types: print(f"school_mixing_type: {school_mixing_type} 'does not exist. Please change this to one of: {available_school_mixing_types}") if school_mixing_type == 'random': school_uids = [] school_uids.extend(student_uids) school_uids.extend(teacher_uids) edges = generate_random_contacts_across_school(school_uids, average_class_size) add_contacts_from_edgelist(popdict, edges, 'S') student_groups = [student_uids] teacher_groups = [teacher_uids] # random contacts across a grade in the school, most edges will across the same age group, much like middle schools or high schools, the inter_grade_mixing parameter is a tuning parameter, students get at least one teacher as a contact elif school_mixing_type == 'age_clustered': edges = generate_random_classes_by_grade_in_school(student_uids, student_ages, age_by_uid, grade_age_mapping, age_grade_mapping, average_class_size, inter_grade_mixing) teacher_edges = generate_edges_for_teachers_in_random_classes(student_uids, student_ages, teacher_uids, age_by_uid, average_student_teacher_ratio, average_teacher_teacher_degree) edges += teacher_edges add_contacts_from_edgelist(popdict, edges, 'S') student_groups = [student_uids] teacher_groups = [teacher_uids] # completely clustered into classes by age, one teacher per class at least elif school_mixing_type == 'age_and_class_clustered': student_groups = generate_clustered_classes_by_grade_in_school(student_uids, student_ages, age_by_uid, grade_age_mapping, age_grade_mapping, average_class_size=average_class_size, return_edges=False) student_groups_2 = sc.dcp(student_groups) student_groups, teacher_groups = generate_edges_for_teachers_in_clustered_classes(student_groups, teacher_uids, average_teacher_teacher_degree=average_teacher_teacher_degree) sum_diff = sum([len(group) for group in student_groups]) - sum([len(group) for group in student_groups_2]) assert sum_diff == 0, f'Check failed. sum of the differences between student groups is not zero. Total school enrollment changed between the step of creating student groups and assigning teachers to each group. sum is {sum_diff}' for ng in range(len(student_groups)): student_group = student_groups[ng] teacher_group = teacher_groups[ng] group = student_group group += teacher_group add_contacts_from_group(popdict, group, 'S') log.debug(f"average_class_size, {average_class_size}, 'class_group sizes', {[len(group) for group in student_groups]}") # additional edges between teachers in different classes - makes distinct clusters connected - this may add edges again between teachers in the same class teacher_edges = generate_edges_between_teachers(teacher_uids, average_teacher_teacher_degree) add_contacts_from_edgelist(popdict, teacher_edges, 'S') all_school_uids = [] all_school_uids.extend(student_uids) all_school_uids.extend(teacher_uids) additional_staff_edges = generate_random_contacts_for_additional_school_members(all_school_uids, non_teaching_staff_uids, average_additional_staff_degree) add_contacts_from_edgelist(popdict, additional_staff_edges, 'S') return popdict, student_groups, teacher_groups def get_school_types_distr_by_age(school_type_age_ranges): """ Return probabilities of school type for each age. For now assuming no overlapping of grades between school types. Return: A dictionary of default probabilities for the school type likely for each age. """ school_types_distr_by_age = {} for a in range(101): school_types_distr_by_age[a] = dict.fromkeys(list(school_type_age_ranges.keys()), 0.) for k in school_type_age_ranges.keys(): for a in school_type_age_ranges[k]: school_types_distr_by_age[a][k] = 1. return school_types_distr_by_age def get_school_types_by_age_single(school_types_distr_by_age): """ Return school type by age by assigning the school type with the highest probability. Return: A dictionary of default school type by age. """ school_types_by_age_single = {} for a in range(101): values_to_keys = {school_types_distr_by_age[a][k]: k for k in school_types_distr_by_age[a]} max_v = max(values_to_keys.keys()) max_k = values_to_keys[max_v] if max_v != 0: school_types_by_age_single[a] = max_k return school_types_by_age_single def get_school_type_data(datadir, location, state_location, country_location, use_default=False): """ Get location specific distributions on school type data if it's available for all the distributions of interest, otherwise return default data if use_default. Args: datadir (string) : file path to the data directory location (string) : name of the location state_location (string) : name of the state the location is in country_location (string) : name of the country the location is in use_default (bool) : if True, try to first use the other parameters to find data specific to the location under study, otherwise returns default data drawing from Seattle, Washington. Returns: 3 dictionaries necessary to generate schools by the type of school (i.e. elementary, middle, high school, etc.). """ school_size_distr_by_type = spdata.get_school_size_distr_by_type(datadir, location=location, state_location=state_location, country_location=country_location, use_default=use_default) school_size_brackets = spdata.get_school_size_brackets(datadir, location=location, state_location=state_location, country_location=country_location, use_default=use_default) # for right now the size distribution for all school types will use the same brackets or bins school_type_age_ranges = spdata.get_school_type_age_ranges(datadir, location=location, state_location=state_location, country_location=country_location, use_default=use_default) # if use_default: # school_size_distr_by_type = spdata.get_default_school_size_distr_by_type() # school_size_brackets = spdata.get_default_school_size_distr_brackets() # school_type_age_ranges = spdata.get_default_school_type_age_ranges() # else: # raise ValueError(f"Data unavailable for the location specified. Please check input strings or set use_default to True to use default values.") return school_size_distr_by_type, school_size_brackets, school_type_age_ranges def assign_teachers_to_schools(student_age_lists, student_uid_lists, employment_rates, workers_by_age_to_assign_count, potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count, average_student_teacher_ratio=20, teacher_age_min=25, teacher_age_max=75): """ Assign teachers to each school according to the average student-teacher ratio. Args: student_age_lists (list) : list of lists where each sublist is a school with the ages of the students within student_uid_lists (list) : list of lists where each sublist is a school with the ids of the students within employment_rates (dict) : employment rates by age workers_by_age_to_assign_count (dict) : dictionary of the count of workers left to assign by age potential_worker_uids (dict) : dictionary of potential workers mapping their id to their age potential_worker_uids_by_age (dict) : dictionary mapping age to the list of worker ids with that age potential_worker_ages_left_count (dict) : dictionary of the count of potential workers left that can be assigned by age average_student_teacher_ratio (float) : The average number of students per teacher teacher_age_min (int) : The minimum age for teachers teacher_age_max (int) : The maximum age for teachers Returns: List of lists of schools with the ages of individuals in each, lists of lists of schools with the ids of individuals in each, dictionary of potential workers mapping id to their age, dictionary mapping age to the list of potential workers of that age, dictionary with the count of workers left to assign for each age after teachers have been assigned. """ log.debug('assign_teachers_to_schools()') # matrix method will already get some teachers into schools so student_teacher_ratio should be higher all_teachers = dict.fromkeys(np.arange(101), 0) teacher_age_lists = [] teacher_uid_lists = [] for n in range(len(student_age_lists)): student_ages = student_age_lists[n] student_uids = student_uid_lists[n] # size = len(school_uids) size = len(student_ages) nteachers = int(size / float(average_student_teacher_ratio)) nteachers = max(1, nteachers) # log.debug(f"nteachers {nteachers}, student-teacher ratio, {(size / nteachers):.4f}") teacher_ages = [] teacher_uids = [] for nt in range(nteachers): a = spsamp.sample_from_range(workers_by_age_to_assign_count, teacher_age_min, teacher_age_max) uid = potential_worker_uids_by_age[a][0] teacher_ages.append(a) all_teachers[a] += 1 potential_worker_uids_by_age[a].remove(uid) workers_by_age_to_assign_count[a] -= 1 potential_worker_ages_left_count[a] -= 1 potential_worker_uids.pop(uid, None) teacher_ages.append(a) teacher_uids.append(uid) teacher_age_lists.append(teacher_ages) teacher_uid_lists.append(teacher_uids) if logging.getLevelName(log.level) == 'DEBUG': print(f"nteachers {nteachers}, student-teacher ratio, {(size / nteachers):.4f}") print(f"school with teachers {sorted(student_uids)}") print(f"nkids: {(np.array(student_ages) <= 19).sum()}, n20=>: {(np.array(student_ages) > 19).sum()}") print(f"kid-adult ratio: {np.divide((np.array(student_ages) <= 19).sum() , (np.array(student_ages) > 19).sum())}") return teacher_age_lists, teacher_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count def assign_additional_staff_to_schools(student_uid_lists, teacher_uid_lists, workers_by_age_to_assign_count, potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count, average_student_teacher_ratio=20, average_student_all_staff_ratio=15, staff_age_min=20, staff_age_max=75, with_non_teaching_staff=False): """ Assign additional staff to each school according to the average student to all staff ratio. Args: student_uid_lists (list) : list of lists where each sublist is a school with the ids of the students within teacher_uid_lists (list) : list of lists where each sublist is a school with the ids of the teachers within workers_by_age_to_assign_count (dict) : dictionary of the count of workers left to assign by age potential_worker_uids (dict) : dictionary of potential workers mapping their id to their age potential_worker_uids_by_age (dict) : dictionary mapping age to the list of worker ids with that age potential_worker_ages_left_count (dict) : dictionary of the count of potential workers left that can be assigned by age average_student_teacher_ratio (float) : The average number of students per teacher. average_student_all_staff_ratio (float) : The average number of students per staff members at school (including both teachers and non teachers). staff_age_min (int) : The minimum age for non teaching staff. staff_age_max (int) : The maximum age for non teaching staff. with_non_teaching_staff (bool) : If True, includes non teaching staff. Returns: List of lists of schools with the ids of non teaching staff for each school, dictionary of potential workers mapping id to their age, dictionary mapping age to the list of potential workers of that age, dictionary with the count of workers left to assign for each age after teachers have been assigned. """ log.debug('assign_additional_staff_to_schools()') # with_non_teaching_staff is False so this method will not select anyone to be a non teaching staff member at schools - thus return empty lists for non_teaching_staff_uids if not with_non_teaching_staff: log.debug(f"with_non_teaching_staff: {with_non_teaching_staff}, so this method does not produce additional staff") non_teaching_staff_uid_lists = [[] for student_list in student_uid_lists] return non_teaching_staff_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count if average_student_teacher_ratio < average_student_all_staff_ratio: errormsg = f"The ratio of students to all staff at school ({average_student_all_staff_ratio}) must be lower than or equal to the ratio students to teachers at school ({average_student_teacher_ratio}). All staff includes both teaching and non teaching staff, so if the student to all staff ratio is greater than the student to teacher ratio then this would expect there to be more teachers than all possible staff in a school." raise ValueError(errormsg) n_students_list = [len(student_list) for student_list in student_uid_lists] # what is the number of students in each school n_teachers_list = [len(teacher_list) for teacher_list in teacher_uid_lists] # what is the number of teachers in each school if average_student_all_staff_ratio == 0: raise ValueError(f"The ratio of students to all staff at school is {average_student_all_staff_ratio}. This would mean no students at the school. Try another value greater than 0 and less than the average_student_teacher_ratio: {average_student_teacher_ratio}.") else: n_all_staff_list = [max(1, int(i/average_student_all_staff_ratio)) for i in n_students_list] # need at least one staff member n_non_teaching_staff_list = [n_all_staff_list[i] - n_teachers_list[i] for i in range(len(n_students_list))] min_n_non_teaching_staff = min(n_non_teaching_staff_list) # log.debug(f"list of number of students per school: {n_students_list}") # log.debug(f"list of number of teachers per school: {n_teachers_list}") # log.debug(f"list of number of all staff expected per school: {n_all_staff_list}") # log.debug(f"list of number of non teaching staff expected per school: {n_non_teaching_staff_list}") if min_n_non_teaching_staff <= 0: errormsg = f"At least one school expects only 1 non teaching staff member. Either check the average_student_teacher_ratio ({average_student_teacher_ratio}) and the average_student_all_staff_ratio ({average_student_all_staff_ratio}) if you do not expect this to be the case, or some of the generated schools may have too few staff members." log.debug(errormsg) n_non_teaching_staff_list = [i if i > 0 else 1 for i in n_non_teaching_staff_list] # force one extra staff member beyond teachers non_teaching_staff_uid_lists = [] for i in range(len(n_non_teaching_staff_list)): n_non_teaching_staff = n_non_teaching_staff_list[i] # how many non teaching staff for the school non_teaching_staff_uids_in_this_school = [] for j in range(n_non_teaching_staff): a = spsamp.sample_from_range(workers_by_age_to_assign_count, staff_age_min, staff_age_max) uid = potential_worker_uids_by_age[a][0] workers_by_age_to_assign_count[a] -= 1 potential_worker_ages_left_count[a] -= 1 potential_worker_uids.pop(uid, None) potential_worker_uids_by_age[a].remove(uid) non_teaching_staff_uids_in_this_school.append(uid) non_teaching_staff_uid_lists.append(non_teaching_staff_uids_in_this_school) return non_teaching_staff_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count def add_random_contacts_from_graph(G, average_degree): """ Add additional edges at random to achieve the expected or desired average degree. Args: G (networkx Graph) : networkx Graph object average_degree (int) : expected or desired average degree Returns: Updated networkx Graph object with additional edges added at random. """ nodes = G.nodes() ordered_node_ids = {node: node_id for node_id, node in enumerate(nodes)} ids_to_ordered_nodes = {node_id: node for node_id, node in enumerate(nodes)} if len(nodes) == 0: return G p = average_degree / len(nodes) G2 = spcnx.random_graph_model(nodes, average_degree) for node in nodes: ordered_node_id = ordered_node_ids[node] extra_neighbors = list(G2.neighbors(ordered_node_id)) extra_edges_needed = len(extra_neighbors) - G.degree(node) if extra_edges_needed > 0: extra_neighbors_to_add = np.random.choice(extra_neighbors, extra_edges_needed) for j in extra_neighbors_to_add: neighbor = ids_to_ordered_nodes[j] G.add_edge(node, neighbor) # in case you've added too many edges, let's remove a few - likely to not be hit for node in nodes: ordered_node_id = ordered_node_ids[node] extra_edges_to_remove = G.degree(node) - G2.degree(ordered_node_id) extra_edges_to_remove = int(extra_edges_to_remove / 2.) if extra_edges_to_remove > 0: extra_neighbors_to_remove = np.random.choice(extra_neighbors, extra_edges_to_remove) for j in extra_neighbors_to_remove: neighbor = ids_to_ordered_nodes[j] if G.has_edge(node, neighbor): G.remove_edge(node, neighbor) return G # %% Things added to enable not-by-type and random def generate_school_sizes(school_size_distr_by_bracket, school_size_brackets, uids_in_school): """ Given a number of students in school, generate a list of school sizes to place everyone in a school. Args: school_size_distr_by_bracket (dict) : The distribution of binned school sizes. school_size_brackets (dict) : A dictionary of school size brackets. uids_in_school (dict) : A dictionary of students in school mapping ID to age. Returns: A list of school sizes whose sum is the length of ``uids_in_school``. """ ns = len(uids_in_school) sorted_brackets = sorted(school_size_brackets.keys()) prob_by_sorted_brackets = [school_size_distr_by_bracket[b] for b in sorted_brackets] school_sizes = [] while ns > 0: size_bracket = np.random.choice(sorted_brackets, p=prob_by_sorted_brackets) # size = np.random.choice(school_size_brackets[size_bracket]) # creates some schools that are much smaller than expected so use average instead size = int(np.mean(school_size_brackets[size_bracket])) # use average school size to avoid schools with very small sizes ns -= size school_sizes.append(size) if ns < 0: school_sizes[-1] = school_sizes[-1] + ns np.random.shuffle(school_sizes) return school_sizes def send_students_to_school(school_sizes, uids_in_school, uids_in_school_by_age, ages_in_school_count, age_brackets, age_by_brackets, contact_matrices): """ A method to send students to school together. Using the matrices to construct schools is not a perfect method so some things are more forced than the matrix method alone would create. This method models schools using matrices and so it does not create explicit school types. Args: school_sizes (list) : A list of school sizes. uids_in_school (dict) : A dictionary of students in school mapping ID to age. uids_in_school_by_age (dict) : A dictionary of students in school mapping age to the list of IDs with that age. ages_in_school_count (dict) : A dictionary mapping age to the number of students with that age. age_brackets (dict) : A dictionary mapping age bracket keys to age bracket range. age_by_brackets(dict) : A dictionary mapping age to the age bracket range it falls within. contact_matrices (dict) : A dictionary of age specific contact matrix for different physical contact settings. Returns: Two lists of lists and third flat list, the first where each sublist is the ages of students in the same school, and the second is the same list but with the IDs of each student in place of their age. The third is a list of the school types for each school, where each school has a single string to represent it's school type. """ log.debug('send_students_to_school()') school_age_lists = [] school_uid_lists = [] school_types = [] ages_in_school_distr = spb.norm_dic(ages_in_school_count) left_in_bracket = spb.get_aggregate_ages(ages_in_school_count, age_by_brackets) for n, size in enumerate(school_sizes): if len(uids_in_school) == 0: # no more students left to send to school! break ages_in_school_distr = spb.norm_dic(ages_in_school_count) new_school = [] new_school_uids = [] aindex = spsamp.fast_choice(ages_in_school_distr.values()) bindex = age_by_brackets[aindex] # reference students under 20 to prevent older adults from being reference students (otherwise we end up with schools with too many adults and kids mixing because the matrices represent the average of the patterns and not the bimodal mixing of adult students together at school and a small number of teachers at school with their students) if bindex >= 4: if np.random.binomial(1, p=0.7): aindex = spsamp.fast_choice(ages_in_school_distr.values()) uid = uids_in_school_by_age[aindex][0] uids_in_school_by_age[aindex].remove(uid) uids_in_school.pop(uid, None) ages_in_school_count[aindex] -= 1 ages_in_school_distr = spb.norm_dic(ages_in_school_count) new_school.append(aindex) new_school_uids.append(uid) log.debug(f"reference school age {aindex}, school size {size}, students left {len(uids_in_school)}, {left_in_bracket}") bindex = age_by_brackets[aindex] b_prob = contact_matrices['S'][bindex, :] left_in_bracket[bindex] -= 1 # fewer students than school size so everyone else is in one school if len(uids_in_school) < size: for uid in uids_in_school: ai = uids_in_school[uid] new_school.append(int(ai)) new_school_uids.append(uid) uids_in_school_by_age[ai].remove(uid) ages_in_school_count[ai] -= 1 left_in_bracket[age_by_brackets[ai]] -= 1 uids_in_school = {} log.debug(f"last school, size from distribution: {size}, size generated {len(new_school)}") else: bi_min = max(0, bindex-1) bi_max = bindex + 1 for i in range(1, size): if len(uids_in_school) == 0: break # no one left to send? should only choose other students from the mixing matrices, not teachers so don't create schools with if sum([left_in_bracket[bi] for bi in range(bi_min, bi_max+1)]) == 0: break bi = spsamp.sample_single_arr(b_prob) while left_in_bracket[bi] == 0 or np.abs(bindex - bi) > 1: bi = spsamp.sample_single_arr(b_prob) ai = spsamp.sample_from_range(ages_in_school_distr, age_brackets[bi][0], age_brackets[bi][-1]) uid = uids_in_school_by_age[ai][0] # grab the next student in line new_school.append(ai) new_school_uids.append(uid) uids_in_school_by_age[ai].remove(uid) uids_in_school.pop(uid, None) ages_in_school_count[ai] -= 1 ages_in_school_distr = spb.norm_dic(ages_in_school_count) left_in_bracket[bi] -= 1 school_age_lists.append(new_school) school_uid_lists.append(new_school_uids) school_types.append(None) new_school = np.array(new_school) kids = new_school <= 19 if logging.getLevelName(log.level) == 'DEBUG': print(f"new school size {len(new_school)}, ages: {sorted(new_school)}, nkids: {kids.sum()}, n20=>: {len(new_school) - kids.sum()}, kid-adult ratio: {np.divide(kids.sum() , (len(new_school) - kids.sum()) )}") log.debug(f"people in school {np.sum([len(school) for school in school_age_lists])}, left to send: {len(uids_in_school)}") return school_age_lists, school_uid_lists, school_types
[docs]def count_enrollment_by_age(popdict): """ Get enrollment count by age for students in the popdict. Args: popdict (dict): population dictionary Returns: dict: Dictionary of the count of enrolled students by age in popdict. """ enrollment_count_by_age = dict.fromkeys(np.arange(0, defaults.settings.max_age), 0) for i, person in popdict.items(): if person['scid'] is not None and person['sc_student']: enrollment_count_by_age[person['age']] += 1 return enrollment_count_by_age
[docs]def get_enrollment_rates_by_age(enrollment_count_by_age, age_count): """ Get enrollment rates by age. Args: enrollment_count_by_age (dict) : dictionary of the count of enrolled students age_count (dict) : dictionary of the age count Returns: dict: Dictionary of the enrollment rates by age. """ return {a: enrollment_count_by_age[a] / age_count[a] if age_count[a] > 0 else 0 for a in sorted(age_count.keys())}
[docs]def count_enrollment_by_school_type(popdict, **kwargs): """ Get enrollment sizes by school types in popdict. Args: popdict (dict) : population dictionary **with_school_types (bool) : If True, return enrollment by school types as defined in the popdict. Otherwise, combine all enrollment sizes for a school type of None. **keys_to_exclude (list) : school types to exclude Returns: dict: Dictionary of generated enrollment sizes by school type. """ kwargs = sc.objdict(sc.mergedicts(dict(with_school_types=False, keys_to_exclude=[]), kwargs)) schools = dict() enrollment_by_school_type = dict() for i, person in popdict.items(): if person['scid'] is not None and person['sc_student']: schools.setdefault(person['scid'], dict()) schools[person['scid']]['sc_type'] = person['sc_type'] schools[person['scid']].setdefault('enrolled', 0) schools[person['scid']]['enrolled'] += 1 for i, school_i in schools.items(): enrollment_by_school_type.setdefault(school_i['sc_type'], []) enrollment_by_school_type[school_i['sc_type']].append(school_i['enrolled']) if not kwargs.with_school_types: sc_types = set(enrollment_by_school_type.keys()) if None not in sc_types: enrollment_by_school_type[None] = [] for sc_type in set(sc_types.difference(set(kwargs.keys_to_exclude))): enrollment_by_school_type[None].extend(enrollment_by_school_type[sc_type]) enrollment_by_school_type.pop(sc_type, None) return enrollment_by_school_type
[docs]def get_generated_school_size_distributions(enrollment_by_school_type, bins): """ Get school size distributions by type. Args: enrollment_by_school_type (dict) : generated enrollment sizes by school types bins (list) : school size bins Returns: dict: Dictionary of generated school size distribution by school type. """ generated_school_size_dist = dict() for sc_type in enrollment_by_school_type: sizes = enrollment_by_school_type[sc_type] hist, bins = np.histogram(sizes, bins=bins, density=0) if sum(sizes) > 0: generated_school_size_dist[sc_type] = {i: hist[i] / sum(hist) for i in range(len(hist))} else: generated_school_size_dist[sc_type] = {i: hist[i] for i in range(len(hist))} return generated_school_size_dist