Source code for emod_api.demographics.Demographics

import json
import math
import numpy as np
import os
import pandas as pd
import tempfile
import pathlib
import sys

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

import emod_api.demographics.grid_construction as grid
from emod_api.demographics.BaseInputFile import BaseInputFile
from emod_api.demographics.Node import Node
from emod_api.demographics.PropertiesAndAttributes import IndividualAttributes, IndividualProperty, IndividualProperties, NodeAttributes
from emod_api.demographics import DemographicsTemplates as DT
from emod_api.demographics.DemographicsTemplates import CrudeRate, YearlyRate, DtkRate
from emod_api.demographics.DemographicsInputDataParsers import node_ID_from_lat_long, duplicate_nodeID_check
from typing import List
from functools import partial
from emod_api.migration import migration

# Just make once-static methods module-level functions


[docs]def from_template_node(lat=0, lon=0, pop=1000000, name="Erewhon", forced_id=1): """ Create a single-node Demographics instance from a few params. """ new_nodes = [Node(lat, lon, pop, forced_id=forced_id, name=name)] return Demographics(nodes=new_nodes)
# MOVE TO demographics/DemographicsInputDataParsers.py
[docs]def from_file(base_file): """ Create a Demographics instance from an existing demographics file. """ with open(base_file, "rb") as src: raw = json.load(src) nodes = [] # Load the nodes for node in raw["Nodes"]: nodes.append(Node.from_data(node)) # Load the idref idref = raw["Metadata"]["IdReference"] # Create the file return Demographics(nodes, idref, base_file)
[docs]def get_node_ids_from_file(demographics_file): """ Get a list of node ids from a demographics file. """ d = from_file(demographics_file) return sorted(d.node_ids)
[docs]def get_node_pops_from_params(tot_pop, num_nodes, frac_rural): """ Get a list of node populations from the params used to create a sparsely parameterized multi-node Demographics instance. """ # Generate node sizes nsizes = np.exp( -np.log(np.random.rand(num_nodes - 1))) nsizes = frac_rural * nsizes / np.sum(nsizes) nsizes = np.minimum(nsizes, 100 / tot_pop) nsizes = frac_rural * nsizes / np.sum(nsizes) nsizes = np.insert(nsizes, 0, 1 - frac_rural) npops = ((np.round(tot_pop * nsizes, 0)).astype(int)).tolist() return npops
[docs]def from_params(tot_pop=1000000, num_nodes=100, frac_rural=0.3, id_ref="from_params", random_2d_grid=False): """ Create an EMOD-compatible Demographics object with the population and numbe of nodes specified. Args: tot_pop: The total population. num_nodes: Number of nodes. Can be defined as a two-dimensional grid of nodes [longitude, latitude]. The distance to the next neighbouring node is 1. frac_rural: Determines what fraction of the population gets put in the 'rural' nodes, which means all nodes besides node 1. Node 1 is the 'urban' node. id_ref: Facility name random_2d_grid: Create a random distanced grid with num_nodes nodes. Returns: Object of type Demographics """ if frac_rural > 1.0: raise ValueError( f"frac_rural can't be greater than 1.0" ) if frac_rural < 0.0: raise ValueError( f"frac_rural can't be less than 0" ) if frac_rural == 0.0: frac_rural = 1e-09 if random_2d_grid: total_nodes = num_nodes ucellb = np.array([[1.0, 0.0], [-0.5, 0.86603]]) nlocs = np.random.rand(num_nodes, 2) nlocs[0, :] = 0.5 nlocs = np.round(np.matmul(nlocs, ucellb), 4) else: if isinstance(num_nodes, int): lon_grid = num_nodes lat_grid = 1 else: lon_grid = num_nodes[0] # east/west lat_grid = num_nodes[1] # north/south total_nodes = lon_grid * lat_grid nlocs = [[i, j] for i in range(lon_grid) for j in range(lat_grid)] nodes = [] npops = get_node_pops_from_params( tot_pop, total_nodes, frac_rural ) # Add nodes to demographics for idx, lat_lon in enumerate(nlocs): nodes.append(Node(lat=lat_lon[0], lon=lat_lon[1], pop=npops[idx], forced_id=idx + 1)) return Demographics(nodes=nodes, idref=id_ref)
def _create_grid_files( point_records_file_in, final_grid_files_dir, site ): """ Purpose: Create grid file (as csv) from records file. Author: pselvaraj """ # create paths first... output_filename = f"{site}_grid.csv" if not os.path.exists(final_grid_files_dir): os.mkdir(final_grid_files_dir) out_path = os.path.join(final_grid_files_dir, output_filename ) if not os.path.exists( out_path ): # Then manip data... #logging.info("Reading data...") print( f"{out_path} not found so we are going to create it." ) print( f"Reading {point_records_file_in}." ) point_records = pd.read_csv(point_records_file_in, encoding="iso-8859-1") point_records.rename(columns={'longitude': 'lon', 'latitude': 'lat'}, inplace=True) if not 'pop' in point_records.columns: point_records['pop'] = [5.5] * len(point_records) if 'hh_size' in point_records.columns: point_records['pop'] = point_records['hh_size'] # point_records = point_records[point_records['pop']>0] x_min, y_min, x_max, y_max = grid.get_bbox(point_records) point_records = point_records[ (point_records.lon >= x_min) & (point_records.lon <= x_max) & (point_records.lat >= y_min) & ( point_records.lat <= y_max)] gridd, grid_id_2_cell_id, origin, final = grid.construct(x_min, y_min, x_max, y_max) gridd.to_csv(os.path.join(final_grid_files_dir, f"{site}_grid.csv")) with open(os.path.join(final_grid_files_dir, f"{site}_grid_id_2_cell_id.json"), "w") as g_f: json.dump(grid_id_2_cell_id, g_f, indent=3) point_records[['gcid', 'gidx', 'gidy']] = point_records.apply( grid.point_2_grid_cell_id_lookup, args=(grid_id_2_cell_id, origin,), axis=1).apply(pd.Series) grid_pop = point_records.groupby(['gcid', 'gidx', 'gidy'])['pop'].apply(np.sum).reset_index() grid_pop['pop'] = grid_pop['pop'].apply(lambda x: round(x/5)) grid_final = pd.merge(gridd, grid_pop, on='gcid') grid_final['node_label'] = list(grid_final.index) grid_final = grid_final[grid_final['pop'] > 5] grid_final.to_csv(os.path.join(final_grid_files_dir, output_filename )) print( f"{out_path} gridded population file created or found." ) return out_path
[docs]def from_csv(input_file, res=30/3600, id_ref="from_csv"): """ Create an EMOD-compatible Demographics instance from a csv population-by-node file. """ def get_value(row, headers): for h in headers: if row.get(h) is not None: return float(row.get(h)) return None if not os.path.exists(input_file): print(f"{input_file} not found.") return print( f"{input_file} found and being read for demographics.json file creation." ) node_info = pd.read_csv(input_file, encoding='iso-8859-1') out_nodes = [] for index, row in node_info.iterrows(): pop = 0 if 'under5_pop' in row: pop = int(6*row['under5_pop']) if pop<25000: continue else: pop = int(row['pop']) latitude_headers = ["lat", "latitude", "LAT", "LATITUDE", "Latitude", "Lat"] lat = get_value(row, latitude_headers) longitude_headers = ["lon", "longitude", "LON", "LONGITUDE", "Longitude", "Lon"] lon = get_value(row, longitude_headers) birth_rate_headers = ["birth", "Birth", "birth_rate", "birthrate", "BirthRate", "Birth_Rate", "BIRTH", "birth rate", "Birth Rate"] birth_rate = get_value(row, birth_rate_headers) if birth_rate is not None and birth_rate < 0.0: raise ValueError("Birth rate defined in " + input_file + " must be greater 0.") node_id = row.get('node_id') if node_id is not None and int(node_id) == 0: raise ValueError( "Node ids can not be '0'." ) forced_id = int(node_id) if (node_id is not None) else int(node_ID_from_lat_long(lat, lon, res)) place_name = "" if 'loc' in row: place_name = str(row['loc']) meta = {} """ meta = {'dot_name': (row['ADM0_NAME']+':'+row['ADM1_NAME']+':'+row['ADM2_NAME']), 'GUID': row['GUID'], 'density': row['under5_pop_weighted_density']} """ node_attributes = NodeAttributes(name=place_name, birth_rate=birth_rate) node = Node(lat, lon, pop, node_attributes=node_attributes, forced_id=forced_id, meta=meta) out_nodes.append(node) out_nodes = duplicate_nodeID_check(out_nodes) for node in out_nodes: if node.id == 1639001798: #Not sure why this node causes issues, just dump it for speed. Probably an issue in the duplicate nodeID check remel = node out_nodes.remove(remel) return Demographics(nodes=out_nodes, idref=id_ref)
[docs]def from_pop_csv( pop_filename_in, pop_filename_out="spatial_gridded_pop_dir", site="No_Site" ): # This should be a 'raw' ungridded file that needs to be converted into a pop_grid using _create_grid_files from household lat-lons # Don't do this if the grid file already exists. grid_file_path = _create_grid_files( pop_filename_in, pop_filename_out, site ) print( f"{grid_file_path} grid file created." ) return from_csv( grid_file_path )
[docs]class Demographics(BaseInputFile): """ This class is a container of data necessary to produce a EMOD-valid demographics input file. It can be initialized from an existing valid demographics.joson type file or from an array of valid Nodes. """ def __init__(self, nodes, idref="Gridded world grump2.5arcmin", base_file=None): """ A class to create demographics. :param nodes: list of Nodes :param idref: A name/reference :param base_file: A demographics file in json format """ super(Demographics, self).__init__(idref) self._nodes = nodes self.idref = idref self.raw = None self.implicits = list() self.migration_files = list() if base_file: with open(base_file, "rb") as src: self.raw = json.load(src) else: meta = self.generate_headers() self.raw = {"Metadata": meta, "Defaults": {}} self.SetMinimalNodeAttributes() self.raw["Defaults"]["IndividualAttributes"] = {} # Uniform prevalence between .1 and .3 #self.raw["Defaults"]["IndividualAttributes"].update( DT.InitPrevUniform() ) #self.raw["Defaults"]["IndividualAttributes"].update( DT.InitSusceptConstant() ) self.raw["Defaults"]["IndividualAttributes"].update( DT.NoRisk() ) #self.raw["Defaults"]["IndividualAttributes"].update( DT.InitAgeUniform() ) DT.NoInitialPrevalence(self) # does this need to be called? #DT.InitSusceptConstant(self) # move this to Measles and maybe other subclasses #DT.NoRisk() self.raw["Defaults"]["IndividualProperties"] = [] DT.InitAgeUniform(self)
[docs] def apply_overlay(self, overlay_nodes: list): """ :param overlay_nodes: Overlay list of nodes over existing nodes in demographics :return: """ map_ids_overlay = {} # map node_id to overlay node_id for node in overlay_nodes: map_ids_overlay[node.forced_id] = node for index, node in enumerate(self.nodes): if map_ids_overlay.get(node.forced_id): self.nodes[index].update(map_ids_overlay[node.forced_id])
[docs] def to_dict(self): self.raw["Nodes"] = [] for node in self._nodes: d = node.to_dict() d.update(node.meta) self.raw["Nodes"].append(d) # Update node count self.raw["Metadata"]["NodeCount"] = len(self._nodes) return self.raw
[docs] def generate_file(self, name="demographics.json"): """ Write the contents of the instance to an EMOD-compatible (JSON) file. """ with open(name, "w") as output: json.dump(self.to_dict(), output, indent=3, sort_keys=True) return name
[docs] def send( self, write_to_this, return_from_forked_sender=False ): """ Write data to a file descriptor as specified by the caller. It must be a pipe, a filename, or a file 'handle' Args: write_to_this: File pointer, file path, or file handle. return_from_forked_sender: Defaults to False. Only applies to pipes. Set to true if caller will handle exiting of fork. Example:: 1) Send over named pipe client code # Named pipe solution 1, uses os.open, not open. import tempfile tmpfile = tempfile.NamedTemporaryFile().name os.mkfifo( tmpfile ) fifo_reader = os.open( tmpfile, os.O_RDONLY | os.O_NONBLOCK ) fifo_writer = os.open( tmpfile, os.O_WRONLY | os.O_NONBLOCK ) demog.send( fifo_writer ) os.close( fifo_writer ) data = os.read( fifo_reader, int(1e6) ) 2) Send over named pipe client code version 2 (forking) import tempfile tmpfile = tempfile.NamedTemporaryFile().name os.mkfifo( tmpfile ) process_id = os.fork() # parent stays here, child is the sender if process_id: # reader fifo_reader = open( tmpfile, "r" ) data = fifo_reader.read() fifo_reader.close() else: # writer demog.send( tmpfile ) 3) Send over file. import tempfile tmpfile = tempfile.NamedTemporaryFile().name # We create the file handle and we pass it to the other module which writes to it. with open( tmpfile, "w" ) as ipc: demog.send( ipc ) # Assuming the above worked, we read the file from disk. with open( tmpfile, "r" ) as ipc: read_data = ipc.read() os.remove( tmpfile ) Returns: N/A """ if type(write_to_this) is int: # Case 1: gonna say this is a pipe data_as_bytes = json.dumps( self.to_dict() ).encode('utf-8') # Sending demographics to pipe try: os.write(write_to_this, data_as_bytes) except Exception as ex: raise ValueError( str(ex) + "\n\nException encountered while trying to write demographics json to inferred pipe handle." ) elif type(write_to_this) is str: # Case 2: we've been passed a filepath ot use to open a named pipe #print( "Serializing demographics object to json string." ) data_as_str = json.dumps( self.to_dict() ) # Sending demographics to named pipe try: fifo_writer = open( write_to_this, "w" ) fifo_writer.write( data_as_str ) fifo_writer.close() if return_from_forked_sender: return else: sys.exit() except Exception as ex: raise ValueError( str(ex) + f"\n\nException encountered while trying to write demographics json to pipe based on name {write_to_this}." ) else: # Case 3: with( open( some_path ) ) as write_to_this try: json.dump( self.to_dict(), write_to_this ) except Exception as ex: raise ValueError( str(ex) + f"\n\nException encountered while trying to write demographics json to inferred file based on {write_to_this}." )
@property def node_ids(self): """ Return the list of (geographic) node ids. """ return [node.to_dict()["NodeID"] for node in self._nodes] @property def nodes(self): return self._nodes @nodes.setter def nodes(self, values): self._nodes = values @property def node_count(self): """ Return the number of (geographic) nodes. """ return len(self._nodes)
[docs] def get_node(self, nodeid): """ Return the node idendified by nodeid. Search either name or actual id :param nodeid: :return: """ for node in self._nodes: if node.id == nodeid or node.name == nodeid: return node raise ValueError( "No nodes available with the id: %s. Available nodes (%s)" % (nodeid, ", ".join([str(node.name) for node in self._nodes])) )
[docs] def SetMigrationPattern(self, pattern: str = "rwd"): """ Set migration pattern. Migration is enabled implicitly. It's unusual for the user to need to set this directly; normally used by emodpy. Args: pattern: Possible values are "rwd" for Random Walk Diffusion and "srt" for Single Round Trips. """ if self.implicits is not None: if pattern.lower() == "srt": self.implicits.append(DT._set_migration_pattern_srt) elif pattern.lower() == "rwd": self.implicits.append(DT._set_migration_pattern_rwd) else: raise ValueError('Unknown migration pattern: %s. Possible values are "rwd" and "srt".', pattern)
def _SetRegionalMigrationFileName(self, file_name): """ Set path to migration file. Args: file_name: Path to migration file. """ if self.implicits is not None: self.implicits.append(partial(DT._set_regional_migration_filenames, file_name=file_name)) def _SetLocalMigrationFileName(self, file_name): """ Set path to migration file. Args: file_name: Path to migration file. """ if self.implicits is not None: self.implicits.append(partial(DT._set_local_migration_filename, file_name=file_name)) def _SetDemographicFileNames(self, file_names): """ Set paths to demographic file. Args: file_names: Paths to demographic files. """ if self.implicits is not None: self.implicits.append(partial(DT._set_demographic_filenames, file_names=file_names))
[docs] def SetRoundTripMigration(self, gravity_factor, probability_of_return=1.0, id_ref='short term commuting migration'): """ Set commuter/seasonal/temporary/round-trip migration rates. You can use the x_Local_Migration configuration parameter to tune/calibrate. Args: gravity_factor: 'Big G' in gravity equation. Combines with 1, 1, and -2 as the other exponents. probability_of_return: Likelihood that an individual who 'commuter migrates' will return to the node of origin during the next migration (not timestep). Defaults to 1.0. Aka, travel, shed, return." id_ref: Text string that appears in the migration file itself; needs to match corresponding demographics file. """ if gravity_factor<0: raise ValueError( f"gravity factor can't be negeative." ) gravity_params = [gravity_factor, 1.0, 1.0, -2.0] if probability_of_return<0 or probability_of_return>1.0: raise ValueError( f"probability_of_return parameter passed by not a probability: {probability_of_return}" ) mig = migration._from_demog_and_param_gravity(self, gravity_params=gravity_params, id_ref=id_ref, migration_type=migration.Migration.LOCAL) #migration_file_path = "commuter_migration.bin" migration_file_path = tempfile.NamedTemporaryFile().name + ".bin" mig.to_file(migration_file_path) self.migration_files.append( migration_file_path ) if self.implicits is not None: self.implicits.append(partial(DT._set_local_migration_roundtrip_probability, probability_of_return=probability_of_return)) self.implicits.append(partial(DT._set_local_migration_filename, file_name=pathlib.PurePath(migration_file_path).name)) self.SetMigrationPattern( "srt" )
[docs] def SetOneWayMigration(self, rates_path, id_ref='long term migration'): """ Set one way migration. You can use the x_Regional_Migration configuration parameter to tune/calibrate. Args: rates_path: Path to csv file with node-to-node migration rates. Format is: source (node id),destination (node id),rate. id_ref: Text string that appears in the migration file itself; needs to match corresponding demographics file. """ import pathlib mig = migration.from_csv( pathlib.Path( rates_path ), id_ref=id_ref, mig_type=migration.Migration.REGIONAL ) migration_file_path = tempfile.NamedTemporaryFile().name + ".bin" mig.to_file(migration_file_path) self.migration_files.append( migration_file_path ) if self.implicits is not None: self.implicits.append(partial(DT._set_regional_migration_roundtrip_probability, probability_of_return=0.0)) self.implicits.append(partial(DT._set_regional_migration_filenames, file_name=pathlib.PurePath(migration_file_path).name)) self.SetMigrationPattern( "srt" )
[docs] def SetSimpleVitalDynamics(self, crude_birth_rate=CrudeRate(40), crude_death_rate=CrudeRate(20), node_ids=None): """ Set fertility, mortality, and initial age with single birth rate and single mortality rate. Args: crude_birth_rate: Birth rate, per year per kiloperson. crude_death_rate: Mortality rate, per year per kiloperson. node_ids: Optional list of nodes to limit these settings to. """ self.SetBirthRate(crude_birth_rate, node_ids) self.SetMortalityRate(crude_death_rate, node_ids) self.SetEquilibriumAgeDistFromBirthAndMortRates(crude_birth_rate, crude_death_rate, node_ids)
[docs] def SetEquilibriumVitalDynamics(self, crude_birth_rate=CrudeRate(40), node_ids=None): """ Set fertility, mortality, and initial age with single rate and mortality to achieve steady state population. Args: crude_birth_rate: Birth rate. And mortality rate. node_ids: Optional list of nodes to limit these settings to. """ self.SetSimpleVitalDynamics(crude_birth_rate, crude_birth_rate, node_ids)
[docs] def SetEquilibriumVitalDynamicsFromWorldBank(self, wb_births_df, country, year, node_ids=None ): """ Set steady-state fertility, mortality, and initial age with rates from world bank, for given country and year. Args: wb_births_df: Pandas dataframe with World Bank birth rate by country and year. country: Country to pick from World Bank dataset. year: Year to pick from World Bank dataset. node_ids: Optional list of nodes to limit these settings to. """ try: birth_rate = CrudeRate(wb_births_df[wb_births_df['Country Name'] == country][str(year)].tolist()[0]) #result_scale_factor = 2.74e-06 # assuming world bank units for input #birth_rate *= result_scale_factor # from births per 1000 pop per year to per person per day except Exception as ex: raise ValueError( f"Exception trying to find {year} and {country} in dataframe.\n{ex}" ) self.SetEquilibriumVitalDynamics(birth_rate, node_ids)
[docs] def SetIndividualAttributesWithFertMort(self, crude_birth_rate=CrudeRate(40), crude_mort_rate=CrudeRate(20)): self.raw['Defaults']['IndividualAttributes'] = {} DT.NoInitialPrevalence( self ) DT.EveryoneInitiallySusceptible( self ) if type(crude_birth_rate) is float or type(crude_birth_rate) is int: crude_birth_rate=CrudeRate(crude_birth_rate) if type(crude_mort_rate) is float or type(crude_mort_rate) is int: crude_mort_rate=CrudeRate(crude_mort_rate) self.SetSimpleVitalDynamics(crude_birth_rate, crude_mort_rate)
[docs] def AddIndividualPropertyAndHINT(self, Property: str, Values: List[str], InitialDistribution:List[float] = None, TransmissionMatrix:List[List[float]] = None, Transitions: List=None): """ Add Individual Properties, including an optional HINT configuration matrix. Args: Property: property (if property already exists an exception is raised). Values: property values. InitialDistribution: initial distribution. TransmissionMatrix: transmission matrix. Returns: N/A/ """ if 'IndividualProperties' not in self.raw['Defaults']: self.raw['Defaults']['IndividualProperties'] = [] if any([Property == x['Property'] for x in self.raw['Defaults']['IndividualProperties']]): raise ValueError("Property Type '{0}' already present in IndividualProperties list".format(Property)) else: # Check if Property is in whitelist. If not, auto-set Disable_IP_Whitelist iplist = ["Age_Bin", "Accessibility", "Geographic", "Place", "Risk", "QualityOfCare", "HasActiveTB", "InterventionStatus"] if Property not in iplist: # print("Need to set Disable_IP_Whitelist in config.") def update_config( config ): config.parameters["Disable_IP_Whitelist"] = 1 return config if self.implicits is not None: self.implicits.append( update_config ) transmission_matrix = None if TransmissionMatrix is not None: transmission_matrix = {"Route": "Contact", "Matrix": TransmissionMatrix} individual_property = IndividualProperty(property=Property, values=Values, initial_distribution=InitialDistribution, transitions=Transitions, transmission_matrix=transmission_matrix) self.raw['Defaults']['IndividualProperties'].append(individual_property.to_dict()) if TransmissionMatrix is not None: def update_config(config): config.parameters.Enable_Heterogeneous_Intranode_Transmission = 1 return config if self.implicits is not None: self.implicits.append(update_config)
[docs] def AddAgeDependentTransmission( self, Age_Bin_Edges_In_Years = [0, 1, 2, -1], TransmissionMatrix = [[1.0, 1.0, 1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0]]): """ Set up age-based HINT. Since ages are a first class property of an agent, Age_Bin is a special case of HINT. We don't specify a distribution, but we do specify the age bin edges, in units of years. So if Age_Bin_Edges_In_Years = [ 0, 10, 65, -1 ] it means you'll have 3 age buckets: 0-10, 10-65, & 65+. Always 'book-end' with 0 and -1. Args: Age_Bin_Edges_In_Years: array (or list) of floating point values, representing the age bucket bounderies. TransmissionMatrix: 2-D array of floating point values, representing epi connectedness of the age buckets. """ if Age_Bin_Edges_In_Years[0] != 0: raise ValueError( "First value of 'Age_Bin_Edges_In_Years' must be 0." ) if Age_Bin_Edges_In_Years[-1] != -1: raise ValueError( "Last value of 'Age_Bin_Edges_In_Years' must be -1." ) num_age_buckets = len(Age_Bin_Edges_In_Years)-1 if len(TransmissionMatrix) != num_age_buckets: raise ValueError( f"Number of rows of TransmissionMatrix ({len(TransmissionMatrix)}) must match number of age buckets ({num_age_buckets})." ) for idx in range(len(TransmissionMatrix)): num_cols = len(TransmissionMatrix[idx]) if num_cols != num_age_buckets: raise ValueError( f"Number of columns of TransmissionMatrix ({len(TransmissionMatrix[idx])}) must match number of age buckets ({num_age_buckets})." ) self.AddIndividualPropertyAndHINT("Age_Bin", Age_Bin_Edges_In_Years, None, TransmissionMatrix) def update_config(config): config.parameters.Enable_Heterogeneous_Intranode_Transmission = 1 return config if self.implicits is not None: self.implicits.append(update_config)
[docs] def SetDefaultIndividualAttributes(self): """ NOTE: This is very Measles-ish. We might want to move into MeaslesDemographics """ self.raw['Defaults']['IndividualAttributes'] = {} DT.NoInitialPrevalence(self) #Age distribution from UNWPP DT.AgeStructureUNWPP(self) #Mortality rates carried over from Nigeria DHS DT.MortalityStructureNigeriaDHS(self) DT.DefaultSusceptibilityDistribution(self)
[docs] def SetMinimalNodeAttributes(self): self.SetDefaultNodeAttributes(birth=False)
# WB is births per 1000 pop per year # DTK is births per person per day.
[docs] def SetBirthRate(self, birth_rate, node_ids=None): """ Set Default birth rate to birth_rate. Turn on Vital Dynamics and Births implicitly. """ if type(birth_rate) is float or type(birth_rate) is int: birth_rate = CrudeRate(birth_rate) dtk_birthrate = birth_rate.get_dtk_rate() if node_ids is None: self.raw['Defaults']['NodeAttributes'].update({ "BirthRate": dtk_birthrate }) else: for node_id in node_ids: self.get_node(node_id).birth_rate = dtk_birthrate self.implicits.append(DT._set_enable_births)
[docs] def SetMortalityRate(self, mortality_rate: CrudeRate, node_ids: List[int] = None): """ Set constant mortality rate to mort_rate. Turn on Enable_Natural_Mortality implicitly. """ #yearly_mortality_rate = YearlyRate(mortality_rate) if type(mortality_rate) is float or type(mortality_rate) is int: mortality_rate = CrudeRate(mortality_rate) mortality_rate = mortality_rate.get_dtk_rate() if node_ids is None: #setting = {"MortalityDistribution": DT._ConstantMortality(yearly_mortality_rate).to_dict()} setting = {"MortalityDistribution": DT._ConstantMortality(mortality_rate).to_dict()} self.SetDefaultFromTemplate(setting) else: for node_id in node_ids: #distribution = DT._ConstantMortality(yearly_mortality_rate) distribution = DT._ConstantMortality(mortality_rate) self.get_node(node_id)._set_mortality_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender)
[docs] def SetMortalityDistribution(self, distribution: IndividualAttributes.MortalityDistribution = None, node_ids: List[int] = None): """ Set a default mortality distribution for all nodes or per node. Turn on Enable_Natural_Mortality implicitly. Args: distribution: distribution node_ids: a list of node_ids Returns: None """ if node_ids is None: self.raw["Defaults"]["IndividualAttributes"]["MortalityDistribution"] = distribution.to_dict() else: for node_id in node_ids: self.get_node(node_id)._set_mortality_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender)
[docs] def SetMortalityOverTimeFromData(self, data_csv, base_year, node_ids=[]): """ Set default mortality rates for all nodes or per node. Turn on mortality configs implicitly. You can use the x_Other_Mortality configuration parameter to tune/calibrate. Args: data_csv: Path to csv file with the mortality rates by calendar year and age bucket. base_year: The calendar year the sim is treating as the base. node_ids: Optional list of node ids to apply this to. Defaults to all. Returns: None """ if base_year<0: raise ValueError( f"User passed negative value of base_year: {base_year}." ) if base_year>2050: raise ValueError( f"User passed too large value of base_year: {base_year}." ) # Load csv. Convert rate arrays into DTK-compatiable JSON structures. rates = [] # array of arrays, but leave that for a minute df = pd.read_csv( data_csv ) header = df.columns year_start = int(header[1]) # someone's going to come along with 1990.5, etc. Sigh. year_end = int(header[-1]) if year_end <= year_start: raise ValueError( f"Failed check that {year_end} is greater than {year_start} in csv dataset." ) num_years = year_end-year_start+1 rel_years = list() for year in range(year_start,year_start+num_years): mort_data = list( df[str(year)] ) rel_years.append( year-base_year ) age_key = None for trykey in df.keys(): if trykey.lower().startswith( "age" ): age_key = trykey raw_age_bins = list(df[age_key]) if age_key is None: raise ValueError( f"Failed to find 'Age_Bin' (or similar) column in the csv dataset. Cannot process." ) num_age_bins = len(raw_age_bins) age_bins = list() try: for age_bin in raw_age_bins: left_age = float(age_bin.split("-")[0]) age_bins.append( left_age ) except Exception as ex: raise ValueError( f"Ran into error processing the values in the Age-Bin column. {ex}" ) for idx in range(len(age_bins)): # 18 of these # mort_data is the array of mortality rates (by year bin) for age_bin mort_data = list( df.transpose()[idx][1:] ) rates.append( mort_data ) # 28 of these, 1 for each year, eg num_pop_groups = [ num_age_bins, num_years ] pop_groups = [ age_bins, rel_years ] distrib = IndividualAttributes.MortalityDistribution( result_values = rates, axis_names = ["age","year"], axis_scale_factors = [365,1], axis_units="N/A", num_distribution_axes=len(num_pop_groups), num_population_groups=num_pop_groups, population_groups=pop_groups, result_scale_factor=2.74e-06, result_units="annual deaths per 1000 individuals" ) if not node_ids: self.raw["Defaults"]["IndividualAttributes"]["MortalityDistributionMale"] = distrib.to_dict() self.raw["Defaults"]["IndividualAttributes"]["MortalityDistributionFemale"] = distrib.to_dict() else: if len(self.nodes) == 1 and len(node_ids)>1: raise ValueError( f"User specified several node ids for single node demographics setup." ) for node_id in node_ids: self.get_node(node_id)._set_mortality_distribution_male(distrib) self.get_node(node_id)._set_mortality_distribution_female(distrib) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender_year)
[docs] def SetAgeDistribution(self, distribution: IndividualAttributes.AgeDistribution, node_ids: List[int] = None): """ Set a default age distribution for all nodes or per node. Sets distribution type to COMPLEX implicitly. Args: distribution: age distribution node_ids: a list of node_ids Returns: None """ if node_ids is None: self.raw["Defaults"]["IndividualAttributes"]["AgeDistribution"] = distribution.to_dict() else: for node_id in node_ids: self.get_node(node_id)._set_age_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_age_complex)
[docs] def SetDefaultNodeAttributes(self, birth=True): """ Set the default NodeAttributes (Altitude, Airport, Region, Seaport), optionally including birth, which is most important actually. """ self.raw['Defaults']['NodeAttributes'] = { "Altitude": 0, "Airport": 1, # why are these still needed? "Region": 1, "Seaport": 1 } if birth: self.SetBirthRate( YearlyRate(math.log(1.03567)) )
[docs] def SetDefaultIndividualProperties(self): """ Initialize Individual Properties to empty. """ self.raw['Defaults']['IndividualProperties'] = []
[docs] def SetDefaultProperties(self): """ Set a bunch of defaults (age structure, initial susceptibility and initial prevalencec) to sensible values. """ self.SetDefaultNodeAttributes() self.SetDefaultIndividualAttributes() #Distributions for initialization of immunity, risk heterogeneity, etc. self.SetDefaultIndividualProperties() #Individual properties like accessibility, for targeting interventions
[docs] def SetDefaultPropertiesFertMort(self, crude_birth_rate = CrudeRate(40), crude_mort_rate = CrudeRate(20)): """ Set a bunch of defaults (birth rates, death rates, age structure, initial susceptibility and initial prevalencec) to sensible values. """ self.SetDefaultNodeAttributes() self.SetDefaultIndividualAttributes() #Distributions for initialization of immunity, risk heterogeneity, etc. self.SetBirthRate(crude_birth_rate) self.SetMortalityRate(crude_mort_rate)
#self.SetDefaultIndividualProperties() #Individual properties like accessibility, for targeting interventions
[docs] def SetDefaultFromTemplate(self, template, setter_fn=None): """ Add to the default IndividualAttributes using the input template (raw json) and set corresponding config values per the setter_fn. The template should always be constructed by a function in DemographicsTemplates. Eventually this function will be hidden and only accessed via separate application-specific API functions such as the ones below. """ # TBD: Add some error checking. Make sure IndividualAttributes are Individual, not individual. self.raw['Defaults']['IndividualAttributes'].update(template) if self.implicits is not None and setter_fn is not None: self.implicits.append(setter_fn)
[docs] def SetNodeDefaultFromTemplate(self, template, setter_fn): """ Add to the default NodeAttributes using the input template (raw json) and set corresponding config values per the setter_fn. The template should always be constructed by a function in DemographicsTemplates. Eventually this function will be hidden and only accessed via separate application-specific API functions such as the ones below. """ # TBD: Add some error checking. Make sure NodeAttributes are Node, not individual. self.raw['Defaults']['NodeAttributes'].update(template) if self.implicits is not None: self.implicits.append(setter_fn)
[docs] def SetEquilibriumAgeDistFromBirthAndMortRates( self, CrudeBirthRate=CrudeRate(40), CrudeMortRate=CrudeRate(20), node_ids=None ): """ Set the inital ages of the population to a sensible equilibrium profile based on the specified input birth and death rates. Note this does not set the fertility and mortality rates. """ yearly_birth_rate = YearlyRate(CrudeBirthRate) yearly_mortality_rate = YearlyRate(CrudeMortRate) dist = DT._EquilibriumAgeDistFromBirthAndMortRates(yearly_birth_rate,yearly_mortality_rate) setter_fn = DT._set_age_complex if node_ids is None: self.SetDefaultFromTemplate( dist, setter_fn ) else: new_dist = IndividualAttributes.AgeDistribution() dist = new_dist.from_dict( dist["AgeDistribution"] ) for node in node_ids: self.get_node(node)._set_age_distribution( dist ) self.implicits.append(setter_fn)
[docs] def SetInitialAgeExponential(self, rate=0.0001068, description=""): """ Set the initial age of the population to an exponential distribution with a specified rate. :param rate: rate :param description: description, why was this distribution chosen """ if not description: description = "Initial ages set to draw from exponential distribution with {rate}" setting = {"AgeDistributionFlag": 3, "AgeDistribution1": rate, "AgeDistribution2": 0, "AgeDistribution_Description": description} self.SetDefaultFromTemplate( setting, DT._set_age_simple )
[docs] def SetInitialAgeLikeSubSaharanAfrica( self, description="" ): """ Set the initial age of the population to a overly simplified structure that sort of looks like sub-Saharan Africa. This uses the SetInitialAgeExponential. :param description: description, why was this age chosen? """ if not description: description = f"Setting initial age distribution like Sub Saharan Africa, drawing from exponential distribution." self.SetInitialAgeExponential(description=description) # use default rate
[docs] def SetOverdispersion( self, new_overdispersion_value, nodes=[] ): """ Set the overdispersion value for the specified nodes (all if empty). """ def enable_overdispersion( config ): print( "DEBUG: Setting 'Enable_Infection_Rate_Overdispersion' to 1." ) config.parameters.Enable_Infection_Rate_Overdispersion = 1 return config if self.implicits is not None: self.implicits.append( enable_overdispersion ) self.raw['Defaults']['NodeAttributes']["InfectivityOverdispersion"] = new_overdispersion_value
[docs] def SetConstantSusceptibility( self ): """ Set the initial susceptibilty for each new individual to a constant value of 1.0. """ DT.InitSusceptConstant(self)
[docs] def SetInitPrevFromUniformDraw( self, min_init_prev, max_init_prev, description="" ): """ Set Initial Prevalence (one value per node) drawn from an uniform distribution. :param min_init_prev: minimal initial prevalence :param max_init_prevalence: maximal initial prevalence :param description: description, why were these parameters chosen? """ if not description: description = f"Drawing prevalence from uniform distribution, min={min_init_prev} and max={max_init_prev}" DT.InitPrevUniform( self, min_init_prev, max_init_prev, description )
[docs] def SetConstantRisk( self, risk=1, description="" ): """ Set the initial risk for each new individual to the same value, defaults to full risk :param risk: risk :param description: description, why was this parameter chosen? """ if not description: description = f"Risk is set to constant, risk={risk}" if risk == 1: DT.FullRisk( self, description ) else: # Could add a DT.ConstantRisk but I like using less code. DT.InitRiskUniform( self, risk, risk, description )
[docs] def SetHeteroRiskUniformDist( self, min_risk=0, max_risk=1 ): """ Set the initial risk for each new individual to a value drawn from a uniform distribution. """ DT.InitRiskUniform( self, min_lim=min_risk, max_lim=max_risk)
[docs] def SetHeteroRiskLognormalDist( self, mean=1.0, sigma=0 ): """ Set the initial risk for each new individual to a value drawn from a log-normal distribution. """ DT.InitRiskLogNormal( self, mean=mean, sigma=sigma)
[docs] def SetHeteroRiskExponDist( self, mean=1.0 ): """ Set the initial risk for each new individual to a value drawn from an exponential distribution. """ DT.InitRiskExponential( self, mean=mean )
def _SetInfectivityMultiplierByNode( self, node_id_to_multplier ): raise ValueError( "Not Yet Implemented." )
[docs] def SetFertilityOverTimeFromParams( self, years_region1, years_region2, start_rate, inflection_rate, end_rate, node_ids=[] ): """ Set fertility rates that vary over time based on a model with two linear regions. Note that fertility rates use GFR units: babies born per 1000 women of child-bearing age annually. You can use the x_Birth configuration parameter to tune/calibrate. Refer to the following diagram. .. figure:: images/fertility_over_time_doc.png Args: years_region1: The number of years covered by the first linear region. So if this represents 1850 to 1960, years_region1 would be 110. years_region2: The number of years covered by the second linear region. So if this represents 1960 to 2020, years_region2 would be 60. start_rate: The fertility rate at t=0. inflection_rate: The fertility rate in the year where the two linear regions meet. end_rate: The fertility rate at the end of the period covered by region1 + region2. node_ids: Optional list of node ids to apply this to. Defaults to all. Returns: rates array (Just in case user wants to do something with them like inspect or plot.) """ rates = [] if years_region1<0: raise ValueError( "years_region1 can't be negative." ) if years_region2<0: raise ValueError( "years_region2 can't be negative." ) if start_rate<0: raise ValueError( "start_rate can't be negative." ) if inflection_rate<0: raise ValueError( "inflection_rate can't be negative." ) if end_rate<0: raise ValueError( "end_rate can't be negative." ) for i in range(years_region1): rate = start_rate + (inflection_rate-start_rate)*(i/years_region1) rates.append(rate) for i in range(years_region2): rate = inflection_rate + (end_rate-inflection_rate)*(i/years_region2) rates.append(rate) # OK, now we put this into the nasty complex fertility structure dist = DT.get_fert_dist_from_rates( rates ) if not node_ids: dist_dict = dist.to_dict() if "FertilityDistribution" not in dist_dict: full_dict = { "FertilityDistribution": dist.to_dict() } else: full_dict = dist_dict self.SetDefaultFromTemplate( full_dict, DT._set_fertility_age_year ) else: if len(self.nodes) == 1 and len(node_ids)>1: raise ValueError( f"User specified several node ids for single node demographics setup." ) for node_id in node_ids: self.get_node(node_id)._set_fertility_distribution( dist ) if self.implicits is not None: self.implicits.append(DT._set_fertility_age_year ) return rates
[docs] def infer_natural_mortality( self, file_male, file_female, interval_fit=[1970, 1980], which_point='mid', predict_horizon=2050, csv_out=False, n=0, # I don't know what this means results_scale_factor=1.0/365.0): """ Calculate and set the expected natural mortality by age, sex, and year from data, predicting what it would have been without disease (usually HIV). """ from collections import OrderedDict from sklearn.linear_model import LinearRegression from functools import reduce name_conversion_dict = {'Age (x)': 'Age', 'Central death rate m(x,n)': 'Mortality_mid', 'Age interval (n)': 'Interval', 'Period': 'Years' } sex_dict = {'Male': 0, 'Female': 1} def construct_interval(x, y): return (x, x + y) def midpoint(x, y): return (x + y) / 2.0 def generate_dict_order(tuple_list, which_entry=1): my_unordered_list = tuple_list.apply(lambda x: x[which_entry]) dict_to_order = OrderedDict(zip(tuple_list, my_unordered_list)) return dict_to_order def map_year(x_tuple, flag='mid'): valid_entries_loc = ['mid', 'end', 'start'] if flag not in valid_entries_loc: raise ValueError('invalid endpoint specified') if flag == 'mid': return (x_tuple[0] + x_tuple[1]) / 2.0 elif flag == 'start': return x_tuple[0] else: return x_tuple[1] df_mort_male = pd.read_csv(file_male, usecols=name_conversion_dict) df_mort_male['Sex'] = 'Male' df_mort_female = pd.read_csv(file_female, usecols=name_conversion_dict) df_mort_female['Sex'] = 'Female' df_mort = pd.concat([df_mort_male, df_mort_female], axis=0) df_mort.rename(columns=name_conversion_dict, inplace=True) df_mort['Years'] = df_mort['Years'].apply(lambda xx: tuple( [float(zz) for zz in xx.split('-')])) # this might be a bit too format specific (ie dashes in input) # log transform the data and drop unneeded columns df_mort['log_Mortality_mid'] = df_mort['Mortality_mid'].apply(lambda x: np.log(x)) df_mort['Age'] = df_mort[['Age', 'Interval']].apply(lambda zz: construct_interval(*zz), axis=1) year_order_dict = generate_dict_order(df_mort['Years']) age_order_dict = generate_dict_order(df_mort['Age']) df_mort['sortby2'] = df_mort['Age'].map(age_order_dict) df_mort['sortby1'] = df_mort['Sex'].map(sex_dict) df_mort['sortby3'] = df_mort['Years'].map(year_order_dict) df_mort.sort_values(['sortby1', 'sortby2', 'sortby3'], inplace=True) df_mort.drop(columns=['Mortality_mid', 'Interval', 'sortby1', 'sortby2', 'sortby3'], inplace=True) # convert to years (and to string for age_list due to really annoying practical slicing reasons df_mort['Years'] = df_mort['Years'].apply(lambda x: map_year(x, which_point)) df_mort['Age'] = df_mort['Age'].apply(lambda x: str(x)) df_before_time = df_mort[df_mort['Years'].between(0, interval_fit[0])].copy() df_mort.set_index(['Sex', 'Age'], inplace=True) sex_list = list(set(df_mort.index.get_level_values('Sex'))) age_list = list(set(df_mort.index.get_level_values('Age'))) df_list = [] df_list_future = [] for sex in sex_list: for age in age_list: tmp_data = df_mort.loc[(sex, age, slice(None)), :] extrap_model = make_pipeline(StandardScaler(with_mean=False), LinearRegression()) first_extrap_df = tmp_data[tmp_data['Years'].between(interval_fit[0], interval_fit[1])] XX = tmp_data[tmp_data['Years'].between(interval_fit[0], predict_horizon)].values[:, 0] values = first_extrap_df.values extrap_model.fit(values[:, 0].reshape(-1, 1), values[:, 1]) extrap_predictions = extrap_model.predict(XX.reshape(-1, 1)) loc_df = pd.DataFrame.from_dict({'Sex': sex, 'Age': age, 'Years': XX, 'Extrap': extrap_predictions}) loc_df.set_index(['Sex', 'Age', 'Years'], inplace=True) df_list.append(loc_df.copy()) df_e1 = pd.concat(df_list, axis=0) df_list_final = [df_mort, df_e1] df_total = reduce(lambda left, right: pd.merge(left, right, on=['Sex', 'Age', 'Years']), df_list_final) df_total = df_total.reset_index(inplace=False).set_index(['Sex', 'Age'], inplace=False) df_total['Extrap'] = df_total['Extrap'].apply(np.exp) df_total['Data'] = df_total['log_Mortality_mid'].apply(np.exp) df_before_time['Data'] = df_before_time['log_Mortality_mid'].apply(np.exp) df_before_time.set_index(['Sex', 'Age'], inplace=True) df_total = pd.concat([df_total, df_before_time], axis=0, join='outer', sort=True) df_total.reset_index(inplace=True) df_total['sortby2'] = df_total['Age'].map(age_order_dict) df_total['sortby1'] = df_total['Sex'].map(sex_dict) df_total.sort_values(by=['sortby1', 'sortby2', 'Years'], inplace=True) df_total.drop(columns=['sortby1', 'sortby2'], inplace=True) estimates_list = [] estimates_list.append(df_total.copy()) #estimates_list = [df_total.copy()] alternative def min_not_nan(x_list): loc_in = list(filter(lambda x: not np.isnan(x), x_list)) return np.min(loc_in) # This was in another function before df = estimates_list[n] df['FE'] = df[['Data', 'Extrap']].apply(min_not_nan, axis=1) df['Age'] = df['Age'].apply(lambda x: int(x.split(',')[1].split(')')[0])) male_df = df[df['Sex'] == 'Male'] female_df = df[df['Sex'] == 'Female'] male_df.set_index(['Sex','Age', 'Years'], inplace=True) female_df.set_index(['Sex','Age','Years'], inplace=True) male_data = male_df['FE'] female_data = female_df['FE'] male_data = male_data.unstack(-1) male_data.sort_index(level ='Age', inplace=True) female_data = female_data.unstack(-1) female_data.sort_index(level='Age', inplace=True) years_out_male = list(male_data.columns) years_out_female =list( female_data.columns) age_out_male = list(male_data.index.get_level_values('Age')) age_out_female = list(male_data.index.get_level_values('Age')) male_output= male_data.values female_output = female_data.values if csv_out: male_data.to_csv(f'Male{csv_out}') female_data.to_csv(f'Female{csv_out}') # TBD: This is the part that should use base file functionality dict_female = {'NumPopulationGroups': list(female_data.shape), 'AxisNames': ['age', 'year'], 'AxisScaleFactors': [365.0, 1], 'AxisUnits': ['years', 'years'], 'NumDistributionAxes': 2, 'PopulationGroups': [age_out_female, years_out_female], 'ResultScaleFactor': results_scale_factor, 'ResultUnits': 'annual deaths per capita', 'ResultValues': female_output.tolist() } dict_male = {'NumPopulationGroups': list(male_data.shape), 'AxisNames': ['age', 'year'], 'AxisScaleFactors': [365.0, 1], 'AxisUnits': ['years', 'years'], 'NumDistributionAxes': 2, 'PopulationGroups': [age_out_male, years_out_male], 'ResultScaleFactor': results_scale_factor, 'ResultUnits': 'annual deaths per capita', 'ResultValues': male_output.tolist() } male_mort = { "MortalityDistributionMale" : dict_male } female_mort = { "MortalityDistributionFemale" : dict_female } self.SetDefaultFromTemplate( male_mort, DT._set_mortality_age_gender_year ) self.SetDefaultFromTemplate( female_mort )
[docs]class DemographicsOverlay: def __init__(self, nodes=None, meta_data: dict = None, individual_attributes=None, node_attributes=None, mortality_distribution=None): self.nodes = nodes self.individual_attributes = individual_attributes self.node_attributes = node_attributes self.meta_data = meta_data self.mortality_distribution = mortality_distribution
[docs] def to_dict(self): assert self.nodes out = {"Defaults": {}} if self.individual_attributes: out["Defaults"]["IndividualAttributes"] = self.individual_attributes.to_dict() if self.node_attributes: out["Defaults"]["NodeAttributes"] = self.node_attributes.to_dict() if self.meta_data: out["Metadata"] = self.meta_data # there is no metadata class nodes_list = [] for n in self.nodes: nodes_list.append({"NodeID": n}) out["Nodes"] = nodes_list return out
[docs] def to_file(self, file_name="demographics_overlay.json"): with open(file_name, "w") as demo_override_f: json.dump(self.to_dict(), demo_override_f, indent=4)