Source code for synthpops.workplaces

from copy import deepcopy
from collections import Counter
import sciris as sc
import numpy as np
from . import base as spb
from . import sampling as spsamp
from .config import logger as log
from . import defaults


__all__ = ['count_employment_by_age', 'get_workplace_sizes',
           'get_employment_rates_by_age',
           'get_generated_workplace_size_distribution',
           'Workplace',
           ]


[docs]class Workplace(spb.LayerGroup): """ A class for individual workplaces and methods to operate on each. Args: kwargs (dict): data dictionary of the workplace """ def __init__(self, wpid=None, **kwargs): """ Class constructor for empty workplace. Args: **wpid (int) : workplace id **member_uids (np.array) : ids of workplace members """ # set up default workplace values super().__init__(wpid=wpid, **kwargs) self.validate() return
[docs] def validate(self): """ Check that information supplied to make a workplace is valid and update to the correct type if necessary. """ super().validate(layer_str='workplace') return
__all__ += ['get_workplace', 'add_workplace', 'initialize_empty_workplaces', 'populate_workplaces']
[docs]def get_workplace(pop, wpid): """ Return workplace with id: wpid. Args: pop (sp.Pop) : population wpid (int) : workplace id number Returns: sp.Workplace: A populated workplace. """ if not isinstance(wpid, int): raise TypeError(f"wpid must be an int. Instead supplied wpid with type: {type(wpid)}.") if len(pop.workplaces) <= wpid: raise IndexError(f"Workplace id (wpid): {wpid} out of range. There are {len(pop.workplaces)} workplaces stored in this object.") return pop.workplaces[wpid]
[docs]def add_workplace(pop, workplace): """ Add a workplace to the list of workplaces. Args: pop (sp.Pop) : population workplace (sp.Workplace) : workplace with at minimum the wpid and member_uids. """ if not isinstance(workplace, Workplace): raise ValueError('workplace is not a sp.Workplace object.') # ensure wpid to match the index in the list if workplace['wpid'] != len(pop.workplaces): workplace['wpid'] = len(pop.workplaces) pop.workplaces.append(workplace) pop.n_workplaces = len(pop.workplaces) return
[docs]def initialize_empty_workplaces(pop, n_workplaces=None): """ Array of empty workplaces. Args: pop (sp.Pop) : population n_workplaces (int) : the number of workplaces to initialize """ if n_workplaces is not None and isinstance(n_workplaces, int): pop.n_workplaces = n_workplaces else: pop.n_workplaces = 0 pop.workplaces = [Workplace() for nw in range(pop.n_workplaces)] return
[docs]def populate_workplaces(pop, workplaces): """ Populate all of the workplaces. Store each workplace at the index corresponding to it's wpid. Args: pop (sp.Pop) : population workplaces (list) : list of lists where each sublist represents a workplace and contains the ids of the workplace members Notes: If number of workplaces (n) is fewer than existing workplaces, it will only replace the first n workplaces. Otherwise the existing workplaces will be overwritten by the input workplaces. """ # make sure there are enough workplaces initialize_empty_workplaces(pop, len(workplaces)) log.debug("Populating workplaces.") # now populate workplaces for nw, wp in enumerate(workplaces): kwargs = dict(wpid=nw, member_uids=wp, ) workplace = Workplace() workplace.set_layer_group(**kwargs) pop.workplaces[workplace['wpid']] = sc.dcp(workplace) return
def get_uids_potential_workers(student_uid_lists, employment_rates, age_by_uid): """ Get IDs for everyone who could be a worker by removing those who are students and those who can't be employed officially. Args: student_uid_lists (list) : A list of lists where each sublist represents a school with the IDs of students in the school. employment_rates (dict) : The employment rates by age. age_by_uid (dict) : A dictionary mapping ID to age for individuals in the population. Returns: A dictionary of potential workers mapping their ID to their age, a dictionary mapping age to the list of IDs for potential workers with that age, and a dictionary mapping age to the count of potential workers left to assign to a workplace for that age. """ log.debug('get_uids_potential_workers()') potential_worker_uids = deepcopy(age_by_uid) potential_worker_uids_by_age = {} potential_worker_ages_left_count = {} for a in range(101): if a >= 15: potential_worker_uids_by_age[a] = [] potential_worker_ages_left_count[a] = 0 # remove students from any potential workers since the model assumes student and worker status are exclusive for students in student_uid_lists: for uid in students: potential_worker_uids.pop(uid, None) for uid in age_by_uid: if age_by_uid[uid] not in employment_rates: potential_worker_uids.pop(uid, None) for uid in potential_worker_uids: ai = potential_worker_uids[uid] # potential_worker_uid[uid] may generate persons who are not valid working age # This will cause a 'key' error in potential__worker_uids_by_age # Since potential_worker_uids_age keys are valid work ages, skip invalid workers if ai in potential_worker_uids_by_age.keys(): potential_worker_uids_by_age[ai].append(uid) potential_worker_ages_left_count[ai] += 1 # shuffle workers around! for ai in potential_worker_uids_by_age: np.random.shuffle(potential_worker_uids_by_age[ai]) return potential_worker_uids, potential_worker_uids_by_age, potential_worker_ages_left_count def generate_workplace_sizes(workplace_size_distr_by_bracket, workplace_size_brackets, workers_by_age_to_assign_count): """ Given a number of individuals employed, generate a list of workplace sizes to place everyone in a workplace. Args: workplace_size_distr_by_bracket (dict) : The distribution of binned workplace sizes. worlplace_size_brackets (dict) : A dictionary of workplace size brackets. workers_by_age_to_assign_count (dict) : A dictionary mapping age to the count of employed individuals of that age. Returns: A list of workplace sizes. """ nworkers = np.sum([workers_by_age_to_assign_count[a] for a in workers_by_age_to_assign_count]) # normalize workplace_size_distr_by_bracket because it's likely a count rather than distribution workplace_size_distr_by_bracket = spb.norm_dic(workplace_size_distr_by_bracket) sorted_brackets = sorted(workplace_size_brackets.keys()) prob_by_sorted_brackets = [workplace_size_distr_by_bracket[b] for b in sorted_brackets] workplace_sizes = [] while nworkers > 0: size_bracket = np.random.choice(sorted_brackets, p=prob_by_sorted_brackets) size = np.random.choice(workplace_size_brackets[size_bracket]) nworkers -= size workplace_sizes.append(size) if nworkers < 0: workplace_sizes[-1] = workplace_sizes[-1] + nworkers np.random.shuffle(workplace_sizes) return workplace_sizes def get_workers_by_age_to_assign(employment_rates, potential_worker_ages_left_count, uids_by_age): """ Get the number of people to assign to a workplace by age using those left who can potentially go to work and employment rates by age. Args: employment_rates (dict) : A dictionary of employment rates by age. potential_worker_ages_left_count (dict) : A dictionary of the count of workers to assign by age. uids_by_age (dict) : A dictionary mapping age to the list of ids with that age. Returns: A dictionary with a count of workers to assign to a workplace. """ log.debug('get_workers_by_age_to_assign()') workers_by_age_to_assign_count = dict.fromkeys(np.arange(101), 0) for a in potential_worker_ages_left_count: if a in employment_rates: try: c = int(employment_rates[a] * len(uids_by_age[a])) except: c = 0 number_of_people_who_can_be_assigned = min(c, potential_worker_ages_left_count[a]) workers_by_age_to_assign_count[a] = number_of_people_who_can_be_assigned return workers_by_age_to_assign_count def assign_rest_of_workers(workplace_sizes, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count, age_by_uid, age_brackets, age_by_brackets, contact_matrices): """ Assign the rest of the workers to non-school workplaces. Args: workplace_sizes (list) : list of workplace sizes potential_worker_uids (dict) : dictionary of potential workers mapping their id to their age potential_worker_uids_by_age (dict) : dictionary mapping age to the list of worker ids with that age workers_by_age_to_assign_count (dict) : dictionary of the count of workers left to assign by age age_by_uid (dict) : dictionary mapping id to age for all individuals in the population age_brackets (dict) : dictionary mapping age bracket keys to age bracket range age_by_brackets (dict) : dictionary mapping age to the age bracket range it falls in contact_matrices (dict) : dictionary of age specific contact matrix for different physical contact settings Returns: List of lists where each sublist is a workplace with the ages of workers, list of lists where each sublist is a workplace with the ids of workers, dictionary of potential workers left mapping id to age, dictionary mapping age to a list of potential workers left of that age, dictionary mapping age to the count of workers left to assign. """ log.debug('assign_rest_of_workers()') workplace_age_lists = [] workplace_uid_lists = [] worker_age_keys = workers_by_age_to_assign_count.keys() sorted_worker_age_keys = sorted(worker_age_keys) # make a copy of the workplace matrix to sample from and modify as people get placed into workplaces and removed from the pool of potential workers w_contact_matrix = contact_matrices['W'].copy() # off turn likelihood to meet those unemployed in the workplace because the matrices are not an exact match for the population under study for b in age_brackets: workers_left_in_bracket = [workers_by_age_to_assign_count[a] for a in age_brackets[b]] number_of_workers_left_in_bracket = np.sum(workers_left_in_bracket) if number_of_workers_left_in_bracket == 0: b = min(b, w_contact_matrix.shape[1] - 1) # Ensure it doesn't go past the end of the array w_contact_matrix[:, b] = 0 for n, size in enumerate(workplace_sizes): workers_by_age_to_assign_distr = spb.norm_dic(workers_by_age_to_assign_count) if sum(workers_by_age_to_assign_distr.values()) == 0: break if sum([len(v) for v in potential_worker_uids_by_age.values()]) == 0: break new_work, new_work_uids = [], [] a_prob = [workers_by_age_to_assign_count[a] for a in sorted_worker_age_keys] a_prob = np.array(a_prob) a_prob = a_prob / np.sum(a_prob) achoice = np.random.choice(a=sorted_worker_age_keys, p=a_prob) aindex = achoice uid = potential_worker_uids_by_age[aindex][0] potential_worker_uids_by_age[aindex].remove(uid) potential_worker_uids.pop(uid, None) workers_by_age_to_assign_count[aindex] -= 1 workers_by_age_to_assign_distr = spb.norm_dic(workers_by_age_to_assign_count) new_work.append(aindex) new_work_uids.append(uid) bindex = age_by_brackets[aindex] bindex = min(bindex, w_contact_matrix.shape[0] - 1) # Ensure it doesn't go past the end of the array b_prob = w_contact_matrix[bindex, :] sum_b_prob = np.sum(b_prob) if sum_b_prob > 0: # pragma: no cover b_prob = b_prob / sum_b_prob if size > len(potential_worker_uids) - 1: # pragma: no cover size = len(potential_worker_uids) - 1 workers_left_count = np.sum([workers_by_age_to_assign_count[a] for a in workers_by_age_to_assign_count]) if size > workers_left_count: size = workers_left_count + 1 # not enough people left over to try to match age mixing patterns in the last workplace so grab everyone who will get placed in order if len(potential_worker_uids) <= size or workers_left_count <= size: for ai in workers_by_age_to_assign_count: for i in range(workers_by_age_to_assign_count[ai]): # do not change this during the loop but afterwards, and if 0 then no one will be placed uid = potential_worker_uids_by_age[ai][0] new_work.append(ai) new_work_uids.append(uid) potential_worker_uids_by_age[ai].remove(uid) potential_worker_uids.pop(uid, None) workers_by_age_to_assign_count[ai] = 0 # set to zero now that everyone will be placed in this last workplace workers_by_age_to_assign_distr = spb.norm_dic(workers_by_age_to_assign_count) else: for i in range(1, size): bi = spsamp.fast_choice(b_prob) workers_left_in_bracket = [workers_by_age_to_assign_count[a] for a in age_brackets[bi] if len(potential_worker_uids_by_age[a]) > 0] if np.sum(b_prob): # pragma: no cover loop_b_prob = sc.dcp(b_prob) # Make a copy to avoid overwriting the original while np.sum(workers_left_in_bracket) == 0: loop_b_prob[bi] = 0 # Don't pick the same bracket ever again bi = spsamp.fast_choice(loop_b_prob) workers_left_in_bracket = [workers_by_age_to_assign_count[a] for a in age_brackets[bi] if len(potential_worker_uids_by_age[a]) > 0] a_prob = [workers_by_age_to_assign_count[a] for a in age_brackets[bi]] ai = age_brackets[bi][spsamp.fast_choice(a_prob)] uid = potential_worker_uids_by_age[ai][0] new_work.append(ai) new_work_uids.append(uid) potential_worker_uids_by_age[ai].remove(uid) potential_worker_uids.pop(uid, None) workers_by_age_to_assign_count[ai] -= 1 workers_by_age_to_assign_distr = spb.norm_dic(workers_by_age_to_assign_count) # if there's no one left in the bracket, then you should turn this bracket off in the contact matrix workers_left_in_bracket = [workers_by_age_to_assign_count[a] for a in age_brackets[bi]] if np.sum(workers_left_in_bracket) == 0: w_contact_matrix[:, bi] = 0. # since the matrix was modified, calculate the bracket probabilities again b_prob = w_contact_matrix[bindex, :] if np.sum(b_prob) > 0: # pragma: no cover b_prob = b_prob / np.sum(b_prob) log.debug(f' Progress: {n}, {Counter(new_work)}') workplace_age_lists.append(new_work) workplace_uid_lists.append(new_work_uids) return workplace_age_lists, workplace_uid_lists, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count
[docs]def count_employment_by_age(popdict): """ Get employment count by age for workers in the popdict. Workers can be in different possible layers: as staff in long term care facilities (LTCF), as teachers or staff in schools (S), or as workers in other workplaces (W). Args: popdict (dict) : population dictionary Returns: dict: Dictionary of the count of employed people by age in popdict. """ employment_count_by_age = dict.fromkeys(np.arange(0, defaults.settings.max_age), 0) for i, person in popdict.items(): if person['ltcf_staff'] or person['sc_teacher'] or person['sc_staff'] or person['wpid']: employment_count_by_age[person['age']] += 1 return employment_count_by_age
[docs]def get_employment_rates_by_age(employment_count_by_age, age_count): """ Get employment rates by age. Args: employment_count_by_age (dict) : dictionary of the count of employed people age_count (dict) : dictionary of the age count Returns: dict: Dictionary of the employment rates by age. """ return {a: employment_count_by_age[a] / age_count[a] if age_count[a] > 0 else 0 for a in sorted(age_count.keys())}
[docs]def get_workplace_sizes(popdict): """ Get workplace sizes of regular workplaces in popdict. This only includes workplaces that are not long term care facilities (LTCF) or schools (S). Args: popdict (dict) : population dictionary Returns: dict: Dictionary of the generated workplace sizes for each regular workplace. """ workplace_sizes = dict() for i, person in popdict.items(): if person['wpid'] is not None: workplace_sizes.setdefault(person['wpid'], 0) workplace_sizes[person['wpid']] += 1 # workplace_sizes.setdefault(person['wpid'], dict()) # use when workplace types by industry are included # workplace_sizes[person['wpid']].setdefault('employed', 0) # workplace_sizes[person['wpid']]['employed'] += 1 return workplace_sizes
[docs]def get_generated_workplace_size_distribution(workplace_sizes, bins): """ Get workplace size distribution. Args: workplace_sizes (dict): generated workplace sizes by workplace id (wpid) bins (list) : workplace size bins Returns: dict: Dictionary of generated workplace size distribution. """ generated_workplace_sizes = list(workplace_sizes.values()) hist, bins = np.histogram(generated_workplace_sizes, bins=bins, density=0) if sum(generated_workplace_sizes) > 0: generated_workplace_size_dist = {i: hist[i] / sum(hist) for i in range(len(hist))} else: generated_workplace_size_dist = {i: hist[i] for i in range(len(hist))} return generated_workplace_size_dist