Source code for emod_api.demographics.demographics_base

import json
import math
import os
import pathlib
import sys
import tempfile
from collections import Counter
from functools import partial
from typing import List, Iterable, Any, Dict, Union

import numpy as np
import pandas as pd
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from emod_api.demographics import DemographicsTemplates as DT
from emod_api.demographics.BaseInputFile import BaseInputFile
from emod_api.demographics.DemographicsTemplates import CrudeRate, DemographicsTemplatesConstants, YearlyRate
from emod_api.demographics.Node import Node
from emod_api.demographics.PropertiesAndAttributes import IndividualProperty
from emod_api.demographics.age_distribution_old import AgeDistributionOld as AgeDistribution
from emod_api.demographics.demographic_exceptions import InvalidNodeIdException
from emod_api.demographics.mortality_distribution_old import MortalityDistributionOld as MortalityDistribution
from emod_api.migration import migration


[docs]class DemographicsBase(BaseInputFile): """ Base class for :py:obj:`emod_api:emod_api.demographics.Demographics` and :py:obj:`emod_api:emod_api.demographics.DemographicsOverlay`. """ DEFAULT_NODE_NAME = 'default_node'
[docs] class UnknownNodeException(ValueError): pass
[docs] class DuplicateNodeIdException(Exception): pass
[docs] class DuplicateNodeNameException(Exception): pass
def __init__(self, nodes: List[Node], idref: str, default_node: Node = None): super().__init__(idref=idref) # TODO: node ids should be required to be UNIQUE to prevent later failures when running EMOD. Any update to # self.nodes should trigger a check/error if needed. self.nodes = nodes self.implicits = list() self.migration_files = list() # verify that the provided non-default nodes have ids > 0 for node in self.nodes: if node.id <= 0: raise InvalidNodeIdException(f"Non-default nodes must have integer ids > 0 . Found id: {node.id}") # Build the default node if not provided metadata = self.generate_headers() if default_node is None: # use raw attribute, current malaria/other disease style # currently all non-HIV disease route self.default_node = None self.metadata = None self.raw = {"Defaults": dict(), "Metadata": metadata} self.raw["Defaults"]["NodeAttributes"] = dict() self.raw["Defaults"]["IndividualAttributes"] = dict() self.raw["Defaults"]["NodeID"] = 0 self.raw["Defaults"]["IndividualProperties"] = list() # TODO: remove the following setting of birth_rate on the default node once this EMOD binary issue is fixed # https://github.com/InstituteforDiseaseModeling/DtkTrunk/issues/4009 self.raw["Defaults"]["NodeAttributes"]["BirthRate"] = 0 else: # HIV style self.default_node = default_node self.default_node.name = self.DEFAULT_NODE_NAME if self.default_node.id != 0: raise InvalidNodeIdException(f"Default nodes must have an id of 0. It is {self.default_node.id} .") self.metadata = metadata # TODO: remove the following setting of birth_rate on the default node once this EMOD binary issue is fixed # https://github.com/InstituteforDiseaseModeling/DtkTrunk/issues/4009 self.get_node_by_id(node_id=0).birth_rate = 0 # enforce unique node ids and names self.verify_demographics_integrity() def _select_node_dicts(self, node_ids=None): if node_ids is None: node_dicts = [self.raw['Defaults']] else: node_dicts = [node_dict for node_dict in self.raw["Nodes"] if node_dict["NodeID"] in node_ids] return node_dicts # TODO: example of node-node update() call, make sure this still works after changing Updateable.update() # Or do we really need this?? (only used in tests or maybe emodpy-malaria; don't know for the latter)
[docs] def apply_overlay(self, overlay_nodes: list): """ :param overlay_nodes: Overlay list of nodes over existing nodes in demographics :return: """ map_ids_overlay = {} # map node_id to overlay node_id for node in overlay_nodes: map_ids_overlay[node.forced_id] = node for index, node in enumerate(self.nodes): if map_ids_overlay.get(node.forced_id): self.nodes[index].update(map_ids_overlay[node.forced_id])
[docs] def send(self, write_to_this, return_from_forked_sender=False): """ Write data to a file descriptor as specified by the caller. It must be a pipe, a filename, or a file 'handle' Args: write_to_this: File pointer, file path, or file handle. return_from_forked_sender: Defaults to False. Only applies to pipes. Set to true if caller will handle exiting of fork. Example:: 1) Send over named pipe client code # Named pipe solution 1, uses os.open, not open. import tempfile tmpfile = tempfile.NamedTemporaryFile().name os.mkfifo(tmpfile) fifo_reader = os.open(tmpfile, os.O_RDONLY | os.O_NONBLOCK) fifo_writer = os.open(tmpfile, os.O_WRONLY | os.O_NONBLOCK) demog.send(fifo_writer) os.close(fifo_writer) data = os.read(fifo_reader, int(1e6)) 2) Send over named pipe client code version 2 (forking) import tempfile tmpfile = tempfile.NamedTemporaryFile().name os.mkfifo(tmpfile) process_id = os.fork() # parent stays here, child is the sender if process_id: # reader fifo_reader = open(tmpfile, "r") data = fifo_reader.read() fifo_reader.close() else: # writer demog.send(tmpfile) 3) Send over file. import tempfile tmpfile = tempfile.NamedTemporaryFile().name # We create the file handle and we pass it to the other module which writes to it. with open(tmpfile, "w") as ipc: demog.send(ipc) # Assuming the above worked, we read the file from disk. with open(tmpfile, "r") as ipc: read_data = ipc.read() os.remove(tmpfile) Returns: N/A """ if type(write_to_this) is int: # Case 1: gonna say this is a pipe data_as_bytes = json.dumps(self.to_dict()).encode('utf-8') # Sending demographics to pipe try: os.write(write_to_this, data_as_bytes) except Exception as ex: raise ValueError(str(ex) + "\n\nException encountered while trying to write demographics json to " "inferred pipe handle.") elif type(write_to_this) is str: # Case 2: we've been passed a filepath ot use to open a named pipe # print("Serializing demographics object to json string.") data_as_str = json.dumps(self.to_dict()) # Sending demographics to named pipe try: fifo_writer = open(write_to_this, "w") fifo_writer.write(data_as_str) fifo_writer.close() if return_from_forked_sender: return else: sys.exit() except Exception as ex: raise ValueError(str(ex) + f"\n\nException encountered while trying to write demographics json to pipe " f"based on name {write_to_this}.") else: # Case 3: with(open(some_path)) as write_to_this try: json.dump(self.to_dict(), write_to_this) except Exception as ex: raise ValueError(str(ex) + f"\n\nException encountered while trying to write demographics json to " f"inferred file based on {write_to_this}.")
@property def node_ids(self): """ Return the list of (geographic) node ids. """ return [node.id for node in self.nodes] @property def node_count(self): """ Return the number of (geographic) nodes. """ from warnings import warn message = f"node_count is a deprecated property of Node objects, use len(demog.nodes) instead." warn(message=message, category=DeprecationWarning, stacklevel=2) return len(self.nodes) # TODO: this is deprecated because it is (was) odd, searching by id THEN name. # Remove and replace with get_node_by_name() (by_id implemented already, below) # https://github.com/InstituteforDiseaseModeling/emod-api/issues/690
[docs] def get_node(self, nodeid: int) -> Node: """ Return the node with node.id equal to nodeid. Args: nodeid: an id to use in retrieving the requested Node object. None or 0 for 'the default node'. Returns: a Node object """ from warnings import warn message = f"get_node() is a deprecated function of Node objects, use get_node_by_id() instead. " \ f"(e.g. demographics.get_node_by_id(node_id=4))" warn(message=message, category=DeprecationWarning, stacklevel=2) return self.get_node_by_id(node_id=nodeid)
[docs] def verify_demographics_integrity(self): """ One stop shopping for making sure a demographics object doesn't have known invalid settings. """ self._verify_node_id_uniqueness() self._verify_node_name_uniqueness()
@staticmethod def _duplicates_check(items: Iterable[Any]) -> List[Any]: """ Simple function that detects and returns the duplicates in an provide iterable. Args: items: a collection of items to search for duplicates Returns: a list of duplicated items from the provided list """ usage_count = Counter(items) return [item for item in usage_count.keys() if usage_count[item] > 1] def _verify_node_id_uniqueness(self): nodes = self._all_nodes node_ids = [node.id for node in nodes] duplicate_items = self._duplicates_check(items=node_ids) if len(duplicate_items) > 0: duplicate_items_str = [str(item) for item in duplicate_items] duplicates_str = ", ".join(duplicate_items_str) raise self.DuplicateNodeIdException(f"Duplicate node ids detected: {duplicates_str}") def _verify_node_name_uniqueness(self): nodes = self._all_nodes node_names = [node.name for node in nodes] duplicate_items = self._duplicates_check(items=node_names) if len(duplicate_items) > 0: duplicate_items_str = [str(item) for item in duplicate_items] duplicates_str = ", ".join(duplicate_items_str) raise self.DuplicateNodeNameException(f"Duplicate node names detected: {duplicates_str}") @property def _all_nodes(self) -> List[Node]: # only HIV is using a default node object right now, malaria currently uses self.raw # None protection if users are using self.raw default node access default_node = [] if self.default_node is None else [self.default_node] all_nodes = self.nodes + default_node return all_nodes @property def _all_node_names(self) -> List[int]: return [node.name for node in self._all_nodes] @property def _all_nodes_by_name(self) -> Dict[int, Node]: return {node.name: node for node in self._all_nodes} @property def _all_node_ids(self) -> List[int]: return [node.id for node in self._all_nodes] @property def _all_nodes_by_id(self) -> Dict[int, Node]: return {node.id: node for node in self._all_nodes}
[docs] def get_node_by_id(self, node_id: int) -> Node: """ Returns the Node object requested by its node id. Args: node_id: a node_id to use in retrieving the requested Node object. None or 0 for 'the default node'. Returns: a Node object """ return list(self.get_nodes_by_id(node_ids=[node_id]).values())[0]
[docs] def get_nodes_by_id(self, node_ids: List[int]) -> Dict[int, Node]: """ Returns the Node objects requested by their node id. Args: node_ids: a list of node ids to use in retrieving Node objects. None or 0 for 'the default node'. Returns: a dict with id: node entries """ # replace a None id (default node) request with 0 if node_ids is None: node_ids = [0] if None in node_ids: node_ids.remove(None) node_ids.append(0) missing_node_ids = [node_id for node_id in node_ids if node_id not in self._all_node_ids] if len(missing_node_ids) > 0: msg = ', '.join([str(node_id) for node_id in missing_node_ids]) raise self.UnknownNodeException(f"The following node id(s) were requested but do not exist in this demographics " f"object:\n{msg}") requested_nodes = {node_id: node for node_id, node in self._all_nodes_by_id.items() if node_id in node_ids} return requested_nodes
[docs] def get_node_by_name(self, node_name: str) -> Node: """ Returns the Node object requested by its node name. Args: node_name: a node_name to use in retrieving the requested Node object. None for 'the default node'. Returns: a Node object """ return list(self.get_nodes_by_name(node_names=[node_name]).values())[0]
[docs] def get_nodes_by_name(self, node_names: List[str]) -> Dict[str, Node]: """ Returns the Node objects requested by their node name. Args: node_names: a list of node names to use in retrieving Node objects. None for 'the default node'. Returns: a dict with name: node entries """ # replace a None name (default node) request with the default node's name if node_names is None: node_names = [self.default_node.name] if None in node_names: node_names.remove(None) node_names.append(self.default_node.name) missing_node_names = [node_name for node_name in node_names if node_name not in self._all_node_names] if len(missing_node_names) > 0: msg = ', '.join([str(node_name) for node_name in missing_node_names]) raise self.UnknownNodeException(f"The following node name(s) were requested but do not exist in this demographics " f"object:\n{msg}") requested_nodes = {node_name: node for node_name, node in self._all_nodes_by_name.items() if node_name in node_names} return requested_nodes
[docs] def SetMigrationPattern(self, pattern: str = "rwd"): """ Set migration pattern. Migration is enabled implicitly. It's unusual for the user to need to set this directly; normally used by emodpy. Args: pattern: Possible values are "rwd" for Random Walk Diffusion and "srt" for Single Round Trips. """ if self.implicits is not None: if pattern.lower() == "srt": self.implicits.append(DT._set_migration_pattern_srt) elif pattern.lower() == "rwd": self.implicits.append(DT._set_migration_pattern_rwd) else: raise ValueError('Unknown migration pattern: %s. Possible values are "rwd" and "srt".', pattern)
def _SetRegionalMigrationFileName(self, file_name): """ Set path to migration file. Args: file_name: Path to migration file. """ if self.implicits is not None: self.implicits.append(partial(DT._set_regional_migration_filenames, file_name=file_name)) def _SetLocalMigrationFileName(self, file_name): """ Set path to migration file. Args: file_name: Path to migration file. """ if self.implicits is not None: self.implicits.append(partial(DT._set_local_migration_filename, file_name=file_name)) def _SetDemographicFileNames(self, file_names): """ Set paths to demographic file. Args: file_names: Paths to demographic files. """ if self.implicits is not None: self.implicits.append(partial(DT._set_demographic_filenames, file_names=file_names))
[docs] def SetRoundTripMigration(self, gravity_factor, probability_of_return=1.0, id_ref='short term commuting migration'): """ Set commuter/seasonal/temporary/round-trip migration rates. You can use the x_Local_Migration configuration parameter to tune/calibrate. Args: gravity_factor: 'Big G' in gravity equation. Combines with 1, 1, and -2 as the other exponents. probability_of_return: Likelihood that an individual who 'commuter migrates' will return to the node of origin during the next migration (not timestep). Defaults to 1.0. Aka, travel, shed, return." id_ref: Text string that appears in the migration file itself; needs to match corresponding demographics file. """ if gravity_factor < 0: raise ValueError(f"gravity factor can't be negative.") gravity_params = [gravity_factor, 1.0, 1.0, -2.0] if probability_of_return < 0 or probability_of_return > 1.0: raise ValueError(f"probability_of_return parameter passed by not a probability: {probability_of_return}") mig = migration._from_demog_and_param_gravity(self, gravity_params=gravity_params, id_ref=id_ref, migration_type=migration.Migration.LOCAL) migration_file_path = tempfile.NamedTemporaryFile().name + ".bin" mig.to_file(migration_file_path) self.migration_files.append(migration_file_path) if self.implicits is not None: self.implicits.append(partial(DT._set_local_migration_roundtrip_probability, probability_of_return=probability_of_return)) self.implicits.append(partial(DT._set_local_migration_filename, file_name=pathlib.PurePath(migration_file_path).name)) self.SetMigrationPattern("srt")
[docs] def SetOneWayMigration(self, rates_path, id_ref='long term migration'): """ Set one way migration. You can use the x_Regional_Migration configuration parameter to tune/calibrate. Args: rates_path: Path to csv file with node-to-node migration rates. Format is: source (node id),destination (node id),rate. id_ref: Text string that appears in the migration file itself; needs to match corresponding demographics file. """ mig = migration.from_csv(pathlib.Path(rates_path), id_ref=id_ref, mig_type=migration.Migration.REGIONAL) migration_file_path = tempfile.NamedTemporaryFile().name + ".bin" mig.to_file(migration_file_path) self.migration_files.append(migration_file_path) if self.implicits is not None: self.implicits.append(partial(DT._set_regional_migration_roundtrip_probability, probability_of_return=0.0)) self.implicits.append(partial(DT._set_regional_migration_filenames, file_name=pathlib.PurePath(migration_file_path).name)) self.SetMigrationPattern("srt")
[docs] def SetSimpleVitalDynamics(self, crude_birth_rate=CrudeRate(40), crude_death_rate=CrudeRate(20), node_ids=None): """ Set fertility, mortality, and initial age with single birth rate and single mortality rate. Args: crude_birth_rate: Birth rate, per year per kiloperson. crude_death_rate: Mortality rate, per year per kiloperson. node_ids: Optional list of nodes to limit these settings to. """ self.SetBirthRate(crude_birth_rate, node_ids) self.SetMortalityRate(crude_death_rate, node_ids) self.SetEquilibriumAgeDistFromBirthAndMortRates(crude_birth_rate, crude_death_rate, node_ids)
# TODO: is this useful in a way that warrants a special-case function in emodpy? # https://github.com/InstituteforDiseaseModeling/emod-api-old/issues/790
[docs] def SetEquilibriumVitalDynamics(self, crude_birth_rate=CrudeRate(40), node_ids=None): """ Set fertility, mortality, and initial age with single rate and mortality to achieve steady state population. Args: crude_birth_rate: Birth rate. And mortality rate. node_ids: Optional list of nodes to limit these settings to. """ self.SetSimpleVitalDynamics(crude_birth_rate, crude_birth_rate, node_ids)
# TODO: is this useful in a way that warrants a special-case function in emodpy? # https://github.com/InstituteforDiseaseModeling/emod-api-old/issues/791
[docs] def SetEquilibriumVitalDynamicsFromWorldBank(self, wb_births_df, country, year, node_ids=None): """ Set steady-state fertility, mortality, and initial age with rates from world bank, for given country and year. Args: wb_births_df: Pandas dataframe with World Bank birth rate by country and year. country: Country to pick from World Bank dataset. year: Year to pick from World Bank dataset. node_ids: Optional list of nodes to limit these settings to. """ try: birth_rate = CrudeRate(wb_births_df[wb_births_df['Country Name'] == country][str(year)].tolist()[0]) # result_scale_factor = 2.74e-06 # assuming world bank units for input # birth_rate *= result_scale_factor # from births per 1000 pop per year to per person per day except Exception as ex: raise ValueError(f"Exception trying to find {year} and {country} in dataframe.\n{ex}") self.SetEquilibriumVitalDynamics(birth_rate, node_ids)
[docs] def SetDefaultIndividualAttributes(self): """ NOTE: This is very Measles-ish. We might want to move into MeaslesDemographics """ import warnings warnings.warn('SetDefaultIndividualAttributes() is deprecated. Default nodes should now be represented by Node ' 'objects and passed to the Demographics object during the constructor call. They can be modified ' 'afterward, if needed.', DeprecationWarning, stacklevel=2) self.raw['Defaults']['IndividualAttributes'] = {} DT.NoInitialPrevalence(self) # Age distribution from UNWPP DT.AgeStructureUNWPP(self) # Mortality rates carried over from Nigeria DHS DT.MortalityStructureNigeriaDHS(self) DT.DefaultSusceptibilityDistribution(self)
[docs] def SetMinimalNodeAttributes(self): import warnings warnings.warn('SetMinimalNodeAttributes() is deprecated. Default nodes should now be represented by Node ' 'objects and passed to the Demographics object during the constructor call. They can be modified ' 'afterward, if needed.', DeprecationWarning, stacklevel=2) self.SetDefaultNodeAttributes(birth=False)
# WB is births per 1000 pop per year # DTK is births per person per day.
[docs] def SetBirthRate(self, birth_rate, node_ids=None): """ Set Default birth rate to birth_rate. Turn on Vital Dynamics and Births implicitly. """ import warnings warnings.warn('SetBirthRate() is deprecated. Default nodes should now be represented by Node ' 'objects and passed to the Demographics object during the constructor call. They can be modified ' 'afterward, if needed.', DeprecationWarning, stacklevel=2) if type(birth_rate) is float or type(birth_rate) is int: birth_rate = CrudeRate(birth_rate) dtk_birthrate = birth_rate.get_dtk_rate() if node_ids is None: self.raw['Defaults']['NodeAttributes'].update({ "BirthRate": dtk_birthrate }) else: for node_id in node_ids: self.get_node_by_id(node_id=node_id).birth_rate = dtk_birthrate self.implicits.append(DT._set_population_dependent_birth_rate)
[docs] def SetMortalityRate(self, mortality_rate: CrudeRate, node_ids: List[int] = None): """ Set constant mortality rate to mort_rate. Turn on Enable_Natural_Mortality implicitly. """ import warnings warnings.warn('SetMortalityRate() is deprecated. Please use the emodpy Demographics method: ' 'set_mortality_distribution()', DeprecationWarning, stacklevel=2) # yearly_mortality_rate = YearlyRate(mortality_rate) if type(mortality_rate) is float or type(mortality_rate) is int: mortality_rate = CrudeRate(mortality_rate) mortality_rate = mortality_rate.get_dtk_rate() if node_ids is None: # setting = {"MortalityDistribution": DT._ConstantMortality(yearly_mortality_rate).to_dict()} setting = {"MortalityDistribution": DT._ConstantMortality(mortality_rate).to_dict()} self.SetDefaultFromTemplate(setting) else: for node_id in node_ids: # distribution = DT._ConstantMortality(yearly_mortality_rate) distribution = DT._ConstantMortality(mortality_rate) self.get_node_by_id(node_id=node_id)._set_mortality_complex_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender)
[docs] def SetMortalityDistribution(self, distribution: MortalityDistribution = None, node_ids: List[int] = None): """ Set a default mortality distribution for all nodes or per node. Turn on Enable_Natural_Mortality implicitly. Args: distribution: distribution node_ids: a list of node_ids Returns: None """ import warnings warnings.warn('SetMortalityDistribution() is deprecated. Please use the emodpy Demographics method: ' 'set_mortality_distribution()', DeprecationWarning, stacklevel=2) if node_ids is None: self.raw["Defaults"]["IndividualAttributes"]["MortalityDistribution"] = distribution.to_dict() else: for node_id in node_ids: self.get_node_by_id(node_id=node_id)._set_mortality_complex_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender)
[docs] def SetMortalityDistributionFemale(self, distribution: MortalityDistribution = None, node_ids: List[int] = None): """ Set a default female mortality distribution for all nodes or per node. Turn on Enable_Natural_Mortality implicitly. Args: distribution: distribution node_ids: a list of node_ids Returns: None """ import warnings warnings.warn('SetMortalityDistributionFemale() is deprecated. Please use the emodpy Demographics method: ' 'set_mortality_distribution()', DeprecationWarning, stacklevel=2) if node_ids is None: self.raw["Defaults"]["IndividualAttributes"]["MortalityDistributionFemale"] = distribution.to_dict() else: for node_id in node_ids: self.get_node_by_id(node_id=node_id)._set_mortality_female_complex_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender)
[docs] def SetMortalityDistributionMale(self, distribution: MortalityDistribution = None, node_ids: List[int] = None): """ Set a default male mortality distribution for all nodes or per node. Turn on Enable_Natural_Mortality implicitly. Args: distribution: distribution node_ids: a list of node_ids Returns: None """ import warnings warnings.warn('SetMortalityDistributionMale() is deprecated. Please use the emodpy Demographics method: ' 'set_mortality_distribution()', DeprecationWarning, stacklevel=2) if node_ids is None: self.raw["Defaults"]["IndividualAttributes"]["MortalityDistributionMale"] = distribution.to_dict() else: for node_id in node_ids: self.get_node_by_id(node_id=node_id)._set_mortality_male_complex_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender)
[docs] def SetMortalityOverTimeFromData(self, data_csv, base_year, node_ids: List = None): """ Set default mortality rates for all nodes or per node. Turn on mortality configs implicitly. You can use the x_Other_Mortality configuration parameter to tune/calibrate. Args: data_csv: Path to csv file with the mortality rates by calendar year and age bucket. base_year: The calendar year the sim is treating as the base. node_ids: Optional list of node ids to apply this to. Defaults to all. Returns: None """ import warnings warnings.warn('SetMortalityOverTimeFromData() is deprecated. Please use the emodpy Demographics method: ' 'set_mortality_distribution()', DeprecationWarning, stacklevel=2) if node_ids is None: node_ids = [] if base_year < 0: raise ValueError(f"User passed negative value of base_year: {base_year}.") if base_year > 2050: raise ValueError(f"User passed too large value of base_year: {base_year}.") # Load csv. Convert rate arrays into DTK-compatiable JSON structures. rates = [] # array of arrays, but leave that for a minute df = pd.read_csv(data_csv) header = df.columns year_start = int(header[1]) # someone's going to come along with 1990.5, etc. Sigh. year_end = int(header[-1]) if year_end <= year_start: raise ValueError(f"Failed check that {year_end} is greater than {year_start} in csv dataset.") num_years = year_end-year_start+1 rel_years = list() for year in range(year_start, year_start+num_years): mort_data = list(df[str(year)]) rel_years.append(year-base_year) age_key = None for trykey in df.keys(): if trykey.lower().startswith("age"): age_key = trykey raw_age_bins = list(df[age_key]) if age_key is None: raise ValueError(f"Failed to find 'Age_Bin' (or similar) column in the csv dataset. Cannot process.") num_age_bins = len(raw_age_bins) age_bins = list() try: for age_bin in raw_age_bins: left_age = float(age_bin.split("-")[0]) age_bins.append(left_age) except Exception as ex: raise ValueError(f"Ran into error processing the values in the Age-Bin column. {ex}") for idx in range(len(age_bins)): # 18 of these # mort_data is the array of mortality rates (by year bin) for age_bin mort_data = list(df.transpose()[idx][1:]) rates.append(mort_data) # 28 of these, 1 for each year, eg num_pop_groups = [num_age_bins, num_years] pop_groups = [age_bins, rel_years] distrib = MortalityDistribution( result_values=rates, axis_names=["age", "year"], axis_scale_factors=[365, 1], axis_units="N/A", num_distribution_axes=len(num_pop_groups), num_population_groups=num_pop_groups, population_groups=pop_groups, result_scale_factor=2.74e-06, result_units="annual deaths per 1000 individuals" ) if not node_ids: self.raw["Defaults"]["IndividualAttributes"]["MortalityDistributionMale"] = distrib.to_dict() self.raw["Defaults"]["IndividualAttributes"]["MortalityDistributionFemale"] = distrib.to_dict() else: if len(self.nodes) == 1 and len(node_ids) > 1: raise ValueError(f"User specified several node ids for single node demographics setup.") for node_id in node_ids: self.get_node_by_id(node_id=node_id)._set_mortality_male_complex_distribution(distrib) self.get_node_by_id(node_id=node_id)._set_mortality_female_complex_distribution(distrib) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender_year)
[docs] def SetAgeDistribution(self, distribution: AgeDistribution, node_ids: List[int] = None): """ Set a default age distribution for all nodes or per node. Sets distribution type to COMPLEX implicitly. Args: distribution: age distribution node_ids: a list of node_ids Returns: None """ import warnings warnings.warn("SetAgeDistibution is deprecated. Please use emodpy Demographics.set_age_distribution instead.", DeprecationWarning, stacklevel=2) if node_ids is None: self.raw["Defaults"]["IndividualAttributes"]["AgeDistribution"] = distribution.to_dict() else: for node_id in node_ids: self.get_node_by_id(node_id=node_id)._set_age_complex_distribution(distribution) if self.implicits is not None: self.implicits.append(DT._set_age_complex)
[docs] def SetDefaultNodeAttributes(self, birth=True): """ Set the default NodeAttributes (Altitude, Airport, Region, Seaport), optionally including birth, which is most important actually. """ import warnings warnings.warn('SetDefaultNodeAttributes() is deprecated. Default nodes should now be represented by Node ' 'objects and passed to the Demographics object during the constructor call. They can be modified ' 'afterward, if needed.', DeprecationWarning, stacklevel=2) self.raw['Defaults']['NodeAttributes'] = { "Altitude": 0, "Airport": 1, # why are these still needed? "Region": 1, "Seaport": 1 } if birth: self.SetBirthRate(YearlyRate(math.log(1.03567)))
[docs] def SetDefaultProperties(self): """ Set a bunch of defaults (age structure, initial susceptibility and initial prevalencec) to sensible values. """ import warnings warnings.warn('SetDefaultProperties() is deprecated. Default nodes should now be represented by Node objects ' 'and passed to the Demographics object during the constructor call. They can be modified ' 'afterward, if needed.', DeprecationWarning, stacklevel=2) self.SetDefaultNodeAttributes() self.SetDefaultIndividualAttributes() # Distributions for initialization of immunity, risk heterogeneity, etc. self.raw['Defaults']['IndividualProperties'] = []
[docs] def SetDefaultFromTemplate(self, template, setter_fn=None): """ Add to the default IndividualAttributes using the input template (raw json) and set corresponding config values per the setter_fn. The template should always be constructed by a function in DemographicsTemplates. Eventually this function will be hidden and only accessed via separate application-specific API functions such as the ones below. """ import warnings warnings.warn('SetDefaultFromTemplate() is deprecated. Please use the emodpy Demographics methods: ' 'set_XYZ_distribution() as needed and other object-based setting functions', DeprecationWarning, stacklevel=2) self.raw['Defaults']['IndividualAttributes'].update(template) if self.implicits is not None and setter_fn is not None: self.implicits.append(setter_fn)
# TODO: is this useful in a way that warrants a special-case function in emodpy built around set_age_distribution? # https://github.com/InstituteforDiseaseModeling/emod-api-old/issues/788
[docs] def SetEquilibriumAgeDistFromBirthAndMortRates(self, CrudeBirthRate=CrudeRate(40), CrudeMortRate=CrudeRate(20), node_ids=None): """ Set the inital ages of the population to a sensible equilibrium profile based on the specified input birth and death rates. Note this does not set the fertility and mortality rates. """ import warnings warnings.warn('SetEquilibriumAgeDistFromBirthAndMortRates() is deprecated. Please use the emodpy Demographics method: ' 'set_age_distribution()', DeprecationWarning, stacklevel=2) yearly_birth_rate = YearlyRate(CrudeBirthRate) yearly_mortality_rate = YearlyRate(CrudeMortRate) dist = DT._EquilibriumAgeDistFromBirthAndMortRates(yearly_birth_rate, yearly_mortality_rate) setter_fn = DT._set_age_complex if node_ids is None: self.SetDefaultFromTemplate(dist, setter_fn) else: new_dist = AgeDistribution() dist = new_dist.from_dict(dist["AgeDistribution"]) for node in node_ids: self.get_node_by_id(node_id=node)._set_age_complex_distribution(dist) self.implicits.append(setter_fn)
[docs] def SetInitialAgeExponential(self, rate=0.0001068, description=""): """ Set the initial age of the population to an exponential distribution with a specified rate. :param rate: rate :param description: description, why was this distribution chosen """ import warnings warnings.warn('SetInitialAgeExponential() is deprecated. Please use the emodpy Demographics method: ' 'set_age_distribution()', DeprecationWarning, stacklevel=2) if not description: description = "Initial ages set to draw from exponential distribution with {rate}" setting = {"AgeDistributionFlag": 3, "AgeDistribution1": rate, "AgeDistribution2": 0, "AgeDistribution_Description": description} self.SetDefaultFromTemplate(setting, DT._set_age_simple)
[docs] def SetInitialAgeLikeSubSaharanAfrica(self, description=""): """ Set the initial age of the population to a overly simplified structure that sort of looks like sub-Saharan Africa. This uses the SetInitialAgeExponential. :param description: description, why was this age chosen? """ import warnings warnings.warn('SetInitialAgeLikeSubSaharanAfrica() is deprecated. Please use the emodpy Demographics method: ' 'set_age_distribution()', DeprecationWarning, stacklevel=2) if not description: description = f"Setting initial age distribution like Sub Saharan Africa, drawing from exponential " \ f"distribution." self.SetInitialAgeExponential(description=description) # use default rate
[docs] def SetOverdispersion(self, new_overdispersion_value, nodes: List = None): """ Set the overdispersion value for the specified nodes (all if empty). """ if nodes is None: nodes = [] def enable_overdispersion(config): print("DEBUG: Setting 'Enable_Infection_Rate_Overdispersion' to 1.") config.parameters.Enable_Infection_Rate_Overdispersion = 1 return config if self.implicits is not None: self.implicits.append(enable_overdispersion) self.raw['Defaults']['NodeAttributes']["InfectivityOverdispersion"] = new_overdispersion_value
[docs] def SetInitPrevFromUniformDraw(self, min_init_prev, max_init_prev, description=""): """ Set Initial Prevalence (one value per node) drawn from an uniform distribution. :param min_init_prev: minimal initial prevalence :param max_init_prev: maximal initial prevalence :param description: description, why were these parameters chosen? """ if not description: description = f"Drawing prevalence from uniform distribution, min={min_init_prev} and max={max_init_prev}" import warnings warnings.warn('SetInitPrevFromUniformDraw() is deprecated. Please use the emodpy Demographics method: ' 'set_prevalence_distribution()', DeprecationWarning, stacklevel=2) DT.InitPrevUniform(self, min_init_prev, max_init_prev, description)
[docs] def AddMortalityByAgeSexAndYear(self, age_bin_boundaries_in_years: List[float], year_bin_boundaries: List[float], male_mortality_rates: List[List[float]], female_mortality_rates: List[List[float]]): import warnings warnings.warn('AddMortalityByAgeSexAndYear() is deprecated. Please use the emodpy Demographics method: ' 'set_mortality_distribution()', DeprecationWarning, stacklevel=2) assert len(age_bin_boundaries_in_years) == len(male_mortality_rates), "One array with distributions per age " \ "bin is required. \n number of age bins "\ "= {len(age_bin_boundaries_in_years)} " \ "number of male mortality rates = {len(" \ "male_mortality_rates)} " assert len(age_bin_boundaries_in_years) == len(female_mortality_rates), "One array with distributions per age "\ "bin is required. \n number of age " \ "bins = {len(" \ "age_bin_boundaries_in_years)} number "\ "of female mortality rates = {len(" \ "male_mortality_rates)} " for yearly_mort_rate in male_mortality_rates: assert len(year_bin_boundaries) == len(yearly_mort_rate), "The number of year bins must be equal the " \ "number of male mortality rates per year.\n" \ "number of year bins = {len(" \ "year_bin_boundaries)} number of male mortality "\ "rates = {len(yearly_mort_rate)} " for yearly_mort_rate in female_mortality_rates: assert len(year_bin_boundaries) == len(yearly_mort_rate), "The number of year bins must be equal the " \ "number of female mortality rates per year.\n " \ "number of year bins = {len(" \ "year_bin_boundaries)} number of male " \ "mortality rates = {len(yearly_mort_rate)} " axis_names = ["age", "year"] axis_scale_factors = [365, 1] num_population_groups = [len(age_bin_boundaries_in_years), len(year_bin_boundaries)] population_groups = [age_bin_boundaries_in_years, year_bin_boundaries] mort_distr_male = MortalityDistribution(axis_names=axis_names, axis_scale_factors=axis_scale_factors, num_population_groups=num_population_groups, population_groups=population_groups, # result_scale_factor=result_values * scale_factor result_scale_factor=1.0, result_values=male_mortality_rates) self.SetMortalityDistributionMale(mort_distr_male) mort_distr_female = MortalityDistribution(axis_names=axis_names, axis_scale_factors=axis_scale_factors, num_population_groups=num_population_groups, population_groups=population_groups, # result_scale_factor=result_values *scale_factor result_scale_factor=1.0, result_values=female_mortality_rates) self.SetMortalityDistributionFemale(mort_distr_female) if self.implicits is not None: self.implicits.append(DT._set_mortality_age_gender_year)
[docs] def SetFertilityOverTimeFromParams(self, years_region1, years_region2, start_rate, inflection_rate, end_rate, node_ids: List = None): """ Set fertility rates that vary over time based on a model with two linear regions. Note that fertility rates use GFR units: babies born per 1000 women of child-bearing age annually. You can use the x_Birth configuration parameter to tune/calibrate. Refer to the following diagram. .. figure:: images/fertility_over_time_doc.png Args: years_region1: The number of years covered by the first linear region. So if this represents 1850 to 1960, years_region1 would be 110. years_region2: The number of years covered by the second linear region. So if this represents 1960 to 2020, years_region2 would be 60. start_rate: The fertility rate at t=0. inflection_rate: The fertility rate in the year where the two linear regions meet. end_rate: The fertility rate at the end of the period covered by region1 + region2. node_ids: Optional list of node ids to apply this to. Defaults to all. Returns: rates array (Just in case user wants to do something with them like inspect or plot.) """ import warnings warnings.warn('SetFertilityOverTimeFromParams() is deprecated. Please use the emodpy-hiv Demographics method: ' 'set_fertility_distribution()', DeprecationWarning, stacklevel=2) if node_ids is None: node_ids = [] rates = [] if years_region1 < 0: raise ValueError("years_region1 can't be negative.") if years_region2 < 0: raise ValueError("years_region2 can't be negative.") if start_rate < 0: raise ValueError("start_rate can't be negative.") if inflection_rate < 0: raise ValueError("inflection_rate can't be negative.") if end_rate < 0: raise ValueError("end_rate can't be negative.") for i in range(years_region1): rate = start_rate + (inflection_rate-start_rate)*(i/years_region1) rates.append(rate) for i in range(years_region2): rate = inflection_rate + (end_rate-inflection_rate)*(i/years_region2) rates.append(rate) # OK, now we put this into the nasty complex fertility structure dist = DT.get_fert_dist_from_rates(rates) if not node_ids: dist_dict = dist.to_dict() if "FertilityDistribution" not in dist_dict: full_dict = {"FertilityDistribution": dist.to_dict()} else: full_dict = dist_dict self.SetDefaultFromTemplate(full_dict, DT._set_fertility_age_year) else: if len(self.nodes) == 1 and len(node_ids) > 1: raise ValueError(f"User specified several node ids for single node demographics setup.") for node_id in node_ids: self.get_node_by_id(node_id=node_id)._set_fertility_complex_distribution(dist) if self.implicits is not None: self.implicits.append(DT._set_fertility_age_year) return rates
[docs] def infer_natural_mortality(self, file_male, file_female, interval_fit: List[Union[int, float]] = None, which_point='mid', predict_horizon=2050, csv_out=False, n=0, # I don't know what this means results_scale_factor=1.0/365.0) -> [Dict, Dict]: """ Calculate and set the expected natural mortality by age, sex, and year from data, predicting what it would have been without disease (HIV-only). """ from collections import OrderedDict from sklearn.linear_model import LinearRegression from functools import reduce import warnings warnings.warn('infer_natural_mortality() is deprecated. Please use modern country model loading.', DeprecationWarning, stacklevel=2) if interval_fit is None: interval_fit = [1970, 1980] name_conversion_dict = {'Age (x)': 'Age', 'Central death rate m(x,n)': 'Mortality_mid', 'Age interval (n)': 'Interval', 'Period': 'Years' } sex_dict = {'Male': 0, 'Female': 1} def construct_interval(x, y): return x, x + y def midpoint(x, y): return (x + y) / 2.0 def generate_dict_order(tuple_list, which_entry=1): my_unordered_list = tuple_list.apply(lambda x: x[which_entry]) dict_to_order = OrderedDict(zip(tuple_list, my_unordered_list)) return dict_to_order def map_year(x_tuple, flag='mid'): valid_entries_loc = ['mid', 'end', 'start'] if flag not in valid_entries_loc: raise ValueError('invalid endpoint specified') if flag == 'mid': return (x_tuple[0] + x_tuple[1]) / 2.0 elif flag == 'start': return x_tuple[0] else: return x_tuple[1] df_mort_male = pd.read_csv(file_male, usecols=name_conversion_dict) df_mort_male['Sex'] = 'Male' df_mort_female = pd.read_csv(file_female, usecols=name_conversion_dict) df_mort_female['Sex'] = 'Female' df_mort = pd.concat([df_mort_male, df_mort_female], axis=0) df_mort.rename(columns=name_conversion_dict, inplace=True) df_mort['Years'] = df_mort['Years'].apply(lambda x: tuple( [float(zz) for zz in x.split('-')])) # this might be a bit too format specific (ie dashes in input) # log transform the data and drop unneeded columns df_mort['log_Mortality_mid'] = df_mort['Mortality_mid'].apply(lambda x: np.log(x)) df_mort['Age'] = df_mort[['Age', 'Interval']].apply(lambda zz: construct_interval(*zz), axis=1) year_order_dict = generate_dict_order(df_mort['Years']) age_order_dict = generate_dict_order(df_mort['Age']) df_mort['sortby2'] = df_mort['Age'].map(age_order_dict) df_mort['sortby1'] = df_mort['Sex'].map(sex_dict) df_mort['sortby3'] = df_mort['Years'].map(year_order_dict) df_mort.sort_values(['sortby1', 'sortby2', 'sortby3'], inplace=True) df_mort.drop(columns=['Mortality_mid', 'Interval', 'sortby1', 'sortby2', 'sortby3'], inplace=True) # convert to years (and to string for age_list due to really annoying practical slicing reasons df_mort['Years'] = df_mort['Years'].apply(lambda x: map_year(x, which_point)) df_mort['Age'] = df_mort['Age'].apply(lambda x: str(x)) df_before_time = df_mort[df_mort['Years'].between(0, interval_fit[0])].copy() df_mort.set_index(['Sex', 'Age'], inplace=True) sex_list = list(set(df_mort.index.get_level_values('Sex'))) age_list = list(set(df_mort.index.get_level_values('Age'))) df_list = [] df_list_future = [] for sex in sex_list: for age in age_list: tmp_data = df_mort.loc[(sex, age, slice(None)), :] extrap_model = make_pipeline(StandardScaler(with_mean=False), LinearRegression()) first_extrap_df = tmp_data[tmp_data['Years'].between(interval_fit[0], interval_fit[1])] xx = tmp_data[tmp_data['Years'].between(interval_fit[0], predict_horizon)].values[:, 0] values = first_extrap_df.values extrap_model.fit(values[:, 0].reshape(-1, 1), values[:, 1]) extrap_predictions = extrap_model.predict(xx.reshape(-1, 1)) loc_df = pd.DataFrame.from_dict({'Sex': sex, 'Age': age, 'Years': xx, 'Extrap': extrap_predictions}) loc_df.set_index(['Sex', 'Age', 'Years'], inplace=True) df_list.append(loc_df.copy()) df_e1 = pd.concat(df_list, axis=0) df_list_final = [df_mort, df_e1] df_total = reduce(lambda left, right: pd.merge(left, right, on=['Sex', 'Age', 'Years']), df_list_final) df_total = df_total.reset_index(inplace=False).set_index(['Sex', 'Age'], inplace=False) df_total['Extrap'] = df_total['Extrap'].apply(np.exp) df_total['Data'] = df_total['log_Mortality_mid'].apply(np.exp) df_before_time['Data'] = df_before_time['log_Mortality_mid'].apply(np.exp) df_before_time.set_index(['Sex', 'Age'], inplace=True) df_total = pd.concat([df_total, df_before_time], axis=0, join='outer', sort=True) df_total.reset_index(inplace=True) df_total['sortby2'] = df_total['Age'].map(age_order_dict) df_total['sortby1'] = df_total['Sex'].map(sex_dict) df_total.sort_values(by=['sortby1', 'sortby2', 'Years'], inplace=True) df_total.drop(columns=['sortby1', 'sortby2'], inplace=True) estimates_list = [] estimates_list.append(df_total.copy()) # estimates_list = [df_total.copy()] alternative def min_not_nan(x_list): loc_in = list(filter(lambda x: not np.isnan(x), x_list)) return np.min(loc_in) # This was in another function before df = estimates_list[n] df['FE'] = df[['Data', 'Extrap']].apply(min_not_nan, axis=1) df['Age'] = df['Age'].apply(lambda x: int(x.split(',')[1].split(')')[0])) male_df = df[df['Sex'] == 'Male'] female_df = df[df['Sex'] == 'Female'] male_df.set_index(['Sex', 'Age', 'Years'], inplace=True) female_df.set_index(['Sex', 'Age', 'Years'], inplace=True) male_data = male_df['FE'] female_data = female_df['FE'] male_data = male_data.unstack(-1) male_data.sort_index(level='Age', inplace=True) female_data = female_data.unstack(-1) female_data.sort_index(level='Age', inplace=True) years_out_male = list(male_data.columns) years_out_female = list(female_data.columns) age_out_male = list(male_data.index.get_level_values('Age')) age_out_female = list(male_data.index.get_level_values('Age')) male_output = male_data.values female_output = female_data.values if csv_out: male_data.to_csv(f'Male{csv_out}') female_data.to_csv(f'Female{csv_out}') # TBD: This is the part that should use base file functionality dict_female = {'AxisNames': ['age', 'year'], 'AxisScaleFactors': [365.0, 1], 'AxisUnits': ['years', 'years'], 'PopulationGroups': [age_out_female, years_out_female], 'ResultScaleFactor': results_scale_factor, 'ResultUnits': 'annual deaths per capita', 'ResultValues': female_output.tolist() } dict_male = {'AxisNames': ['age', 'year'], 'AxisScaleFactors': [365.0, 1], 'AxisUnits': ['years', 'years'], 'PopulationGroups': [age_out_male, years_out_male], 'ResultScaleFactor': results_scale_factor, 'ResultUnits': 'annual deaths per capita', 'ResultValues': male_output.tolist() } self.implicits.append(DT._set_mortality_age_gender_year) return dict_female, dict_male