'''
Defines the Sim class, the core class of the FP model (FPsim).
'''
#%% Imports
import numpy as np # Needed for a few things not provided by pl
import pylab as pl
import seaborn as sns
import sciris as sc
import pandas as pd
from .settings import options as fpo
from . import utils as fpu
from . import defaults as fpd
from . import base as fpb
from . import parameters as fpp
# Specify all externally visible things this file defines
__all__ = ['People', 'Sim', 'MultiSim', 'parallel']
#%% Define classes
def arr(n=None, val=0):
''' Shortcut for defining an empty array with the correct value and data type '''
if isinstance(val, np.ndarray):
assert len(val) == n
arr = val
elif isinstance(val, list):
arr = [[] for _ in range(n)]
else:
dtype = object if isinstance(val, str) else None
arr = np.full(shape=n, fill_value=val, dtype=dtype)
return arr
[docs]
class People(fpb.BasePeople):
'''
Class for all the people in the simulation.
'''
def __init__(self, pars, n=None, **kwargs):
# Initialization
super().__init__()
self.pars = pars # Set parameters
d = sc.mergedicts(fpd.person_defaults, kwargs) # d = defaults
if n is None:
n = int(self.pars['n_agents'])
# Basic states
init_states = dir(self)
self.uid = arr(n, np.arange(n))
self.age = arr(n, np.float64(d['age'])) # Age of the person (in years)
self.age_by_group = arr(n, np.float64(d['age_by_group'])) # Age by which method bin the age falls into, as integer
self.sex = arr(n, d['sex']) # Female (0) or male (1)
self.parity = arr(n, d['parity']) # Number of children
self.method = arr(n, d['method']) # Contraceptive method 0-9, see pars['methods']['map'], excludes LAM as method
self.barrier = arr(n, d['barrier']) # Reason for non-use
self.alive = arr(n, d['alive'])
self.pregnant = arr(n, d['pregnant'])
self.fertile = arr(n, d['fertile']) # assigned likelihood of remaining childfree throughout reproductive years
# Sexual and reproductive history
self.sexually_active = arr(n, d['sexually_active'])
self.sexual_debut = arr(n, d['sexual_debut'])
self.sexual_debut_age = arr(n, np.float64(d['sexual_debut_age'])) # Age at first sexual debut in years, If not debuted, -1
self.fated_debut = arr(n, np.float64(d['debut_age']))
self.first_birth_age = arr(n, np.float64(d['first_birth_age'])) # Age at first birth. If no births, -1
self.lactating = arr(n, d['lactating'])
self.gestation = arr(n, d['gestation'])
self.preg_dur = arr(n, d['preg_dur'])
self.stillbirth = arr(n, d['stillbirth']) # Number of stillbirths
self.miscarriage = arr(n, d['miscarriage']) # Number of miscarriages
self.abortion = arr(n, d['abortion']) # Number of abortions
self.pregnancies = arr(n, d['pregnancies']) #Number of conceptions (before abortion)
self.months_inactive = arr(n, d['months_inactive']) # Number of months an agents has been sexually inactive once debuted
self.postpartum = arr(n, d['postpartum'])
self.mothers = arr(n, d['mothers'])
self.short_interval = arr(n, d['short_interval']) # Number of short birth intervals
self.secondary_birth = arr(n, d['secondary_birth']) # Number of secondary live birth
self.postpartum_dur = arr(n, d['postpartum_dur']) # Tracks # months postpartum
self.lam = arr(n, d['lam']) # Separately tracks lactational amenorrhea, can be using both LAM and another method
self.breastfeed_dur = arr(n, d['breastfeed_dur'])
self.breastfeed_dur_total = arr(n, d['breastfeed_dur_total'])
self.children = arr(n, []) # Indices of children -- list of lists
self.dobs = arr(n, []) # Dates of births -- list of lists
self.still_dates = arr(n, []) # Dates of stillbirths -- list of lists
self.miscarriage_dates = arr(n, []) # Dates of miscarriages -- list of lists
self.abortion_dates = arr(n, []) # Dates of abortions -- list of lists
self.short_interval_dates = arr(n, []) # age of agents at short birth interval -- list of lists
# Fecundity variation
fv = [self.pars['fecundity_var_low'], self.pars['fecundity_var_high']]
self.personal_fecundity = arr(n, np.random.random(n)*(fv[1]-fv[0])+fv[0]) # Stretch fecundity by a factor bounded by [f_var[0], f_var[1]]
self.remainder_months = arr(n, d['remainder_months'])
# Empowerment-related sociodemographic attributes
self.partnered = arr(n, d['partnered']) # Whether a person is in a relationship or not
self.partnership_age = arr(n, d['partnership_age']) # Age at first partnership in years, initialised from data
self.urban = arr(n, d['urban']) # Whether a person lives in rural or urban setting
self.paid_employment = arr(n, d['paid_employment']) # Whether a person has a paid job or not
self.control_over_wages = arr(n, d['control_over_wages']) # Decision making autonomy over major household purchases
self.sexual_autonomy = arr(n, d['sexual_autonomy']) # Ability to refuse sex
# Empowerment-education attributes
self.edu_objective = arr(n, d['edu_objective']) # Highest-ideal level of education to be completed (in years), could be individualised or constant across agents
self.edu_attainment = arr(n, d['edu_attainment']) # Current level of education achieved in years
self.edu_dropout = arr(n, d['edu_dropout']) # Whether a person has dropped out of the edu system, before reaching their goal
self.edu_interrupted = arr(n, d['edu_interrupted']) # Whether a person/woman has had their education temporarily interrupted, but can resume
self.edu_completed = arr(n, d['edu_completed']) # Whether a person/woman has reached their education goals
self.edu_started = arr(n, d['edu_started']) # Whether a person/woman has started thier education
# Store keys
final_states = dir(self)
self._keys = [s for s in final_states if s not in init_states]
return
[docs]
def update_method(self):
'''
Uses a switching matrix from DHS data to decide based on a person's original method their probability of changing to a
new method and assigns them the new method. Currently allows switching on whole calendar years to enter function.
Matrix serves as an initiation, discontinuation, continuation, and switching matrix. Transition probabilities are for 1 year and
only for women who have not given birth within the last 6 months.
'''
methods = self.pars['methods']
method_map = methods['map']
annual = methods['adjusted']['annual']
orig_methods = self.method
m = len(method_map)
switching_events = np.zeros((m, m), dtype=int)
switching_events_ages = {}
for key in fpd.method_age_map.keys():
switching_events_ages[key] = np.zeros((m, m), dtype=int)
# Method switching depends both on agent age and also on their current method, so we need to loop over both
for key,(age_low, age_high) in fpd.method_age_map.items():
match_low = (self.age >= age_low) # CK: TODO: refactor into single method
match_high = (self.age < age_high)
match_low_high = match_low * match_high
for m in method_map.values():
match_m = (orig_methods == m)
match = match_m * match_low_high
this_method = self.filter(match)
old_method = this_method.method.copy()
matrix = annual[key]
choices = matrix[m]
choices = choices/choices.sum()
new_methods = fpu.n_multinomial(choices, match.sum())
this_method.method = new_methods
for i in range(len(old_method)):
x = old_method[i]
y = new_methods[i]
switching_events[x, y] += 1
switching_events_ages[key][x, y] += 1
if self.pars['track_switching']:
self.step_results_switching['annual'] += switching_events # CK: TODO: remove this extra result and combine with step_results
for key in fpd.method_age_map.keys():
self.step_results['switching_annual'][key] += switching_events_ages[key]
return
[docs]
def update_method_pp(self):
'''
Utilizes data from birth to allow agent to initiate a method postpartum coming from birth by
3 months postpartum and then initiate, continue, or discontinue a method by 6 months postpartum.
Next opportunity to switch methods will be on whole calendar years, whenever that falls.
'''
# TODO- Probabilities need to be adjusted for postpartum women on the next annual draw in "get_method" since they may be less than one year
# Probability of initiating a postpartum method at 0-3 months postpartum
# Transitional probabilities are for the first 3 month time period after delivery from DHS data
methods = self.pars['methods']
pp0to1 = methods['adjusted']['pp0to1']
pp1to6 = methods['adjusted']['pp1to6']
methods_map = methods['map']
orig_methods = self.method
m = len(methods_map)
switching_events = np.zeros((m, m), dtype=int)
switching_events_ages = {}
for key in fpd.method_age_map.keys():
switching_events_ages[key] = np.zeros((m, m), dtype=int)
postpartum1 = (self.postpartum_dur == 0)
postpartum6 = (self.postpartum_dur == 6)
# In first time step after delivery, choice is by age but not previous method (since just gave birth)
# All women are coming from birth and on no method to start, either will stay on no method or initiate a method
for key, (age_low, age_high) in fpd.method_age_map.items():
match_low = (self.age >= age_low)
match_high = (self.age < age_high)
low_parity = (self.parity < self.pars['high_parity'])
high_parity = (self.parity >= self.pars['high_parity'])
match = (self.postpartum * postpartum1 * match_low * match_high * low_parity)
match_high_parity = (self.postpartum * postpartum1 * match_low * match_high * high_parity)
this_method = self.filter(match)
this_method_high_parity = self.filter(match_high_parity)
old_method = this_method.method.copy()
old_method_high_parity = sc.dcp(this_method_high_parity.method)
choices = pp0to1[key]
choices_high_parity = sc.dcp(choices)
choices_high_parity[0] *= self.pars['high_parity_nonuse']
choices_high_parity = choices_high_parity / choices_high_parity.sum()
new_methods = fpu.n_multinomial(choices, len(this_method))
new_methods_high_parity = fpu.n_multinomial(choices_high_parity, len(this_method_high_parity))
this_method.method = np.array(new_methods, dtype=np.int64)
this_method_high_parity.method = np.array(new_methods_high_parity, dtype=np.int64)
for i in range(len(old_method)):
x = old_method[i]
y = new_methods[i]
switching_events[x, y] += 1
switching_events_ages[key][x, y] += 1
for i in range(len(old_method_high_parity)):
x = old_method_high_parity[i]
y = new_methods_high_parity[i]
switching_events[x, y] += 1
switching_events_ages[key][x, y] += 1
# At 6 months, choice is by previous method and by age
# Allow initiation, switching, or discontinuing with matrix at 6 months postpartum
# Transitional probabilities are for 5 months, 1-6 months after delivery from DHS data
for key,(age_low, age_high) in fpd.method_age_map.items():
match_low = (self.age >= age_low)
match_high = (self.age < age_high)
match_postpartum_age = self.postpartum * postpartum6 * match_low * match_high
for m in methods_map.values():
match_m = (orig_methods == m)
match = match_m * match_postpartum_age
this_method = self.filter(match)
old_method = self.method[match].copy()
matrix = pp1to6[key]
choices = matrix[m]
new_methods = fpu.n_multinomial(choices, match.sum())
this_method.method = new_methods
for i in range(len(old_method)):
x = old_method[i]
y = new_methods[i]
switching_events[x, y] += 1
switching_events_ages[key][x, y] += 1
if self.pars['track_switching']:
self.step_results_switching['postpartum'] += switching_events
for key in fpd.method_age_map.keys():
self.step_results['switching_postpartum'][key] += switching_events_ages[key]
return
[docs]
def update_methods(self):
'''If eligible (age 15-49 and not pregnant), choose new method or stay with current one'''
if not (self.i % self.pars['method_timestep']): # Allow skipping timesteps
postpartum = (self.postpartum) * (self.postpartum_dur <= 6)
pp = self.filter(postpartum)
non_pp = self.filter(~postpartum)
pp.update_method_pp() # Update method for
age_diff = non_pp.ceil_age - non_pp.age
whole_years = ((age_diff < (1/fpd.mpy)) * (age_diff > 0))
birthdays = non_pp.filter(whole_years)
birthdays.update_method()
return
[docs]
def check_mortality(self):
'''Decide if person dies at a timestep'''
timestep = self.pars['timestep']
trend_val = self.pars['mortality_probs']['gen_trend']
age_mort = self.pars['age_mortality']
f_spline = age_mort['f_spline'] * trend_val
m_spline = age_mort['m_spline'] * trend_val
over_one = self.filter(self.age >= 1)
female = over_one.filter(over_one.is_female)
male = over_one.filter(over_one.is_male)
f_ages = female.int_age
m_ages = male.int_age
f_mort_prob = fpu.annprob2ts(f_spline[f_ages], timestep)
m_mort_prob = fpu.annprob2ts(m_spline[m_ages], timestep)
f_died = female.binomial(f_mort_prob, as_filter=True)
m_died = male.binomial(m_mort_prob, as_filter=True)
for died in [f_died, m_died]:
died.alive = False,
died.pregnant = False,
died.gestation = False,
died.sexually_active = False,
died.lactating = False,
died.postpartum = False,
died.lam = False,
died.breastfeed_dur = 0,
self.step_results['deaths'] += len(died)
return
[docs]
def check_partnership(self):
'''
Decide if an agent has reached their age at first partnership. Age-based data from DHS.
'''
is_not_partnered = self.partnered == 0
reached_partnership_age = self.age >= self.partnership_age
first_timers = self.filter(is_not_partnered * reached_partnership_age)
first_timers.partnered = True
[docs]
def check_sexually_active(self):
'''
Decide if agent is sexually active based either on month postpartum or age if
not postpartum. Postpartum and general age-based data from DHS.
'''
# Set postpartum probabilities
match_low = self.postpartum_dur >= 0
match_high = self.postpartum_dur <= self.pars['postpartum_dur']
pp_match = self.postpartum * match_low * match_high
non_pp_match = ((self.age >= self.fated_debut) * (~pp_match))
pp = self.filter(pp_match)
non_pp = self.filter(non_pp_match)
# Adjust for postpartum women's birth spacing preferences
pref = self.pars['spacing_pref'] # Shorten since used a lot
spacing_bins = pp.postpartum_dur / pref['interval'] # Main calculation -- divide the duration by the interval
spacing_bins = np.array(np.minimum(spacing_bins, pref['n_bins']), dtype=int) # Convert to an integer and bound by longest bin
probs_pp = self.pars['sexual_activity_pp']['percent_active'][pp.postpartum_dur]
probs_pp *= pref['preference'][spacing_bins] # Actually adjust the probability -- check the overall probability with print(pref['preference'][spacing_bins].mean())
# Set non-postpartum probabilities
probs_non_pp = self.pars['sexual_activity'][non_pp.int_age]
# Evaluate likelihood in this time step of being sexually active
# Can revert to active or not active each timestep
pp.sexually_active = fpu.binomial_arr(probs_pp)
non_pp.sexually_active = fpu.binomial_arr(probs_non_pp)
# Set debut to True if sexually active for the first time
# Record agent age at sexual debut in their memory
never_sex = non_pp.sexual_debut == 0
now_active = non_pp.sexually_active == 1
first_debut = non_pp.filter(now_active * never_sex)
first_debut.sexual_debut = True
first_debut.sexual_debut_age = first_debut.age
active_sex = self.sexually_active == 1
debuted = self.sexual_debut == 1
active = self.filter(active_sex * debuted)
inactive = self.filter(~active_sex * debuted)
active.months_inactive = 0
inactive.months_inactive += 1
inactive_year = self.months_inactive >= 12
sexually_infrequent = self.filter(inactive_year)
#print (f'Age: {sexually_infrequent.age}')
#print (f'Debuted?: {sexually_infrequent.sexual_debut}')
#print (f'Debut age: {sexually_infrequent.sexual_debut_age}')
#print (f'Months inactive: {sexually_infrequent.months_inactive}')
#print (f'On method?: {sexually_infrequent.method}')
return
[docs]
def check_conception(self):
'''
Decide if person (female) becomes pregnant at a timestep.
'''
all_ppl = self.unfilter() # For complex array operations
active = self.filter(self.sexually_active * self.fertile)
lam = active.filter(active.lam)
nonlam = active.filter(~active.lam)
preg_probs = np.zeros(len(all_ppl)) # Use full array
# Find monthly probability of pregnancy based on fecundity and any use of contraception including LAM - from data
pars = self.pars # Shorten
preg_eval_lam = pars['age_fecundity'][lam.int_age_clip] * lam.personal_fecundity
preg_eval_nonlam = pars['age_fecundity'][nonlam.int_age_clip] * nonlam.personal_fecundity
method_eff = np.array(list(pars['methods']['eff'].values()))[nonlam.method]
lam_eff = pars['LAM_efficacy']
lam_probs = fpu.annprob2ts((1-lam_eff) * preg_eval_lam, pars['timestep'])
nonlam_probs = fpu.annprob2ts((1-method_eff) * preg_eval_nonlam, pars['timestep'])
preg_probs[lam.inds] = lam_probs
preg_probs[nonlam.inds] = nonlam_probs
# Adjust for decreased likelihood of conception if nulliparous vs already gravid - from PRESTO data
nullip = active.filter(active.parity == 0) # Nulliparous
preg_probs[nullip.inds] *= pars['fecundity_ratio_nullip'][nullip.int_age_clip]
# Adjust for probability of exposure to pregnancy episode at this timestep based on age and parity - encapsulates background factors - experimental and tunable
preg_probs *= pars['exposure_factor']
preg_probs *= pars['exposure_age'][all_ppl.int_age_clip]
preg_probs *= pars['exposure_parity'][np.minimum(all_ppl.parity, fpd.max_parity)]
# Use a single binomial trial to check for conception successes this month
conceived = active.binomial(preg_probs[active.inds], as_filter=True)
self.step_results['pregnancies'] += len(conceived) # track all pregnancies
unintended = conceived.filter(conceived.method != 0)
self.step_results['unintended_pregs'] += len(unintended) # track pregnancies due to method failure
# Check for abortion
is_abort = conceived.binomial(pars['abortion_prob'])
abort = conceived.filter(is_abort)
preg = conceived.filter(~is_abort)
# Update states
all_ppl = self.unfilter()
abort.postpartum = False
abort.abortion += 1 # Add 1 to number of abortions agent has had
abort.postpartum_dur = 0
for i in abort.inds: # Handle adding dates
all_ppl.abortion_dates[i].append(all_ppl.age[i])
self.step_results['abortions'] = len(abort)
# Make selected agents pregnant
preg.make_pregnant()
if self.pars['track_as']:
pregnant_boolean = np.full(len(self), False)
pregnant_boolean[np.searchsorted(self.uid, preg.uid)] = True
pregnant_age_split = self.log_age_split(binned_ages_t=[self.age_by_group], channel='pregnancies', numerators=[pregnant_boolean], denominators=None)
for key in pregnant_age_split:
self.step_results[key] = pregnant_age_split[key]
return
[docs]
def make_pregnant(self):
'''
Update the selected agents to be pregnant
'''
pregdur = [self.pars['preg_dur_low'], self.pars['preg_dur_high']]
self.pregnant = True
self.gestation = 1 # Start the counter at 1
self.preg_dur = np.random.randint(pregdur[0], pregdur[1]+1, size=len(self)) # Duration of this pregnancy
self.postpartum = False
self.postpartum_dur = 0
self.reset_breastfeeding() # Stop lactating if becoming pregnant
self.method = 0
return
[docs]
def check_lam(self):
'''
Check to see if postpartum agent meets criteria for LAM in this time step
'''
max_lam_dur = self.pars['max_lam_dur']
lam_candidates = self.filter((self.postpartum) * (self.postpartum_dur <= max_lam_dur))
probs = self.pars['lactational_amenorrhea']['rate'][lam_candidates.postpartum_dur]
lam_candidates.lam = lam_candidates.binomial(probs)
not_postpartum = self.postpartum == 0
over5mo = self.postpartum_dur > max_lam_dur
not_breastfeeding = self.breastfeed_dur == 0
not_lam = self.filter(not_postpartum + over5mo + not_breastfeeding)
not_lam.lam = False
return
[docs]
def update_breastfeeding(self):
'''
Track breastfeeding, and update time of breastfeeding for individual pregnancy.
Agents are randomly assigned a duration value based on a gumbel distribution drawn from the 2018 DHS variable for breastfeeding months. The mean (mu) and the std dev (beta) are both drawn from that distribution in the DHS data.
'''
mu, beta = self.pars['breastfeeding_dur_mu'], self.pars['breastfeeding_dur_beta']
breastfeed_durs = abs(np.random.gumbel(mu, beta, size=len(self)))
breastfeed_durs = np.ceil(breastfeed_durs)
breastfeed_finished_inds = self.breastfeed_dur >= breastfeed_durs
breastfeed_finished = self.filter(breastfeed_finished_inds)
breastfeed_continue = self.filter(~breastfeed_finished_inds)
breastfeed_finished.reset_breastfeeding()
breastfeed_continue.breastfeed_dur += self.pars['timestep']
return
[docs]
def update_postpartum(self):
'''Track duration of extended postpartum period (0-24 months after birth). Only enter this function if agent is postpartum'''
# Stop postpartum episode if reach max length (set to 24 months)
pp_done = self.filter(self.postpartum_dur >= self.pars['postpartum_dur'])
pp_done.postpartum = False
pp_done.postpartum_dur = 0
# Count the state of the agent for postpartum -- # TOOD: refactor, what is this loop doing?
pp = self.filter(self.postpartum)
for key,(pp_low, pp_high) in fpd.postpartum_map.items():
this_pp_bin = pp.filter((pp.postpartum_dur >= pp_low) * (pp.postpartum_dur < pp_high))
self.step_results[key] += len(this_pp_bin)
pp.postpartum_dur += self.pars['timestep']
return
[docs]
def update_pregnancy(self):
'''Advance pregnancy in time and check for miscarriage'''
preg = self.filter(self.pregnant)
preg.gestation += self.pars['timestep']
# Check for miscarriage at the end of the first trimester
end_first_tri = preg.filter(preg.gestation == self.pars['end_first_tri'])
miscarriage_probs = self.pars['miscarriage_rates'][end_first_tri.int_age_clip]
miscarriage = end_first_tri.binomial(miscarriage_probs, as_filter=True)
# Reset states and track miscarriages
all_ppl = self.unfilter()
miscarriage.pregnant = False
miscarriage.miscarriage += 1 # Add 1 to number of miscarriages agent has had
miscarriage.postpartum = False
miscarriage.gestation = 0 # Reset gestation counter
for i in miscarriage.inds: # Handle adding dates
all_ppl.miscarriage_dates[i].append(all_ppl.age[i])
self.step_results['miscarriages'] = len(miscarriage)
return
[docs]
def reset_breastfeeding(self):
'''Stop breastfeeding, calculate total lifetime duration so far, and reset lactation episode to zero'''
self.lactating = False
self.breastfeed_dur_total += self.breastfeed_dur
self.breastfeed_dur = 0
return
[docs]
def check_maternal_mortality(self):
'''Check for probability of maternal mortality'''
prob = self.pars['mortality_probs']['maternal'] * self.pars['maternal_mortality_factor']
is_death = self.binomial(prob)
death = self.filter(is_death)
death.alive = False
self.step_results['maternal_deaths'] += len(death)
self.step_results['deaths'] += len(death)
return death
[docs]
def check_infant_mortality(self):
'''Check for probability of infant mortality (death < 1 year of age)'''
death_prob = (self.pars['mortality_probs']['infant'])
if len(self) > 0:
age_inds = sc.findnearest(self.pars['infant_mortality']['ages'], self.age)
death_prob = death_prob * (self.pars['infant_mortality']['age_probs'][age_inds])
is_death = self.binomial(death_prob)
death = self.filter(is_death)
self.step_results['infant_deaths'] += len(death)
death.reset_breastfeeding()
return death
[docs]
def check_delivery(self):
'''Decide if pregnant woman gives birth and explore maternal mortality and child mortality'''
# Update states
deliv = self.filter(self.gestation == self.preg_dur)
if len(deliv): # check for any deliveries
deliv.pregnant = False
deliv.gestation = 0 # Reset gestation counter
deliv.lactating = True
deliv.postpartum = True # Start postpartum state at time of birth
deliv.breastfeed_dur = 0 # Start at 0, will update before leaving timestep in separate function
deliv.postpartum_dur = 0
# Handle stillbirth
still_prob = self.pars['mortality_probs']['stillbirth']
rate_ages = self.pars['stillbirth_rate']['ages']
age_ind = np.searchsorted(rate_ages, deliv.age, side="left")
prev_idx_is_less = ((age_ind == len(rate_ages))|(np.fabs(deliv.age - rate_ages[np.maximum(age_ind-1, 0)]) < np.fabs(deliv.age - rate_ages[np.minimum(age_ind, len(rate_ages)-1)])))
age_ind[prev_idx_is_less] -= 1 # adjusting for quirks of np.searchsorted
still_prob = still_prob * (self.pars['stillbirth_rate']['age_probs'][age_ind]) if len(self) > 0 else 0
is_stillborn = deliv.binomial(still_prob)
stillborn = deliv.filter(is_stillborn)
stillborn.stillbirth += 1 # Track how many stillbirths an agent has had
stillborn.lactating = False # Set agents of stillbith to not lactate
self.step_results['stillbirths'] = len(stillborn)
if self.pars['track_as']:
stillbirth_boolean = np.full(len(self), False)
stillbirth_boolean[np.searchsorted(self.uid, stillborn.uid)] = True
self.step_results['stillbirth_ages'] = self.age_by_group
self.step_results['as_stillbirths'] = stillbirth_boolean
# Add dates of live births and stillbirths separately for agent to remember
all_ppl = self.unfilter()
live = deliv.filter(~is_stillborn)
short_interval = 0
secondary_birth = 0
for i in live.inds: # Handle DOBs
all_ppl.dobs[i].append(all_ppl.age[i]) # Used for birth spacing only, only add one baby to dob -- CK: can't easily turn this into a Numpy operation
if len(all_ppl.dobs[i]) == 1:
all_ppl.first_birth_age[i] = all_ppl.age[i]
if (len(all_ppl.dobs[i]) > 1) and all_ppl.age[i] >= self.pars['low_age_short_int'] and all_ppl.age[i] < self.pars['high_age_short_int']:
secondary_birth += 1
if ((all_ppl.dobs[i][-1] - all_ppl.dobs[i][-2]) < (self.pars['short_int'] / fpd.mpy)):
all_ppl.short_interval_dates[i].append(all_ppl.age[i])
all_ppl.short_interval[i] += 1
short_interval += 1
self.step_results['short_intervals'] += short_interval
self.step_results['secondary_births'] += secondary_birth
for i in stillborn.inds: # Handle adding dates
all_ppl.still_dates[i].append(all_ppl.age[i])
# Add age of agents at birth with short birth interval
#for i in live.inds: # Handle DOBs
#if len(all_ppl.dobs[i]) > 1:
#for d in range(len(all_ppl.dobs[i]) - 1):
#if (all_ppl.dobs[i][d + 1] - all_ppl.dobs[i][d]) < self.pars['short_int']:
#short_interval_age = all_ppl.dobs[i][d+1].append(all_ppl.age[i][d+1])
# Handle twins
is_twin = live.binomial(self.pars['twins_prob'])
twin = live.filter(is_twin)
self.step_results['births'] += 2*len(twin) # only add births to population if born alive
twin.parity += 2 # Add 2 because matching DHS "total children ever born (alive) v201"
# Handle singles
single = live.filter(~is_twin)
self.step_results['births'] += len(single)
single.parity += 1
#Calculate total births
self.step_results['total_births'] = len(stillborn) + self.step_results['births']
live_age = live.age
for key, (age_low, age_high) in fpd.age_bin_map.items():
birth_bins = np.sum((live_age >= age_low) * (live_age < age_high))
self.step_results['birth_bins'][key] += birth_bins
if self.pars['track_as']:
total_women_delivering = np.full(len(self), False)
total_women_delivering[np.searchsorted(self.uid, live.uid)] = True
self.step_results['mmr_age_by_group'] = self.age_by_group
# Check mortality
maternal_deaths = live.check_maternal_mortality() # Mothers of only live babies eligible to match definition of maternal mortality ratio
if self.pars['track_as']:
maternal_deaths_bool = np.full(len(self), False)
maternal_deaths_bool[np.searchsorted(self.uid, maternal_deaths.uid)] = True
total_infants_bool = np.full(len(self), False)
total_infants_bool[np.searchsorted(self.uid, live.uid)] = True
i_death = live.check_infant_mortality()
# Save infant deaths and totals into age buckets
if self.pars['track_as']:
infant_deaths_bool = np.full(len(self), False)
infant_deaths_bool[np.searchsorted(self.uid, i_death.uid)] = True
self.step_results['imr_age_by_group'] = self.age_by_group # age groups have to be in same context as imr
self.step_results['imr_numerator'] = infant_deaths_bool # we need to track these over time to be summed by year
self.step_results['imr_denominator'] = total_infants_bool
self.step_results['mmr_numerator'] = maternal_deaths_bool
self.step_results['mmr_denominator'] = total_women_delivering
live_births_age_split = self.log_age_split(binned_ages_t=[self.age_by_group], channel='births', numerators=[total_women_delivering], denominators=None)
for key in live_births_age_split:
self.step_results[key] = live_births_age_split[key]
# TEMP -- update children, need to refactor
r = sc.dictobj(**self.step_results)
new_people = r.births - r.infant_deaths # Do not add agents who died before age 1 to population
children_map = sc.ddict(int)
for i in live.inds:
children_map[i] += 1
for i in twin.inds:
children_map[i] += 1
for i in i_death.inds:
children_map[i] -= 1
assert sum(list(children_map.values())) == new_people
start_ind = len(all_ppl)
for mother,n_children in children_map.items():
end_ind = start_ind+n_children
children = list(range(start_ind, end_ind))
all_ppl.children[mother] += children
start_ind = end_ind
return
[docs]
def update_age(self):
'''Advance age in the simulation'''
self.age += self.pars['timestep'] / fpd.mpy # Age the person for the next timestep
self.age = np.minimum(self.age, self.pars['max_age'])
return
[docs]
def update_education(self):
'''Advance education attainment in the simulation, determine if agents have completed their educationm,
'''
# Filter people who have not: completed education, dropped out or had their education interrupted
students = self.filter((self.edu_started & ~self.edu_completed & ~self.edu_dropout & ~self.edu_interrupted))
# Advance education attainment
students.edu_attainment += self.pars['timestep'] / fpd.mpy
# Check who will experience an interruption
students.interrupt_education()
# Make some students dropout based on dropout | parity probabilities
par1 = students.filter(students.parity == 1)
par1.dropout_education('1') # Women with parity 1
par2plus = students.filter(students.parity >= 2)
par2plus.dropout_education('2+') # Women with parity 2+
[docs]
def graduate(self):
completed_inds = sc.findinds(self.edu_attainment >= self.edu_objective)
# NOTE: the two lines below were necessary because edu_completed was not being updating as expected
tmp = self.edu_completed
tmp[completed_inds] = True
self.edu_completed = tmp
[docs]
def start_education(self):
'''
Begin education
'''
new_students = self.filter(~self.edu_started & (self.age >= self.pars["education"]["age_start"]))
new_students.edu_started = True
[docs]
def interrupt_education(self):
'''
Interrupt education due to pregnancy. This method hinders education progression if a
woman is pregnant and towards the end of the first trimester
'''
# Hinder education progression if a woman is pregnant and towards the end of the first trimester
pregnant_students = self.filter(self.pregnant)
end_first_tri = pregnant_students.filter(pregnant_students.gestation == self.pars['end_first_tri'])
# Disrupt education
end_first_tri.edu_interrupted = True
[docs]
def resume_education(self):
'''
# Basic mechanism to resume education post-pregnancy:
# If education was interrupted due to pregnancy, resume after 9 months pospartum ()
#TODO: check if there's any evidence supporting this assumption
'''
# Basic mechanism to resume education post-pregnancy:
# If education was interrupted due to pregnancy, resume after 9 months pospartum
pospartum_students = self.filter(self.postpartum & self.edu_interrupted & ~self.edu_completed & ~self.edu_dropout)
resume_inds = sc.findinds(pospartum_students.postpartum_dur > 0.5 * self.pars['postpartum_dur'])
tmp = pospartum_students.edu_interrupted
tmp[resume_inds] = False
pospartum_students.edu_interrupted = tmp
[docs]
def dropout_education(self, parity):
dropout_dict = self.pars['education']['edu_dropout_probs'][parity]
age_cutoffs = np.hstack((dropout_dict['age'], dropout_dict['age'].max() + 1))
age_inds = np.digitize(self.age, age_cutoffs) - 1
# Decide who will dropout
self.edu_dropout = fpu.binomial_arr(dropout_dict['percent'][age_inds])
[docs]
def update_age_bin_totals(self):
'''
Count how many total live women in each 5-year age bin 10-50, for tabulating ASFR
'''
for key, (age_low, age_high) in fpd.age_bin_map.items():
this_age_bin = self.filter((self.age >= age_low) * (self.age < age_high))
self.step_results['age_bin_totals'][key] += len(this_age_bin)
return
[docs]
def log_age_split(self, binned_ages_t, channel, numerators, denominators=None):
counts_dict = {}
results_dict = {}
if denominators is not None: # true when we are calculating rates (like cpr)
for timestep_index in range(len(binned_ages_t)):
if len(denominators[timestep_index]) == 0:
counts_dict[f"age_true_counts_{timestep_index}"] = {}
counts_dict[f"age_false_counts_{timestep_index}"] = {}
else:
binned_ages = binned_ages_t[timestep_index]
binned_ages_true = binned_ages[np.logical_and(numerators[timestep_index], denominators[timestep_index])]
if len(numerators[timestep_index]) == 0:
binned_ages_false = [] # ~[] doesnt make sense
else:
binned_ages_false = binned_ages[np.logical_and(~numerators[timestep_index], denominators[timestep_index])]
counts_dict[f"age_true_counts_{timestep_index}"] = dict(zip(*np.unique(binned_ages_true, return_counts=True)))
counts_dict[f"age_false_counts_{timestep_index}"] = dict(zip(*np.unique(binned_ages_false, return_counts=True)))
age_true_counts = {}
age_false_counts = {}
for age_counts_dict_key in counts_dict:
for index in counts_dict[age_counts_dict_key]:
age_true_counts[index] = 0 if index not in age_true_counts else age_true_counts[index]
age_false_counts[index] = 0 if index not in age_false_counts else age_false_counts[index]
if 'false' in age_counts_dict_key:
age_false_counts[index] += counts_dict[age_counts_dict_key][index]
else:
age_true_counts[index] += counts_dict[age_counts_dict_key][index]
for index, age_str in enumerate(fpd.age_specific_channel_bins):
scale = 1
if channel == "imr":
scale = 1000
elif channel == "mmr":
scale = 100000
if index not in age_true_counts:
results_dict[f"{channel}_{age_str}"] = 0
elif index in age_true_counts and index not in age_false_counts:
results_dict[f"{channel}_{age_str}"] = 1.0 * scale
else:
results_dict[f"{channel}_{age_str}"] = (age_true_counts[index] / (age_true_counts[index] + age_false_counts[index])) * scale
else: # true when we are calculating counts (like pregnancies)
for timestep_index in range(len(binned_ages_t)):
if len(numerators[timestep_index]) == 0:
counts_dict[f"age_counts_{timestep_index}"] = {}
else:
binned_ages = binned_ages_t[timestep_index]
binned_ages_true = binned_ages[numerators[timestep_index]]
counts_dict[f"age_counts_{timestep_index}"] = dict(zip(*np.unique(binned_ages_true, return_counts=True)))
age_true_counts = {}
for age_counts_dict_key in counts_dict:
for index in counts_dict[age_counts_dict_key]:
age_true_counts[index] = 0 if index not in age_true_counts else age_true_counts[index]
age_true_counts[index] += counts_dict[age_counts_dict_key][index]
for index, age_str in enumerate(fpd.age_specific_channel_bins):
if index not in age_true_counts:
results_dict[f"{channel}_{age_str}"] = 0
else:
results_dict[f"{channel}_{age_str}"] = age_true_counts[index]
return results_dict
[docs]
def track_mcpr(self):
'''
Track for purposes of calculating mCPR at the end of the timestep after all people are updated
Not including LAM users in mCPR as this model counts all women passively using LAM but
DHS data records only women who self-report LAM which is much lower.
Follows the DHS definition of mCPR
'''
modern_methods = sc.findinds(list(self.pars['methods']['modern'].values()))
method_age = (self.pars['method_age'] <= self.age)
fecund_age = self.age < self.pars['age_limit_fecundity']
denominator = method_age * fecund_age * self.is_female * (self.alive)
numerator = np.isin(self.method, modern_methods)
no_method_mcpr = np.sum((self.method == 0) * denominator)
on_method_mcpr = np.sum(numerator * denominator)
self.step_results['no_methods_mcpr'] += no_method_mcpr
self.step_results['on_methods_mcpr'] += on_method_mcpr
if self.pars['track_as']:
as_result_dict = self.log_age_split(binned_ages_t=[self.age_by_group], channel='mcpr', numerators=[numerator], denominators=[denominator])
for key in as_result_dict:
self.step_results[key] = as_result_dict[key]
return
[docs]
def track_cpr(self):
'''
Track for purposes of calculating newer ways to conceptualize contraceptive prevalence
at the end of the timestep after all people are updated
Includes women using any method of contraception, including LAM
Denominator of possible users includes all women aged 15-49
'''
denominator = ((self.pars['method_age'] <= self.age) * (self.age < self.pars['age_limit_fecundity']) * (
self.sex == 0) * (self.alive))
numerator = self.method != 0
no_method_cpr = np.sum((self.method == 0) * denominator)
on_method_cpr = np.sum(numerator * denominator)
self.step_results['no_methods_cpr'] += no_method_cpr
self.step_results['on_methods_cpr'] += on_method_cpr
if self.pars['track_as']:
as_result_dict = self.log_age_split(binned_ages_t=[self.age_by_group], channel='cpr', numerators=[numerator], denominators=[denominator])
for key in as_result_dict:
self.step_results[key] = as_result_dict[key]
return
[docs]
def track_acpr(self):
'''
Track for purposes of calculating newer ways to conceptualize contraceptive prevalence
at the end of the timestep after all people are updated
Denominator of possible users excludes pregnant women and those not sexually active in the last 4 weeks
Used to compare new metrics of contraceptive prevalence and eventually unmet need to traditional mCPR definitions
'''
denominator = ((self.pars['method_age'] <= self.age) * (self.age < self.pars['age_limit_fecundity']) * (
self.sex == 0) * (self.pregnant == 0) * (self.sexually_active == 1) * (self.alive))
numerator = self.method != 0
no_method_cpr = np.sum((self.method == 0) * denominator)
on_method_cpr = np.sum(numerator * denominator)
self.step_results['no_methods_acpr'] += no_method_cpr
self.step_results['on_methods_acpr'] += on_method_cpr
if self.pars['track_as']:
as_result_dict = self.log_age_split(binned_ages_t=[self.age_by_group], channel='acpr', numerators=[numerator], denominators=[denominator])
for key in as_result_dict:
self.step_results[key] = as_result_dict[key]
return
[docs]
def init_step_results(self):
self.step_results = dict(
deaths = 0,
births = 0,
stillbirths = 0,
total_births = 0,
short_intervals = 0,
secondary_births = 0,
maternal_deaths = 0,
infant_deaths = 0,
on_methods_mcpr = 0,
no_methods_mcpr = 0,
on_methods_cpr = 0,
no_methods_cpr = 0,
on_methods_acpr = 0,
no_methods_acpr = 0,
as_stillbirths = [],
imr_numerator = [],
imr_denominator = [],
mmr_numerator = [],
mmr_denominator = [],
pp0to5 = 0,
pp6to11 = 0,
pp12to23 = 0,
total_women_fecund = 0,
pregnancies = 0,
unintended_pregs = 0,
birthday_fraction = None,
birth_bins = {},
age_bin_totals = {},
switching_annual = {},
switching_postpartum = {},
imr_age_by_group = [],
mmr_age_by_group = [],
stillbirth_ages = []
)
if self.pars['track_as']:
as_keys = dict(
as_stillbirths=[],
imr_numerator=[],
imr_denominator=[],
mmr_numerator=[],
mmr_denominator=[],
imr_age_by_group=[],
mmr_age_by_group = [],
stillbirth_ages = []
)
self.step_results.update(as_keys)
as_channels = ['acpr', 'cpr', 'mcpr', 'stillbirths', "births", "pregnancies"]
for age_specific_channel in as_channels:
for age_range in fpd.age_specific_channel_bins:
self.step_results[f"{age_specific_channel}_{age_range}"] = 0
for key in fpd.age_bin_map.keys():
self.step_results['birth_bins'][key] = 0
self.step_results['age_bin_totals'][key] = 0
m = len(self.pars['methods']['map'])
def mm_zeros():
''' Return an array of m x m zeros '''
return np.zeros((m, m), dtype=int)
if self.pars['track_switching']:
for key in fpd.method_age_map.keys():
self.step_results['switching_annual'][key] = mm_zeros()
self.step_results['switching_postpartum'][key] = mm_zeros()
self.step_results['switching'] = dict(
annual = mm_zeros(),
postpartum = mm_zeros(),
)
return
[docs]
def update(self):
'''
Update the person's state for the given timestep.
t is the time in the simulation in years (ie, 0-60), y is years of simulation (ie, 1960-2010)'''
self.init_step_results() # Initialize outputs
alive_start = self.filter(self.alive)
alive_start.check_mortality() # Decide if person dies at this t in the simulation
alive_check = self.filter(self.alive) # Reselect live agents after exposure to general mortality
# Update pregnancy with maternal mortality outcome
preg = alive_check.filter(alive_check.pregnant)
preg.check_delivery() # Deliver with birth outcomes if reached pregnancy duration
# Reselect for live agents after exposure to maternal mortality
alive_now = self.filter(self.alive)
fecund = alive_now.filter((alive_now.sex == 0) * (alive_now.age < alive_now.pars['age_limit_fecundity']))
nonpreg = fecund.filter(~fecund.pregnant)
lact = fecund.filter(fecund.lactating)
if self.pars['restrict_method_use'] == 1:
methods = nonpreg.filter((nonpreg.age >= nonpreg.fated_debut) * (nonpreg.months_inactive < 12))
else:
methods = nonpreg.filter(nonpreg.age >= self.pars['method_age'])
# Check if has reached their age at first partnership and set partnered attribute to True.
# TODO: decide whether this is the optimal place to perform this update, and how it may interact with sexual debut age
alive_now.check_partnership()
# Update everything else
preg.update_pregnancy() # Advance gestation in timestep, handle miscarriage
nonpreg.check_sexually_active()
methods.update_methods()
nonpreg.update_postpartum() # Updates postpartum counter if postpartum
lact.update_breastfeeding()
nonpreg.check_lam()
nonpreg.check_conception() # Decide if conceives and initialize gestation counter at 0
# Update education
if self.pars['education'] is not None:
alive_now_f = self.filter(self.is_female)
alive_now_f.start_education() # Check if anyone needs to start school
alive_now_f.update_education() # Advance attainment, determine who reaches their objective, who dropouts, who has their education interrupted
alive_now_f.resume_education() # Determine who goes back to school after an interruption
alive_now_f.graduate() # Check if anyone achieves their education goal
# Update results
fecund.update_age_bin_totals()
self.track_mcpr()
self.track_cpr()
self.track_acpr()
age_min = self.age >= 15 # CK: TODO: remove hardcoding
age_max = self.age < self.pars['age_limit_fecundity']
self.step_results['total_women_fecund'] = np.sum(self.is_female * age_min * age_max)
# Age person at end of timestep after tabulating results
alive_now.update_age() # Important to keep this here so birth spacing gets recorded accurately
# Storing ages by method age group
age_bins = [0] + [max(fpd.age_specific_channel_bins[key]) for key in fpd.age_specific_channel_bins]
self.age_by_group = np.digitize(self.age, age_bins) - 1
return self.step_results
#%% Plotting helper functions
def fixaxis(useSI=True, set_lim=True, legend=True):
''' Format the axis using SI units and limits '''
if legend:
pl.legend() # Add legend
if set_lim:
sc.setylim()
if useSI:
sc.SIticks()
return
def tidy_up(fig, do_show=None, do_save=None, filename=None):
''' Helper function to handle the slightly complex logic of showing, saving, returing -- not for users '''
# Handle inputs
if do_show is None: do_show = fpo.show
if do_save is None: do_save = fpo.save
backend = pl.get_backend()
# Handle show
if backend == 'agg': # Cannot show plots for a non-interactive backend
do_show = False
if do_show: # Now check whether to show, and atually do it
pl.show()
# Handle saving
if do_save:
if isinstance(do_save, str): # No figpath provided - see whether do_save is a figpath
filename = sc.makefilepath(filename) # Ensure it's valid, including creating the folder
sc.savefig(fig=fig, filename=filename) # Save the figure
# Handle close
if fpo.close and not do_show:
pl.close(fig)
# Return the figure or figures unless we're in Jupyter
if not fpo.returnfig:
return
else:
return fig
#%% Sim class
[docs]
class Sim(fpb.BaseSim):
'''
The Sim class handles the running of the simulation: the creation of the
population and the dynamics of the epidemic. This class handles the mechanics
of the actual simulation, while BaseSim takes care of housekeeping (saving,
loading, exporting, etc.). Please see the BaseSim class for additional methods.
Args:
pars (dict): parameters to modify from their default values
location (str): name of the location (country) to look for data file to load
label (str): the name of the simulation (useful to distinguish in batch runs)
track_children (bool): whether to track links between mothers and their children (slow, so disabled by default)
kwargs (dict): additional parameters; passed to ``fp.make_pars()``
**Examples**::
sim = fp.Sim()
sim = fp.Sim(n_agents=10e3, location='senegal', label='My small Seneagl sim')
'''
def __init__(self, pars=None, location=None, label=None, track_children=False, **kwargs):
# Check parameters
loc_pars = fpp.pars(location)
pars = sc.mergedicts(loc_pars, pars)
mismatches = [key for key in kwargs.keys() if key not in fpp.par_keys]
if len(mismatches):
errormsg = f'Key(s) {mismatches} not found; available keys are {fpp.par_keys}'
raise sc.KeyNotFoundError(errormsg)
super().__init__(pars, location=location, **kwargs) # Initialize and set the parameters as attributes
self.initialized = False
self.already_run = False
self.test_mode = False
self.label = label
self.track_children = track_children
fpu.set_metadata(self) # Set version, date, and git info
return
[docs]
def initialize(self, force=False):
if force or not self.initialized:
fpu.set_seed(self['seed'])
self.init_results()
self.init_people()
return self
[docs]
def init_results(self):
resultscols = ['t', 'pop_size_months','pregnancies', 'births', 'deaths', 'stillbirths', 'miscarriages','abortions', 'total_births', 'maternal_deaths', 'infant_deaths',
'cum_maternal_deaths', 'cum_infant_deaths', 'on_methods_mcpr', 'no_methods_mcpr', 'on_methods_cpr', 'no_methods_cpr', 'on_methods_acpr',
'no_methods_acpr', 'mcpr', 'cpr', 'acpr', 'pp0to5', 'pp6to11', 'pp12to23', 'nonpostpartum', 'total_women_fecund', 'unintended_pregs', 'birthday_fraction',
'total_births_10-14', 'total_births_15-19', 'total_births_20-24', 'total_births_25-29', 'total_births_30-34', 'total_births_35-39', 'total_births_40-44',
'total_births_45-49', 'total_women_10-14', 'total_women_15-19', 'total_women_20-24', 'total_women_25-29', 'total_women_30-34', 'total_women_35-39',
'total_women_40-44', 'total_women_45-49', 'short_intervals','secondary_births','proportion_short_interval']
self.results = {}
for key in resultscols:
self.results[key] = np.zeros(int(self.npts))
self.results['tfr_years'] = [] # CK: TODO: refactor into loop with keys
self.results['tfr_rates'] = []
self.results['pop_size'] = []
self.results['mcpr_by_year'] = []
self.results['cpr_by_year'] = []
self.results['method_failures_over_year'] = []
self.results['infant_deaths_over_year'] = []
self.results['total_births_over_year'] = []
self.results['live_births_over_year'] = []
self.results['stillbirths_over_year'] = []
self.results['miscarriages_over_year'] = []
self.results['abortions_over_year'] = []
self.results['pregnancies_over_year'] = []
self.results['short_intervals_over_year'] = []
self.results['secondary_births_over_year'] = []
self.results['risky_pregs_over_year'] = []
self.results['maternal_deaths_over_year'] = []
self.results['proportion_short_interval_by_year'] = []
self.results['mmr'] = []
self.results['imr'] = []
self.results['birthday_fraction'] = []
self.results['asfr'] = {}
self.results['method_usage'] = []
for key in fpd.age_bin_map.keys():
self.results['asfr'][key] = []
self.results[f"tfr_{key}"] = []
if self['track_switching']:
m = len(self['methods']['map'])
keys = [
'switching_events_annual',
'switching_events_postpartum',
'switching_events_<18',
'switching_events_18-20',
'switching_events_21-25',
'switching_events_26-35',
'switching_events_>35',
'switching_events_pp_<18',
'switching_events_pp_18-20',
'switching_events_pp_21-25',
'switching_events_pp_26-35',
'switching_events_pp_>35',
]
for key in keys:
self.results[key] = {} # CK: TODO: refactor
for p in range(self.npts):
self.results[key][p] = np.zeros((m, m), dtype=int)
if self.pars['track_as']:
self.results['imr_age_by_group'] = []
self.results['mmr_age_by_group'] = []
self.results['stillbirth_ages'] = []
for age_specific_channel in ['acpr', 'cpr', 'mcpr', 'pregnancies', 'births', 'imr_numerator', 'imr_denominator', 'mmr_numerator', 'mmr_denominator', 'imr', 'mmr', 'as_stillbirths', 'stillbirths']:
for age_group in fpd.age_specific_channel_bins:
if 'numerator' in age_specific_channel or 'denominator' in age_specific_channel or 'as_' in age_specific_channel:
self.results[f"{age_specific_channel}"] = []
else:
self.results[f"{age_specific_channel}_{age_group}"] = []
return
[docs]
def get_age_sex(self, n):
''' For an ex nihilo person, figure out if they are male and female, and how old '''
pyramid = self['age_pyramid']
self.m_frac = pyramid[:,1].sum() / pyramid[:,1:3].sum()
ages = np.zeros(n)
sexes = np.random.random(n) < self.m_frac # Pick the sex based on the fraction of men vs. women
f_inds = sc.findinds(sexes == 0)
m_inds = sc.findinds(sexes == 1)
age_data_min = pyramid[:,0]
age_data_max = np.append(pyramid[1:,0], self['max_age'])
age_data_range = age_data_max - age_data_min
for i,inds in enumerate([m_inds, f_inds]):
if len(inds):
age_data_prob = pyramid[:,i+1]
age_data_prob = age_data_prob/age_data_prob.sum() # Ensure it sums to 1
age_bins = fpu.n_multinomial(age_data_prob, len(inds)) # Choose age bins
ages[inds] = age_data_min[age_bins] + age_data_range[age_bins]*np.random.random(len(inds)) # Uniformly distribute within this age bin
return ages, sexes
[docs]
def initialize_urban(self, n, urban_prop):
"""Get initial distribution of urban"""
urban = np.ones(n, dtype=bool)
if urban_prop is not None:
urban = fpu.n_binomial(urban_prop, n)
return urban
[docs]
def initialize_empowerment(self, n, ages, sexes):
"""Get initial distribution of women's empowerment metrics/attributes"""
# NOTE: we assume that either probabilities or metrics in empowerment_dict are defined over all possible ages
# from 0 to 100 years old.
empowerment_dict = self['empowerment']
# Empowerment dictionary
empowerment = {}
empowerment['paid_employment'] = np.zeros(n, dtype=bool)
empowerment['sexual_autonomy'] = np.zeros(n, dtype=float)
empowerment['control_over_wages'] = np.zeros(n, dtype=float)
if empowerment_dict is not None:
# Find only female agents
f_inds = sc.findinds(sexes == 0)
# Get ages from women
f_ages = ages[f_inds]
# Create age bins
age_cutoffs = np.hstack((empowerment_dict['age'], empowerment_dict['age'].max()+1))
age_inds = np.digitize(f_ages, age_cutoffs)-1
# Paid employment
paid_employment_probs = empowerment_dict['paid_employment']
empowerment['paid_employment'][f_inds] = fpu.binomial_arr(paid_employment_probs[age_inds])
for metric in ['control_over_wages', 'sexual_autonomy']:
empowerment[metric][f_inds] = empowerment_dict[metric][age_inds]
return empowerment
[docs]
def initialize_education(self, n, ages, sexes, urban):
"""Get initial distribution of education goal, attainment and whether
a woman has reached their education goal"""
education_dict = self['education']
# Initialise individual education attainment - number of education years completed at start of simulation
# Assess whether a woman has completed her education based on the values of the two previous attributes
# Education dictionary
education = {'edu_objective': np.zeros(n, dtype=float),
'edu_attainment': np.zeros(n, dtype=float),
'edu_started' : np.zeros(n, dtype=bool),
'edu_completed': np.zeros(n, dtype=bool),
'edu_droput': np.zeros(n, dtype=bool)}
if education_dict is not None:
# Initialise individual education objectives from a 2d array of probs with dimensions (urban, edu_years)
f_inds_urban = sc.findinds(sexes == 0, urban == True)
f_inds_rural = sc.findinds(sexes == 0, urban == False)
# Set objectives
probs_rural = education_dict['edu_objective'][1, :]
probs_urban = education_dict['edu_objective'][0, :]
edu_years = np.arange(len(probs_rural))
education['edu_objective'][f_inds_rural] = np.random.choice(edu_years, size=len(f_inds_rural), p=probs_rural) # Probs in rural settings
education['edu_objective'][f_inds_urban] = np.random.choice(edu_years, size=len(f_inds_urban), p=probs_urban) # Probs in urban settings
# Initialise education attainment - ie, current state of education at the start of the simulation
f_inds = sc.findinds(sexes == 0)
# Get ages for female agents and round them so we can use them as indices
f_ages = np.floor(ages[f_inds]).astype(int)
# Set the initial number of education years an agent has based on her age
education['edu_attainment'][f_inds] = np.floor((education_dict['edu_attainment'][f_ages]))
# Check people who started their education
started_inds = sc.findinds(education['edu_attainment'][f_inds] > 0.0)
# Check people who completed their education
completed_inds = sc.findinds(education['edu_objective'][f_inds]-education['edu_attainment'][f_inds] <= 0)
# Set attainment to edu_objective, just in case that initial edu_attainment > edu_objective
education['edu_attainment'][f_inds[completed_inds]] = education['edu_objective'][f_inds[completed_inds]]
education['edu_completed'][f_inds[completed_inds]] = True
education['edu_started'][f_inds[started_inds]] = True
return education
[docs]
def initialize_partnered(self, n, ages, sexes):
"""Get initial distribution of age at first partnership"""
partnership_data = self['age_partnership']
partnered = np.zeros(n, dtype=bool)
partnership_age = np.zeros(n, dtype=float)
if partnership_data is not None:
# Find only female agents
f_inds = sc.findinds(sexes == 0)
# Get ages from women
f_ages = ages[f_inds]
# Select age at first partnership
partnership_age[f_inds] = np.random.choice(partnership_data['age'], size=len(f_inds), p=partnership_data['partnership_probs'])
# Check if age at first partnership => than current age to set partnered
p_inds = sc.findinds((f_ages >= partnership_age[f_inds]))
partnered[f_inds[p_inds]] = True
return partnered, partnership_age
[docs]
def make_people(self, n=1, age=None, sex=None, method=None, debut_age=None):
''' Set up each person '''
_age, _sex = self.get_age_sex(n)
if age is None: age = _age
if sex is None: sex = _sex
if method is None: method = np.zeros(n, dtype=np.int64)
barrier = fpu.n_multinomial(self['barriers'][:], n)
debut_age = self['debut_age']['ages'][fpu.n_multinomial(self['debut_age']['probs'], n)]
fertile = fpu.n_binomial(1 - self['primary_infertility'], n)
urban = self.initialize_urban(n, self['urban_prop'])
partnered, partnership_age = self.initialize_partnered(n, age, sex)
empowerment = self.initialize_empowerment(n, age, sex)
education = self.initialize_education(n, age, sex, urban)
data = dict(
age=age,
sex=sex,
method=method,
barrier=barrier,
debut_age=debut_age,
fertile=fertile,
urban=urban,
partnered=partnered,
partnership_age=partnership_age,
**sc.mergedicts(empowerment, education)
)
return data
[docs]
def init_people(self, output=False, **kwargs):
''' Create the people '''
p = sc.objdict(self.make_people(n=int(self['n_agents'])))
self.people = People(
pars=self.pars,
age=p.age,
sex=p.sex,
method=p.method,
barrier=p.barrier,
debut_age=p.debut_age,
fertile=p.fertile,
urban=p.urban,
partnered=p.partnered,
partnership_age=p.partnership_age,
paid_employment=p.paid_employment,
sexual_autonomy=p.sexual_autonomy,
control_over_wages=p.control_over_wages,
edu_objective=p.edu_objective,
edu_attainment=p.edu_attainment,
edu_completed=p.edu_completed,
edu_started=p.edu_started,
)
return
[docs]
def update_methods(self):
'''
Update all contraceptive method matrices to have probabilities that follow a trend closest to the
year the sim is on based on mCPR in that year
'''
methods = self['methods'] # Shorten methods
methods['adjusted'] = sc.dcp(methods['raw']) # Avoids needing to copy this within loops later
# Compute the trend in MCPR
trend_years = methods['mcpr_years']
trend_vals = methods['mcpr_rates']
ind = sc.findnearest(trend_years, self.y) # The year of data closest to the sim year
norm_ind = sc.findnearest(trend_years, self['mcpr_norm_year']) # The year we're using to normalize
nearest_val = trend_vals[ind] # Nearest MCPR value from the data
norm_val = trend_vals[norm_ind] # Normalization value
if self.y > max(trend_years): # We're after the last year of data: extrapolate
eps = 1e-3 # Epsilon for lowest allowed MCPR value (to avoid divide by zero errors)
nearest_year = trend_years[ind]
year_diff = self.y - nearest_year
correction = self['mcpr_growth_rate']*year_diff # Project the change in MCPR
extrapolated_val = nearest_val*(1 + correction) # Multiply the current value by the projection
trend_val = np.clip(extrapolated_val, eps, self['mcpr_max']) # Ensure it stays within bounds
else: # Otherwise, just use the nearest data point
trend_val = nearest_val
norm_trend_val = trend_val/norm_val # Normalize so the correction factor is 1 at the normalization year
# Update annual (non-postpartum) population and postpartum switching matrices for current year mCPR - stratified by age
for switchkey in ['annual', 'pp1to6']:
for matrix in methods['adjusted'][switchkey].values():
matrix[0, 0] /= norm_trend_val # Takes into account mCPR during year of sim
for i in range(len(matrix)):
denom = matrix[i,:].sum()
if denom > 0:
matrix[i,:] = matrix[i, :] / denom # Normalize so probabilities add to 1
# Update postpartum initiation matrices for current year mCPR - stratified by age
for matrix in methods['adjusted']['pp0to1'].values():
matrix[0] /= norm_trend_val # Takes into account mCPR during year of sim
matrix /= matrix.sum()
return
[docs]
def update_mortality(self):
''' Update infant and maternal mortality for the sim's current year. Update general mortality trend
as this uses a spline interpolation instead of an array'''
mapping = {
'age_mortality': 'gen_trend',
'infant_mortality': 'infant',
'maternal_mortality': 'maternal',
'stillbirth_rate': 'stillbirth',
}
self['mortality_probs'] = {}
for key1,key2 in mapping.items():
ind = sc.findnearest(self[key1]['year'], self.y)
val = self[key1]['probs'][ind]
self['mortality_probs'][key2] = val
return
[docs]
def update_mothers(self):
'''Add link between newly added individuals and their mothers'''
all_ppl = self.people.unfilter()
for mother_index, postpartum in enumerate(all_ppl.postpartum):
if postpartum and all_ppl.postpartum_dur[mother_index] < 2:
for child in all_ppl.children[mother_index]:
all_ppl.mothers[child] = mother_index
return
[docs]
def apply_interventions(self):
''' Apply each intervention in the model '''
from . import interventions as fpi # To avoid circular import
for i,intervention in enumerate(sc.tolist(self['interventions'])):
if isinstance(intervention, fpi.Intervention):
if not intervention.initialized: # pragma: no cover
intervention.initialize(self)
intervention.apply(self) # If it's an intervention, call the apply() method
elif callable(intervention):
intervention(self) # If it's a function, call it directly
else: # pragma: no cover
errormsg = f'Intervention {i} ({intervention}) is neither callable nor an Intervention object: it is {type(intervention)}'
raise TypeError(errormsg)
return
[docs]
def apply_analyzers(self):
''' Apply each analyzer in the model '''
from . import analyzers as fpa # To avoid circular import
for i,analyzer in enumerate(sc.tolist(self['analyzers'])):
if isinstance(analyzer, fpa.Analyzer):
if not analyzer.initialized: # pragma: no cover
analyzer.initialize(self)
analyzer.apply(self) # If it's an intervention, call the apply() method
elif callable(analyzer):
analyzer(self) # If it's a function, call it directly
else: # pragma: no cover
errormsg = f'Analyzer {i} ({analyzer}) is neither callable nor an Analyzer object: it is {type(analyzer)}'
raise TypeError(errormsg)
return
[docs]
def finalize_interventions(self):
''' Make any final updates to interventions (e.g. to shrink) '''
from . import interventions as fpi # To avoid circular import
for intervention in sc.tolist(self['interventions']):
if isinstance(intervention, fpi.Intervention):
intervention.finalize(self)
[docs]
def finalize_analyzers(self):
''' Make any final updates to analyzers (e.g. to shrink) '''
from . import analyzers as fpa # To avoid circular import
for analyzer in sc.tolist(self['analyzers']):
if isinstance(analyzer, fpa.Analyzer):
analyzer.finalize(self)
[docs]
def run(self, verbose=None):
''' Run the simulation '''
# Initialize -- reset settings and results
T = sc.timer()
if verbose is None:
verbose = self['verbose']
self.initialize()
if self.already_run:
errormsg = 'Cannot re-run an already run sim; please recreate or copy prior to a run'
raise RuntimeError(errormsg)
# Main simulation loop
for i in range(self.npts): # Range over number of timesteps in simulation (ie, 0 to 261 steps)
self.i = i # Timestep
self.t = self.ind2year(i) # t is time elapsed in years given how many timesteps have passed (ie, 25.75 years)
self.y = self.ind2calendar(i) # y is calendar year of timestep (ie, 1975.75)
# Print progress
elapsed = T.toc(output=True)
if verbose:
simlabel = f'"{self.label}": ' if self.label else ''
string = f' Running {simlabel}{self.y:0.0f} of {self["end_year"]} ({i:2.0f}/{self.npts}) ({elapsed:0.2f} s) '
if verbose >= 2:
sc.heading(string)
elif verbose>0:
if not (self.t % int(1.0/verbose)):
sc.progressbar(self.i+1, self.npts, label=string, length=20, newline=True)
# Update method matrices for year of sim to trend over years
self.update_methods()
# Update mortality probabilities for year of sim
self.update_mortality()
# Apply interventions and analyzers
self.apply_interventions()
self.apply_analyzers()
# Update the people
self.people.i = self.i
self.people.t = self.t
step_results = self.people.update()
r = sc.dictobj(**step_results)
# Start calculating results
new_people = r.births - r.infant_deaths # Do not add agents who died before age 1 to population
# Births
data = self.make_people(n=new_people, age=np.zeros(new_people))
people = People(pars=self.pars, n=new_people, **data)
self.people += people
# Update mothers
if self.track_children:
self.update_mothers()
# Results
percent0to5 = (r.pp0to5 / r.total_women_fecund) * 100
percent6to11 = (r.pp6to11 / r.total_women_fecund) * 100
percent12to23 = (r.pp12to23 / r.total_women_fecund) * 100
nonpostpartum = ((r.total_women_fecund - r.pp0to5 - r.pp6to11 - r.pp12to23)/r.total_women_fecund) * 100
# Store results
if self['scaled_pop']:
scale = self['scaled_pop']/self['n_agents']
else:
scale = 1
self.results['t'][i] = self.tvec[i]
self.results['pop_size_months'][i] = self.n*scale
self.results['births'][i] = r.births*scale
self.results['deaths'][i] = r.deaths*scale
self.results['stillbirths'][i] = r.stillbirths*scale
self.results['miscarriages'][i] = r.miscarriages*scale
self.results['abortions'][i] = r.abortions*scale
self.results['short_intervals'][i] = r.short_intervals*scale
self.results['secondary_births'][i] = r.secondary_births*scale
self.results['pregnancies'][i] = r.pregnancies*scale
self.results['total_births'][i] = r.total_births*scale
self.results['maternal_deaths'][i] = r.maternal_deaths*scale
self.results['infant_deaths'][i] = r.infant_deaths*scale
self.results['on_methods_mcpr'][i] = r.on_methods_mcpr
self.results['no_methods_mcpr'][i] = r.no_methods_mcpr
self.results['on_methods_cpr'][i] = r.on_methods_cpr
self.results['no_methods_cpr'][i] = r.no_methods_cpr
self.results['on_methods_acpr'][i] = r.on_methods_acpr
self.results['no_methods_acpr'][i] = r.no_methods_acpr
self.results['mcpr'][i] = r.on_methods_mcpr/(r.no_methods_mcpr + r.on_methods_mcpr)
self.results['cpr'][i] = r.on_methods_cpr/(r.no_methods_cpr + r.on_methods_cpr)
self.results['acpr'][i] = r.on_methods_acpr/(r.no_methods_acpr + r.on_methods_acpr)
self.results['pp0to5'][i] = percent0to5
self.results['pp6to11'][i] = percent6to11
self.results['pp12to23'][i] = percent12to23
self.results['nonpostpartum'][i] = nonpostpartum
self.results['total_women_fecund'][i] = r.total_women_fecund*scale
self.results['unintended_pregs'][i] = r.unintended_pregs*scale
if self.pars['track_as']:
for age_specific_channel in ['imr_numerator', 'imr_denominator', 'mmr_numerator', 'mmr_denominator', 'as_stillbirths', 'imr_age_by_group', 'mmr_age_by_group', 'stillbirth_ages']:
self.results[f"{age_specific_channel}"].append(getattr(r, f"{age_specific_channel}"))
if len(self.results[f"{age_specific_channel}"]) > 12:
self.results[f"{age_specific_channel}"] = self.results[f"{age_specific_channel}"][1:]
for age_specific_channel in ['acpr', 'cpr', 'mcpr', 'pregnancies', 'births']:
for method_agekey in fpd.age_specific_channel_bins:
self.results[f"{age_specific_channel}_{method_agekey}"].append(getattr(r, f"{age_specific_channel}_{method_agekey}"))
for agekey in fpd.age_bin_map.keys():
births_key = f'total_births_{agekey}'
women_key = f'total_women_{agekey}'
self.results[births_key][i] = r.birth_bins[agekey]*scale # Store results of total births per age bin for ASFR
self.results[women_key][i] = r.age_bin_totals[agekey]*scale # Store results of total fecund women per age bin for ASFR
# Store results of number of switching events in each age group
if self['track_switching']:
switch_events = step_results.pop('switching')
self.results['switching_events_<18'][i] = scale**scale*r.switching_annual['<18']
self.results['switching_events_18-20'][i] = scale*r.switching_annual['18-20']
self.results['switching_events_21-25'][i] = scale*r.switching_annual['21-25']
self.results['switching_events_26-35'][i] = scale*r.switching_annual['26-35']
self.results['switching_events_>35'][i] = scale*r.switching_annual['>35']
self.results['switching_events_pp_<18'][i] = scale*r.switching_postpartum['<18']
self.results['switching_events_pp_18-20'][i] = scale*r.switching_postpartum['18-20']
self.results['switching_events_pp_21-25'][i] = scale*r.switching_postpartum['21-25']
self.results['switching_events_pp_26-35'][i] = scale*r.switching_postpartum['26-35']
self.results['switching_events_pp_>35'][i] = scale*r.switching_postpartum['>35']
self.results['switching_events_annual'][i] = scale*switch_events['annual']
self.results['switching_events_postpartum'][i] = scale*switch_events['postpartum']
# Calculate metrics over the last year in the model and save whole years and stats to an array
if i % fpd.mpy == 0:
self.results['tfr_years'].append(self.y)
start_index = (int(self.t)-1)*fpd.mpy
stop_index = int(self.t)*fpd.mpy
unintended_pregs_over_year = scale*np.sum(self.results['unintended_pregs'][start_index:stop_index]) # Grabs sum of unintended pregnancies due to method failures over the last 12 months of calendar year
infant_deaths_over_year = scale*np.sum(self.results['infant_deaths'][start_index:stop_index])
total_births_over_year = scale*np.sum(self.results['total_births'][start_index:stop_index])
live_births_over_year = scale*np.sum(self.results['births'][start_index:stop_index])
stillbirths_over_year = scale*np.sum(self.results['stillbirths'][start_index:stop_index])
miscarriages_over_year = scale*np.sum(self.results['miscarriages'][start_index:stop_index])
abortions_over_year = scale*np.sum(self.results['abortions'][start_index:stop_index])
short_intervals_over_year = scale*np.sum(self.results['short_intervals'][start_index:stop_index])
secondary_births_over_year = scale*np.sum(self.results['secondary_births'][start_index:stop_index])
maternal_deaths_over_year = scale*np.sum(self.results['maternal_deaths'][start_index:stop_index])
pregnancies_over_year = scale*np.sum(self.results['pregnancies'][start_index:stop_index])
self.results['method_usage'].append(self.compute_method_usage()) # only want this per year
self.results['pop_size'].append(scale*self.n) # CK: TODO: replace with arrays
self.results['mcpr_by_year'].append(self.results['mcpr'][i])
self.results['cpr_by_year'].append(self.results['cpr'][i])
self.results['method_failures_over_year'].append(unintended_pregs_over_year)
self.results['infant_deaths_over_year'].append(infant_deaths_over_year)
self.results['total_births_over_year'].append(total_births_over_year)
self.results['live_births_over_year'].append(live_births_over_year)
self.results['stillbirths_over_year'].append(stillbirths_over_year)
self.results['miscarriages_over_year'].append(miscarriages_over_year)
self.results['abortions_over_year'].append(abortions_over_year)
self.results['short_intervals_over_year'].append(short_intervals_over_year)
self.results['secondary_births_over_year'].append(secondary_births_over_year)
self.results['maternal_deaths_over_year'].append(maternal_deaths_over_year)
self.results['pregnancies_over_year'].append(pregnancies_over_year)
if self.pars['track_as']:
imr_results_dict = self.people.log_age_split(binned_ages_t=self.results['imr_age_by_group'], channel='imr',
numerators=self.results['imr_numerator'], denominators=self.results['imr_denominator'])
mmr_results_dict = self.people.log_age_split(binned_ages_t=self.results['mmr_age_by_group'], channel='mmr',
numerators=self.results['mmr_numerator'], denominators=self.results['mmr_denominator'])
stillbirths_results_dict = self.people.log_age_split(binned_ages_t=self.results['stillbirth_ages'], channel='stillbirths',
numerators=self.results['as_stillbirths'], denominators=None)
for age_key in fpd.age_specific_channel_bins:
self.results[f"imr_{age_key}"].append(imr_results_dict[f"imr_{age_key}"])
self.results[f"mmr_{age_key}"].append(mmr_results_dict[f"mmr_{age_key}"])
self.results[f"stillbirths_{age_key}"].append(stillbirths_results_dict[f"stillbirths_{age_key}"])
if maternal_deaths_over_year == 0:
self.results['mmr'].append(0)
else:
maternal_mortality_ratio = maternal_deaths_over_year / live_births_over_year * 100000
self.results['mmr'].append(maternal_mortality_ratio)
if infant_deaths_over_year == 0:
self.results['imr'].append(infant_deaths_over_year)
else:
infant_mortality_rate = infant_deaths_over_year / live_births_over_year * 1000
self.results['imr'].append(infant_mortality_rate)
if secondary_births_over_year == 0:
self.results['proportion_short_interval_by_year'].append(secondary_births_over_year)
else:
short_interval_proportion = (short_intervals_over_year / secondary_births_over_year)
self.results['proportion_short_interval_by_year'].append(short_interval_proportion)
tfr = 0
for key in fpd.age_bin_map.keys():
age_bin_births_year = np.sum(self.results['total_births_'+key][start_index:stop_index])
age_bin_total_women_year = self.results['total_women_'+key][stop_index]
age_bin_births_per_woman = sc.safedivide(age_bin_births_year, age_bin_total_women_year)
self.results['asfr'][key].append(age_bin_births_per_woman*1000)
self.results[f'tfr_{key}'].append(age_bin_births_per_woman * 1000)
tfr += age_bin_births_per_woman # CK: TODO: check if this is right
self.results['tfr_rates'].append(tfr*5) # CK: TODO: why *5? # SB: I think this corresponds to size of age bins?
if self.test_mode:
self.log_daily_totals()
if self.test_mode:
self.save_daily_totals()
if not self.track_children:
delattr(self.people, "mothers")
# Convert all results to Numpy arrays
for key,arr in self.results.items():
if isinstance(arr, list):
self.results[key] = np.array(arr) # Convert any lists to arrays
# Calculate cumulative totals
self.results['cum_maternal_deaths_by_year'] = np.cumsum(self.results['maternal_deaths_over_year'])
self.results['cum_infant_deaths_by_year'] = np.cumsum(self.results['infant_deaths_over_year'])
self.results['cum_live_births_by_year'] = np.cumsum(self.results['live_births_over_year'])
self.results['cum_stillbirths_by_year'] = np.cumsum(self.results['stillbirths_over_year'])
self.results['cum_miscarriages_by_year'] = np.cumsum(self.results['miscarriages_over_year'])
self.results['cum_abortions_by_year'] = np.cumsum(self.results['abortions_over_year'])
self.results['cum_short_intervals_by_year'] = np.cumsum(self.results['short_intervals_over_year'])
self.results['cum_secondary_births_by_year'] = np.cumsum(self.results['secondary_births_over_year'])
self.results['cum_pregnancies_by_year'] = np.cumsum(self.results['pregnancies_over_year'])
# Convert to an objdict for easier access
self.results = sc.objdict(self.results)
# Finalize interventions and analyzers
self.finalize_interventions()
self.finalize_analyzers()
if verbose:
print(f'Final population size: {self.n}.')
elapsed = T.toc(output=True)
print(f'Run finished for "{self.label}" after {elapsed:0.1f} s')
self.summary = sc.objdict()
self.summary.births = np.sum(self.results['births'])
self.summary.deaths = np.sum(self.results['deaths'])
self.summary.final = self.results['pop_size'][-1]
self.already_run = True
return self
[docs]
def store_postpartum(self):
'''Stores snapshot of who is currently pregnant, their parity, and various
postpartum states in final step of model for use in calibration'''
min_age = 12.5
max_age = self['age_limit_fecundity']
ppl = self.people
rows = []
for i in range(len(ppl)):
if ppl.alive[i] and ppl.sex[i] == 0 and min_age <= ppl.age[i] < max_age:
row = {'Age': None, 'PP0to5': None, 'PP6to11': None, 'PP12to23': None, 'NonPP': None, 'Pregnant': None, 'Parity': None}
row['Age'] = int(round(ppl.age[i]))
row['NonPP'] = 1 if not ppl.postpartum[i] else 0
if ppl.postpartum[i]:
pp_dur = ppl.postpartum_dur[i]
row['PP0to5'] = 1 if 0 <= pp_dur < 6 else 0
row['PP6to11'] = 1 if 6 <= pp_dur < 12 else 0
row['PP12to23'] = 1 if 12 <= pp_dur <= 24 else 0
row['Pregnant'] = 1 if ppl.pregnant[i] else 0
row['Parity'] = ppl.parity[i]
rows.append(row)
pp = pd.DataFrame(rows, index = None, columns = ['Age', 'PP0to5', 'PP6to11', 'PP12to23', 'NonPP', 'Pregnant', 'Parity'])
pp.fillna(0, inplace=True)
return pp
[docs]
def to_df(self, include_range=False):
'''
Export all sim results to a dataframe
Args:
include_range (bool): if True, and if the sim results have best, high, and low, then export all of them; else just best
'''
raw_res = sc.odict(defaultdict=list)
for reskey in self.results.keys():
res = self.results[reskey]
if isinstance(res, dict):
for blh,blhres in res.items(): # Best, low, high
if len(blhres) == self.npts:
if not include_range and blh != 'best':
continue
if include_range:
blhkey = f'{reskey}_{blh}'
else:
blhkey = reskey
raw_res[blhkey] += blhres.tolist()
elif sc.isarray(res) and len(res) == self.npts:
raw_res[reskey] += res.tolist()
df = pd.DataFrame(raw_res)
self.df = df
return df
# Function to scale all y-axes in fig based on input channel
[docs]
def plot(self, to_plot=None, xlims=None, ylims=None, do_save=None, do_show=True, filename='fpsim.png', style=None, fig_args=None,
plot_args=None, axis_args=None, fill_args=None, label=None, new_fig=True, colors=None):
'''
Plot the results -- can supply arguments for both the figure and the plots.
Args:
to_plot (str/dict): What to plot (e.g. 'default' or 'cpr'), or a dictionary of result:label pairs
xlims (list/dict): passed to pl.xlim() (use ``[None, None]`` for default)
ylims (list/dict): passed to pl.ylim()
do_save (bool): Whether or not to save the figure. If a string, save to that filename.
do_show (bool): Whether to show the plots at the end
filename (str): If a figure is saved, use this filename
style (bool): Custom style arguments
fig_args (dict): Passed to pl.figure() (plus ``nrows`` and ``ncols`` for overriding defaults)
plot_args (dict): Passed to pl.plot()
axis_args (dict): Passed to pl.subplots_adjust()
fill_args (dict): Passed to pl.fill_between())
label (str): Label to override default
new_fig (bool): Whether to create a new figure (true unless part of a multisim)
colors (list/dict): Colors for plots with multiple lines
'''
if to_plot is None: to_plot = 'default'
fig_args = sc.mergedicts(dict(figsize=(16,10), nrows=None, ncols=None), fig_args)
plot_args = sc.mergedicts(dict(lw=2, alpha=0.7), plot_args)
axis_args = sc.mergedicts(dict(left=0.1, bottom=0.05, right=0.9, top=0.97, wspace=0.2, hspace=0.25), axis_args)
fill_args = sc.mergedicts(dict(alpha=0.2), fill_args)
with fpo.with_style(style):
nrows,ncols = fig_args.pop('nrows'), fig_args.pop('ncols')
fig = pl.figure(**fig_args) if new_fig else pl.gcf()
pl.subplots_adjust(**axis_args)
if to_plot is not None and 'as_' in to_plot:
nrows,ncols = 2, 3
res = self.results # Shorten since heavily used
method_age_groups = list(fpd.age_specific_channel_bins.keys())
if self.pars['track_as']:
no_plot_age = method_age_groups[-1]
method_age_groups.remove(no_plot_age)
delete_keys = [] # to avoid mutating dict during iteration
for key in res:
if no_plot_age in key:
delete_keys.append(key)
for bad_key in delete_keys:
res.remove(bad_key)
agelim = ('-'.join([str(self.pars['low_age_short_int']),str(self.pars['high_age_short_int'])])) ## age limit to be added to the title of short birth interval plot
# Plot everything
if ('as_' in to_plot and not self.pars['track_as']):
raise ValueError(f"Age specific plot selected but sim.pars['track_as'] is False")
if to_plot == 'default':
to_plot = {
'mcpr_by_year': 'Modern contraceptive prevalence rate (%)',
'cum_live_births_by_year': 'Live births',
'cum_stillbirths_by_year': 'Stillbirths',
'cum_maternal_deaths_by_year': 'Maternal deaths',
'cum_infant_deaths_by_year': 'Infant deaths',
'imr': 'Infant mortality rate',
}
elif to_plot == 'cpr':
to_plot = {
'mcpr': 'MCPR (modern contraceptive prevalence rate)',
'cpr': 'CPR (contraceptive prevalence rate)',
'acpr': 'ACPR (alternative contraceptive prevalence rate',
}
elif to_plot == 'mortality':
to_plot = {
'mmr': 'Maternal mortality ratio',
'cum_maternal_deaths_by_year': 'Maternal deaths',
'cum_infant_deaths_by_year': 'Infant deaths',
'imr': 'Infant mortality rate',
}
elif to_plot == 'apo': #adverse pregnancy outcomes
to_plot = {
'cum_pregnancies_by_year': 'Pregnancies',
'cum_stillbirths_by_year': 'Stillbirths',
'cum_miscarriages_by_year': 'Miscarriages',
'cum_abortions_by_year': 'Abortions',
}
elif to_plot == 'method':
to_plot = {
'method_usage': 'Method usage'
}
elif to_plot == 'short-interval':
to_plot = {
# 'proportion_short_interval_by_year': 'Proportion of short birth interval'
'proportion_short_interval_by_year': f"Proportion of short birth interval [{age_group})" for age_group in agelim.split()
}
elif to_plot == 'as_cpr':
to_plot = {f"cpr_{age_group}": f"Contraceptive Prevalence Rate ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_acpr':
to_plot = {f"acpr_{age_group}": f"Alternative Contraceptive Prevalence Rate ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_mcpr':
to_plot = {f"mcpr_{age_group}": f"Modern Contraceptive Prevalence Rate ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_pregnancies':
to_plot = {f"pregnancies_{age_group}": f"Number of Pregnancies for ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_tfr':
to_plot = {f"tfr_{age_group}": f"Fertility Rate for ({age_group})" for age_group in fpd.age_bin_map}
elif to_plot == 'as_imr':
to_plot = {f"imr_{age_group}": f"Infant Mortality Rate for ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_mmr':
to_plot = {f"mmr_{age_group}": f"Maternal Mortality Rate for ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_stillbirths':
to_plot = {f"stillbirths_{age_group}": f"Stillbirths for ({age_group})" for age_group in method_age_groups}
elif to_plot == 'as_births':
to_plot = {f"births_{age_group}": f"Live births for ({age_group})" for age_group in method_age_groups}
elif to_plot is not None:
errormsg = f"Your to_plot value: {to_plot} is not a valid option"
raise ValueError(errormsg)
rows,cols = sc.getrowscols(len(to_plot), nrows=nrows, ncols=ncols)
if to_plot == 'cpr':
rows,cols = 1,3
for p,key,reslabel in sc.odict(to_plot).enumitems():
ax = pl.subplot(rows, cols, p+1)
this_res = res[key]
is_dist = hasattr(this_res, 'best')
if is_dist:
y, low, high = this_res.best, this_res.low, this_res.high
else:
y, low, high = this_res, None, None
years = res['tfr_years']
# Figure out x axis
years = res['tfr_years']
timepoints = res['t'] # Likewise
x = None
for x_opt in [years, timepoints]:
if len(y) == len(x_opt):
x = x_opt
break
if x is None:
errormsg = f'Could not figure out how to plot {key}: result of length {len(y)} does not match a known x-axis'
raise RuntimeError(errormsg)
percent_keys = ['mcpr_by_year', 'mcpr', 'cpr', 'acpr', 'method_usage','proportion_short_interval_by_year']
if ('cpr_' in key or 'acpr_' in key or 'mcpr_' in key or 'proportion_short_interval_' in key) and 'by_year' not in key:
percent_keys = percent_keys + list(to_plot.keys())
if key in percent_keys and key != 'method_usage':
y *= 100
if is_dist:
low *= 100
high *= 100
# Handle label
if label is not None:
plotlabel = label
else:
if new_fig: # It's a new figure, use the result label
plotlabel = reslabel
else: # Replace with sim label to avoid duplicate labels
plotlabel = self.label
# Actually plot
if key == "method_usage":
data = self.format_method_df(timeseries=True)
method_names = data['Method'].unique()
flipped_data = {method: [percentage for percentage in data[data['Method'] == method]['Percentage']] for method in method_names}
colors = [colors[method] for method in method_names] if isinstance(colors, dict) else colors
ax.stackplot(data["Year"].unique(), list(flipped_data.values()), labels=method_names, colors=colors)
else:
ax.plot(x, y, label=plotlabel, **plot_args)
if is_dist:
if 'c' in plot_args:
fill_args['facecolor'] = plot_args['c']
ax.fill_between(x, low, high, **fill_args)
# Plot interventions, if present
# for intv in sc.tolist(self['interventions']):
# if hasattr(intv, 'plot_intervention'): # Don't plot e.g. functions
# intv.plot_intervention(self, ax)
# Handle annotations
as_plot = ('cpr_' in key or 'acpr_' in key or 'mcpr_' in key or 'pregnancies_' in key or 'stillbirths' in key or 'tfr_' in key or 'imr_' in key or 'mmr_' in key or 'births_' in key or 'proportion_short_interval_' in key) and 'by_year' not in key
fixaxis(useSI=fpd.useSI, set_lim=new_fig) # If it's not a new fig, don't set the lim
if key in percent_keys:
pl.ylabel('Percentage')
elif 'mmr' in key:
pl.ylabel('Deaths per 100,000 live births')
elif 'imr' in key:
pl.ylabel('Deaths per 1,000 live births')
elif 'tfr_' in key:
pl.ylabel('Fertility rate per 1,000 women')
elif 'mmr_' in key:
pl.ylabel('Maternal deaths per 10,000 births')
elif 'stillbirths_' in key:
pl.ylabel('Number of stillbirths')
else:
pl.ylabel('Count')
pl.xlabel('Year')
pl.title(reslabel, fontweight='bold')
if xlims is not None:
pl.xlim(xlims)
if ylims is not None:
pl.ylim(ylims)
if (key == "method_usage") or as_plot: # need to overwrite legend for some plots
ax.legend(loc='upper left', frameon=True)
if 'cpr' in to_plot and '_' not in to_plot:
if is_dist:
top = int(np.ceil(max(self.results['acpr'].high) / 10.0)) * 10 # rounding up to nearest 10
else:
top = int(np.ceil(max(self.results['acpr']) / 10.0)) * 10
self.conform_y_axes(figure=fig, top=top)
if as_plot: # this condition is impossible if self.pars['track_as']
channel_type = key.split("_")[0]
tfr_scaling = 'tfr_' in key
age_bins = fpd.age_bin_map if tfr_scaling else fpd.age_specific_channel_bins
age_bins = {bin: interval for bin, interval in age_bins.items() if no_plot_age not in bin}
if is_dist:
top = max([max(group_result) for group_result in [res[f'{channel_type}_{age_group}'].high for age_group in age_bins]])
else:
top = max([max(group_result) for group_result in [res[f'{channel_type}_{age_group}'] for age_group in age_bins]])
tidy_top = int(np.ceil(top / 10.0)) * 10
tidy_top = tidy_top + 20 if tfr_scaling or 'imr_' in key else tidy_top
tidy_top = tidy_top + 50 if 'mmr_' in key else tidy_top
self.conform_y_axes(figure=fig, top=tidy_top)
return tidy_up(fig=fig, do_show=do_show, do_save=do_save, filename=filename)
[docs]
def plot_age_first_birth(self, do_show=None, do_save=None, fig_args=None, filename="first_birth_age.png"):
'''
Plot age at first birth
Args:
fig_args (dict): arguments to pass to ``pl.figure()``
do_show (bool): whether or not the user wants to show the output plot (default: true)
do_save (bool): whether or not the user wants to save the plot to filepath (default: false)
filename (str): the name of the path to output the plot
'''
birth_age = self.people.first_birth_age
data = birth_age[birth_age>0]
fig = pl.figure(**sc.mergedicts(dict(figsize=(7,5)), fig_args))
pl.title("Age at first birth")
sns.boxplot(x=data, orient='v', notch=True)
pl.xlabel('Age (years')
return tidy_up(fig=fig, do_show=do_show, do_save=do_save, filename=filename)
[docs]
def compute_method_usage(self):
'''
Computes method mix proportions from a sim object
Returns:
list of lists where list[years_after_start][method_index] == proportion of
fecundity aged women using that method on that year
'''
ppl = self.people
min_age = 15
max_age = self['age_limit_fecundity']
# filtering for women with appropriate characteristics
bool_list = ppl.alive * [sex == 0 for sex in ppl.sex] * [min_age <= age for age in ppl.age] * [age < max_age for age in ppl.age]
filtered_methods = [method for index, method in enumerate(ppl.method) if bool_list[index]]
unique, counts = np.unique(filtered_methods, return_counts=True)
count_dict = dict(zip(unique, counts))
result = [0] * (len(self.pars['methods']['eff']))
for method in count_dict:
result[method] = count_dict[method] / len(filtered_methods)
return result
#%% Multisim and running
[docs]
class MultiSim(sc.prettyobj):
'''
The MultiSim class handles the running of multiple simulations
'''
def __init__(self, sims=None, base_sim=None, label=None, n=None, **kwargs):
# Handle inputs
if base_sim is None:
if isinstance(sims, Sim):
base_sim = sims
sims = None
elif isinstance(sims, list):
base_sim = sims[0]
else:
errormsg = f'If base_sim is not supplied, sims must be either a single sim (treated as base_sim) or a list of sims, not {type(sims)}'
raise TypeError(errormsg)
# Set properties
self.sims = sims
self.base_sim = base_sim
self.label = base_sim.label if (label is None and base_sim is not None) else label
self.run_args = sc.mergedicts(kwargs)
self.results = None
self.which = None # Whether the multisim is to be reduced, combined, etc.
self.already_run = False
fpu.set_metadata(self) # Set version, date, and git info
return
def __len__(self):
if isinstance(self.sims, list):
return len(self.sims)
elif isinstance(self.sims, Sim):
return 1
else:
return 0
[docs]
def run(self, compute_stats=True, **kwargs):
''' Run all simulations in the MultiSim '''
# Handle missing labels
for s,sim in enumerate(sc.tolist(self.sims)):
if sim.label is None:
sim.label = f'Sim {s}'
# Run
if self.already_run:
errormsg = 'Cannot re-run an already run MultiSim'
raise RuntimeError(errormsg)
self.sims = multi_run(self.sims, **kwargs)
# Recompute stats
if compute_stats:
self.compute_stats()
self.already_run = True
return self
[docs]
def compute_stats(self, return_raw=False, quantiles=None, use_mean=False, bounds=None):
''' Compute statistics across multiple sims '''
if use_mean:
if bounds is None:
bounds = 1
else:
if quantiles is None:
quantiles = {'low':0.1, 'high':0.9}
if not isinstance(quantiles, dict):
try:
quantiles = {'low':float(quantiles[0]), 'high':float(quantiles[1])}
except Exception as E:
errormsg = f'Could not figure out how to convert {quantiles} into a quantiles object: must be a dict with keys low, high or a 2-element array ({str(E)})'
raise ValueError(errormsg)
base_sim = sc.dcp(self.sims[0])
raw = sc.objdict()
results = sc.objdict()
axis = 1
start_end = np.array([sim.tvec[[0, -1]] for sim in self.sims])
if len(np.unique(start_end)) != 2:
errormsg = f'Cannot compute stats for sims: start and end values do not match:\n{start_end}'
raise ValueError(errormsg)
reskeys = list(base_sim.results.keys())
if self.sims[0].pars['track_as']:
for bad_key in ['imr_numerator', 'imr_denominator', 'mmr_numerator', 'mmr_denominator', 'imr_age_by_group', 'mmr_age_by_group', 'as_stillbirths', 'stillbirth_ages']:
reskeys.remove(bad_key) # these keys are intermediate results so we don't really want to save them
bad_keys = ['t', 'tfr_years', 'method_usage']
for key in bad_keys: # Don't compute high/low for these
results[key] = base_sim.results[key]
reskeys.remove(key)
for reskey in reskeys:
if isinstance(base_sim.results[reskey], dict):
if return_raw:
for s, sim in enumerate(self.sims):
raw[reskey][s] = base_sim.results[reskey]
else:
results[reskey] = sc.objdict()
npts = len(base_sim.results[reskey])
raw[reskey] = np.zeros((npts, len(self.sims)))
for s,sim in enumerate(self.sims):
raw[reskey][:, s] = sim.results[reskey] # Stack into an array for processing
if use_mean:
r_mean = np.mean(raw[reskey], axis=axis)
r_std = np.std(raw[reskey], axis=axis)
results[reskey].best = r_mean
results[reskey].low = r_mean - bounds * r_std
results[reskey].high = r_mean + bounds * r_std
else:
results[reskey].best = np.quantile(raw[reskey], q=0.5, axis=axis)
results[reskey].low = np.quantile(raw[reskey], q=quantiles['low'], axis=axis)
results[reskey].high = np.quantile(raw[reskey], q=quantiles['high'], axis=axis)
self.results = results
self.base_sim.results = results # Store here too, to enable plotting
if return_raw:
return raw
else:
return
[docs]
@staticmethod
def merge(*args, base=False):
'''
Convenience method for merging two MultiSim objects.
Args:
args (MultiSim): the MultiSims to merge (either a list, or separate)
base (bool): if True, make a new list of sims from the multisim's two base sims; otherwise, merge the multisim's lists of sims
Returns:
msim (MultiSim): a new MultiSim object
**Examples**:
mm1 = fp.MultiSim.merge(msim1, msim2, base=True)
mm2 = fp.MultiSim.merge([m1, m2, m3, m4], base=False)
'''
# Handle arguments
if len(args) == 1 and isinstance(args[0], list):
args = args[0] # A single list of MultiSims has been provided
# Create the multisim from the base sim of the first argument
msim = MultiSim(base_sim=sc.dcp(args[0].base_sim), sims=[], label=args[0].label)
msim.sims = []
msim.chunks = [] # This is used to enable automatic splitting later
# Handle different options for combining
if base: # Only keep the base sims
for i,ms in enumerate(args):
sim = sc.dcp(ms.base_sim)
sim.label = ms.label
msim.sims.append(sim)
msim.chunks.append([[i]])
else: # Keep all the sims
for ms in args:
len_before = len(msim.sims)
msim.sims += list(sc.dcp(ms.sims))
len_after= len(msim.sims)
msim.chunks.append(list(range(len_before, len_after)))
return msim
[docs]
def split(self, inds=None, chunks=None):
'''
Convenience method for splitting one MultiSim into several. You can specify
either individual indices of simulations to extract, via inds, or consecutive
chunks of indices, via chunks. If this function is called on a merged MultiSim,
the chunks can be retrieved automatically and no arguments are necessary.
Args:
inds (list): a list of lists of indices, with each list turned into a MultiSim
chunks (int or list): if an int, split the MultiSim into that many chunks; if a list return chunks of that many sims
Returns:
A list of MultiSim objects
**Examples**::
m1 = fp.MultiSim(fp.Sim(label='sim1'))
m2 = fp.MultiSim(fp.Sim(label='sim2'))
m3 = fp.MultiSim.merge(m1, m2)
m3.run()
m1b, m2b = m3.split()
msim = fp.MultiSim(fp.Sim(), n_runs=6)
msim.run()
m1, m2 = msim.split(inds=[[0,2,4], [1,3,5]])
mlist1 = msim.split(chunks=[2,4]) # Equivalent to inds=[[0,1], [2,3,4,5]]
mlist2 = msim.split(chunks=2) # Equivalent to inds=[[0,1,2], [3,4,5]]
'''
# Process indices and chunks
if inds is None: # Indices not supplied
if chunks is None: # Chunks not supplied
if hasattr(self, 'chunks'): # Created from a merged MultiSim
inds = self.chunks
else: # No indices or chunks and not created from a merge
errormsg = 'If a MultiSim has not been created via merge(), you must supply either inds or chunks to split it'
raise ValueError(errormsg)
else: # Chunks supplied, but not inds
inds = [] # Initialize
sim_inds = np.arange(len(self)) # Indices for the simulations
if sc.isiterable(chunks): # e.g. chunks = [2,4]
chunk_inds = np.cumsum(chunks)[:-1]
inds = np.split(sim_inds, chunk_inds)
else: # e.g. chunks = 3
inds = np.split(sim_inds, chunks) # This will fail if the length is wrong
# Do the conversion
mlist = []
for indlist in inds:
sims = sc.dcp([self.sims[i] for i in indlist])
msim = MultiSim(sims=sims)
mlist.append(msim)
return mlist
[docs]
def remerge(self, base=True, recompute=True, **kwargs):
'''
Split a sim, compute stats, and re-merge.
Args:
base (bool): whether to use the base sim (otherwise, has no effect)
kwargs (dict): passed to msim.split()
Note: returns a new MultiSim object (if that concerns you).
'''
ms = self.split(**kwargs)
if recompute:
for m in ms:
m.compute_stats() # Recompute the statistics on each separate MultiSim
out = MultiSim.merge(*ms, base=base) # Now re-merge, this time using the base_sim
return out
[docs]
def to_df(self, yearly=False, mean=False):
'''
Export all individual sim results to a dataframe
'''
if mean:
df = self.base_sim.to_df()
else:
raw_res = sc.odict(defaultdict=list)
for s,sim in enumerate(self.sims):
for reskey in sim.results.keys():
res = sim.results[reskey]
if sc.isarray(res):
if len(res) == sim.npts and not yearly:
raw_res[reskey] += res.tolist()
elif len(res) == len(sim.results['tfr_years']) and yearly:
raw_res[reskey] += res.tolist()
scale = len(sim.results['tfr_years']) if yearly else sim.npts
raw_res['sim'] += [s]*scale
raw_res['sim_label'] += [sim.label]*scale
df = pd.DataFrame(raw_res)
self.df = df
return df
[docs]
def plot(self, to_plot=None, plot_sims=True, do_show=None, do_save=None, filename='fp_multisim.png',
fig_args=None, axis_args=None, plot_args=None, style=None, colors=None, **kwargs):
'''
Plot the MultiSim
Args:
plot_sims (bool): whether to plot individual sims (else, plot with uncertainty bands)
See ``sim.plot()`` for additional args.
'''
fig_args = sc.mergedicts(dict(figsize=(16,10)), fig_args)
no_plot_age = list(fpd.age_specific_channel_bins.keys())[-1]
fig = pl.figure(**fig_args)
do_show = kwargs.pop('do_show', True)
labels = sc.autolist()
labellist = sc.autolist() # TODO: shouldn't need this
for sim in self.sims: # Loop over and find unique labels
if sim.label not in labels:
labels += sim.label
labellist += sim.label
label = sim.label
else:
labellist += ''
n_unique = len(np.unique(labels)) # How many unique sims there are
def get_scale_ceil(channel):
is_dist = hasattr(self.sims[0].results['acpr'], 'best') # picking a random channel
if is_dist:
maximum_value = max([max(sim.results[channel].high) for sim in self.sims if no_plot_age not in channel])
else:
maximum_value = max([max(sim.results[channel]) for sim in self.sims if no_plot_age not in channel])
top = int(np.ceil(maximum_value / 10.0)) * 10 # rounding up to nearest 10
return top
if to_plot == 'method':
axis_args_method = sc.mergedicts(dict(left=0.1, bottom=0.05, right=0.9, top=0.97, wspace=0.2, hspace=0.30), axis_args)
with fpo.with_style(style):
pl.subplots_adjust(**axis_args_method)
for axis_index, label in enumerate(np.unique(labels)):
total_df = pd.DataFrame()
return_default = lambda name: fig_args[name] if name in fig_args else None
rows,cols = sc.getrowscols(n_unique, nrows=return_default('nrows'), ncols=return_default('ncols'))
ax = pl.subplot(rows, cols, axis_index+1)
for sim in self.sims:
if sim.label == label:
total_df = pd.concat([total_df, sim.format_method_df(timeseries=True)], ignore_index=True)
method_names = total_df['Method'].unique()
# Getting the mean of each seed as a list of lists, could add conditional here if different method plots are added
percentage_by_method=[]
for method in method_names:
method_df = total_df[(total_df['Method'] == method) & (total_df['Sim'] == label)]
seed_split = [method_df[method_df['Seed'] == seed]['Percentage'].values for seed in method_df['Seed'].unique()]
percentage_by_method.append([np.mean([seed[i] for seed in seed_split]) for i in range(len(seed_split[0]))])
legend = axis_index + 1 == cols # True for last plot in first row
colors = [colors[method] for method in method_names] if isinstance(colors, dict) else colors
ax.stackplot(total_df["Year"].unique(), percentage_by_method, labels=method_names, colors=colors)
ax.set_title(label.capitalize())
ax.legend().set_visible(legend)
ax.set_xlabel('Year')
ax.set_ylabel('Percentage')
if legend:
ax.legend(loc='lower left', bbox_to_anchor=(1, -0.05), frameon=True) if len(labels) > 1 else ax.legend(loc='upper left', frameon=True)
pl.ylim(0, max(max([sum(proportion[1:]*100) for proportion in results['method_usage']]) for results in [sim.results for sim in self.sims]) + 1)
return tidy_up(fig=fig, do_show=do_show, do_save=do_save, filename=filename)
elif plot_sims:
colors = sc.gridcolors(n_unique)
colors = {k:c for k,c in zip(labels, colors)}
for s,sim in enumerate(self.sims): # Note: produces duplicate legend entries
label = labellist[s]
color = colors[sim.label]
alpha = max(0.2, 1/np.sqrt(n_unique))
sim_plot_args = sc.mergedicts(dict(alpha=alpha, c=color), plot_args)
kw = dict(new_fig=False, do_show=False, label=label, plot_args=sim_plot_args)
sim.plot(to_plot=to_plot, **kw, **kwargs)
if to_plot is not None:
# Scale axes
if to_plot == 'cpr':
fig = self.base_sim.conform_y_axes(figure=fig, top=get_scale_ceil('acpr'))
if 'as_' in to_plot:
channel_type = to_plot.split("_")[1]
is_tfr = "tfr" in to_plot
age_bins = list(fpd.age_specific_channel_bins)[:-1]
if is_tfr:
age_bins = fpd.age_bin_map
if hasattr(sim.results[f'cpr_{list(fpd.age_specific_channel_bins.keys())[0]}'], 'best'): # if compute_stats has been applied
top = max([max([max(group_result) for group_result in [sim.results[f'{channel_type}_{age_group}'].high for age_group in age_bins]]) for sim in self.sims])
else:
top = max([max([max(group_result) for group_result in [sim.results[f'{channel_type}_{age_group}'] for age_group in age_bins]]) for sim in self.sims])
tidy_top = int(np.ceil(top / 10.0)) * 10 # rounds top of y axis up to the nearest ten
tidy_top = tidy_top + 20 if is_tfr or 'imr' in to_plot else tidy_top # some custom axis adjustments for neatness
tidy_top = tidy_top + 50 if 'mmr' in to_plot else tidy_top
self.base_sim.conform_y_axes(figure=fig, top=tidy_top)
return tidy_up(fig=fig, do_show=do_show, do_save=do_save, filename=filename)
else:
return self.base_sim.plot(to_plot=to_plot, do_show=do_show, fig_args=fig_args, plot_args=plot_args, **kwargs)
[docs]
def plot_age_first_birth(self, do_show=False, do_save=True, output_file='age_first_birth_multi.png'):
length = sum([len([num for num in sim.people.first_birth_age if num is not None]) for sim in self.sims])
data_dict = {"age": [0] * length, "sim": [0] * length}
i = 0
for sim in self.sims:
for value in [num for num in sim.people.first_birth_age if num is not None]:
data_dict['age'][i] = value
data_dict['sim'][i] = sim.label
i = i + 1
data = pd.DataFrame(data_dict)
pl.title("Age at first birth")
sns.boxplot(data=data, y='age', x='sim', orient='v', notch=True)
if do_show:
pl.show()
if do_save:
print(f"Saved age at first birth plot at {output_file}")
pl.savefig(output_file)
def single_run(sim):
''' Helper function for multi_run(); rarely used on its own '''
sim.run()
return sim
def multi_run(sims, **kwargs):
''' Run multiple sims in parallel; usually used via the MultiSim class, not directly '''
sims = sc.parallelize(single_run, iterarg=sims, **kwargs)
return sims
[docs]
def parallel(*args, **kwargs):
'''
A shortcut to ``fp.MultiSim()``, allowing the quick running of multiple simulations
at once.
Args:
args (list): The simulations to run
kwargs (dict): passed to multi_run()
Returns:
A run MultiSim object.
**Examples**::
s1 = fp.Sim(exposure_factor=0.5, label='Low')
s2 = fp.Sim(exposure_factor=2.0, label='High')
fp.parallel(s1, s2).plot()
msim = fp.parallel(s1, s2)
'''
sims = sc.mergelists(*args)
return MultiSim(sims=sims).run(**kwargs)