'''
Specify the core interventions available in FPsim. Other interventions can be
defined by the user by inheriting from these classes.
'''
import pylab as pl
import sciris as sc
import inspect
#%% Generic intervention classes
__all__ = ['Intervention', 'change_par', 'update_methods']
[docs]
class Intervention:
'''
Base class for interventions. By default, interventions are printed using a
dict format, which they can be recreated from. To display all the attributes
of the intervention, use disp() instead.
To retrieve a particular intervention from a sim, use sim.get_intervention().
Args:
label (str): a label for the intervention (used for plotting, and for ease of identification)
show_label (bool): whether or not to include the label in the legend
do_plot (bool): whether or not to plot the intervention
line_args (dict): arguments passed to pl.axvline() when plotting
'''
def __init__(self, label=None, show_label=False, do_plot=None, line_args=None):
self._store_args() # Store the input arguments so the intervention can be recreated
if label is None: label = self.__class__.__name__ # Use the class name if no label is supplied
self.label = label # e.g. "Close schools"
self.show_label = show_label # Do not show the label by default
self.do_plot = do_plot if do_plot is not None else True # Plot the intervention, including if None
self.line_args = sc.mergedicts(dict(linestyle='--', c='#aaa', lw=1.0), line_args) # Do not set alpha by default due to the issue of overlapping interventions
self.years = [] # The start and end years of the intervention
self.initialized = False # Whether or not it has been initialized
self.finalized = False # Whether or not it has been initialized
return
def __repr__(self, jsonify=False):
''' Return a JSON-friendly output if possible, else revert to short repr '''
if self.__class__.__name__ in __all__ or jsonify:
try:
json = self.to_json()
which = json['which']
pars = json['pars']
parstr = ', '.join([f'{k}={v}' for k,v in pars.items()])
output = f"cv.{which}({parstr})"
except Exception as E:
output = type(self) + f' (error: {str(E)})' # If that fails, print why
return output
else:
return f'{self.__module__}.{self.__class__.__name__}()'
[docs]
def disp(self):
''' Print a detailed representation of the intervention '''
return sc.pr(self)
def _store_args(self):
''' Store the user-supplied arguments for later use in to_json '''
f0 = inspect.currentframe() # This "frame", i.e. Intervention.__init__()
f1 = inspect.getouterframes(f0) # The list of outer frames
parent = f1[2].frame # The parent frame, e.g. change_beta.__init__()
_,_,_,values = inspect.getargvalues(parent) # Get the values of the arguments
if values:
self.input_args = {}
for key,value in values.items():
if key == 'kwargs': # Store additional kwargs directly
for k2,v2 in value.items():
self.input_args[k2] = v2 # These are already a dict
elif key not in ['self', '__class__']: # Everything else, but skip these
self.input_args[key] = value
return
[docs]
def initialize(self, sim=None):
'''
Initialize intervention -- this is used to make modifications to the intervention
that can't be done until after the sim is created.
'''
self.initialized = True
self.finalized = False
return
[docs]
def finalize(self, sim=None):
'''
Finalize intervention
This method is run once as part of `sim.finalize()` enabling the intervention to perform any
final operations after the simulation is complete (e.g. rescaling)
'''
if self.finalized:
raise RuntimeError('Intervention already finalized') # Raise an error because finalizing multiple times has a high probability of producing incorrect results e.g. applying rescale factors twice
self.finalized = True
return
[docs]
def apply(self, sim):
'''
Apply the intervention. This is the core method which each derived intervention
class must implement. This method gets called at each timestep and can make
arbitrary changes to the Sim object, as well as storing or modifying the
state of the intervention.
Args:
sim: the Sim instance
Returns:
None
'''
raise NotImplementedError
[docs]
def plot_intervention(self, sim, ax=None, **kwargs):
'''
Plot the intervention
This can be used to do things like add vertical lines on days when
interventions take place. Can be disabled by setting self.do_plot=False.
Note 1: you can modify the plotting style via the ``line_args`` argument when
creating the intervention.
Note 2: By default, the intervention is plotted at the days stored in self.days.
However, if there is a self.plot_days attribute, this will be used instead.
Args:
sim: the Sim instance
ax: the axis instance
kwargs: passed to ax.axvline()
Returns:
None
'''
line_args = sc.mergedicts(self.line_args, kwargs)
if self.do_plot or self.do_plot is None:
if ax is None:
ax = pl.gca()
if hasattr(self, 'plot_years'):
years = self.plot_years
elif not self.years and hasattr(self, 'year'):
years = sc.toarray(self.year)
else:
years = self.years
if sc.isiterable(years):
label_shown = False # Don't show the label more than once
for y in years:
if sc.isnumber(y):
if self.show_label and not label_shown: # Choose whether to include the label in the legend
label = self.label
label_shown = True
else:
label = None
ax.axvline(y, label=label, **line_args)
return
[docs]
def to_json(self):
'''
Return JSON-compatible representation
Custom classes can't be directly represented in JSON. This method is a
one-way export to produce a JSON-compatible representation of the
intervention. In the first instance, the object dict will be returned.
However, if an intervention itself contains non-standard variables as
attributes, then its `to_json` method will need to handle those.
Note that simply printing an intervention will usually return a representation
that can be used to recreate it.
Returns:
JSON-serializable representation (typically a dict, but could be anything else)
'''
which = self.__class__.__name__
pars = sc.jsonify(self.input_args)
output = dict(which=which, pars=pars)
return output
[docs]
class change_par(Intervention):
'''
Change a parameter at a specified point in time.
Args:
par (str): the parameter to change
years (float/arr): the year(s) at which to apply the change
vals (any): a value or list of values to change to (if a list, must have the same length as years); or a dict of year:value entries
If any value is ``'reset'``, reset to the original value.
**Example**::
ec0 = fp.change_par(par='exposure_factor', years=[2000, 2010], vals=[0.0, 2.0]) # Reduce exposure factor
ec0 = fp.change_par(par='exposure_factor', vals={2000:0.0, 2010:2.0}) # Equivalent way of writing
sim = fp.Sim(interventions=ec0).run()
'''
def __init__(self, par, years=None, vals=None, verbose=False):
super().__init__()
self.par = par
self.verbose = verbose
if isinstance(years, dict): # Swap if years is supplied as a dict, so can be supplied first
vals = years
if vals is None:
errormsg = 'Values must be supplied'
raise ValueError(errormsg)
if isinstance(vals, dict):
years = sc.dcp(list(vals.keys()))
vals = sc.dcp(list(vals.values()))
else:
if years is None:
errormsg = 'If vals is not supplied as a dict, then year(s) must be supplied'
raise ValueError(errormsg)
else:
years = sc.toarray(sc.dcp(years))
vals = sc.dcp(vals)
if sc.isnumber(vals):
vals = sc.tolist(vals) # We want to be careful not to take something that might already be an array and interpret different values as years
n_years = len(years)
n_vals = len(vals)
if n_years != n_vals:
errormsg = f'Number of years ({n_years}) does not match number of values ({n_vals})'
raise ValueError(errormsg)
self.years = years
self.vals = vals
return
[docs]
def initialize(self, sim):
super().initialize()
# Validate parameter name
if self.par not in sim.pars:
errormsg = f'Parameter "{self.par}" is not a valid sim parameter'
raise ValueError(errormsg)
# Validate years and values
years = self.years
min_year = min(years)
max_year = max(years)
if min_year < sim['start_year']:
errormsg = f'Intervention start {min_year} is before the start of the simulation'
raise ValueError(errormsg)
if max_year > sim['end_year']:
errormsg = f'Intervention end {max_year} is after the end of the simulation'
raise ValueError(errormsg)
if years != sorted(years):
errormsg = f'Years {years} should be monotonic increasing'
raise ValueError(errormsg)
# Convert intervention years to sim timesteps
self.counter = 0
self.inds = sc.autolist()
for y in years:
self.inds += sc.findnearest(sim.tvec, y)
# Store original value
self.orig_val = sc.dcp(sim[self.par])
return
[docs]
def apply(self, sim):
if len(self.inds) > self.counter:
ind = self.inds[self.counter] # Find the current index
if sim.i == ind: # Check if the current timestep matches
curr_val = sc.dcp(sim[self.par])
val = self.vals[self.counter]
if val == 'reset':
val = self.orig_val
sim[self.par] = val # Update the parameter value -- that's it!
if self.verbose:
label = f'Sim "{sim.label}": ' if sim.label else ''
print(f'{label}On {sim.y}, change {self.counter+1}/{len(self.inds)} applied: "{self.par}" from {curr_val} to {sim[self.par]}')
self.counter += 1
return
[docs]
def finalize(self, sim=None):
# Check that all changes were applied
n_counter = self.counter
n_vals = len(self.vals)
if n_counter != n_vals:
errormsg = f'Not all values were applied ({n_vals} ≠ {n_counter})'
raise RuntimeError(errormsg)
return
[docs]
class update_methods(Intervention):
"""
Intervention to modify method efficacy and/or switching matrix.
Args:
year (float): The year we want to change the method.
eff (dict):
An optional key for changing efficacy; its value is a dictionary with the following schema:
{method: efficacy}
Where method is the method to be changed, and efficacy is the new efficacy (can include multiple keys).
probs (list): A list of dictionaries where each dictionary has the following keys:
source (str): the source method to be changed.
dest (str): the destination method to be changed.
factor (float): the factor by which to multiply existing probability; OR
value (float): the value to replace the switching probability value.
keys (list): a list of strings representing age groups to affect.
matrix (str): one of ['probs', 'probs1', 'probs1to6'] where:
probs: Changes the specified uptake at the corresponding year regardless of state.
probs1: Changes the specified uptake for all individuals in their first month postpartum.
probs1to6: Changes the specified uptake for all individuals that are in the first 6 months postpartum.
"""
def __init__(self, year, eff=None, probs=None, matrix=None, verbose=False):
super().__init__()
self.year = year
self.eff = eff
self.probs = probs
self.matrix = matrix
self.verbose = verbose
# Validation
if self.year is None:
errormsg = 'A year must be supplied'
raise ValueError(errormsg)
if self.eff is None and self.probs is None:
errormsg = 'Either efficacy or probabilities must be supplied'
raise ValueError(errormsg)
self.applied = False
return
[docs]
def apply(self, sim):
"""
Applies the efficacy or contraceptive uptake changes if it is the specified year
based on scenario specifications.
"""
if not self.applied and sim.y >= self.year:
self.applied = True # Ensure we don't apply this more than once
# Implement efficacy
if self.eff is not None:
for k,rawval in self.eff.items():
sim.pars.update_method_eff(method=k, eff=rawval)
# Implement method mix shift
if self.probs is not None:
probs = sc.tolist(self.probs)
for entry in probs:
entry = sc.dcp(entry)
matrix = entry.pop('matrix', self.matrix) # Switching matrix
ages = entry.pop('ages', None)
source = entry.pop('source', None)
dest = entry.pop('dest', None)
method = entry.pop('method', None)
factor = entry.pop('factor', None)
value = entry.pop('value', None)
i_factor = entry.pop('init_factor', None)
d_factor = entry.pop('discont_factor', None)
i_value = entry.pop('init_value', None)
d_value = entry.pop('discont_value', None)
copy_from = entry.pop('copy_from', None)
# Supply default matrix
if matrix is None:
matrix = 'annual'
# Validation # CK: TODO: move validation to initialization
if len(entry) != 0:
errormsg = f'Keys "{sc.strjoin(entry.keys())}" not valid entries; see fp.make_scen() for valid args'
raise ValueError(errormsg)
# Validate method/source/dest
if method is not None:
if (source is not None or dest is not None):
errormsg = 'You can supply "method" as an alternative to "source" and "dest", but not both'
raise ValueError(errormsg)
else:
source = method
dest = method
# Ensure correct number of inputs are given
n_vals = len(sc.mergelists(copy_from, factor, value, i_factor, d_factor, i_value, d_value))
if n_vals != 1:
errormsg = f'Must supply one and only one of copy_from, factor, value, or initiation/discontinuation factors/values; you supplied {n_vals}'
raise ValueError(errormsg)
# Check nothing strange has happened
is_switch = len(sc.mergelists(factor, value))
is_init = len(sc.mergelists(i_value, i_factor))
is_discont = len(sc.mergelists(d_value, d_factor))
is_copy = (copy_from is not None)
if is_switch + is_init + is_discont + is_copy != 1:
errormsg = f'Could not figure out what to do: switching={is_switch}, initiation={is_init}, discontinuation={is_discont}, but only one should happen'
raise ValueError(errormsg)
if is_init: # It's initiation
source = 'None'
dest = method
elif is_discont: # It's discontinuation
source = method
dest = 'None'
elif not is_copy and (source is None) and (dest is None):
errormsg = 'Must supply a source or a destination'
raise ValueError(errormsg)
# Decide if it's a factor or a value modification
factor = sc.mergelists(factor, i_factor, d_factor)
value = sc.mergelists(value, i_value, d_value)
factor = factor[0] if factor else None
value = value[0] if value else None
# Actually update the values and check the matrix is valid
kw = dict(source=source, dest=dest, factor=factor, value=value, ages=ages, matrix=matrix, copy_from=copy_from)
sim.pars.update_method_prob(**kw)
return