Source code for

Functions and classes for running multiple Poliosim runs.

# %% Imports
import numpy as np
import pandas as pd
import sciris as sc
from collections import defaultdict
from . import utils as psu
from . import parameters as pspar
from . import base as psb
from . import model as psm
from . import plotting as psplt
from . import interventions as psi
from . import analysis as psa

# Specify all externally visible functions this file defines
__all__ = ['MultiSim', 'single_run', 'multi_run', 'run_configs', 'run_sims', 'create_sim']

[docs]class MultiSim(sc.prettyobj): ''' Class for running multiple copies of a simulation. The parameter n_runs controls how many copies of the simulation there will be, if a list of sims is not provided. Args: sims (Sim/list) : a single sim or a list of sims base_sim (Sim) : the sim used for shared properties; if not supplied, the first of the sims provided quantiles (dict) : the quantiles to use with reduce(), e.g. [0.1, 0.9] or {'low : '0.1, 'high' : 0.9} initialize (bool) : whether or not to initialize the sims (otherwise, initialize them during run) kwargs (dict) : stored in run_args and passed to run() Returns: msim: a MultiSim object **Examples**:: sim = ps.Sim() # Create the sim msim = ps.MultiSim(sim, n_runs=5) # Create the multisim # Run them in parallel msim.combine() # Combine into one sim msim.plot() # Plot results sim = ps.Sim() # Create the sim msim = ps.MultiSim(sim, n_runs=11, noise=0.1, keep_people=True) # Set up a multisim with noise # Run msim.reduce() # Compute statistics msim.plot() # Plot sims = [ps.Sim(beta=0.015*(1+0.02*i)) for i in range(5)] # Create sims for sim in sims: # Run sims in serial msim = ps.MultiSim(sims) # Convert to multisim msim.plot() # Plot as single sim ''' def __init__(self, sims=None, base_sim=None, quantiles=None, initialize=False, label=None, **kwargs): # Handle inputs if base_sim is None: if isinstance(sims, psm.Sim): base_sim = sims sims = None elif isinstance(sims, list): base_sim = sims[0] else: errormsg = f'If base_sim is not supplied, sims must be either a sims or a list of sims, not {type(sims)}' raise TypeError(errormsg) if quantiles is None: quantiles = [0.1, 0.9] # Set properties self.label = base_sim.label if (label is None and base_sim is not None) else label self.sims = sims self.base_sim = base_sim self.quantiles = quantiles self.run_args = sc.mergedicts(kwargs) self.results = None self.which = None # Whether the multisim is to be reduced, combined, etc. psb.set_metadata(self) # Set version, date, and git info # Optionally initialize if initialize: self.init_sims() return def __len__(self): try: return len(self.sims) except: return 0
[docs] def result_keys(self): ''' Attempt to retrieve the results keys from the base sim ''' try: keys = self.base_sim.result_keys() except Exception as E: errormsg = f'Could not retrieve result keys since base sim not accessible: {str(E)}' raise ValueError(errormsg) return keys
[docs] def init_sims(self, **kwargs): ''' Initialize the sims, but don't actually run them. Syntax is the same as Note: in most cases you can just call run() directly, there is no need to call this separately. Args: kwargs (dict): passed to multi_run() ''' # Handle which sims to use if self.sims is None: sims = self.base_sim else: sims = self.sims # Initialize the sims but don't run them kwargs = sc.mergedicts(self.run_args, kwargs, {'do_run': False}) # Never run, that's the point! self.sims = multi_run(sims, **kwargs) return
[docs] def run(self, reduce=False, combine=False, **kwargs): ''' Run the actual sims Args: reduce (bool): whether or not to reduce after running (see reduce()) combine (bool): whether or not to combine after running (see combine(), not compatible with reduce) kwargs (dict): passed to multi_run(); use run_args to pass arguments to Returns: None (modifies MultiSim object in place) **Examples**::'2020-0601', restore_pars=False)) ''' # Handle which sims to use -- same as init_sims() if self.sims is None: sims = self.base_sim else: sims = self.sims # Run kwargs = sc.mergedicts(self.run_args, kwargs) self.sims = multi_run(sims, **kwargs) # Reduce or combine if reduce: self.reduce() elif combine: self.combine() return self
def _has_orig_sim(self): ''' Helper method for determining if an original base sim is present ''' return hasattr(self, 'orig_base_sim') def _rm_orig_sim(self, reset=False): ''' Helper method for removing the original base sim, if present ''' if self._has_orig_sim(): if reset: self.base_sim = self.orig_base_sim delattr(self, 'orig_base_sim') return
[docs] def shrink(self, **kwargs): ''' Not to be confused with reduce(), this shrinks each sim in the msim; see sim.shrink() for more information. Args: kwargs (dict): passed to sim.shrink() for each sim ''' self.base_sim.shrink(**kwargs) self._rm_orig_sim() for sim in self.sims: sim.shrink(**kwargs) return
[docs] def reset(self): ''' Undo a combine() or reduce() by resetting the base sim, which, and results ''' self._rm_orig_sim(reset=True) self.which = None self.results = None return
[docs] def reduce(self, quantiles=None, output=False): ''' Combine multiple sims into a single sim with scaled results ''' if quantiles is None: quantiles = self.quantiles if not isinstance(quantiles, dict): try: quantiles = {'low': float(quantiles[0]), 'high': float(quantiles[1])} except Exception as E: errormsg = f'Could not figure out how to convert {quantiles} into a quantiles object: must be a dict with keys low, high or a 2-element array ({str(E)})' raise ValueError(errormsg) # Store information on the sims n_runs = len(self) reduced_sim = sc.dcp(self.sims[0]) reduced_sim.parallelized = {'parallelized': True, 'combined': False, 'n_runs': n_runs} # Store how this was parallelized # Perform the statistics raw = {} reskeys = reduced_sim.result_keys() for reskey in reskeys: raw[reskey] = np.zeros((reduced_sim.npts, len(self.sims))) for s, sim in enumerate(self.sims): vals = sim.results[reskey].values if len(vals) != reduced_sim.npts: errormsg = f'Cannot reduce sims with inconsistent numbers of days: {reduced_sim.npts} vs. {len(vals)}' raise ValueError(errormsg) raw[reskey][:, s] = vals for reskey in reskeys: reduced_sim.results[reskey].values[:] = np.quantile(raw[reskey], q=0.5, axis=1) # Changed from median to mean for smoother plots reduced_sim.results[reskey].low = np.quantile(raw[reskey], q=quantiles['low'], axis=1) reduced_sim.results[reskey].high = np.quantile(raw[reskey], q=quantiles['high'], axis=1) # Compute and store final results reduced_sim.compute_summary(verbose=False) self.orig_base_sim = self.base_sim self.base_sim = reduced_sim self.results = reduced_sim.results self.summary = reduced_sim.summary self.which = 'reduced' if output: return self.base_sim else: return
[docs] def mean(self, bounds=None, **kwargs): ''' Alias for reduce(use_mean=True). See reduce() for full description. Args: bounds (float): multiplier on the standard deviation for the upper and lower bounds (default, 2) kwargs (dict): passed to reduce() ''' return self.reduce(use_mean=True, bounds=bounds, **kwargs)
[docs] def median(self, quantiles=None, **kwargs): ''' Alias for reduce(use_mean=False). See reduce() for full description. Args: quantiles (list or dict): upper and lower quantiles (default, 0.1 and 0.9) kwargs (dict): passed to reduce() ''' return self.reduce(use_mean=False, quantiles=quantiles, **kwargs)
[docs] def combine(self, output=False): ''' Combine multiple sims into a single sim with scaled results. **Example**:: msim = cv.MultiSim(cv.Sim()) msim.combine() msim.summarize() ''' n_runs = len(self) combined_sim = sc.dcp(self.sims[0]) combined_sim.parallelized = dict(parallelized=True, combined=True, n_runs=n_runs) # Store how this was parallelized for s,sim in enumerate(self.sims[1:]): # Skip the first one if combined_sim.people: # If the people are there, add them and increment the population size accordingly combined_sim.people += sim.people combined_sim['pop_size'] =['pop_size'] else: # If not, manually update population size combined_sim['pop_size'] += sim['pop_size'] # Record the number of people for key in sim.result_keys(): vals = sim.results[key].values if len(vals) != combined_sim.npts: errormsg = f'Cannot combine sims with inconsistent numbers of days: {combined_sim.npts} vs. {len(vals)}' raise ValueError(errormsg) combined_sim.results[key].values += vals # For non-count results (scale=False), rescale them for key in combined_sim.result_keys(): if not combined_sim.results[key].scale: combined_sim.results[key].values /= n_runs # Compute and store final results combined_sim.compute_summary() self.orig_base_sim = self.base_sim self.base_sim = combined_sim self.results = combined_sim.results self.summary = combined_sim.summary self.which = 'combined' if output: return self.base_sim else: return
[docs] def compare(self, t=None, sim_inds=None, output=False, do_plot=False, **kwargs): ''' Create a dataframe compare sims at a single point in time. Args: t (int/str) : the day (or date) to do the comparison; default, the end sim_inds (list) : list of integers of which sims to include (default: all) output (bool) : whether or not to return the comparison as a dataframe do_plot (bool) : whether or not to plot the comparison (see also plot_compare()) kwargs (dict) : passed to plot_compare() Returns: df (dataframe): a dataframe comparison ''' # Handle time if t is None: t = -1 daystr = 'the last day' else: daystr = f'day {t}' # Handle the indices if sim_inds is None: sim_inds = list(range(len(self.sims))) # Move the results to a dictionary resdict = defaultdict(dict) for i,s in enumerate(sim_inds): sim = self.sims[s] day = # Unlikely, but different sims might have different start days label = sim.label if not label: # Give it a label if it doesn't have one label = f'Sim {i}' if label in resdict: # Avoid duplicates label += f' ({i})' for reskey in sim.result_keys(): res = sim.results[reskey] val = res.values[day] if res.scale: # Results that are scaled by population are ints val = int(val) resdict[label][reskey] = val if do_plot: self.plot_compare(**kwargs) df = pd.DataFrame.from_dict(resdict).astype(object) # astype is necessary to prevent type coercion if not output: print(f'Results for {daystr} in each sim:') print(df) else: return df
[docs] def plot(self, to_plot=None, inds=None, plot_sims=False, color_by_sim=None, max_sims=5, colors=None, labels=None, alpha_range=None, plot_args=None, show_args=None, **kwargs): ''' Plot all the sims -- arguments passed to Sim.plot(). The behavior depends on whether or not combine() or reduce() has been called. If so, this function by default plots only the combined/reduced sim (which you can override with plot_sims=True). Otherwise, it plots a separate line for each sim. Note that this function is complex because it aims to capture the flexibility of both sim.plot() and scens.plot(). By default, if combine() or reduce() has been used, it will resemble sim.plot(); otherwise, it will resemble scens.plot(). This can be changed via color_by_sim, together with the other options. Args: to_plot (list) : list or dict of which results to plot; see cv.get_default_plots() for structure inds (list) : if not combined or reduced, the indices of the simulations to plot (if None, plot all) plot_sims (bool) : whether to plot individual sims, even if combine() or reduce() has been used color_by_sim (bool) : if True, set colors based on the simulation type; otherwise, color by result type; True implies a scenario-style plotting, False implies sim-style plotting max_sims (int) : maximum number of sims to use with color-by-sim; can be overridden by other options colors (list) : if supplied, override default colors for color_by_sim labels (list) : if supplied, override default labels for color_by_sim alpha_range (list) : a 2-element list/tuple/array providing the range of alpha values to use to distinguish the lines plot_args (dict) : passed to sim.plot() show_args (dict) : passed to sim.plot() kwargs (dict) : passed to sim.plot() Returns: fig: Figure handle **Examples**:: sim = ps.Sim() msim = ps.MultiSim(sim) msim.plot() # Plots individual sims msim.reduce() msim.plot() # Plots the combined sim ''' # Plot a single curve, possibly with a range if not plot_sims and self.which in ['combined', 'reduced']: fig = self.base_sim.plot(to_plot=to_plot, colors=colors, **kwargs) # PLot individual sims on top of each other else: # Initialize fig = None orig_show = kwargs.get('do_show', True) orig_setylim = kwargs.get('setylim', True) kwargs['legend_args'] = sc.mergedicts({'show_legend': True}, kwargs.get('legend_args')) # Only plot the legend the first time # Handle indices if inds is None: inds = np.arange(len(self.sims)) n_sims = len(inds) # Handle what style of plotting to use: if color_by_sim is None: if n_sims <= max_sims: color_by_sim = True else: color_by_sim = False # Handle what to plot if to_plot is None: if color_by_sim: to_plot = psplt.get_scen_plots() else: to_plot = psplt.get_sim_plots() # Handle colors if colors is None: if color_by_sim: colors = sc.gridcolors(ncolors=n_sims) else: colors = [None] * n_sims # So we can iterate over it else: colors = [colors] * n_sims # Again, for iteration # Handle alpha if not using colors if alpha_range is None: if color_by_sim: alpha_range = [0.8, 0.8] # We're using color to distinguish sims, so don't need alpha else: alpha_range = [0.8, 0.3] # We're using alpha to distinguish sims alphas = np.linspace(alpha_range[0], alpha_range[1], n_sims) # Plot for s, ind in enumerate(inds): sim = self.sims[ind] final_plot = (s == n_sims - 1) # Check if this is the final plot # Handle the legend and labels if final_plot: merged_show_args = show_args kwargs['do_show'] = orig_show kwargs['setylim'] = orig_setylim else: merged_show_args = False # Only show things like data the last time it's plotting kwargs['do_show'] = False # On top of that, don't show the plot at all unless it's the last time kwargs['setylim'] = False # Optionally set the label for the first max_sims sims if labels is None and color_by_sim is True and s < max_sims: merged_labels = sim.label elif final_plot: merged_labels = labels else: merged_labels = '' # Actually plot merged_plot_args = sc.mergedicts({'alpha': alphas[s]}, plot_args) # Need a new variable to avoid overwriting fig = sim.plot(fig=fig, to_plot=to_plot, colors=colors[s], labels=merged_labels, plot_args=merged_plot_args, show_args=merged_show_args, **kwargs) return fig
[docs] def plot_result(self, key, colors=None, labels=None, *args, **kwargs): ''' Convenience method for plotting -- arguments passed to Sim.plot_result() ''' if self.which in ['combined', 'reduced']: fig = self.base_sim.plot_result(key, *args, **kwargs) else: fig = None if colors is None: colors = sc.gridcolors(len(self)) if labels is None: labels = [sim.label for sim in self.sims] orig_setylim = kwargs.get('setylim', True) for s, sim in enumerate(self.sims): if s == len(self.sims) - 1: kwargs['setylim'] = orig_setylim else: kwargs['setylim'] = False fig = sim.plot_result(key=key, fig=fig, color=colors[s], label=labels[s], *args, **kwargs) return fig
[docs] def plot_compare(self, t=-1, sim_inds=None, log_scale=True, **kwargs): ''' Plot a comparison between sims, using bars to show different values for each result. Args: t (int) : index of results, passed to compare() sim_inds (list) : which sims to include, passed to compare() log_scale (bool) : whether to plot with a logarithmic x-axis kwargs (dict) : standard plotting arguments, see Sim.plot() for explanation Returns: fig (figure): the figure handle ''' df =, sim_inds=sim_inds, output=True) psplt.plot_compare(df, log_scale=log_scale, **kwargs)
[docs] def save(self, filename=None, keep_people=False, **kwargs): ''' Save to disk as a gzipped pickle. Load with ps.load(filename) or ps.MultiSim.load(filename). Args: filename (str) : the name or path of the file to save to; if None, uses default keep_people (bool) : whether or not to store the population in the Sim objects (NB, very large) kwargs (dict) : passed to makefilepath() Returns: scenfile (str): the validated absolute path to the saved file **Example**:: # Saves to an .msim file ''' if filename is None: filename = 'poliosim.msim' msimfile = sc.makefilepath(filename=filename, **kwargs) self.filename = filename # Store the actual saved filename # Store sims seperately sims = self.sims self.sims = None # Remove for now obj = sc.dcp(self) # This should be quick once we've removed the sims if keep_people: obj.sims = sims # Just restore the object in full print('Note: saving people, which may produce a large file!') else: obj.base_sim.shrink(in_place=True) obj.sims = [] for sim in sims: obj.sims.append(sim.shrink(in_place=False)), obj=obj) # Actually save self.sims = sims # Restore return msimfile
[docs] @staticmethod def load(msimfile, *args, **kwargs): ''' Load from disk from a gzipped pickle. Args: msimfile (str): the name or path of the file to load from kwargs: passed to ps.load() Returns: msim (MultiSim): the loaded MultiSim object **Example**:: msim = ps.MultiSim.load('my-multisim.msim') ''' msim = psu.load(msimfile, *args, **kwargs) if not isinstance(msim, MultiSim): errormsg = f'Cannot load object of {type(msim)} as a MultiSim object' raise TypeError(errormsg) return msim
[docs] @staticmethod def merge(*args, base=False): ''' Convenience method for merging two MultiSim objects. Args: args (MultiSim): the MultiSims to merge (either a list, or separate) base (bool): if True, make a new list of sims from the multisim's two base sims; otherwise, merge the multisim's lists of sims Returns: msim (MultiSim): a new MultiSim object **Examples**: mm1 = ps.MultiSim.merge(msim1, msim2, base=True) mm2 = ps.MultiSim.merge([m1, m2, m3, m4], base=False) ''' # Handle arguments if len(args) == 1 and isinstance(args[0], list): args = args[0] # A single list of MultiSims has been provided # Create the multisim from the base sim of the first argument msim = MultiSim(base_sim=sc.dcp(args[0].base_sim)) msim.sims = [] msim.chunks = [] # This is used to enable automatic splitting later # Handle different options for combining if base: # Only keep the base sims for i, ms in enumerate(args): msim.sims.append(sc.dcp(ms.base_sim)) msim.chunks.append([[i]]) else: # Keep all the sims for ms in args: len_before = len(msim.sims) msim.sims += sc.dcp(ms.sims) len_after = len(msim.sims) msim.chunks.append(list(range(len_before, len_after))) return msim
[docs] def split(self, inds=None, chunks=None): ''' Convenience method for splitting one MultiSim into several. You can specify either individual indices of simulations to extract, via inds, or consecutive chunks of indices, via chunks. If this function is called on a merged MultiSim, the chunks can be retrieved automatically and no arguments are necessary. Args: inds (list): a list of lists of indices, with each list turned into a MultiSim chunks (int or list): if an int, split the MultiSim into chunks of that length; if a list return chunks of that many sims Returns: A list of MultiSim objects **Examples**:: m1 = ps.MultiSim(ps.Sim(label='sim1'), initialize=True) m2 = ps.MultiSim(ps.Sim(label='sim2'), initialize=True) m3 = ps.MultiSim.merge(m1, m2) m1b, m2b = m3.split() msim = ps.MultiSim(ps.Sim(), n_runs=6) m1, m2 = msim.split(inds=[[0,2,4], [1,3,5]]) mlist1 = msim.split(chunks=[2,4]) # Equivalent to inds=[[0,1], [2,3,4,5]] mlist2 = msim.split(chunks=3) # Equivalent to inds=[[0,1,2], [3,4,5]] ''' # Process indices and chunks if inds is None: # Indices not supplied if chunks is None: # Chunks not supplied if hasattr(self, 'chunks'): # Created from a merged MultiSim inds = self.chunks else: # No indices or chunks and not created from a merge errormsg = 'If a MultiSim has not been created via merge(), you must supply either inds or chunks to split it' raise ValueError(errormsg) else: # Chunks supplied, but not inds inds = [] # Initialize sim_inds = np.arange(len(self)) # Indices for the simulations if sc.isiterable(chunks): # e.g. chunks = [2,4] chunk_inds = np.cumsum(chunks)[:-1] inds = np.split(sim_inds, chunk_inds) else: # e.g. chunks = 3 inds = np.split(sim_inds, chunks) # This will fail if the length is wrong # Do the conversion mlist = [] for indlist in inds: sims = sc.dcp([self.sims[i] for i in indlist]) msim = MultiSim(sims=sims) mlist.append(msim) return mlist
[docs] def summarize(self, output=False): ''' Print a brief summary of the MultiSim ''' string = 'MultiSim summary:\n' string += f' Number of sims: {len(self.sims)}\n' string += f' Reduced/combined: {self.which}\n' string += f' Base: {self.base_sim.brief(output=True)}\n' string += ' Sims:\n' for s, sim in enumerate(self.sims): string += f' {s}: {sim.brief(output=True)}\n' if not output: print(string) else: return string
[docs]def single_run(sim, ind=0, reseed=True, noise=0.0, noisepar=None, keep_people=False, run_args=None, sim_args=None, verbose=None, do_run=True, **kwargs): ''' Convenience function to perform a single simulation run. Mostly used for parallelization, but can also be used directly. Args: sim (Sim) : the sim instance to be run ind (int) : the index of this sim reseed (bool) : whether or not to generate a fresh seed for each run noise (float) : the amount of noise to add to each run noisepar (str) : the name of the parameter to add noise to keep_people (bool) : whether to keep the people after the sim run run_args (dict) : arguments passed to sim_args (dict) : extra parameters to pass to the sim, e.g. 'n_infected' verbose (int) : detail to print do_run (bool) : whether to actually run the sim (if not, just initialize it) kwargs (dict) : also passed to the sim Returns: sim (Sim): a single sim object with results **Example**:: import poliosim as ps sim = ps.Sim() # Create a default simulation sim = ps.single_run(sim) # Run it, equivalent(ish) to ''' # Set sim and run arguments sim_args = sc.mergedicts(sim_args, kwargs) run_args = sc.mergedicts({'verbose': verbose}, run_args) if verbose is None: verbose = sim['verbose'] if not sim.label: sim.label = f'Sim {ind:d}' if reseed: sim['rand_seed'] += ind # Reset the seed, otherwise no point of parallel runs sim.set_seed() # If the noise parameter is not found, guess what it should be if noisepar is None: noisepar = 'beta' if noisepar not in raise sc.KeyNotFoundError(f'Noise parameter {noisepar} was not found in sim parameters') # Handle noise -- normally distributed fractional error noiseval = noise * np.random.normal() if noiseval > 0: noisefactor = 1 + noiseval else: noisefactor = 1 / (1 - noiseval) sim[noisepar] *= noisefactor if verbose >= 1: verb = 'Running' if do_run else 'Creating' print(f'{verb} a simulation using seed={sim["rand_seed"]} and noise={noiseval}') # Handle additional arguments for key, val in sim_args.items(): print(f'Processing {key}:{val}') if key in if verbose >= 1: print(f'Setting key {key} from {sim[key]} to {val}') sim[key] = val else: raise sc.KeyNotFoundError(f'Could not set key {key}: not a valid parameter name') # Run if do_run:**run_args) # Shrink the sim to save memory if not keep_people: sim.shrink() return sim
[docs]def multi_run(sim, n_runs=4, reseed=True, noise=0.0, noisepar=None, iterpars=None, combine=False, keep_people=None, run_args=None, sim_args=None, par_args=None, do_run=True, parallel=True, n_cpus=None, verbose=None, **kwargs): ''' For running multiple runs in parallel. If the first argument is a list of sims, exactly these will be run and most other arguments will be ignored. Args: sim (Sim) : the sim instance to be run, or a list of sims. n_runs (int) : the number of parallel runs reseed (bool) : whether or not to generate a fresh seed for each run noise (float) : the amount of noise to add to each run noisepar (str) : the name of the parameter to add noise to iterpars (dict) : any other parameters to iterate over the runs; see sc.parallelize() for syntax combine (bool) : whether or not to combine all results into one sim, rather than return multiple sim objects keep_people (bool) : whether to keep the people after the sim run (default false) run_args (dict) : arguments passed to sim_args (dict) : extra parameters to pass to the sim par_args (dict) : arguments passed to sc.parallelize() do_run (bool) : whether to actually run the sim (if not, just initialize it) parallel (bool) : whether to run in parallel using multiprocessing (else, just run in a loop) n_cpus (int) : the number of CPUs to run on (if blank, set automatically; otherwise, passed to par_args) verbose (int) : detail to print kwargs (dict) : also passed to the sim Returns: If combine is True, a single sim object with the combined results from each sim. Otherwise, a list of sim objects (default). **Example**:: import poliosim as ps sim = ps.Sim() sims = ps.multi_run(sim, n_runs=6, noise=0.2) ''' # Handle inputs sim_args = sc.mergedicts(sim_args, kwargs) # Handle blank par_args = sc.mergedicts({'ncpus':n_cpus}, par_args) # Handle blank # Handle iterpars if iterpars is None: iterpars = {} else: n_runs = None # Reset and get from length of dict instead for key,val in iterpars.items(): new_n = len(val) if n_runs is not None and new_n != n_runs: raise ValueError(f'Each entry in iterpars must have the same length, not {n_runs} and {len(val)}') else: n_runs = new_n # Run the sims if isinstance(sim, psm.Sim): # Normal case: one sim iterkwargs = {'ind': np.arange(n_runs)} iterkwargs.update(iterpars) kwargs = dict(sim=sim, reseed=reseed, noise=noise, noisepar=noisepar, verbose=verbose, keep_people=keep_people, sim_args=sim_args, run_args=run_args, do_run=do_run) elif isinstance(sim, list): # List of sims iterkwargs = {'sim':sim} kwargs = dict(verbose=verbose, keep_people=keep_people, sim_args=sim_args, run_args=run_args, do_run=do_run) else: errormsg = f'Must be Sim object or list, not {type(sim)}' raise TypeError(errormsg) # Actually run! if parallel: try: sims = sc.parallelize(single_run, iterkwargs=iterkwargs, kwargs=kwargs, **par_args) # Run in parallel except RuntimeError as E: # Handle if run outside of __main__ on Windows if 'freeze_support' in E.args[0]: # For this error, add additional information errormsg = ''' Uh oh! It appears you are trying to run with multiprocessing on Windows outside of the __main__ block; please see for more information. The correct syntax to use is e.g. import poliosim as ps sim = ps.Sim() msim = ps.MultiSim(sim) if __name__ == '__main__': Alternatively, to run without multiprocessing, set parallel=False. ''' raise RuntimeError(errormsg) from E else: # For all other runtime errors, raise the original exception raise E else: # Run in serial, not in parallel sims = [] n_sims = len(list(iterkwargs.values())[0]) # Must have length >=1 and all entries must be the same length for s in range(n_sims): this_iter = {k:v[s] for k,v in iterkwargs.items()} # Pull out items specific to this iteration this_iter.update(kwargs) # Merge with the kwargs this_iter['sim'] = this_iter['sim'].copy() # Ensure we have a fresh sim; this happens implicitly on pickling with multiprocessing sim = single_run(**this_iter) # Run in series sims.append(sim) return sims
# %% Run functions -- # TODO: refactor all of these def process_pars(config, kwargs, verbose=True): ''' Handle sim pars and run pars ''' # Merge arguments -- # TODO: refactor custom = sc.mergedicts(config, kwargs) if 'seed' in custom: # Rename seed to rand_seed custom['rand_seed'] = custom.pop('seed') # Ensure the same seed is being used if custom.get('verbose', verbose) >= 1: print('Configuration:') sc.pp(custom) # Split into the two dicts of pars -- run pars and sim pars pars = sc.objdict(pspar.make_pars()) # Create run parameters matches = [k for k in custom.keys() if k in pars] # Find matching parameters for k in matches: pars[k] = custom.pop(k) # Move custom keys into run parameters, leaving sim parameters -- run parameters takes precedence for duplicates assert len(custom) == 0, f'The following parameter keys are invalid: {custom.keys()}' pars = sc.objdict(pars) return pars
[docs]def run_sims(config=None, keep_people=False, **kwargs): ''' Main API for running sims ''' sc.heading('Running sims...') pars = process_pars(config, kwargs) age_mix_matrix = None if pars.age_mix_matrix is not None: age_mix_matrix = np.fromstring(pars.age_mix_matrix, sep=',') N = len(pars.age_mix_breakpoints) + 1 age_mix_matrix = np.reshape(age_mix_matrix, newshape=[N, N]) print(f'age_mix_matrix: {age_mix_matrix}') age_mix_breakpoints = None age_mix_selectors = [] if age_mix_breakpoints is not None: age_mix_breakpoints = np.fromstring(age_mix_breakpoints, sep=',') print(f'age_mix_breakpoints: {age_mix_breakpoints}') last_age_mix_breakpoint = 0.0 k = 0 for age_mix_breakpoint in age_mix_breakpoints: # Evaling this because msim pickling framework doesnt' like it if we have multiple with # the same name, so need to generate a unique name for each. s = f"""def this_selector_{k}(ages, lower={last_age_mix_breakpoint}, upper={age_mix_breakpoint}): return np.nonzero((ages >= lower) & (ages < upper))[0] age_mix_selectors.append(this_selector_{k}) """ exec(s) # CK: this needs to be refactored ASAP k = k + 1 last_age_mix_breakpoint = age_mix_breakpoint def last_selector(ages, lower=last_age_mix_breakpoint): return np.nonzero(ages >= lower)[0] age_mix_selectors.append(last_selector) enable_layers = 'h,s,w,c' if enable_layers is not None: enable_layers = enable_layers.split(',') print(f'Enabling layers {enable_layers}') # print(f'Running scenarios with {args}...') sc.timedsleep(1, verbose=False) sc.tic() for rep in range(pars.reps): this_batch_uuid_hex = sc.uuid().hex print(f'Batch uuid is {this_batch_uuid_hex}') sims = [] start = pars.pickup + rep * pars.n_sims stop = pars.pickup + pars.n_sims * (1 + rep) for seed in range(start, stop): this_seed_uuid_hex = sc.uuid().hex print(f'Seed {seed} uuid: {this_seed_uuid_hex}') cs_pars = dict( seed=seed, n_days=pars.n_days, uuid=this_seed_uuid_hex, mix_matrix=age_mix_matrix, selectors=age_mix_selectors, ) cs_pars.update(pars) # Include custom parameters sim = create_sim(**cs_pars) # Add analyzers sim['analyzers'].append(psa.calculate_contacts_infected()) if pars.save_infection_report > 0: infection_report_age_breakpoints = pars.infection_report_age_breakpoints.split(",") infection_report_age_breakpoints = [int(breakpoint) for breakpoint in infection_report_age_breakpoints] sim['analyzers'].append(psa.infection_report(age_breakpoints=infection_report_age_breakpoints)) sim['user_measurements']['age_mixed_community_layer_enabled'] = pars.age_mixed_community_layer if pars.age_mixed_community_layer > 0: age_mix_breakpoints = np.fromstring(age_mix_breakpoints, sep=',') sim['user_measurements']['age_mix_breakpoints'] = age_mix_breakpoints sim['user_measurements']['age_mix_matrix'] = age_mix_matrix sim.age_paralyzed = sim.people.age[sim.people.symptomatic] sim.update_pars(dict(vx_coverage=pars.vx_coverage, scale_beta=pars.scale_beta)) layers_changed = False for layer in sim.people.contacts.keys(): if layer not in enable_layers: sim.people.contacts.pop(layer) layers_changed = True if layers_changed: raise Exception('Layers changed, and do not match {enable_layers}') sim.init_analyzers() # Ensure they're initialized sims.append(sim) msim = MultiSim(sims, label=pars.label) par_args = None#dict(ncpus=pars.ncpus) if pars.ncpus > 0 else None # TODO: refactor using direct ncpus argument, par_args=par_args) for sim in msim.sims: sim.age_paralyzed = sim.people.age[sim.people.symptomatic] if not keep_people: sim.shrink() return msim
[docs]def create_sim(config=None, **kwargs): ''' Function for creating a sim -- should be merged with run_sims ''' # Handle the parameters pars = process_pars(config, kwargs) # Run pars, sim pars # Initialize sim if pars.label is None: label = f'quar_age={pars.quar_age}, quar_period={pars.quar_period}, TTQ={pars.trace_prob}, test_delay={pars.test_delay}, seed={pars.rand_seed}' else: label = pars.label if 'label' not in pars: pars['label'] = label sim = psm.Sim(**pars) # Setup sim sim.initialize() people = sim.people # Get the actual people verbose = sim['verbose'] if pars.immunity_mode == 1: psm.initialize_immunity(people, sia_coverage=pars.vx_coverage, ri_coverage=pars.ri_coverage, ri_ages_years=pars.ri_ages_years) else: people.current_immunity.fill(1) # arrays of flags indicating people under / over 5 u5 = people.age < 5.0 o5 = [not i for i in u5] # Check on and store pop initialization & immunity -- # TODO: remove hard-coding initialization_measurements = dict() if verbose >= 1: sc.heading('Population immunity & initialization checks') naive_o5 = np.logical_and(people.age >= 5, people.current_immunity < 8.0) initialization_measurements['Number of >5'] = sum(o5) initialization_measurements['Number of >5 naive (Nab < 8)'] = sum(naive_o5) initialization_measurements['Proportion of >5 naive (Nab < 8)'] = initialization_measurements['Number of >5 naive (Nab < 8)'] / initialization_measurements['Number of >5'] naive_u5 = np.logical_and(people.age < 5, people.current_immunity < 8.0) titer_under_536_u5 = np.logical_and(people.age < 5, people.current_immunity < 536.0) initialization_measurements['Number of u5'] = sum(u5) initialization_measurements['Number of u5 naive (Nab < 8)'] = sum(naive_u5) initialization_measurements['Number of u5 with (Nab < 536)'] = sum(titer_under_536_u5) initialization_measurements['Proportion of u5 naive (Nab < 8)'] = initialization_measurements['Number of u5 naive (Nab < 8)'] / initialization_measurements['Number of u5'] initialization_measurements['Proportion of u5 with (Nab < 536)'] = initialization_measurements['Number of u5 with (Nab < 536)'] / initialization_measurements['Number of u5'] p01 = np.logical_and(people.age > 0, people.age < 1) p12 = np.logical_and(people.age >= 1, people.age < 2) p23 = np.logical_and(people.age >= 2, people.age < 3) p34 = np.logical_and(people.age >= 3, people.age < 4) p45 = np.logical_and(people.age >= 4, people.age < 5) p5_15 = np.logical_and(people.age >= 5, people.age < 15) po15 = np.logical_and(people.age >= 15, people.age < 10000) initialization_measurements['Proportion of 0-1yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, p01)) / sum(p01) initialization_measurements['Proportion of 1-2yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, p12)) / sum(p12) initialization_measurements['Proportion of 2-3yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, p23)) / sum(p23) initialization_measurements['Proportion of 3-4yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, p34)) / sum(p34) initialization_measurements['Proportion of 4-5yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, p45)) / sum(p45) initialization_measurements['Proportion of 5-15yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, p5_15)) / sum(p5_15) initialization_measurements['Proportion of >15yo naive (Nab < 8)'] = sum(np.logical_and(people.current_immunity < 8.0, po15)) / sum(po15) # Seed infections in naive u5 inds_naive = sc.findinds(naive_u5) # Store indices for naives inds_naive_to_infect = np.random.choice(inds_naive, pars['pop_infected'], replace=False) people.seed_infections(target_inds=inds_naive_to_infect) initialization_measurements['num seed infections'] = len(inds_naive_to_infect) # Double check infections & shedding are correct inds_exp = sc.findinds( shed_duration = [people.shed_duration[i] for i in inds_exp] initialization_measurements['Mean shedding duration for seeded infections'] = np.mean(shed_duration) # 47.9 days initialization_measurements['N people exposed'] = sum( # 100 exposed initialization_measurements['N people shedding'] = sum([people.is_shed[i] for i in range(len(people))]) # 100 shedding # We're gonna look at each community layer contact in each direction all_contacts_1 = np.append(people.contacts['c']['p1'], people.contacts['c']['p2']) all_contacts_2 = np.append(people.contacts['c']['p2'], people.contacts['c']['p1']) ages_1 = people.age[all_contacts_1] ages_2 = people.age[all_contacts_2] age_mix_breakpoints = [] if pars.age_mixed_community_layer > 0: age_mix_breakpoints = np.fromstring(age_mix_breakpoints, sep=',') # Bound ages of last bucket in for loops below age_mix_breakpoints = np.append(age_mix_breakpoints, 1000) else: age_mix_breakpoints = np.asarray([2.0, 5.0, 1000]) N = len(age_mix_breakpoints) age_mix_counts = np.zeros([N, N]) age_mix_frac = np.zeros([N, N]) age_mix_means = np.zeros([N, N]) # TODO: does this do anything? age_counts = np.zeros(N) m = 0 last_age_mix_breakpoint_source = 0 for age_mix_breakpoint_source in age_mix_breakpoints: age_counts[m] = len( np.nonzero((people.age >= last_age_mix_breakpoint_source) & (people.age < age_mix_breakpoint_source))[0]) last_age_mix_breakpoint_target = 0 n = 0 for age_mix_breakpoint_target in age_mix_breakpoints: contact_inds_m_n = np.nonzero( (ages_1 >= last_age_mix_breakpoint_source) & (ages_1 < age_mix_breakpoint_source) & (ages_2 >= last_age_mix_breakpoint_target) & (ages_2 < age_mix_breakpoint_target) )[0] num_m_n = len(contact_inds_m_n) age_mix_counts[m][n] = num_m_n num_sources = len(np.nonzero( (ages_1 >= last_age_mix_breakpoint_source) & (ages_1 < age_mix_breakpoint_source) )[0]) age_mix_frac[m][n] = np.divide(num_m_n, num_sources) indices, counts = np.unique(all_contacts_1[contact_inds_m_n], return_counts=True) age_mix_means[m][n] = np.mean(counts) last_age_mix_breakpoint_target = age_mix_breakpoint_target n = n + 1 last_age_mix_breakpoint_source = age_mix_breakpoint_source m = m + 1 age_group_sums = np.zeros(N) for m in np.arange(0, N): age_group_sums[m] = np.sum(age_mix_means[m]) measure_contact_counts = dict() measure_contact_counts['age_mix_contact_counts'] = age_mix_counts measure_contact_counts['age_mix_contact_frac'] = age_mix_frac measure_contact_counts['age_mix_contact_means'] = age_mix_means measure_contact_counts['age_group_sums'] = age_group_sums measure_contact_counts['num_edges'] = len(people.contacts['c']['p1']) indices, counts = np.unique(all_contacts_1, return_counts=True) measure_contact_counts['mean_counts'] = np.mean(counts) measure_contact_counts['age_counts'] = age_counts initialization_measurements['measure_contacts'] = measure_contact_counts sim['user_measurements'] = initialization_measurements if verbose >= 1: print('Sim user measurements:') sc.pp(sim['user_measurements']) # Setup interventions - Test 100% of symptomatics and 0% of asymptomatics # Test all individuals in quarantine # Trace out from index case only if trace_prob > 0 sim['interventions'] += [ psi.test_prob(symp_prob=1.00, asymp_prob=0.0, test_delay=pars.test_delay, symp_quar_prob=1.0, asymp_quar_prob=1.0), psi.contact_tracing(trace_probs=pars.trace_prob, trace_time=0), ] if pars.symptomatic_triggered_surveillance_mu > 0.0: symptomatic_triggered_surveillance_intervention = psi.symptomatic_triggered_surveillance( \ num_sample_mu=pars.symptomatic_triggered_surveillance_mu, num_sample_sigma=pars.symptomatic_triggered_surveillance_sigma, test_sensitivity=pars.symptomatic_triggered_surveillance_test_sensitivity, test_delay=pars.symptomatic_triggered_surveillance_test_delay, ) sim['interventions'].append(symptomatic_triggered_surveillance_intervention) # Ensure they're initialized sim.init_interventions() return sim
[docs]def run_configs(configs, do_run=True, do_save=None, save_summary=False, **kwargs): ''' Helper function to run a list of dictionaries with configuration settings. **Example**:: TBC ''' if isinstance(configs, dict): config_pars = list(configs.values()) config_labels = list(configs.keys()) else: config_pars = sc.tolist(configs) config_labels = [str(p) for p in config_pars] # Handle labels if supplied for c,config in enumerate(configs): if 'label' in config: config_labels[c] = config.pop('label') # Pull out label sims = [] for label, pars in zip(config_labels, config_pars): sim = create_sim(pars, label=label) sim.configpars = sc.objdict(label=label, **pars) sims.append(sim) msim = MultiSim(sims, **kwargs) if not do_run: return msim else: # Compute summary reslist = [] for sim in msim.sims: resdict = sc.mergedicts(sim.summary, sim.configpars, copy=True) reslist.append(resdict) df = pd.DataFrame(reslist) msim.summary = df if do_save: if not isinstance(do_save, str): do_save = 'poliosim_batch_run.msim' if save_summary: if not isinstance(save_summary, str): save_summary = 'poliosim_batch_run.csv' msim.summary.to_csv(save_summary) return msim