Source code for synthpops.ltcfs

"""
Modeling Seattle Metro Long Term Care Facilities

"""

import numpy as np
import sciris as sc
from collections import Counter
from .config import logger as log, checkmem
from . import defaults as spd
from . import sampling as spsamp
from . import data_distributions as spdata
from . import base as spb


[docs]def generate_ltcfs(n, with_facilities, loc_pars, expected_age_dist, ages_left_to_assign): """ Generate residents living in long term care facilities and their ages. Args: n (int) : The number of people to generate in the population with_facilities (bool) : If True, create long term care facilities, currently only available for locations in the US. loc_pars (dict) : A dictionary of location parameters expected_age_dist (dict) : The expected age distribution ages_left_to_assign (dic) : The counter of ages for the generated population left to place in a residence """ log.debug('generate_ltcfs()') # initialize an empty list for facilities facilities = [] # If not using facilities, skip everything here if with_facilities: # what the ltcf user rates by age? ltcf_rates_by_age = spdata.get_long_term_care_facility_use_rates(loc_pars.datadir, country_location=loc_pars.country_location, state_location=loc_pars.state_location) # generate the count of ltcf users by age and make a list of all users represented by their age expected_users_by_age = dict.fromkeys(expected_age_dist.keys(), 0) # make a list of all resident ages all_residents = [] for a in expected_users_by_age: expected_users_by_age[a] = np.random.binomial(ages_left_to_assign[a], ltcf_rates_by_age[a]) # use the rates to sample the number of ltcf residents by age all_residents.extend([a] * expected_users_by_age[a]) # shuffle resident ages np.random.shuffle(all_residents) # how big are long term care facilities resident_size_dist = spb.norm_dic(spdata.get_long_term_care_facility_residents_distr(**loc_pars)) resident_size_brackets = spdata.get_long_term_care_facility_residents_distr_brackets(**loc_pars) size_bracket_keys = sorted(resident_size_dist.keys()) size_dist = [resident_size_dist[k] for k in size_bracket_keys] # create facilities while len(all_residents) > 0: b = spsamp.fast_choice(size_dist) size = np.random.choice(resident_size_brackets[b]) if size > len(all_residents): size = len(all_residents) new_facility = all_residents[:size] facilities.append(new_facility) all_residents = all_residents[size:] # what's the age distribution and count of people left to place in a residence? ltcf_adjusted_age_dist = sc.dcp(expected_age_dist) for a in ltcf_adjusted_age_dist: ltcf_adjusted_age_dist[a] -= expected_users_by_age[a] / n ltcf_adjusted_age_dist[a] = max(ltcf_adjusted_age_dist[a], 0) ages_left_to_assign[a] -= expected_users_by_age[a] ltcf_adjusted_age_dist_values = np.array([ltcf_adjusted_age_dist[a] for a in ltcf_adjusted_age_dist.keys()]) n_nonltcf = int(n - sum([len(facililty) for facililty in facilities])) else: n_nonltcf = n ltcf_adjusted_age_dist = sc.dcp(expected_age_dist) ltcf_adjusted_age_dist_values = np.array([ltcf_adjusted_age_dist[a] for a in ltcf_adjusted_age_dist]) return n_nonltcf, ltcf_adjusted_age_dist, ltcf_adjusted_age_dist_values, ages_left_to_assign, facilities
[docs]def assign_facility_staff(datadir, location, state_location, country_location, ltcf_staff_age_min, ltcf_staff_age_max, facilities, workers_by_age_to_assign_count, potential_worker_uids_by_age, potential_worker_uids, facilities_by_uids, age_by_uid, use_default=False): """ Assign Long Term Care Facility staff to the generated facilities with residents. Args: datadir (string) : The file path to the data directory. location : name of the location state_location (string) : name of the state the location is in country_location (string) : name of the country the location is in ltcf_staff_age_min (int) : Long term care facility staff minimum age. ltcf_staff_age_max (int) : Long term care facility staff maximum age. facilities (list) : A list of lists where each sublist is a facility with the resident ages workers_by_age_to_assign_count (dict) : A dictionary mapping age to the count of employed individuals of that age. potential_worker_uids (dict) : dictionary of potential workers mapping their id to their age facilities (list) : A list of lists where each sublist is a facility with the resident IDs age_by_uid (dict) : dictionary mapping id to age for all individuals in the population use_default (bool) : If True, try to first use the other parameters to find data specific to the location under study; otherwise, return default data drawing from default_location, default_state, default_country. Returns: list: A list of lists with the facility staff IDs for each facility. """ log.debug('assign_facility_staff()') resident_to_staff_ratio_distr = spdata.get_long_term_care_facility_resident_to_staff_ratios_distr(datadir, location=location, state_location=state_location, country_location=country_location, use_default=use_default) resident_to_staff_ratio_distr = spb.norm_dic(resident_to_staff_ratio_distr) resident_to_staff_ratio_brackets = spdata.get_long_term_care_facility_resident_to_staff_ratios_brackets(datadir, location=location, state_location=state_location, country_location=country_location, use_default=use_default) facilities_staff = [] facilities_staff_uids = [] sorted_ratio_keys = sorted([k for k in resident_to_staff_ratio_distr.keys()]) ratio_array = [resident_to_staff_ratio_distr[k] for k in sorted_ratio_keys] staff_age_range = np.arange(ltcf_staff_age_min, ltcf_staff_age_max + 1) for nf, fc in enumerate(facilities): n_residents = len(fc) s = spsamp.fast_choice(ratio_array) s_range = resident_to_staff_ratio_brackets[s] resident_staff_ratio = s_range[spsamp.fast_choice(s_range)] n_staff = int(np.ceil(n_residents / resident_staff_ratio)) new_staff, new_staff_uids = [], [] for i in range(n_staff): a_prob = np.array([workers_by_age_to_assign_count[a] for a in staff_age_range]) a_prob = a_prob / np.sum(a_prob) aindex = np.random.choice(a=staff_age_range, p=a_prob) uid = potential_worker_uids_by_age[aindex][0] potential_worker_uids_by_age[aindex].remove(uid) potential_worker_uids.pop(uid, None) workers_by_age_to_assign_count[aindex] -= 1 new_staff.append(aindex) new_staff_uids.append(uid) facilities_staff.append(new_staff) facilities_staff_uids.append(new_staff_uids) return facilities_staff_uids
[docs]def remove_ltcf_residents_from_potential_workers(facilities_by_uids, potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count, age_by_uid): """ Remove facilities residents from potential workers Args: facilities_by_uids (list) : A list of lists, where each sublist represents a skilled nursing or long term care facility and the ids of the residents living within it potential_worker_uids (dict) : dictionary of potential workers mapping their id to their age potential_worker_uids_by_age (dict) : dictionary mapping age to the list of worker ids with that age workers_by_age_to_assign_count (dict) : dictionary of the count of workers left to assign by age age_by_uid_dic (dict) : dictionary mapping id to age for all individuals in the population Returns: Updated dictionaries for potential worker ids, lists of potential worker ids mapped to age, and the number of workers left to assign by age. """ log.debug('remove_ltcf_residents_from_potential_workers()') for nf, fc in enumerate(facilities_by_uids): for uid in fc: aindex = age_by_uid[uid] if uid in potential_worker_uids: # pragma: no cover potential_worker_uids_by_age[aindex].remove(uid) potential_worker_uids.pop(uid, None) if workers_by_age_to_assign_count[aindex] > 0: workers_by_age_to_assign_count[aindex] -= 1 return potential_worker_uids, potential_worker_uids_by_age, workers_by_age_to_assign_count
# Age resampling method
[docs]def ltcf_resample_age(exp_age_distr, a): """ Resampling younger ages to better match data Args: exp_age_distr (dict) : age distribution age (int) : age as an integer Returns: Resampled age as an integer. Notes: This is not always necessary, but is mostly used to smooth out sharp edges in the age distribution when spsamp.resample_age() produces too many of one year and under produces the surrounding ages. For example, new borns (0 years old) may be over produced, and 1 year olds under produced, so this function can be customized to correct for that. It is currently customized to model well the age distribution for Seattle, Washington. """ # exp_age_distr = np.array(list(exp_age_distr_dict.values()), dtype=np.float64) a = spsamp.resample_age(exp_age_distr, a) if a == 7: if np.random.binomial(1, p=0.25): a = spsamp.resample_age(exp_age_distr, a) if a == 6: if np.random.binomial(1, p=0.25): a = spsamp.resample_age(exp_age_distr, a) if a == 5: if np.random.binomial(1, p=0.2): a = spsamp.resample_age(exp_age_distr, a) if a == 0: if np.random.binomial(1, p=0.0): a = spsamp.resample_age(exp_age_distr, a) if a == 1: if np.random.binomial(1, p=0.1): a = spsamp.resample_age(exp_age_distr, a) if a == 2: if np.random.binomial(1, p=0.0): a = spsamp.resample_age(exp_age_distr, a) if a == 4: if np.random.binomial(1, p=0.1): a = spsamp.resample_age(exp_age_distr, a) return a
[docs]def get_ltcf_sizes(popdict, keys_to_exclude=[]): """ Get long term care facility sizes, including both residents and staff. Args: popdict (dict) : population dictionary keys_to_exclude (list) : possible keys to exclude for roles in long term care facilities. See notes. Returns: dict: Dictionary of the size for each long term care facility generated. Notes: keys_to_exclude is an empty list by default, but can contain the different long term care facility roles: 'ltcf_res' for residents and 'ltcf_staff' for staff. If either role is included in the parameter keys_to_exclude, then individuals with that value equal to 1 will not be counted. """ log.debug('get_ltcf_sizes()') ltcf_sizes = dict() for i, person in popdict.items(): if person['ltcfid'] is not None: ltcf_sizes.setdefault(person['ltcfid'], 0) # include facility residents if person['ltcf_res'] is not None and 'ltcf_res' not in keys_to_exclude: ltcf_sizes[person['ltcfid']] += 1 # include facility staff elif person['ltcf_staff'] is not None and 'ltcf_staff' not in keys_to_exclude: ltcf_sizes[person['ltcfid']] += 1 return ltcf_sizes
[docs]class LongTermCareFacility(spb.LayerGroup): """ A class for individual long term care facilities and methods to operate on each. Args: kwargs (dict): data dictionary of the long term care facility """ def __init__(self, ltcfid=None, resident_uids=np.array([], dtype=int), staff_uids=np.array([], dtype=int), **kwargs): """ Class constructor for empty long term care facility (ltcf). Args: **ltcfid (int) : ltcf id **resident_uids (np.array) : ids of ltcf members **staff_uids (np.array) : ages of ltcf members """ super().__init__(ltcfid=ltcfid, resident_uids=resident_uids, staff_uids=staff_uids, **kwargs) self.validate() return
[docs] def validate(self): """ Check that information supplied to make a long term care facility is valid and update to the correct type if necessary. """ for key in ['resident_uids', 'staff_uids']: if key in self.keys(): try: self[key] = sc.promotetoarray(self[key], dtype=int) except: errmsg = f"Error: Could not convert ltcf key {key} to an np.array() with type int. This key only takes arrays with int values." raise TypeError(errmsg) for key in ['ltcfid']: if key in self.keys(): if not isinstance(self[key], (int, np.int32, np.int64)): if self[key] is not None: errmsg = f"Error: Expected type int or None for ltcf key {key}. Instead the type of this value is {type(self[key])}." raise TypeError(errmsg) return
@property def member_uids(self): """ Return ids of all ltcf members: residents and staff. Returns: np.ndarray : ltcf member ids """ return np.concatenate((self['resident_uids'], self['staff_uids']))
[docs] def member_ages(self, age_by_uid): """ Return ages of all ltcf members: residents and staff. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : ltcf member ages """ return np.concatenate((self.resident_ages(age_by_uid), self.staff_ages(age_by_uid)))
def __len__(self): """Return the length as the number of members in the ltcf.""" return len(self.member_uids)
[docs] def resident_ages(self, age_by_uid): """ Return ages of ltcf residents. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : ltcf resident ages """ return super().member_ages(age_by_uid, self['resident_uids'])
[docs] def staff_ages(self, age_by_uid): """ Return ages of ltcf staff. Args: age_by_uid (np.ndarray) : mapping of age to uid Returns: np.ndarray : ltcf staff ages """ return super().member_ages(age_by_uid, self['staff_uids'])
[docs]def get_ltcf(pop, ltcfid): """ Return ltcf with id: ltcfid. Args: pop (sp.Pop) : population ltcfid (int) : ltcf id number Returns: sp.LongTermCareFacility: A populated ltcf. """ if not isinstance(ltcfid, int): raise TypeError(f"ltcfid must be an int. Instead supplied wpid with type: {type(ltcfid)}.") if len(pop.ltcfs) <= ltcfid: raise IndexError(f"Ltcf id (ltcfid): {ltcfid} out of range. There are {len(pop.ltcfs)} ltcfs stored in this object.") return pop.ltcfs[ltcfid]
[docs]def add_ltcf(pop, ltcf): """ Add a ltcf to the list of ltcfs. Args: pop (sp.Pop) : population ltcf (sp.LongTermCareFacility) : ltcf with at minimum the ltcfid, resident_uids and staff_uids. """ if not isinstance(ltcf, LongTermCareFacility): raise ValueError('ltcf is not a sp.LongTermCareFacility object.') # ensure ltcfid to match the index in the list if ltcf['ltcfid'] != len(pop.ltcfs): ltcf['ltcfid'] = len(pop.ltcfs) pop.ltcfs.append(ltcf) pop.n_ltcfs = len(pop.ltcfs) return
[docs]def initialize_empty_ltcfs(pop, n_ltcfs=None): """ Array of empty ltcfs. Args: pop (sp.Pop) : population n_ltcfs (int) : the number of ltcfs to initialize """ if n_ltcfs is not None and isinstance(n_ltcfs, int): pop.n_ltcfs = n_ltcfs else: pop.n_ltcfs = 0 pop.ltcfs = [LongTermCareFacility() for nl in range(pop.n_ltcfs)] return
[docs]def populate_ltcfs(pop, resident_lists, staff_lists): """ Populate all of the ltcfs. Store each ltcf at the index corresponding to it's ltcfid. Args: pop (sp.Pop) : population residents_list (list) : list of lists where each sublist represents a ltcf and contains the ids of the residents staff_lists (list) : list of lists where each sublist represents a ltcf and contains the ids of the staff """ initialize_empty_ltcfs(pop, len(resident_lists)) log.debug("Populating ltcfs.") # now populate ltcfs for nl, residents in enumerate(resident_lists): lf = [] lf.extend(residents) lf.extend(staff_lists[nl]) kwargs = dict(ltcfid=nl, resident_uids=residents, staff_uids=staff_lists[nl], ) ltcf = LongTermCareFacility() ltcf.set_layer_group(**kwargs) pop.ltcfs[ltcf['ltcfid']] = sc.dcp(ltcf) return