Source code for emodpy_hiv.demographics.country_models

from pathlib import Path
from typing import Dict

import pandas as pd

from emod_api.demographics.PropertiesAndAttributes import IndividualAttributes, IndividualProperty

from emodpy_hiv.demographics import DemographicsTemplates
from emodpy_hiv.demographics.hiv_demographics import HIVDemographics

from emodpy_hiv.country_model import DefaultZambiaData
_data_root = DefaultZambiaData.data_root.parent


# TODO: yeah, yeah, this shouldn't be represented like this. Just a convenient way to organize during development.
_registry = {
    "zambia": {
        "initial_population_file": "initial_population.csv",
        "age_distribution_file": "initial_age_distribution.csv",
        "fertility_file": "parsed_fertility.csv",
        "male_mortality_file": "parsed_mortality--male.csv",
        "female_mortality_file": "parsed_mortality--female.csv",
        "default_society_template": "PFA-Southern-Africa",
        "individual_properties": [
            {
                "property": "Risk",
                "values": ["LOW", "MEDIUM", "HIGH"],
                "initial_distribution": [0.85, 0.15, 0],
                "transitions": None,
                "transmission_matrix": None
            },
            {
                "property": "CascadeState",
                "values": ["", "ARTStaging", "ARTStagingDiagnosticTest", "LinkingToART", "LinkingToPreART", "OnART", "OnPreART",
                  "HCTTestingLoop", "HCTUptakeAtDebut", "HCTUptakePostDebut", "TestingOnANC", "TestingOnChild6w",
                  "TestingOnSymptomatic", "LostForever"],
                "initial_distribution": [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
                "transitions": None,
                "transmission_matrix": None
            },
            {
                "property": "Accessibility",
                "values": ["Yes", "No"],
                "initial_distribution": [0.8, 0.2],
                "transitions": None,
                "transmission_matrix": None
            }
        ],
        "node_attributes": []  # TODO: how to include/load node properties? (node attributes?)
    }
}

########################################################################################################################
# TODO:
#  NOTE: The following is simply copy/pasted in for now to enable sketch of 'country models' to work. This is NOT
#  the canonical place for this code.

import re


[docs]class AgeBin:
[docs] class InvalidAgeBinFormat(Exception): pass
[docs] class NotMergeable(Exception): pass
STR_FORMAT = '[%s%s%s)' # e.g. [15, 49) -> [(15)(, )(49)) delimiter must contain no numeric characters or '.' SPLIT_REGEX = re.compile('^\[(?P<start>[0-9.]+)(?P<delimiter>[^0-9.]+)(?P<end>[0-9.]+)\)$') DEFAULT_DELIMITER = ':' ALL = 'all' def __init__(self, start, end, delimiter=None): try: self.start = int(start) except ValueError: self.start = float(start) try: self.end = int(end) except ValueError: self.end = float(end) self.delimiter = delimiter or self.DEFAULT_DELIMITER
[docs] def merge(self, other_bin): """ Create a single AgeBin representing two adjacent AgeBins. Keeps delimiter of 'self'. :param other_bin: merge self with this other AgeBin object (self is lower age than other_bin) :return: an AgeBin object with delimiter set to self.delimiter (not other_bin.delimiter) """ other_bin = other_bin if isinstance(other_bin, AgeBin) else AgeBin.from_string(other_bin) if self.end != other_bin.start: raise self.NotMergeable('AgeBin objects must be age-adjacent to be merged: %s %s' % (self, other_bin)) return type(self)(start=self.start, end=other_bin.end, delimiter=self.delimiter)
[docs] def contains(self, other_bin): """ Is other_bin contained within the bounds of self? :param other_bin: an AgeBin object :return: True/False """ other_bin = other_bin if isinstance(other_bin, AgeBin) else AgeBin.from_string(other_bin) return self.start <= other_bin.start and self.end >= other_bin.end
[docs] def to_tuple(self): return tuple([self.start, self.end])
def __str__(self): return self.STR_FORMAT % (self.start, self.delimiter, self.end) def __repr__(self): return self.__str__() def __eq__(self, other): return self.start == other.start and self.end == other.end def __ne__(self, other): return not (self == other) @classmethod def _split_string(cls, str): match = cls.SPLIT_REGEX.match(str) return match['start'], match['delimiter'], match['end']
[docs] @classmethod def from_string(cls, str): try: start, delimiter, end = cls._split_string(str=str) except (KeyError, IndexError, TypeError) as e: example = cls(15,49) raise cls.InvalidAgeBinFormat('Required AgeBin format is e.g.: %s' % example) return cls(start=start, end=end, delimiter=delimiter)
[docs] @classmethod def merge_bins(cls, bins): if len(bins) == 0: raise cls.NotMergeable('No AgeBins provided for merging.') # tolerant of string and object representations bins = [bin if isinstance(bin, AgeBin) else cls.from_string(bin) for bin in bins] bins = sorted(bins, key=lambda b: b.start) merged_bin = bins[0] for bin in bins[1:]: merged_bin = merged_bin.merge(bin) return merged_bin
[docs] @classmethod def can_upsample_bins(cls, bins, target_bin): # tolerant of string and object representations bins = [bin if isinstance(bin, AgeBin) else cls.from_string(bin) for bin in bins] target_bin = target_bin if isinstance(target_bin, AgeBin) else cls.from_string(target_bin) # remove bins not within our target age range bins = [bin for bin in bins if target_bin.contains(bin)] # merge what is left over to see if it matches target_bin try: merged_bin = cls.merge_bins(bins) except cls.NotMergeable as e: return False return True if merged_bin == target_bin else False
# TODO: move this into emodpy-hiv/emod-api?? Will need to move the AgeBin class, too # TODO: add file format verification checks (e.g. consecutive age bins, totals add up to 1.0, etc) # TODO: document file format def _age_distributions_from_df(df: pd.DataFrame) -> Dict[int, IndividualAttributes.AgeDistribution]: node_column = 'node_id' age_bin_column = 'age_bin' fraction_column = 'population_fraction' # process the dataframe by node_id so that we can set per-node and/or Default (global) age fraction data age_distributions = {} groups = df.groupby(node_column) for node_id, node_table in groups: node_id = None if node_id == 0 else node_id # mapping 0 (all nodes) to None age_bins = [AgeBin.from_string(s) for s in node_table[age_bin_column]] pop_fractions = node_table[fraction_column] # generating the format used by EMOD output_ages = [age_bin.start for age_bin in age_bins] output_ages.append(age_bins[-1].end) # generate the per-(end_age) cumulative fraction of the population cumulative_fractions = [0] cumulative_fraction = 0 for pop_fraction in pop_fractions: cumulative_fraction += pop_fraction cumulative_fractions.append(cumulative_fraction) # just in case of odd floating point issues cumulative_fractions[-1] = 1 ages_and_fractions = { 'ResultValues': output_ages, 'DistributionValues': cumulative_fractions, 'NumDistributionAxes': 0, 'ResultUnits': 'years', 'ResultScaleFactor': 365 } age_distribution = IndividualAttributes.AgeDistribution() age_distribution.from_dict(ages_and_fractions) age_distributions[node_id] = age_distribution return age_distributions ######################################################################################################################## # TODO: this is the "Standard Model" for country X
[docs]def load_country_model_demographics_default(country_model: str) -> HIVDemographics: country_model_dict = _registry.get(country_model, None) if country_model_dict is None: available = ", ".join(list(_registry.keys())) raise ValueError(f"Unknown country model named: {country_model} . Available models: {available}") country_data_root = _data_root.joinpath(country_model) population_file = Path(country_data_root, country_model_dict["initial_population_file"]) df = pd.read_csv(population_file) demographics = HIVDemographics.from_population_dataframe(df=df) # apply the selected society template society = DemographicsTemplates.get_society_dict(society_name=country_model_dict["default_society_template"]) demographics.society = society demographics.set_concurrency_params_by_type_and_risk("COMMERCIAL", "LOW", 0, 0, 1, 1, None ) demographics.set_concurrency_params_by_type_and_risk("COMMERCIAL", "MEDIUM", 0, 0, 1, 1, None ) demographics.set_concurrency_params_by_type_and_risk("COMMERCIAL", "HIGH", 59, 59, 1, 1, None ) demographics.set_concurrency_params_by_type_and_risk("TRANSITORY", "LOW", 2, 2, 0.3556898557563289, 0.11640023094861798, None ) demographics.set_concurrency_params_by_type_and_risk("TRANSITORY", "MEDIUM", 2.8758243237888, 2.8758243237888, 0.7875892836289242, 0.7430282943619352, None ) demographics.set_concurrency_params_by_type_and_risk("TRANSITORY", "HIGH", 1, 1, 1, 1, None ) demographics.set_concurrency_params_by_type_and_risk("INFORMAL", "LOW", 1.3185438939448315, 1.3185438939448315, 0.512265014420052, 0.28546727987372494, None ) demographics.set_concurrency_params_by_type_and_risk("INFORMAL", "MEDIUM", 2.4343140081277106, 2.4343140081277106, 0.32876586481783143, 0.42145844365099366, None ) demographics.set_concurrency_params_by_type_and_risk("INFORMAL", "HIGH", 1, 1, 1, 1, None ) demographics.set_concurrency_params_by_type_and_risk("MARITAL", "LOW", 1, 1, 0, 0, None ) demographics.set_concurrency_params_by_type_and_risk("MARITAL", "MEDIUM", 1.203640498339868, 1.203640498339868, 1, 1, None ) demographics.set_concurrency_params_by_type_and_risk("MARITAL", "HIGH", 1, 1, 1, 1, None ) assortivity_tim = [ [ 0.7032523334501651, 0.29674766654983487, 0 ], [ 0.29674766654983487, 0.7032523334501651, 0.7032523334501651 ], [ 0, 0.7032523334501651, 0.29674766654983487 ] ] assortivity_com = [ [ 1, 1, 1 ], [ 1, 1, 1 ], [ 1, 1, 1 ] ] demographics.set_pair_formation_parameters( "TRANSITORY", 0.0011197414231317777, assortivity_matrix=assortivity_tim, node_ids=None ) demographics.set_pair_formation_parameters( "INFORMAL", 0.0001561736059703799, assortivity_matrix=assortivity_tim, node_ids=None ) demographics.set_pair_formation_parameters( "MARITAL", 0.00018145743996196627, assortivity_matrix=assortivity_tim, node_ids=None ) demographics.set_pair_formation_parameters( "COMMERCIAL", 0.15, assortivity_matrix=assortivity_com, node_ids=None ) condom_usage_max_by_rel_type_by_node = [ [ 0.7433197389199494, 0.5229475709021313, 0.3, 0.85 ], [ 0.46623684268980436, 0.26419727712640767, 0.3, 0.85 ], [ 0.46130107019244415, 0.042222393165696724, 0.3, 0.85 ], [ 0.7400316675706575, 0.5597670617942341, 0.3, 0.85 ], [ 0.3079437471527026, 0.5632160018635154, 0.3, 0.85 ], [ 0.4992941638997009, 0.2448900326712238, 0.3, 0.85 ], [ 0.4917707746908634, 0.3528077863526073, 0.3, 0.85 ], [ 0.538504255943748, 0.5079383809835986, 0.3, 0.85 ], [ 0.7479019895876177, 0.5714360413594817, 0.3, 0.85 ], [ 0.4713025697735299, 0.10411030389531929, 0.3, 0.85 ] ] tmp_node_id = 1 for condom_usage_max_by_rel_type in condom_usage_max_by_rel_type_by_node: demographics.set_relationship_parameters( "TRANSITORY", coital_act_rate=0.33, condom_usage_min=0, condom_usage_mid=2003.4846871749717, condom_usage_max=condom_usage_max_by_rel_type[0], condom_usage_rate=2.6862110225260216, duration_scale=0.956774771214, duration_heterogeneity=0.833333333, node_ids=[tmp_node_id] ) demographics.set_relationship_parameters( "INFORMAL", coital_act_rate=0.33, condom_usage_min=0, condom_usage_mid=1992.052744500361, condom_usage_max=condom_usage_max_by_rel_type[1], condom_usage_rate=0.2952473824125923, duration_scale=2.03104913138, duration_heterogeneity=0.75, node_ids=[tmp_node_id] ) demographics.set_relationship_parameters( "MARITAL", coital_act_rate=0.33, condom_usage_min=0, condom_usage_mid=1995.8605054635884, condom_usage_max=condom_usage_max_by_rel_type[2], condom_usage_rate=1.6202355778150024, duration_scale=22.154455184937, duration_heterogeneity=0.666666667, node_ids=[tmp_node_id] ) demographics.set_relationship_parameters( "COMMERCIAL", coital_act_rate=0.0027397260273972603, condom_usage_min=0.5, condom_usage_mid=1999.5, condom_usage_max=condom_usage_max_by_rel_type[3], condom_usage_rate=1, duration_scale=0.01917808219, duration_heterogeneity=1, node_ids=[tmp_node_id] ) tmp_node_id += 1 # initialize starting age distributions from file data age_distribution_file = Path(country_data_root, country_model_dict["age_distribution_file"]) df = pd.read_csv(age_distribution_file) age_distributions = _age_distributions_from_df(df=df) for node_id, age_distribution in age_distributions.items(): node_ids = node_id if node_id is None else [node_id] demographics.SetAgeDistribution(distribution=age_distribution, node_ids=node_ids) # load and apply fertility and mortality data fertility_file = Path(country_data_root, country_model_dict["fertility_file"]) demographics.set_fertility(path_to_csv=fertility_file) male_mortality_file = Path(country_data_root, country_model_dict["male_mortality_file"]) female_mortality_file = Path(country_data_root, country_model_dict["female_mortality_file"]) demographics.set_mortality(file_male=male_mortality_file, file_female=female_mortality_file) # Load initial individual properties # TODO: ensure that in hiv_workflow that when we set IPs, (e.g. Accessibility) we don't overwrite this data by default (modify only) ips = [IndividualProperty.from_dict(ip_dict) for ip_dict in country_model_dict["individual_properties"]] demographics.default_node.individual_properties.individual_properties = ips return demographics