Source code for hpvsim.data.loaders

'''
Load data
'''

#%% Housekeeping
import numpy as np
import pandas as pd
import sciris as sc
import unicodedata
import re
from .. import misc as hpm

__all__ = ['get_country_aliases', 'map_entries', 'get_age_distribution', 'get_age_distribution_over_time', 'get_total_pop', 'get_death_rates',
           'get_birth_rates', 'get_life_expectancy']


thisdir = sc.thispath(__file__)
filesdir = thisdir / 'files'
files = sc.objdict()
files.metadata = 'metadata.json'
files.age_dist = 'populations.obj'
files.birth = 'birth_rates.obj'
files.death = 'mx.obj'
files.life_expectancy = 'ex.obj'

# Cache data as a dict
cache = dict()

for k,v in files.items():
    files[k] = filesdir / v


def sanitizestr(string=None, alphanumeric=True, nospaces=True, asciify=True, lower=True, spacechar='_', symchar='_'):
    ''' Remove all non-printable characters from a string -- to be moved to Sciris eventually '''
    string = str(string)
    if asciify:
        string = unicodedata.normalize('NFKD', string).encode('ascii', 'ignore').decode()
    if nospaces:
        string = string.replace(' ', spacechar)
    if lower:
        string = string.lower()
    if alphanumeric:
        string = re.sub('[^0-9a-zA-Z ]', symchar, string)
    return string


def load_file(path):
    ''' Load a data file from the local data folder -- but store in memory if already loaded '''
    strpath = str(path)
    if strpath not in cache:
        obj = sc.load(path)
        cache[strpath] = obj
    else:
        obj = cache[strpath]
    return obj


[docs] def get_country_aliases(wb=False): ''' Define aliases for countries with odd names in the data ''' country_mappings = { 'Bolivia': 'Bolivia (Plurinational State of)', 'Burkina': 'Burkina Faso', 'Cape Verde': 'Cabo Verdeo', 'Hong Kong': 'China, Hong Kong Special Administrative Region', 'Macao': 'China, Macao Special Administrative Region', "Cote d'Ivoire": "Côte d'Ivoire", "Cote dIvoire": "Côte d'Ivoire", "Ivory Coast": "Côte d'Ivoire", 'DRC': 'Democratic Republic of the Congo', 'Congo': 'Congo, Rep.', 'Iran': 'Iran (Islamic Republic of)', 'Laos': "Lao People's Democratic Republic", 'Micronesia': 'Micronesia (Federated States of)', 'Korea': 'Republic of Korea', 'South Korea': 'Republic of Korea', 'Moldova': 'Republic of Moldova', 'Russia': 'Russian Federation', 'Palestine': 'State of Palestine', 'Syria': 'Syrian Arab Republic', 'Taiwan': 'Taiwan Province of China', 'Macedonia': 'The former Yugoslav Republic of Macedonia', 'UK': 'United Kingdom of Great Britain and Northern Ireland', 'United Kingdom': 'United Kingdom of Great Britain and Northern Ireland', 'Tanzania': 'United Republic of Tanzania', 'USA': 'United States of America', 'United States': 'United States of America', 'Venezuela': 'Venezuela (Bolivarian Republic of)', 'Vietnam': 'Viet Nam', } # Slightly different aliases for WB data if wb: for key,val in country_mappings.items(): if val == 'Democratic Republic of the Congo': country_mappings[key] = 'Congo, Dem. Rep.' if val in ["Cote d'Ivoire", "Cote dIvoire", "Côte d'Ivoire"]: country_mappings[key] = "Cote d'Ivoire" return country_mappings # Convert to lowercase
[docs] def map_entries(json, location, df=None, wb=False): ''' Find a match between the JSON file and the provided location(s). Args: json (list or dict): the data being loaded location (list or str): the list of locations to pull from ''' # The data have slightly different formats: list of dicts or just a dict if sc.checktype(json, dict): countries = [key.lower() for key in json.keys()] elif sc.checktype(json, 'listlike'): countries = [l.lower() for l in json] elif sc.checktype(json, pd.DataFrame): countries = [l.lower() for l in np.unique(json.Country.values)] # Set parameters if location is None: location = countries else: location = sc.promotetolist(location) # Define a mapping for common mistakes mapping = get_country_aliases(wb=wb) mapping = {key.lower(): val.lower() for key, val in mapping.items()} entries = {} for loc in location: lloc = loc.lower() if lloc not in countries and lloc in mapping: lloc = mapping[lloc] try: ind = countries.index(lloc) entry = list(json.values())[ind] entries[loc] = entry except ValueError as E: suggestions = sc.suggest(loc, countries, n=4) if suggestions: errormsg = f'Location "{loc}" not recognized, did you mean {suggestions}? ({str(E)})' else: errormsg = f'Location "{loc}" not recognized ({str(E)})' raise ValueError(errormsg) return entries
[docs] def get_age_distribution(location=None, year=None, total_pop_file=None, age_datafile=None): ''' Load age distribution for a given country or countries. Args: location (str): name of the country to load the age distribution for year (int): year to load the age distribution for total_pop_file (str): optional filepath to save total population size for every year Returns: age_data (array): Numpy array of age distributions, or dict if multiple locations ''' # Load the raw data if age_datafile is None: try: df = load_file(files.age_dist) except Exception as E: errormsg = 'Could not locate datafile with population sizes by country. Please run data/get_data.py first.' raise ValueError(errormsg) from E # Handle year if year is None: warnmsg = 'No year provided for the initial population age distribution, using 2000 by default' hpm.warn(warnmsg) year = 2000 # Extract the age distribution for the given location and year full_df = map_entries(df, location)[location] raw_df = full_df[full_df["Time"] == year] else: raw_df = pd.read_csv(age_datafile) # Pull out the data result = np.array([raw_df["AgeGrpStart"],raw_df["AgeGrpStart"]+1,raw_df["PopTotal"]*1e3]).T # Data are stored in thousands # Optinally save total population sizes for calibration/plotting purposes if total_pop_file is not None: dd = full_df.groupby("Time").sum()["PopTotal"] dd = dd * 1e3 dd = dd.astype(int) dd = dd.rename("n_alive") dd = dd.rename_axis("year") dd.to_csv(total_pop_file) return result
[docs] def get_age_distribution_over_time(location=None, popage_datafile=None): ''' Load age distribution for a given country or countries over time. Args: location (str): name of the country to load the age distribution for Returns: age_data (dataframe): Pandas dataframe with age distribution over time ''' # Load the raw data if popage_datafile is None: try: df = load_file(files.age_dist) except Exception as E: errormsg = 'Could not locate datafile with population sizes by country. Please run data/get_data.py first.' raise ValueError(errormsg) from E full_df = map_entries(df, location)[location] else: full_df = pd.read_csv(popage_datafile) result = full_df.rename(columns={'Time':'year', 'AgeGrpStart': 'age'}) result['PopTotal'] *= 1e3 # reported as per 1,000 return result
[docs] def get_total_pop(location=None, pop_datafile=None): ''' Load total population for a given country or countries. Args: location (str or list): name of the country to load the total population for Returns: pop_data (dataframe): Dataframe of year and pop_size columns ''' # Load the raw data if pop_datafile is None: try: df = load_file(files.age_dist) except Exception as E: errormsg = 'Could not locate datafile with population sizes by country. Please run data/get_data.py first.' raise ValueError(errormsg) from E # Extract the age distribution for the given location and year full_df = map_entries(df, location)[location] dd = full_df.groupby("Time").sum(numeric_only=True)["PopTotal"] else: dd = pd.read_csv(pop_datafile) dd = dd * 1e3 df = sc.dataframe(dd).reset_index().rename(columns={'Time':'year', 'PopTotal':'pop_size'}) return df
[docs] def get_death_rates(location=None, by_sex=True, overall=False): ''' Load death rates for a given country or countries. Args: location (str or list): name of the country or countries to load the age distribution for by_sex (bool): whether to rates by sex overall (bool): whether to load total rate Returns: death_rates (dict): death rates by age and sex ''' # Load the raw data try: df = load_file(files.death) except Exception as E: errormsg = 'Could not locate datafile with age-specific death rates by country. Please run data/get_data.py first.' raise ValueError(errormsg) from E raw_df = map_entries(df, location)[location] sex_keys = [] if by_sex: sex_keys += ['Male', 'Female'] if overall: sex_keys += ['Both sexes'] sex_key_map = {'Male': 'm', 'Female': 'f', 'Both sexes': 'tot'} # max_age = 99 # age_groups = raw_df['AgeGrpStart'].unique() years = raw_df['Time'].unique() result = dict() # Processing for year in years: result[year] = dict() for sk in sex_keys: sk_out = sex_key_map[sk] result[year][sk_out] = np.array(raw_df[(raw_df['Time']==year) & (raw_df['Sex']== sk)][['AgeGrpStart','mx']]) result[year][sk_out] = result[year][sk_out][result[year][sk_out][:, 0].argsort()] return result
[docs] def get_life_expectancy(location=None, by_sex=True, overall=False): ''' Load life expectancy by age for a given country or countries. Args: location (str or list): name of the country or countries to load the age distribution for by_sex (bool): whether to rates by sex overall (bool): whether to load total rate Returns: life_expectancy (dict): life expectancy by age and sex ''' # Load the raw data try: df = load_file(files.life_expectancy) except Exception as E: errormsg = 'Could not locate datafile with age-specific life expectancy by country. Please run data/get_data.py first.' raise ValueError(errormsg) from E raw_df = map_entries(df, location)[location] sex_keys = [] if by_sex: sex_keys += ['Male', 'Female'] if overall: sex_keys += ['Both sexes'] sex_key_map = {'Male': 'm', 'Female': 'f', 'Both sexes': 'tot'} # max_age = 99 # age_groups = raw_df['AgeGrpStart'].unique() years = raw_df['Time'].unique() result = dict() # Processing for year in years: result[year] = dict() for sk in sex_keys: sk_out = sex_key_map[sk] result[year][sk_out] = np.array(raw_df[(raw_df['Time']==year) & (raw_df['Sex']== sk)][['AgeGrpStart','ex']]) result[year][sk_out] = result[year][sk_out][result[year][sk_out][:, 0].argsort()] return result
[docs] def get_birth_rates(location=None): ''' Load crude birth rates for a given country Args: location (str or list): name of the country to load the birth rates for Returns: birth_rates (arr): years and crude birth rates ''' # Load the raw data try: birth_rate_data = load_file(files.birth) except Exception as E: errormsg = 'Could not locate datafile with birth rates by country. Please run data/get_data.py first.' raise ValueError(errormsg) from E standardized = map_entries(birth_rate_data, location, wb=True) birth_rates, years = standardized[location], birth_rate_data['years'] birth_rates, inds = sc.sanitize(birth_rates, returninds=True) years = years[inds] return np.array([years, birth_rates])