'''
Specify the core analyzers available in FPsim. Other analyzers can be
defined by the user by inheriting from these classes.
'''
import numpy as np
import sciris as sc
import matplotlib.pyplot as pl
from . import defaults as fpd
from . import utils as fpu
import fpsim as fp
import fpsim.arrays as fpa
from .settings import options as fpo
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import starsim as ss
#%% Generic analyzer classes
__all__ = ['snapshot', 'cpr_by_age', 'method_mix_by_age', 'age_pyramids', 'lifeof_recorder', 'track_as', 'longitudinal_history']
# Specific analyzers
__all__ += ['education_recorder']
# Analyzers for debugging
__all__ += ['state_tracker', 'method_mix_over_time']
[docs]
class snapshot(ss.Analyzer):
'''
Analyzer that takes a "snapshot" of the sim.people array at specified points
in time, and saves them to itself.
Args:
timesteps (list): list of timesteps on which to take the snapshot
args (list): additional timestep(s)
die (bool): whether or not to raise an exception if a date is not found (default true)
kwargs (dict): passed to Analyzer()
**Example**::
sim = fp.Sim(analyzers=fps.snapshot('2020-04-04', '2020-04-14'))
sim.run()
snapshot = sim.pars['analyzers'][0]
people = snapshot.snapshots[0]
'''
def __init__(self, timesteps, *args, die=True, **kwargs):
super().__init__(**kwargs) # Initialize the Analyzer object
timesteps = sc.promotetolist(timesteps) # Combine multiple days
timesteps.extend(args) # Include additional arguments, if present
self.die = die # Whether or not to raise an exception
self.timesteps = timesteps # String representations
self.snapshots = sc.odict() # Store the actual snapshots
return
[docs]
def step(self):
"""
Apply snapshot at each timestep listed in timesteps and
save result at snapshot[str(timestep)]
"""
sim = self.sim
for t in self.timesteps:
if np.isclose(sim.ti, t):
self.snapshots[str(sim.ti)] = sc.dcp(sim.people) # Take snapshot!
return
[docs]
class cpr_by_age(ss.Analyzer):
'''
Analyzer that records the contraceptive prevalence rate (CPR) by age at each timestep.
Args:
kwargs (dict): passed to Analyzer()
**Example**::
sim = fp.Sim(analyzers=fps.cpr_by_age())
sim.run()
final_cpr = sim.analyzers.cpr_by_age.results['total'][-1]
'''
def __init__(self, **kwargs):
super().__init__(**kwargs) # Initialize the Analyzer object
self.age_bins = [v[1] for v in fpd.method_age_map.values()]
return
def init_results(self):
super().init_results()
# Define results for each age group based on the method_age_map
for k in fpd.method_age_map.keys():
self.define_results(ss.Result(name=k,dtype=float, scale=False))
self.define_results(ss.Result(name='total',dtype=float, scale=False))
return
def step(self):
sim = self.sim
ppl = sim.people
for key, (age_low, age_high) in fpd.method_age_map.items():
match_low_high = (ppl.age >= age_low) & (ppl.age < age_high)
denom_conds = match_low_high * (ppl.female) * ppl.alive
num_conds = denom_conds * (ppl.method != 0)
self.results[key][sim.ti] = sc.safedivide(np.count_nonzero(num_conds), np.count_nonzero(denom_conds))
total_denom_conds = (ppl.female) * ppl.alive
total_num_conds = total_denom_conds * (ppl.method != 0)
self.results['total'][sim.ti] = sc.safedivide(np.count_nonzero(total_num_conds), np.count_nonzero(total_denom_conds))
return
[docs]
class method_mix_by_age(ss.Analyzer):
'''
Analyzer that records the method mix by age at the end of the simulation.
'''
def __init__(self, **kwargs):
super().__init__(**kwargs) # Initialize the Analyzer object
self.age_bins = [v[1] for v in fpd.method_age_map.values()]
self.mmba_results = None
self.n_methods = None
return
def step(self):
pass
def finalize(self):
sim = self.sim
ppl = sim.people
n_methods = len(sim.people.contraception_module.methods)
self.mmba_results = {k: np.zeros(n_methods) for k in fpd.method_age_map.keys()}
for key, (age_low, age_high) in fpd.method_age_map.items():
match_low_high = (ppl.age >= age_low) & (ppl.age < age_high)
denom_conds = match_low_high * (ppl.female == True) * ppl.alive
for mn in range(n_methods):
num_conds = denom_conds * (ppl.method == mn)
self.mmba_results[key][mn] = sc.safedivide(np.count_nonzero(num_conds), np.count_nonzero(denom_conds))
return
[docs]
class education_recorder(ss.Analyzer):
'''
Analyzer records all education attributes of females + pregnancy + living status
for all timesteps. Made for debugging purposes.
Args:
args (list): additional timestep(s)
kwargs (dict): passed to Analyzer()
'''
def __init__(self, **kwargs):
super().__init__(**kwargs) # Initialize the Analyzer object
self.snapshots = sc.odict() # Store the actual snapshots
self.keys = ['edu_objective', 'edu_attainment', 'edu_completed',
'edu_dropout', 'edu_interrupted',
'pregnant', 'alive', 'age']
self.max_agents = 0 # maximum number of agents this analyzer tracks
self.time = []
self.trajectories = {} # Store education trajectories
return
[docs]
def step(self):
"""
Apply snapshot at each timestep listed in timesteps and
save result at snapshot[str(timestep)]
"""
sim = self.sim
females = sim.people.female.uids
self.snapshots[str(sim.ti)] = {}
for key in self.keys:
self.snapshots[str(sim.ti)][key] = sc.dcp(sim.people[key][females]) # Take snapshot!
self.max_agents = max(self.max_agents, len(females))
return
[docs]
def finalize(self, sim=None):
"""
Process data in snapshots so we can plot it easily
"""
if self.finalized:
raise RuntimeError('Analyzer already finalized')
self.finalized = True
# Process data so we can plot it easily
self.time = np.array([key for key in self.snapshots.keys()], dtype=int)
for state in self.keys:
self.trajectories[state] = np.full((len(self.time), self.max_agents), np.nan)
for ti, t in enumerate(self.time):
stop_idx = len(self.snapshots[t][state])
self.trajectories[state][ti, 0:stop_idx] = self.snapshots[t][state]
return
[docs]
def plot(self, index=0, fig_args=None, pl_args=None):
"""
Plots time series of each state as a line graph
Args:
index: index of the female individual, must be less the analyzer's max_pop_size
"""
fig_args = sc.mergedicts(fig_args, {'figsize': (5, 7)})
pl_args = sc.mergedicts(pl_args)
rows, cols = sc.get_rows_cols(2)
fig = pl.figure(**fig_args)
keys2 = ['edu_completed', 'edu_interrupted', 'edu_dropout']
keys3 = ['pregnant', 'alive']
k = 0
pl.subplot(rows, cols, k + 1)
age_data = self.trajectories["age"]
state = "edu_attainment"
data = self.trajectories[state]
pl.step(self.time, data[:, index], color="black", label=f"{state}", where='mid', **pl_args)
state = "edu_objective"
data = self.trajectories[state]
pl.step(self.time, data[:, index], color="red", ls="--", label=f"{state}", where='mid', **pl_args)
pl.ylim([0, 24])
pl.title('Education')
pl.ylabel('Education (years)')
pl.xlabel('Timesteps')
pl.legend()
k += 1
for state in sc.mergelists(keys2, keys3):
pl.subplot(rows, cols, k + 1)
data = self.trajectories[state]
if state in keys2:
if state == 'edu_interrupted':
pl.step(self.time, 3*data[:, index], color=[0.7, 0.7, 0.7], label=f"{state}", ls=":", where='mid', **pl_args)
elif state == "edu_dropout":
pl.step(self.time, 3*data[:, index], color="black", label=f"{state}", ls=":", where='mid', **pl_args)
else:
pl.step(self.time, 3*data[:, index], color="#2ca25f", label=f"{state}", where='mid', **pl_args)
elif state == 'pregnant':
pl.step(self.time, data[:, index], color="#dd1c77", label=f"{state}", where='mid', **pl_args)
elif state == 'alive':
plt.step(self.time, 4*data[:, index], color="black", ls="--", label=f"{state}", where='mid', **pl_args)
pl.title(f"Education trajectories - Start age: {int(age_data[0, index])}; final age {int(age_data[-1, index])}.")
pl.ylabel('State')
pl.xlabel('Timesteps')
pl.legend()
return fig
[docs]
def plot_waterfall(self, max_timepoints=30, min_age=18, max_age=40, fig_args=None, pl_args=None):
"""
Plot a waterfall plot showing the evolution of education objective and attainment over time
for a specified age group.
Args:
max_timepoints (int, optional): The maximum number of timepoints to plot, defaults to 30.
min_age (int, optional): The minimum age for the age group, defaults to 18.
max_age (int, optional): The maximum age for the age group, defaults to 40.
Returns:
figure handle
The function generates uses kernel density estimation to visualize the data. If there's not data for the
min max age specified, for a specific time step (ie, there are no agents in that age group), it adds a
textbox. This is an edge case that can happen for a simulation with very few agents, and a very narrow
age group.
"""
from scipy.stats import gaussian_kde
data_att = self.trajectories["edu_attainment"]
data_obj = self.trajectories["edu_objective"]
data_age = self.trajectories["age"]
mask = (data_age < min_age) | (data_age > max_age) | np.isnan(data_age)
data_att = np.ma.array(data_att, mask=mask)
data_obj = np.ma.array(data_obj, mask=mask)
n_tpts = data_att.shape[0]
if n_tpts <= max_timepoints:
tpts_to_plot = np.arange(n_tpts)
else:
tpts_to_plot = np.linspace(0, n_tpts - 1, max_timepoints, dtype=int)
fig_args = sc.mergedicts(fig_args, {'figsize': (3, 10)})
pl_args = sc.mergedicts(pl_args, {'y_scaling': 0.9})
fig = plt.figure(**fig_args)
ax = fig.add_subplot(111)
edu_min, edu_max = 0, 25
edu_mid = (edu_max-edu_min)/2 + edu_min
edu_years = np.linspace(edu_min, edu_max, 50)
y_scaling = pl_args['y_scaling']
# Set the y-axis (time) labels
ax.set_yticks(y_scaling*np.arange(len(tpts_to_plot)))
ax.set_yticklabels(tpts_to_plot)
# Initialize legend labels
edu_att_label = None
edu_obj_label = None
# Loop through the selected time points and create kernel density estimates
for idx, ti in enumerate(tpts_to_plot):
data_att_ti = np.sort(data_att[ti, :][~data_att[ti, :].mask].data)
data_obj_ti = np.sort(data_obj[ti, :][~data_obj[ti, :].mask].data)
try:
kde_att = gaussian_kde(data_att_ti)
kde_obj = gaussian_kde(data_obj_ti)
y_att = kde_att(edu_years)
y_obj = kde_obj(edu_years)
if idx == len(tpts_to_plot) - 1:
edu_obj_label = 'Distribution of education objectives'
edu_att_label = 'Current distribution of education attainment'
ax.fill_between(edu_years, y_scaling*idx, y_obj / y_obj.max() + y_scaling*idx,
color='#2f72de', alpha=0.3, label=edu_obj_label)
ax.plot(edu_years, y_att / y_att.max() + y_scaling*idx,
color='black', alpha=0.7, label=edu_att_label)
except:
# No data available for this age group or age range,
ax.plot(edu_years, (y_scaling * idx) * np.ones_like(edu_years),
color='black', alpha=0.2, label=edu_att_label)
ax.annotate('No data available ', xy=(edu_mid, y_scaling*idx), xycoords='data', fontsize=8,
ha='center', va='center', bbox=dict(boxstyle='round,pad=0.4', fc='none', ec="none"))
# Labels and annotations
ax.set_xlim([edu_min, edu_max])
ax.set_xlabel('Education years')
ax.set_ylabel('Timesteps')
ax.legend()
ax.set_title(f"Evolution of education \n objective and attainment for age group:\n{min_age}-{max_age}.")
[docs]
class lifeof_recorder(ss.Analyzer):
'''
Analyzer records sexual and reproductive history, and contraceptions
females, plus age and living status for all timesteps.
Made for debugging purposes.
Args:
args (list): additional timestep(s)
kwargs (dict): passed to Analyzer()
'''
def __init__(self, **kwargs):
super().__init__(**kwargs) # Initialize the Analyzer object
self.snapshots = sc.odict() # Store the actual snapshots
self.keys = ['method', 'pregnant', 'lam', 'postpartum', 'sexually_active',
'abortion', 'stillbirth', 'parity', 'method',
'miscarriage', 'age', 'on_contra', 'alive']
self.max_agents = 0 # maximum number of agents this analyzer tracks
self.time = []
self.trajectories = {} # Store education trajectories
Methods = fp.make_methods().Methods
self.method_map = {idx: method.label for idx, method in enumerate(Methods.values())}
self.m2y = 1.0/fpd.mpy # Transform timesteps in months to years
return
[docs]
def step(self):
"""
Apply snapshot at each timestep listed in timesteps and
save result at snapshot[str(timestep)]
"""
sim = self.sim
females = sim.people.female.uids
self.snapshots[str(sim.ti)] = {}
for key in self.keys:
self.snapshots[str(sim.ti)][key] = sc.dcp(
sim.people[key][females]) # Take snapshot!
self.max_agents = max(self.max_agents, len(females))
return
[docs]
def finalize(self, sim=None):
"""
Process data in snapshots so we can plot it easily
"""
if self.finalized:
raise RuntimeError('Analyzer already finalized')
self.finalized = True
# Process data so we can plot it easily
self.time = np.array([key for key in self.snapshots.keys()],
dtype=int)
for state in self.keys:
self.trajectories[state] = np.full(
(len(self.time), self.max_agents), np.nan)
for ti, t in enumerate(self.time):
stop_idx = len(self.snapshots[t][state])
self.trajectories[state][ti, 0:stop_idx] = \
self.snapshots[t][state]
return
[docs]
def plot(self, index=0, fig_args=None, pl_args=None):
"""
Plots time series of each state as a line graph
Args:
index: index of the female individual, must be less the analyzer's max_pop_size
"""
fig_args = sc.mergedicts(fig_args, {'figsize': (15, 6)})
pl_args = sc.mergedicts(pl_args, {'markersize': 12,
'markeredgecolor': 'black',
'markeredgewidth': 1.5,
'ls': 'none'})
ymin, ymax = 0, 4
rows, cols = sc.get_rows_cols(1)
fig = pl.figure(**fig_args)
pl.subplot(rows, cols,1)
preg_outcome_state = ["parity", "stillbirth", "miscarriage", "abortion"]
preg_outcome_symbl = {"stillbirth": u"\u29BB",
"parity": u"\u29C2", # use to determine live births
"miscarriage" : u"\u29B0",
"abortion": u"\u2A09"}
preg_outcome_lbl = {"stillbirth": "Stillbirth",
"parity": "Live birth", # use to determine live births
"miscarriage": "Miscarriage",
"abortion": "Abortion"}
# Alive
state = "alive"
temp = self._state_intervals(state, index)
temp_age = self._transform_to_age(temp, index)
al_bar = pl.broken_barh(temp_age,
(3.5, 0.25),
facecolors="lemonchiffon", label=f"{state}",
hatch="..")
# Add vertical lines indicating age at start of simulation
yoffset = 0.25
yp = [ymin-yoffset, ymin+yoffset, (ymax-ymin)/2, ymax-yoffset, ymax+yoffset],
xp = temp_age[0, 0]*np.ones(len(yp))
pl.plot(xp, yp, color='k', ls=':', marker=">")
# Mark the age at the end of the simulation (if they died before the yellowish "alive" bar will stop befor this line)
sim_len = len(self.time) * self.m2y
xp = (temp_age[0, 0] + sim_len) *len(yp)
pl.plot(xp, yp, color='k', ls=':', marker="<")
# Sexually active
state = "sexually_active"
temp = self._state_intervals(state, index)
sa_bar = pl.broken_barh(self._transform_to_age(temp, index),
(1, 1), facecolors="#9bf1fe", label=f"{state}")
# Is she on contraception?
with plt.rc_context({"hatch.linewidth": 3}):
# NOTE: the context manager does not work (known matplotlib bug)
# but if we set the a thicker hatch linewidth as common value for all bars
# then some hatch patters look ugly ¯\_(ツ)_/¯ ...
state = "on_contra"
temp = self._state_intervals(state, index)
temp_age = self._transform_to_age(temp, index)
on_contra_yo, on_contra_height = 1, 1
oc_bar = plt.broken_barh(self._transform_to_age(temp, index),
(on_contra_yo, on_contra_height),
facecolors="none", label=f"{state}", hatch="//")
# What contraceptive method is she on?
state = "method"
contramethod = self.trajectories[state][:, index]
bbox = dict(boxstyle="round", fc="w", ec="none", alpha=0.8)
for start, age in zip(temp[:, 0], temp_age[:, 0]):
pl.annotate(
f"{self.method_map[contramethod[start]]}",
(age, on_contra_yo + on_contra_height/2), xycoords='data',
xytext=(age, on_contra_yo + on_contra_height/2), textcoords='data',
size=10, va="center", ha="left",
bbox=bbox)
# Pregnant
state = "pregnant"
temp = self._state_intervals(state, index)
pr_bar = pl.broken_barh(self._transform_to_age(temp, index),
(2, 1), facecolors="deeppink", label=f"{state}")
# What happened with the pregnancy?
po_plots = []
for state in preg_outcome_state:
if state not in preg_outcome_state:
break
else:
temp = self._preg_outcome_instants(state, index)
lbl = f"{preg_outcome_lbl[state]}"
marker = f"${preg_outcome_symbl[state]}$"
age = self.trajectories["age"][:, index]
po = pl.plot(age, 2.5*temp, marker=marker, label=f"{lbl}", **pl_args)
po_plots.append(po[0])
# Postpartum
state = "postpartum"
temp = self._state_intervals(state, index)
pp_bar = pl.broken_barh(self._transform_to_age(temp, index),
(0.7, 2.5), facecolors="oldlace", label=f"{state}")
# Lam period
state = "lam"
temp = self._state_intervals(state, index)
lam_bar = pl.broken_barh(self._transform_to_age(temp, index),
(0.7, 2.5), facecolors="none", edgecolors="#a5a5a5",
label=f"{state}",
hatch="\\\\\\"
)
# Labels and annotations
pl.xlim(-0.5, 95)
pl.ylim(ymin, ymax)
pl.xlabel('Age (years)')
pl.ylabel('')
pl.title(f"Life course of a woman")
# Hierarchical legend
# TODO: figure out a better/systematic way to do the bbox anchoring of multiple legends
lgn_loc = 'center right'
lvl_1_fnt_sze = 12
lvl_2_fnt_sze = 9
nonpreg_lgnd = plt.legend([al_bar, sa_bar, oc_bar],
["Alive", "Sexually active", "On contraception"],
fontsize=lvl_1_fnt_sze, loc=lgn_loc,
bbox_to_anchor=(0.965, 0.5),
frameon=False)
preg_lgnd = plt.legend([pr_bar], ["Pregnant"],
fontsize=lvl_1_fnt_sze, loc=lgn_loc,
bbox_to_anchor=(0.91, 0.35),
frameon=False)
lbls = [po._label for po in po_plots]
po_lgnd = plt.legend(po_plots,
lbls,
fontsize=lvl_2_fnt_sze,
loc=lgn_loc, bbox_to_anchor=(0.93, 0.25),
frameon=False)
pp_lgnd = plt.legend([lam_bar, pp_bar], ["LAM", "Pospartum"],
fontsize=lvl_1_fnt_sze, loc=lgn_loc,
bbox_to_anchor=(0.92, 0.1),
frameon=False)
plt.gca().add_artist(nonpreg_lgnd)
plt.gca().add_artist(preg_lgnd)
plt.gca().add_artist(po_lgnd)
plt.gca().add_artist(pp_lgnd)
ax = plt.gca()
ax.xaxis.set_major_locator(ticker.MultipleLocator(10))
# Set the minor ticks at every year
ax.xaxis.set_minor_locator(ticker.MultipleLocator(1))
return fig
def _state_intervals(self, state, index):
"""
Extract information about start and length of an interval where the
state is true. Works only for boolean States. Needed for broken_barh()
plots.
"""
# Find where we go from nonzero to zero in our data (mostly boolean states)
data_padded = np.pad(self.trajectories[state][:, index],
(1, 1), mode='constant')
crossings = np.where(np.diff(data_padded != 0))[0]
starts = crossings[:-1:2]
ends = crossings[1::2]
lengths = ends - starts
intervals = np.column_stack((starts, lengths))
return intervals
def _transform_to_age(self, intervals, index):
age = self.trajectories["age"][:, index]
intervals_age = np.full(shape=intervals.shape, fill_value=0.0, dtype=np.float64)
intervals_age[:, 1] = self.m2y*intervals[:, 1].astype(np.float64)
intervals_age[:, 0] = age[intervals[:, 0]]
return intervals_age
def _preg_outcome_instants(self, state, index):
"""
Use the States that count how many instances of each type of pregnancy
outcome a woman has over the course of her life, to extract the
event instants.
"""
data = self.trajectories[state][:, index]
temp = np.full(shape=data.shape,
fill_value=np.nan)
instants = sc.findinds(np.diff(data)) + 1
temp[instants] = 1.0
return temp
[docs]
class age_pyramids(ss.Analyzer):
'''
Records age pyramids for each timestep.
Attributes:
self.bins: A list of ages, default is a sequence from 0 to max_age + 1.
self.data: A matrix of shape (number of timesteps, number of bins - 1) containing age pyramid data.
'''
def __init__(self, bins=None):
"""
Initializes age bins and data variables
"""
super().__init__()
self.bins = bins
self.data = None
return
[docs]
def init_pre(self, sim, force=False):
"""
Initializes bins and data with proper shapes.
"""
super().init_pre(sim, force)
if self.bins is None:
# If no bins are provided, use default bins which exceed the maximum allowed age to ensure all agent ages are captured
self.bins = np.arange(0, sim.fp_pars['max_age']+2)
nbins = len(self.bins)-1
# self.data will contain the proportions of individuals in each age bin at each timestep
# self._raw will contain the raw counts of individuals in each age bin at each timestep
self.data = np.full((sim.t.npts, nbins), np.nan)
self._raw = sc.dcp(self.data)
return
[docs]
def step(self):
"""
Records histogram of ages of all alive individuals at a timestep such that
self.data[timestep] = list of proportions where index signifies age
"""
sim = self.sim
ages = sim.people.age.values
self._raw[sim.ti, :] = np.histogram(ages, self.bins)[0]
self.data[sim.ti, :] = self._raw[sim.ti, :]/self._raw[sim.ti, :].sum()
[docs]
def plot(self):
"""
Plots self.data as 2D pyramid plot
"""
fig = pl.figure()
pl.pcolormesh(self.data.T)
pl.xlabel('Timestep')
pl.ylabel('Age (years)')
return fig
[docs]
def plot3d(self):
"""
Plots self.data as 3D pyramid plot
"""
print('Warning, very slow...')
fig = pl.figure()
sc.bar3d(self.data.T)
pl.xlabel('Timestep')
pl.ylabel('Age (years)')
return fig
[docs]
class method_mix_over_time(ss.Analyzer):
"""
Tracks the number of women on each method available
for each time step
"""
def __init__(self, **kwargs):
super().__init__(**kwargs) # Initialize the Analyzer object
self.results = None
self.n_methods = None
self.tvec = None
return
def init_post(self):
super().initialize()
self.methods = self.sim.contraception_module.methods.keys()
self.n_methods = len(self.methods)
self.results = {k: np.zeros(self.sim.t.npts) for k in self.methods}
self.tvec = self.sim.tvec
return
def step(self):
sim = self.sim
ppl = sim.people
for m_idx, method in enumerate(self.methods):
eligible = ppl.female & ppl.alive & (ppl.method == m_idx)
self.results[method][sim.ti] = np.count_nonzero(eligible)
return
def plot(self, style=None):
with fpo.with_style(style):
fig, ax = plt.subplots(figsize=(10, 5))
for method in self.methods:
ax.plot(self.tvec, self.results[method][:], label=method)
ax.set_xlabel("Year")
ax.set_ylabel(f"Number of living women on method 'x'")
ax.legend()
fig.tight_layout()
return fig
[docs]
class state_tracker(ss.Analyzer):
'''
Records the number of living women on a specific boolean state (eg, numbe of
living women who live in rural settings)
'''
def __init__(self, state_name=None, min_age=fpd.min_age, max_age=fpd.max_age):
"""
Initializes bins and data variables
"""
super().__init__()
self.state_name = state_name
self.data_num = None
self.data_perc = None
self.tvec = None
self.min_age = min_age
self.max_age = max_age
return
[docs]
def init_post(self):
"""
Initializes bins and data with proper shapes
"""
sim = self.sim
super().init_post()
self.data_num = np.full((sim.t.npts,), np.nan)
self.data_perc = np.full((sim.t.npts,), np.nan)
self.data_n_female = np.full((sim.t.npts,), np.nan)
self.tvec = np.full((sim.t.npts,), np.nan)
return
[docs]
def step(self):
"""
Records histogram of ages of all alive individuals at a timestep such that
self.data[timestep] = list of proportions where index signifies age
"""
sim = self.sim
ppl = sim.people
living_women = (ppl.alive & ppl.female & (ppl.age >= self.min_age) & (ppl.age < self.max_age)).uids
self.data_num[sim.ti] = ppl[self.state_name][living_women].sum()
self.data_n_female[sim.ti] = len(living_women)
self.data_perc[sim.ti] = (self.data_num[sim.ti] / self.data_n_female[sim.ti])*100.0
self.tvec[sim.ti] = sim.y
[docs]
def plot(self, style=None):
"""
Plots self.data as a line
"""
colors = ["steelblue", "slategray", "black"]
with fpo.with_style(style):
fig, ax1 = plt.subplots(figsize=(10, 5))
ax2 = ax1.twinx()
ax3 = ax1.twinx()
ax1.spines["left"].set_color(colors[0])
ax1.tick_params(axis="y", labelcolor=colors[0])
ax2.spines["right"].set_color(colors[1])
ax2.yaxis.tick_right()
ax2.yaxis.set_label_position("right")
ax2.tick_params(axis="y", labelcolor=colors[1])
ax3.yaxis.tick_left()
ax3.spines["left"].set_position(('outward', 70))
ax3.spines["left"].set_color(colors[2])
ax3.yaxis.set_label_position("left")
ax3.tick_params(axis="y", labelcolor=colors[2])
ax1.plot(self.tvec, self.data_num, color=colors[0])
ax2.plot(self.tvec, self.data_perc, color=colors[1])
ax3.plot(self.tvec, self.data_n_female, color=colors[2])
ax1.set_xlabel('Year')
ax1.set_ylabel(f'Number of women who are {self.state_name}', color=colors[0])
ax2.set_ylabel(f'percentage (%) of women who are {self.state_name} \n (denominator=num living women {self.min_age}-{self.max_age})', color=colors[1])
ax3.set_ylabel(f'Number of women alive, aged {self.min_age}-{self.max_age}', color=colors[2])
fig.tight_layout()
return fig
[docs]
class track_as(ss.Analyzer):
"""
Analyzer for tracking age-specific results
"""
def __init__(self):
# Check versioning
if sc.compareversions(fp, '<3.0'):
errormsg = (f'Your current version of FPsim is {fp.__version__}, but this analyzer is slated for release'
f' with v3.0 of FPsim and is not currently functional. If you require age-specific results and'
f' FPsim v3.0 is not released, please contact us at [email protected] for assistance.')
raise ValueError(errormsg)
# Initialize
self.results = dict()
self.init_results()
self.reskeys = [
'imr_numerator',
'imr_denominator',
'mmr_numerator',
'mmr_denominator',
'as_stillbirths',
'imr_age_by_group',
'mmr_age_by_group',
'stillbirth_ages',
'acpr',
'cpr',
'mcpr',
'pregnancies',
'births'
]
self.age_bins = {
'<16': [10, 16],
'16-17': [16, 18],
'18-19': [18, 20],
'20-22': [20, 23],
'23-25': [23, 26],
'>25': [26, 100]
}
return
def init_results(self):
self.results['imr_age_by_group'] = []
self.results['mmr_age_by_group'] = []
self.results['stillbirth_ages'] = []
for rk in self.reskeys:
for ab in self.age_bins:
if 'numerator' in rk or 'denominator' in rk or 'as_' in rk:
self.results[rk] = []
else:
self.results[f"{rk}_{ab}"] = []
return
def age_by_group(self, ppl):
# Storing ages by method age group
age_bins = [0] + [max(self.age_bins[key]) for key in self.age_bins]
return np.digitize(ppl.age, age_bins) - 1
[docs]
def log_age_split(self, binned_ages_t, channel, numerators, denominators=None):
"""
Method called if age-specific results are being tracked. Separates results by age.
"""
counts_dict = {}
results_dict = {}
if denominators is not None: # true when we are calculating rates (like cpr)
for timestep_index in range(len(binned_ages_t)):
if len(denominators[timestep_index]) == 0:
counts_dict[f"age_true_counts_{timestep_index}"] = {}
counts_dict[f"age_false_counts_{timestep_index}"] = {}
else:
binned_ages = binned_ages_t[timestep_index]
binned_ages_true = binned_ages[
np.logical_and(numerators[timestep_index], denominators[timestep_index])]
if len(numerators[timestep_index]) == 0:
binned_ages_false = [] # ~[] doesnt make sense
else:
binned_ages_false = binned_ages[
np.logical_and(~numerators[timestep_index], denominators[timestep_index])]
counts_dict[f"age_true_counts_{timestep_index}"] = dict(
zip(*np.unique(binned_ages_true, return_counts=True)))
counts_dict[f"age_false_counts_{timestep_index}"] = dict(
zip(*np.unique(binned_ages_false, return_counts=True)))
age_true_counts = {}
age_false_counts = {}
for age_counts_dict_key in counts_dict:
for index in counts_dict[age_counts_dict_key]:
age_true_counts[index] = 0 if index not in age_true_counts else age_true_counts[index]
age_false_counts[index] = 0 if index not in age_false_counts else age_false_counts[index]
if 'false' in age_counts_dict_key:
age_false_counts[index] += counts_dict[age_counts_dict_key][index]
else:
age_true_counts[index] += counts_dict[age_counts_dict_key][index]
for index, age_str in enumerate(self.reskeys):
scale = 1
if channel == "imr":
scale = 1000
elif channel == "mmr":
scale = 100000
if index not in age_true_counts:
results_dict[f"{channel}_{age_str}"] = 0
elif index in age_true_counts and index not in age_false_counts:
results_dict[f"{channel}_{age_str}"] = 1.0 * scale
else:
results_dict[f"{channel}_{age_str}"] = (age_true_counts[index] / (
age_true_counts[index] + age_false_counts[index])) * scale
else: # true when we are calculating counts (like pregnancies)
for timestep_index in range(len(binned_ages_t)):
if len(numerators[timestep_index]) == 0:
counts_dict[f"age_counts_{timestep_index}"] = {}
else:
binned_ages = binned_ages_t[timestep_index]
binned_ages_true = binned_ages[numerators[timestep_index]]
counts_dict[f"age_counts_{timestep_index}"] = dict(
zip(*np.unique(binned_ages_true, return_counts=True)))
age_true_counts = {}
for age_counts_dict_key in counts_dict:
for index in counts_dict[age_counts_dict_key]:
age_true_counts[index] = 0 if index not in age_true_counts else age_true_counts[index]
age_true_counts[index] += counts_dict[age_counts_dict_key][index]
for index, age_str in enumerate(self.reskeys):
if index not in age_true_counts:
results_dict[f"{channel}_{age_str}"] = 0
else:
results_dict[f"{channel}_{age_str}"] = age_true_counts[index]
return results_dict
[docs]
def step(self):
"""
Apply the analyzer
Note: much of the logic won't work because the sim doesn't record the time at which events
occur (!), so attributes like ppl.ti_pregnant won't exist. These are all slated to be added
as part of the V3 refactor. For now, this is a placeholder.
"""
sim = self.sim
ppl = sim.people
ppl_uids = ppl.alive.uids
# Pregnancies
preg_uids = (ppl.ti_pregnant == self.sim.ti).uids
pregnant_boolean = np.full(len(ppl), False)
pregnant_boolean[np.searchsorted(ppl_uids, preg_uids)] = True
pregnant_age_split = self.log_age_split(binned_ages_t=[self.age_by_group], channel='pregnancies',
numerators=[pregnant_boolean], denominators=None)
for key in pregnant_age_split:
self.results[key] = pregnant_age_split[key]
# Stillborns
stillborn_uids = (ppl.ti_stillbirth == self.sim.ti).uids
stillbirth_boolean = np.full(len(ppl), False)
stillbirth_boolean[np.searchsorted(ppl_uids, stillborn_uids)] = True
self.results['stillbirth_ages'] = self.age_by_group
self.results['as_stillbirths'] = stillbirth_boolean
# Live births
live_uids = (ppl.ti_live_birth == self.sim.ti).uids
total_women_delivering = np.full(len(ppl), False)
total_women_delivering[np.searchsorted(ppl_uids, live_uids)] = True
self.results['mmr_age_by_group'] = self.age_by_group
live_births_age_split = self.log_age_split(binned_ages_t=[self.age_by_group], channel='births',
numerators=[total_women_delivering], denominators=None)
for key in live_births_age_split:
self.results[key] = live_births_age_split[key]
# MCPR
modern_methods_num = [idx for idx, m in enumerate(ppl.contraception_module.methods.values()) if m.modern]
method_age = (sim.fp_pars['method_age'] <= ppl.age)
fecund_age = ppl.age < sim.fp_pars['age_limit_fecundity']
denominator = method_age * fecund_age * ppl.female * ppl.alive
numerator = np.isin(ppl.method, modern_methods_num)
as_result_dict = self.log_age_split(binned_ages_t=[self.age_by_group], channel='mcpr',
numerators=[numerator], denominators=[denominator])
for key in as_result_dict:
self.results[key] = as_result_dict[key]
# CPR
denominator = ((sim.fp_pars['method_age'] <= ppl.age) * (ppl.age < sim.fp_pars['age_limit_fecundity']) * (
ppl.female * ppl.alive))
numerator = ppl.method != 0
as_result_dict = self.log_age_split(binned_ages_t=[self.age_by_group], channel='cpr',
numerators=[numerator], denominators=[denominator])
for key in as_result_dict:
self.results[key] = as_result_dict[key]
# ACPR
denominator = ((sim.fp_pars['method_age'] <= ppl.age) * (ppl.age < sim.fp_pars['age_limit_fecundity']) * (
ppl.female) * (ppl.pregnant == 0) * (ppl.sexually_active == 1) * ppl.alive)
numerator = ppl.method != 0
as_result_dict = self.log_age_split(binned_ages_t=[self.age_by_group], channel='acpr',
numerators=[numerator], denominators=[denominator])
for key in as_result_dict:
self.results[key] = as_result_dict[key]
# Additional stillbirth results
stillbirths_results_dict = self.log_age_split(binned_ages_t=self.results['stillbirth_ages'],
channel='stillbirths',
numerators=self.results['as_stillbirths'],
denominators=None)
for age_key in self.age_bins:
self.results[f"stillbirths_{age_key}"].append(
stillbirths_results_dict[f"stillbirths_{age_key}"])
return
[docs]
class longitudinal_history(ss.Analyzer):
"""
Analyzer for tracking longitudinal history of individuals. The longitude object acts as a circular buffer,
tracking the most recent 1 year of values for each key specified in longitude_keys.
"""
def __init__(self, longitude_keys, tiperyear=12):
super().__init__()
self.longitude_keys = longitude_keys
self.longitude = sc.objdict()
states = []
for key in self.longitude_keys:
self.longitude[key] = np.empty( shape=(tiperyear), dtype=list) # Initialize with empty lists
states.append(
fpa.TwoDimensionalArr(name=key, ncols=tiperyear)
)
self.define_states(*states)
[docs]
def step(self):
"""
Updates longitudinal params in people object
"""
ppl = self.sim.people
# Calculate column index in which to store current vals
index = int(self.sim.ti % self.sim.fp_pars['tiperyear'])
# Store the current params in people.longitude object
for key in self.longitude_keys:
attr = getattr(self, key)
attr[:, index] = getattr(ppl, key).values
return