Source code for fpsim.scenarios

'''
Class to define and run scenarios
'''

import numpy as np
import pandas as pd
import sciris as sc
from . import defaults as fpd
from . import parameters as fpp
from . import sim as fps
from . import interventions as fpi

__all__ = ['make_scen', 'Scenario', 'Scenarios']


#%% Validation functions -- for internal use only

def check_not_none(obj, *args):
    ''' Check that all needed inputs are supplied '''
    for key in args:
        if obj[key] is None:
            errormsg = f'Entry "{key}" is not allowed to be None for the following scenario spec:\n{obj}'
            raise ValueError(errormsg)
    return


def check_ages(ages):
    ''' Check that age keys are all valid '''
    valid_keys = list(fpd.method_age_map.keys()) + fpd.none_all_keys
    ages = sc.tolist(ages, keepnone=True)
    for age in ages:
        if age not in valid_keys:
            errormsg = f'Age "{age}" is not valid; choices are:\n{sc.newlinejoin(valid_keys)}'
            raise sc.KeyNotFoundError(errormsg)
    return


def check_method(methods):
    ''' Check that methods are valid '''
    valid_methods = list(fpd.method_map.keys()) + [None]
    for method in methods:
        if method not in valid_methods:
            errormsg = f'Method "{method}" is not valid; choices are:\n{sc.newlinejoin(valid_methods)}'
            raise sc.KeyNotFoundError(errormsg)
    return


#%% Scenario classes

[docs] class Scenario(sc.prettyobj, sc.dictobj): ''' Store the specification for a single scenario (which may consist of multiple interventions). This function is intended to be as flexible as possible; as a result, it may be somewhat confusing. There are five different ways to call it -- method efficacy, method probability, method initiation/discontinuation, parameter, and custom intervention. Args (shared): spec (dict): a pre-made specification of a scenario; see keyword explanations below (optional) args (list): additional specifications (optional) label (str): the sim label to use for this scenario pars (dict): optionally supply additional sim parameters to use with this scenario (that take effect at the beginning of the sim, not at the point of intervention) year (float): the year at which to activate efficacy and probability scenarios matrix (str): which set of probabilities to modify for probability scenarios (e.g. annual or postpartum) ages (str/list): the age groups to modify the probabilities for Args (efficacy): year (float): as above eff (dict): a dictionary of method names and new efficacy values Args (probablity): year (float): as above matrix (str): as above ages (str): as above source (str): the method to switch from dest (str): the method to switch to factor (float): if supplied, multiply the [source, dest] probability by this amount value (float): if supplied, instead of factor, replace the [source, dest] probability by this value copy_from (str): if supplied, copy probabilities from a different method Args (initiation/discontinuation): year (float): as above matrix (str): as above ages (str): as above method (str): the method for initiation/discontinuation init_factor (float): as with "factor" above, for initiation (None → method) discont_factor (float): as with "factor" above, for discontinuation (method → None) init_value (float): as with "value" above, for initiation (None → method) discont_value (float): as with "value" above, for discontinuation (method → None) Args (parameter): par (str): the parameter to modify par_years (float/list): the year(s) at which to apply the modifications par_vals (float/list): the value(s) of the parameter for each year Args (custom): interventions (Intervention/list): any custom intervention(s) to be applied to the scenario Congratulations on making it this far. **Examples**:: # Basic efficacy scenario s1 = fp.make_scen(eff={'Injectables':0.99}, year=2020) # Double rate of injectables initiation s2 = fp.make_scen(source='None', dest='Injectables', factor=2) # Double rate of injectables initiation -- alternate approach s3 = fp.make_scen(method='Injectables', init_factor=2) # More complex example: change condoms to injectables transition probability for 18-25 postpartum women s4 = fp.make_scen(source='Condoms', dest='Injectables', value=0.5, ages='18-25', matrix='pp1to6') # Parameter scenario: halve exposure s5 = fp.make_scen(par='exposure_factor', years=2010, vals=0.5) # Custom scenario def update_sim(sim): sim.updated = True s6 = fp.make_scen(interventions=update_sim) # Combining multiple scenarios: change probabilities and exposure factor s7 = fp.make_scen( dict(method='Injectables', init_value=0.1, discont_value=0.02, create=True), dict(par='exposure_factor', years=2010, vals=0.5) ) # Scenarios can be combined s8 = s1 + s2 ''' def __init__(self, spec=None, label=None, pars=None, year=None, matrix=None, ages=None, # Basic settings eff=None, probs=None, # Option 1 source=None, dest=None, factor=None, value=None, copy_from=None, # Option 2 method=None, init_factor=None, discont_factor=None, init_value=None, discont_value=None, # Option 3 par=None, par_years=None, par_vals=None, # Option 4 interventions=None, # Option 5 ): # Handle input specification if isinstance(spec, str) and not isinstance(label, str): # Swap order if types don't match label,spec = spec,label self.specs = sc.mergelists(*[Scenario(**s).specs for s in sc.tolist(spec)]) # Sorry self.label = label self.pars = sc.mergedicts(pars) if not isinstance(label, (str, type(None))): errormsg = f'Unexpected label type {type(label)}' raise TypeError(errormsg) # Handle other keyword inputs eff_spec = None prob_spec = None par_spec = None intv_specs = None # It's an efficacy scenario if eff is not None: eff_spec = sc.objdict( which = 'eff', eff = eff, year = year, ) check_not_none(eff_spec, 'year') # It's a method switching probability scenario prob_args = [probs, factor, value, init_factor, discont_factor, init_value, discont_value] if len(sc.mergelists(*prob_args)): # Check if any are non-None prob_spec = sc.objdict( which = 'prob', year = year, probs = probs if probs else dict( matrix = matrix, ages = ages, source = source, dest = dest, factor = factor, value = value, method = method, init_factor = init_factor, discont_factor = discont_factor, init_value = init_value, discont_value = discont_value, copy_from = copy_from, ) ) check_not_none(prob_spec, 'year') check_ages(ages) check_method([source, dest, method]) # It's a parameter change scenario if par is not None: par_spec = sc.objdict( which = 'par', par = par, par_years = par_years, par_vals = par_vals, ) check_not_none(par_spec, 'par_years', 'par_vals') # It's a custom scenario(s) if interventions is not None: intv_specs = [] for intv in sc.tolist(interventions): intv_specs.append(sc.objdict( which = 'intv', intervention = intv, )) # Merge these different scenarios into the list, skipping None entries self.specs.extend(sc.mergelists(eff_spec, prob_spec, par_spec, intv_specs)) # Finally, ensure all have a consistent label if supplied self.update_label() return
[docs] def __add__(self, scen2): ''' Combine two scenario spec lists ''' scen3 = sc.dcp(self) scen3.specs.extend(sc.dcp(scen2.specs)) return scen3
def __radd__(self, scen2): ''' Allows sum() to work correctly ''' if not scen2: return self else: return self.__add__(scen2)
[docs] def update_label(self, label=None): ''' Ensure all specs have the correct label ''' # Option 1: update the label based on the spec if label is None: label = self.label for spec in self.specs: if 'label' in spec and spec['label'] is not None: label = spec['label'] # Option 2: update the spec based on the label else: for spec in self.specs: spec['label'] = label self.label = label return
[docs] def run(self, run_args=None, **kwargs): ''' Shortcut for creating and running a Scenarios object based on the current scenario. Args: run_args (dict): passed to scens.run() kwargs (dict): passed to Scenarios() ''' scens = Scenarios(**kwargs) scens.add_scen(self) scens.run(**sc.mergedicts(run_args)) return scens
[docs] def make_scen(*args, **kwargs): ''' Alias for ``fp.Scenario()``. ''' return Scenario(*args, **kwargs)
# Ensure the function ahs the same docstring as the class make_scen.__doc__ += '\n\n' + Scenario.__doc__
[docs] class Scenarios(sc.prettyobj): ''' Run different intervention scenarios. A "scenario" can be thought of as a list of sims, all with the same parameters except for the random seed. Usually, scenarios differ from each other only in terms of the interventions run (to compare other differences between sims, it's preferable to use a MultiSim object). Args: pars (dict): parameters to pass to the sim repeats (int): how many repeats of each scenario to run (default: 1) scens (list): the list of scenarios to run; see also ``fp.make_scen()`` and ``Scenarios.add_scen()`` kwargs (dict): optional additional parameters to pass to the sim **Example**:: scen1 = fp.make_scen(label='Baseline') scen2 = fp.make_scen(year=2002, eff={'Injectables':0.99}) # Basic efficacy scenario scens = fp.Scenarios(location='test', repeats=2, scens=[scen1, scen2]) scens.run() ''' def __init__(self, pars=None, repeats=None, scens=None, **kwargs): self.pars = sc.mergedicts(pars, kwargs) self.repeats = repeats if repeats is not None else 1 self.scens = sc.dcp(sc.tolist(scens)) self.simslist = [] self.msims = [] self.msim = None self.already_run = False return
[docs] def add_scen(self, scen=None, label=None): ''' Add a scenario or scenarios to the Scenarios object ''' if isinstance(scen, Scenario): scen.update_label(label) self.scens.append(scen) else: self.scens.append(Scenario(label=label, spec=scen)) return
[docs] def make_sims(self, scenlabel, **kwargs): ''' Create a list of sims that are all identical except for the random seed ''' if scenlabel is None: errormsg = 'Scenario label must be defined' raise ValueError(errormsg) sims = sc.autolist() for i in range(self.repeats): pars = sc.mergedicts(fpp.pars(self.pars.get('location')), self.pars, _copy=True) pars.update(kwargs) pars['seed'] += i sim = fps.Sim(pars=pars) sim.scenlabel = scenlabel # Special label for scenarios objects if sim.label is None: sim.label = scenlabel # Include here if no other label sims += sim return sims
[docs] def make_scens(self): ''' Convert a scenario specification into a list of sims ''' for i,scen in enumerate(self.scens): simlabel = scen.label interventions = sc.autolist() for spec in sc.tolist(scen.specs): # Figure ou what type of spec this is spec = sc.dcp(spec) if 'which' not in spec: errormsg = f'Invalid scenario spec: key "which" not found among: {sc.strjoin(spec.keys())}' raise ValueError(errormsg) else: which = spec.pop('which') # Handle interventions if which == 'eff': eff = spec.pop('eff') year = spec.pop('year') interventions += fpi.update_methods(eff=eff, year=year) elif which == 'prob': probs = spec.pop('probs') year = spec.pop('year') interventions += fpi.update_methods(probs=probs, year=year) elif which == 'par': par = spec.pop('par') years = spec.pop('par_years') vals = spec.pop('par_vals') interventions += fpi.change_par(par=par, years=years, vals=vals) elif which == 'intv': intv = spec.pop('intervention') interventions += intv else: errormsg = f'Could not understand intervention type "{which}"' raise ValueError(errormsg) # Handle label label = spec.pop('label', scen.label) assert len(spec)==0, f'Unrecognized scenario key(s) {sc.strjoin(spec.keys())}' if simlabel is None: simlabel = label else: if label != simlabel: print('Warning, new sim label {label} does not match existing sim label {simlabel}') if simlabel is None: simlabel = f'Scenario {i}' sims = self.make_sims(scenlabel=simlabel, interventions=interventions, **scen.pars) self.simslist.append(sims) return
[docs] def run(self, recompute=True, *args, **kwargs): ''' Actually run a list of sims ''' # Check that it's set up if not self.scens: errormsg = 'No scenarios are defined' raise ValueError(errormsg) if not self.simslist: self.make_scens() # Create msim msims = sc.autolist() for sims in self.simslist: msims += fps.MultiSim(sims) self.msim = fps.MultiSim.merge(*msims) # Run self.msim.run(**kwargs) self.already_run = True # Process self.msim_merged =self.msim.remerge(recompute=recompute) self.analyze_sims() return
[docs] def check_run(self): ''' Give a meaningful error message if the scenarios haven't been run ''' if not self.already_run: errormsg = 'Scenarios have not yet been run; please run first via scens.run()' raise RuntimeError(errormsg) return
[docs] def plot(self, to_plot=None, plot_sims=True, **kwargs): ''' Plot the scenarios with bands -- see ``sim.plot()`` for args ''' self.check_run() if to_plot == 'method': return self.msim.plot(to_plot=to_plot, plot_sims=plot_sims, **kwargs) else: return self.msim_merged.plot(to_plot=to_plot, plot_sims=plot_sims, **kwargs)
[docs] def plot_sims(self, to_plot=None, plot_sims=True, **kwargs): ''' Plot each sim as a separate line across all senarios -- see ``sim.plot()`` for args ''' self.check_run() return self.msim.plot(to_plot=to_plot, plot_sims=plot_sims, **kwargs)
[docs] def analyze_sims(self, start=None, end=None): ''' Take a list of sims that have different labels and extrapolate statistics from each ''' self.check_run() # Pull out first sim and parameters sim0 = self.msim.sims[0] if start is None: start = sim0.pars['start_year'] if end is None: end = sim0.pars['end_year'] def analyze_sim(sim): def aggregate_channel(channel, is_sum=True, is_t=False): if is_t: year = sim.results['t'] else: year = sim.results['tfr_years'] channel_results = sim.results[channel] inds = sc.findinds((year >= start), year <= end) if is_sum: return channel_results[inds].sum() else: return channel_results[inds].mean() # Defines how we calculate each channel, first number is is_sum: 1 = aggregate as sum, 0 = aggregate as mean # The second parameter defines whether to aggregate by year or by timestep where 1 = use sim.t (timestep), 0 = use sim.tfr_years (years) agg_param_dict = {'method_failures_over_year': (1, 0), 'pop_size': (1, 0), 'tfr_rates': (0, 0), 'maternal_deaths_over_year': (1, 0), 'infant_deaths_over_year': (1, 0), 'mcpr': (0, 1), 'births': (1, 1)} results_dict = {} for channel in agg_param_dict: parameters = agg_param_dict[channel] results_dict[channel] = aggregate_channel(channel, parameters[0], parameters[1]) return results_dict # Split the sims up by scenario results = sc.objdict() results.sims = sc.objdict(defaultdict=sc.autolist) for sim in self.msim.sims: try: label = sim.scenlabel except: errormsg = f'Warning, could not extract scenlabel from sim {sim.label}; using default...' print(errormsg) label = sim.label results.sims[label] += sim # Count the births across the scenarios raw = sc.ddict(list) for key,sims in results.sims.items(): for sim in sims: results_dict = analyze_sim(sim) raw['scenario'] += [key] # Append scenario key raw['births'] += [results_dict['births']] # Append births raw['fails'] += [results_dict['method_failures_over_year']] # Append failures raw['popsize'] += [results_dict['pop_size']] # Append population size raw['tfr'] += [results_dict['tfr_rates']] # Append mean tfr rates raw['infant_deaths'] += [results_dict['infant_deaths_over_year']] raw['maternal_deaths'] += [results_dict['maternal_deaths_over_year']] raw['mcpr'] += [results_dict['mcpr']] # Calculate basic stats results.stats = sc.objdict() for statkey in ['mean', 'median', 'std', 'min', 'max']: results.stats[statkey] = sc.objdict() for k,vals in raw.items(): if k != 'scenario': results.stats[statkey][k] = getattr(np, statkey)(vals) # Also save as pandas results.df = pd.DataFrame(raw) self.results = results return