Source code for synthpops.data

import numpy as np
import sciris as sc
import json
import jsbeautifier
from jsonobject import *
from jsonobject.base_properties import DefaultProperty
from jsonobject.containers import JsonDict
import os

from . import logger
from . import defaults
import warnings


[docs]class PopulationAgeDistribution(JsonObject): """Class for population age distribution with a specified number of bins.""" num_bins = IntegerProperty() # [min_age, max_age, percentage] distribution = ListProperty(DefaultProperty)
[docs]class SchoolSizeDistributionByType(JsonObject): """Class for the school size distribution by school type.""" school_type = StringProperty() # length should be len(location.school_size_distribution) size_distribution = ListProperty(DefaultProperty)
[docs]class SchoolTypeByAge(JsonObject): """Class for the school type by age range.""" school_type = StringProperty() # [min_age, max_age] age_range = ListProperty(DefaultProperty)
[docs]class Location(JsonObject): """ Class for the json object for the location containing data about the population to generate representative contact networks. The general use case of this is to use a filepath, and the parent data is parsed from the filepath. DefaultProperty type handles either a scalar or json object. We allow a json object mainly for testing of inheriting from a parent specified directly in the json. Most users will want to populate this with a relative or absolute file path. Note: The structures for the population age distribution will be updated to be more flexible to take in a parameter for the number of age brackets to generate the population age distribution structure. """ location_name = StringProperty() data_provenance_notices = ListProperty(StringProperty) reference_links = ListProperty(StringProperty) citations = ListProperty(StringProperty) notes = ListProperty(StringProperty) parent = DefaultProperty() population_age_distributions = ListProperty(PopulationAgeDistribution) employment_rates_by_age = ListProperty( # [age, percentage] ListProperty(DefaultProperty) ) enrollment_rates_by_age = ListProperty( # [age, percentage] ListProperty(DefaultProperty) ) household_head_age_brackets = ListProperty( # [age_min, age_max] ListProperty(DefaultProperty) ) household_head_age_distribution_by_family_size = ListProperty( # length should be len(household_head_age_brackets) + 1 # The first entry is the family size, the rest of the entries fill in the household head age counts for # each household head age bracket. # [family_size, count_1, count_2, ...] ListProperty(DefaultProperty) ) household_size_distribution = ListProperty( # [size, percentage] ListProperty(DefaultProperty) ) ltcf_resident_to_staff_ratio_distribution = ListProperty( # [ratio_low, ratio_hi, percentage] ListProperty(DefaultProperty) ) ltcf_num_residents_distribution = ListProperty( # [num_residents_low, num_residents_hi, percentage] ListProperty(DefaultProperty) ) ltcf_num_staff_distribution = ListProperty( # [num_staff_low, num_staff_hi, percentage] ListProperty(DefaultProperty) ) ltcf_use_rate_distribution = ListProperty( # [age, percentage] ListProperty(DefaultProperty) ) school_size_brackets = ListProperty( # [school_size_low, school_size_hi] ListProperty(DefaultProperty) ) school_size_distribution = ListProperty(DefaultProperty) # The length of size_distribution needs to equal the length of school_size_brackets school_size_distribution_by_type = ListProperty(SchoolSizeDistributionByType) school_types_by_age = ListProperty(SchoolTypeByAge) workplace_size_counts_by_num_personnel = ListProperty( # [num_personnel_low, num_personnel_hi, count] ListProperty(DefaultProperty) )
[docs] def get_list_properties(self): """ Get the properties of the location data object as a list. Returns: list: A list of the properties of the location json object with data about the location. """ return [p for p in self if type(getattr(self, p)) is JsonArray]
[docs] def get_population_age_distribution(self, nbrackets): """ Get the age distribution of the population aggregated to nbrackets age brackets. If the data doesn't contain a distribution with the requested number of brackets, an exception is raised. Args: nbrackets (int): the number of age brackets the age distribution is aggregated to Returns: list: A list of the probability age distribution values indexed by the bracket number. """ matching_distributions = [d for d in self.population_age_distributions if d.num_bins==nbrackets] if len(matching_distributions) == 0: raise RuntimeError(f"The configured location data doesn't have a population age " f"distribution with [{nbrackets}] brackets.") dist = matching_distributions[0].distribution return dist
[docs]def populate_parent_data_from_file_path(location, parent_file_path): """ Loading a location json object with necessary data fields filled from the parent location using the parent location file path. Args: location (json) : json object for the location data parent_file_path (str) : file path to the parent location Returns: json: The location json object with necessary data fields filled from the parent location. """ # DM: parameter name of location should change to better reflect what this parameter actually is: the location data object logger.debug(f"Loading parent location from filepath [{parent_file_path}]") try: parent_obj = load_location_from_filepath(parent_file_path, check_constraints=False) location = populate_parent_data_from_json_obj(location, parent_obj) except: logger.warning(f"You may have an invalid data configuration: couldn't load parent " f"from filepath [{parent_file_path}] for location [{location.location_name}]") return location
[docs]def populate_parent_data_from_json_obj(location, parent): """ Loading a location json object with necessary data fields filled from the parent location json. Args: location (json) : json object for the location data parent (json) : json object for the parent location Returns: json: The location json object with necessary data fields filled from the parent location. """ # DM: parameter names should change to reflect that better if parent.parent is not None: populate_parent_data(parent) for list_property in location.get_list_properties(): child_value = getattr(location, list_property) if len(child_value) == 0 and str(list_property) in parent: parent_value = parent[str(list_property)] if len(parent_value) > 0: setattr(location, list_property, parent_value) return location
[docs]def populate_parent_data(location): """ Populate location json object with fields from the parent location if available. Args: location (json): json data object for the location # parameter name change for more specificity Returns: json: The location json data object with data fields filled from the parent location. """ if location.parent is None: return location parent = location.parent if type(parent) is str: if len(parent) == 0: return location return populate_parent_data_from_file_path(location, parent) if type(parent) is JsonDict: parent_location = Location(parent) return populate_parent_data_from_json_obj(location, parent_location) raise RuntimeError(f'Invalid type for parent field: [{type(parent)}]')
[docs]def load_location_from_json(json_obj, check_constraints=None): """ Load location data from json object with some checks made. Args: json_obj (json): json object containing location data Returns: json: The json object with location data. """ if check_constraints is None: check_constraints = True location = Location(json_obj) populate_parent_data(location) if check_constraints: check_location_constraints_satisfied(location) check_all_probability_distribution_sums(location) check_all_probability_distribution_nonnegative(location) return location
[docs]def load_location_from_json_str(json_str, check_constraints=None): """ Load location data from json str with some checks made. Args: json_str (str): string version of the json object Returns: json: The json object with location data. """ json_obj = json.loads(json_str) return load_location_from_json(json_obj, check_constraints=check_constraints)
[docs]def get_relative_path(datadir): """ Get the relative path for the data folder. Args: datadir (str): data folder path Returns: str: Relative path for the data folder. Notes: This method may not be necessary anymore... """ base_dir = datadir if len(defaults.settings.relative_path) > 1: base_dir = os.path.join(datadir, *defaults.settings.relative_path) return base_dir
[docs]def get_location_attr(location, property_name): """ Get the attribute from the json object containing location data given the associated property name. Args: location (json) : the json object with location data property_name (str) : the property name Returns: If property_name exists in the location json object, return [True, attribute]. Else, return [False, None]. """ if property_name in location.keys(): return getattr(location, property_name) else: return [False, None]
[docs]def load_location_from_filepath(rel_filepath, check_constraints=None): """ Loads location data object from provided relative filepath where the file path is relative to defaults.settings.datadir. Args: rel_filepath (str): relative file path for the location data Returns: json: The json object with location data. """ if check_constraints is None: check_constraints = True filepath = os.path.join(get_relative_path(defaults.settings.datadir), rel_filepath) logger.debug(f"Opening location from filepath [{filepath}]") f = open(filepath, 'r') json_obj = json.load(f) return load_location_from_json(json_obj, check_constraints=check_constraints)
[docs]def save_location_to_filepath(location, abs_filepath): """ Saves json object with location data to provided absolute filepath. Args: location (json) : the json object with location data abs_filepath (str) : absolute file path to where the json is saved Returns: None. """ logger.debug(f"Saving location json to filepath [{abs_filepath}]") location_json = location.to_json() options = jsbeautifier.default_options() options.indent_size = 2 location_json = jsbeautifier.beautify(json.dumps(location_json), options) with open(abs_filepath, 'w') as f: f.write(location_json)
# json.dump(location_json, f, indent=2)
[docs]def check_location_constraints_satisfied(location): """ Checks a number of constraints that need to be satisfied for the schema. Args: location (json): the json object with location data Returns: None. Raises: RuntimeError with a description if one of the constraints is not satisfied. """ [status, msg] = are_location_constraints_satisfied(location) if not status: raise RuntimeError(msg)
[docs]def are_location_constraints_satisfied(location): """ Checks a number of constraints that need to be satisfied for the schema. Args: location (json): the json object with location data Returns: [True, None] if all constraints are satisfied. [False, str] if a constraint is violated. The returned str is one of the error messages. """ for f in [check_location_name, check_population_age_distributions, check_employment_rates_by_age, check_enrollment_rates_by_age, check_household_head_age_brackets, check_household_head_age_distributions_by_family_size, check_household_size_distribution, check_ltcf_resident_to_staff_ratio_distribution, check_ltcf_num_residents_distribution, check_ltcf_num_staff_distribution, check_school_size_brackets, check_school_size_distribution, check_school_size_distribution_by_type, check_school_types_by_age, check_workplace_size_counts_by_num_personnel, ]: [status, msg] = f(location) # update this to return the combination of all the error messages if not status: return [status, msg] return [True, None]
[docs]def check_array_of_array_entry_lens_arr(array_of_arrays, expected_len): for [k, bracket] in enumerate(array_of_arrays): if not len(bracket) == expected_len: return [False, f"Entry [{k}] has invalid length: [{len(bracket)}]; should be [{expected_len}]"] return [True, None]
[docs]def check_array_of_arrays_entry_lens(location, expected_len, property_name): """ Check that each array in an array of arrays has the expected length. Args: location (json) : the json object with location data expected_len (int) : the expected length of each sub array property_name (str) : the property name Returns: [True, None] if sub array length checks pass. [False, str] if sub array length checks fail. The returned str is the error message. """ arr = get_location_attr(location, property_name) status, reason = check_array_of_array_entry_lens_arr(arr, expected_len) if not status: return [False, f"For property {property_name}: {reason}"] return [True, None]
[docs]def check_valid_probability_distributions(property_name, valid_properties=None): """ Check that the property_name is a valid probability distribution. Args: property_name (str) : the property name valid_properties (str or list) : a list of the valid probability distributions Returns: None. """ # check the property_name is in the list of valid_probability_distributions() if valid_properties is None: valid_properties = defaults.valid_probability_distributions # if a single str, make into a list so next check will work valid_properties = sc.tolist(valid_properties) if property_name not in valid_properties: # pragma: no cover raise NotImplementedError(f"{property_name} is not one of the expected probability distributions. The list of expected probability distributions is {valid_properties}. If you wish to use this method on the attribute {property_name}, you can supply it as the parameter valid_properties={property_name}.")
[docs]def check_probability_distribution_sum_age_distributions(location, arr, tolerance=1e-2, **kwargs): """ Check that each population age distribution has a sum equal to 1 within some tolerance. Args: location (json) : the json object with location data arr (list) : the list of population age distributions tolerance (float) : difference from the sum of 1 tolerated kwargs (dict) : dictionary of values passed to np.isclose() Returns: [True, None] if the sum of the probability distribution is equal to 1 within the tolerance level. [False, str] else. The returned str is the error message with some information about the check. """ if tolerance is not None: # pragma: no cover kwargs['atol'] = tolerance checks, msgs = [], [] for i in arr: # pragma: no cover if 'num_bins' in i: arr_i = np.array(i.distribution) arr_sum = np.sum(arr_i[:, -1]) check = np.isclose(a=1, b=arr_sum, **kwargs) checks.append(check) if check: msg = '' else: msg = f"The sum of the probability distribution for the population age distribution for {location.location_name} with num_bins = {i.num_bins} is {arr_sum:.4f}.\n" msgs.append(msg) else: checks.append(False) msgs.append(f"The probability distribution for the population age distribution for {location.location_name} does not have num_bins.") msg = "".join(msgs) if msg == "": # pragma: no cover msg = None return [sum(checks) > 0, msg]
[docs]def check_probability_distribution_nonnegative_age_distributions(location, arr): """ Check that each population age distribution has all non negative values. Args: location (json) : the json object with location data arr (list) : the list of population age distributions Returns: [True, None] if the sum of the probability distribution is equal to 1 within the tolerance level. [False, str] else. The returned str is the error message with some information about the check. """ checks, msgs = [], [] for i in arr: # pragma: no cover if 'num_bins' in i: arr_i = np.array(i.distribution) # find the indices where the distribution is negative negative = np.argwhere(arr_i < 0) # check is any are negative any_negative = len(negative) check = not any_negative checks.append(check) if check: msg = '' else: msg = f"The probability distribution for the population age distribution for {location.location_name} with num_bins = {i.num_bins} has some negative values, {arr_i[negative]}, at the indices {negative}.\n" msgs.append(msg) else: checks.append(False) msgs.append(f"The probability distribution for the population age distribution for {location.location_name} does not have num_bins.") msg = "".join(msgs) if msg == "": # pragma: no cover msg = None return [sum(checks) > 0, msg]
[docs]def check_probability_distribution_sum(location, property_name, tolerance=1e-2, valid_properties=None, **kwargs): """ Check that fields representing probability distributions have sums equal to 1 within some tolerance. Args: location (json) : the json object with location data property_name (str) : the property name tolerance (float) : difference from the sum of 1 tolerated valid_properties (str or list) : a list of the valid probability distributions kwargs (dict) : dictionary of values passed to np.isclose() Returns: [True, None] if the sum of the probability distribution is equal to 1 within the tolerance level. [False, str] else. The returned str is the error message with some information about the check. """ check_valid_probability_distributions(property_name, valid_properties) # is the absolute difference between the sum and the expected value of 1 less than the tolerance value? if tolerance is not None: kwargs['atol'] = tolerance arr = get_location_attr(location, property_name) if property_name == 'population_age_distributions': check, msg = check_probability_distribution_sum_age_distributions(location, arr, **kwargs) return check, msg elif len(arr): arr = np.array(arr) if arr.ndim == 1: # for school size distributions arr_sum = sum(arr) # what is the sum of the probability distribution values? elif arr.ndim == 2: arr_sum = np.sum(arr[:, -1]) # distribution values are in the last column if arr is 2D array else: raise NotImplementedError(f"Could not understand an array of shape {arr.shape}: Expected a 1D or 2D array.") check = np.isclose(a=1, b=arr_sum, **kwargs) if check: return [True, None] else: return [False, f"The sum of the probability distribution for the property: {property_name} is {arr_sum:.4f}.\n\ We expected the sum of these probabilities to be less than {tolerance} from 1."] else: return [False, f"{location.location_name} {property_name} could not be checked for a sum close to 1."]
[docs]def check_probability_distribution_nonnegative(location, property_name, valid_properties=None): """ Check that fields representing probability distributions have all non negative values. Args: location (json) : the json object with location data property_name (str) : the property name valid_properties (str or list) : a list of the valid probability distributions Returns: [True, None] if the values of the probability distribution are all non negative. [False, str] else. The returned str is the error message with some information about the check. """ check_valid_probability_distributions(property_name, valid_properties) arr = get_location_attr(location, property_name) if property_name == 'population_age_distributions': check, msg = check_probability_distribution_nonnegative_age_distributions(location, arr) return check, msg elif len(arr): arr = np.array(arr) if arr.ndim == 2: arr = arr[:, -1] # distribution values are in the last column if arr is 2D array # find the indices where the distribution is negative negative = np.argwhere(arr < 0) # check if any are negative any_negative = len(negative) check = not any_negative if check: return [True, None] else: return [False, f"The probability distribution for the property: {property_name} has some negative values, {arr[negative]}, at the indices {negative}."] else: return [False, f"{location.location_name} {property_name} could not be checked for negative values."]
[docs]def check_all_probability_distribution_sums(location, tolerance=1e-2, die=False, verbose=False, **kwargs): """ Checks that each probability distribution available to a location has a sum close to 1. Args: location (json) : the json object with location data tolerance (float) : difference from the sum of 1 tolerated die (bool) : raise an exception if the check fails verbose (bool) : print a warning if the check fails kwargs (dict) : dictionary of values passed to np.isclose() Returns: list, list: List of checks and a list of associated error messages. """ property_list = defaults.valid_probability_distributions checks, msgs = [], [] for i, property_name in enumerate(property_list): check, msg = check_probability_distribution_sum(location, property_name, tolerance=tolerance, **kwargs) checks.append(check) msgs.append(msg) if not check: if die: # pragma: no cover raise ValueError(msg) elif verbose: warnings.warn(msg) logger.debug(f"Check passed. The sum of the probability distribution for {property_name} is within {tolerance} of 1. ") return checks, msgs
[docs]def check_all_probability_distribution_nonnegative(location, die=False, verbose=True): """ Run checks that a field representing probabilty distributions has all non negative values. Args: location (json) : json object with the location data die (bool) : raise an exception if the check fails verbose (bool) : print a warning if the check fails Returns: list, list: List of checks and a list of associated error messages. """ property_list = defaults.valid_probability_distributions checks, msgs = [], [] for i, property_name in enumerate(property_list): check, msg = check_probability_distribution_nonnegative(location, property_name) checks.append(check) msgs.append(msg) if not check: if die: # pragma: no cover raise ValueError(msg) elif verbose: warnings.warn(msg) logger.debug(f"Check passed. The probability distribution for {property_name} has all non negative values.") return checks, msgs
[docs]def check_location_name(location): """ Check the location json data object has a string. Args: location (json): the json object with location data Returns: [True, str] if the location json has a str value in the location_name field. Returned str specifies the location_name. [False, str] if the location json does not have a str value in the location_name field. """ if location.location_name is not None and len(location.location_name) > 0 and isinstance(location.location_name, str): return [True, f"The location_name is {location.location_name}"] return [False, "location_name must be specified"]
[docs]def check_population_age_distributions(location): """ Check that the population age distributions are self-consistent in the number of brackets, and each sub array has length 3. Args: location (json): the json object with location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ for population_age_distribution in location.population_age_distributions: if len(population_age_distribution.distribution) != population_age_distribution.num_bins: return [False, f"Length for {population_age_distribution} distribution doesn't match 'num_bins': " f"{len(population_age_distribution.distribution)} != {population_age_distribution.num_bins}"] return check_array_of_array_entry_lens_arr(population_age_distribution.distribution, 3) return [True, None]
[docs]def check_employment_rates_by_age(location): """ Check that the employment rates by age is an array of arrays, where each sub array has length 2. Args: location (json): the json object with location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 2, 'employment_rates_by_age')
[docs]def check_enrollment_rates_by_age(location): """ Check that the enrollment rates by age is an array of arrays, where each sub array has length 2. Args: location (json): the json object with location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 2, 'enrollment_rates_by_age')
[docs]def check_household_head_age_brackets(location): """ Check that the household head age brackets is an array of arrays, where each sub array has length 2. Args: location (json): the json object with location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 2, 'household_head_age_brackets')
[docs]def check_household_head_age_distributions_by_family_size(location): """ Check that the conditional household head age distribution by household size is an array with length equal to the number of household head age brackets. Args: location (json): the json object with location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ num_household_age_brackets = len(location.household_head_age_brackets) for [k, household_head_age_distribution] in enumerate(location.household_head_age_distribution_by_family_size): expected_len = 1 + num_household_age_brackets actual_len = len(household_head_age_distribution) if not actual_len == expected_len: return [False, f"Entry [{k}] in household_head_age_distribution_by_family_size has invalid length: [{actual_len}]; should be [{expected_len}]"] return [True, None]
[docs]def check_household_size_distribution(location): """ Check that the household size distribution is an array of arrays, where each sub array has length 2. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 2, 'household_size_distribution')
[docs]def check_ltcf_resident_to_staff_ratio_distribution(location): """ Check that the long term care facility resident to staff ratio distribution is an array of arrays, where each sub array has length 3. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 3, 'ltcf_resident_to_staff_ratio_distribution')
[docs]def check_ltcf_num_residents_distribution(location): """ Check that the long term care facility resident size distribution is an array of arrays, where each sub array has length 3. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 3, 'ltcf_num_residents_distribution')
[docs]def check_ltcf_num_staff_distribution(location): """ Check that the long term care facility staff size distribution is an array of arrays, where each sub array has length 3. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 3, 'ltcf_num_staff_distribution')
[docs]def check_school_size_brackets(location): """ Check that the school size distribution brackets is an array of arrays, where each sub array has length 2. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 2, 'school_size_brackets')
[docs]def check_school_size_distribution(location): # TODO: decide if there is a check we should apply here. # DM: This should check that the school size distribution has the same # length as the school size brackets otherwise we have a data inconsistency return [True, None]
[docs]def check_school_size_distribution_by_type(location): """ Check that the school size distribution by school type is an array of arrays, where each sub array has length 3. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ num_school_size_brackets = len(location.school_size_brackets) for [k, bracket] in enumerate(location.school_size_distribution_by_type): expected_len = num_school_size_brackets actual_len = len(bracket.size_distribution) if not actual_len == num_school_size_brackets: return [False, f"Entry [{k} - {bracket.school_type}] in school_size_distribution_by_type has invalid length for size_distribution: [{actual_len}]; should be [{expected_len}]"] return [True, None]
[docs]def check_school_types_by_age(location): """ Check that the school types by age range is an array of arrays, where each sub array has length 2. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ for [k, bracket] in enumerate(location.school_types_by_age): expected_len = 2 actual_len = len(bracket.age_range) if not actual_len == expected_len: return [False, f"Entry [{k} - {bracket.school_type}] in school_types_by_age has invalid length for age_range: [{actual_len}]; should be [{expected_len}]"] return [True, None]
[docs]def check_workplace_size_counts_by_num_personnel(location): """ Check that the workplace size count is an array of arrays, where each sub array has length 3. Args: location (json): the json object location data Returns: [True, None] if checks pass. [False, str] if checks fail. """ return check_array_of_arrays_entry_lens(location, 3, 'workplace_size_counts_by_num_personnel')
[docs]def convert_df_to_json_array(df, cols, int_cols=None): """ Convert desired data from a pandas dataframe into a json array. Args: df (pandas dataframe) : the dataframe with data cols (list) : list of the columns to convert to the json array format int_cols (str or list) : a str or list of columns to convert to integer values Returns: array: An array version of the pandas dataframe to be added to synthpops json data objects. """ df = df[cols] # make into a list to iterate over int_cols = sc.tolist(int_cols) # some columns as ints df = df.astype({k: int for k in int_cols}) # make an array of arrays --- dtype=object to preserve each columns type arr = df.to_numpy(dtype=object).tolist() return arr