'''
Functions and classes for running multiple HPVsim runs.
'''
#%% Imports
import numpy as np
import pandas as pd
import sciris as sc
from collections import defaultdict
from . import defaults as hpd
from . import misc as hpm
from . import base as hpb
from . import sim as hps
from . import utils as hpu
from . import plotting as hppl
from . import interventions as hpi
from .settings import options as hpo
# Specify all externally visible functions this file defines
__all__ = ['make_metapars', 'MultiSim', 'Scenarios', 'Sweep', 'single_run', 'multi_run', 'parallel']
[docs]
class MultiSim(hpb.FlexPretty):
'''
Class for running multiple copies of a simulation. The parameter n_runs
controls how many copies of the simulation there will be, if a list of sims
is not provided. This is the main class that's used to run multiple versions
of a simulation (e.g., with different random seeds).
Args:
sims (Sim/list) : a single sim or a list of sims
base_sim (Sim) : the sim used for shared properties; if not supplied, the first of the sims provided
label (str) : the name of the multisim
initialize (bool) : whether or not to initialize the sims (otherwise, initialize them during run)
kwargs (dict) : stored in run_args and passed to run()
Returns:
msim: a MultiSim object
**Examples**::
sim = hpv.Sim() # Create the sim
msim = hpv.MultiSim(sim, n_runs=5) # Create the multisim
msim.run() # Run them in parallel
msim.combine() # Combine into one sim
msim.plot() # Plot results
sim = hpv.Sim() # Create the sim
msim = hpv.MultiSim(sim, n_runs=11, noise=0.1, keep_people=True) # Set up a multisim with noise
msim.run() # Run
msim.reduce() # Compute statistics
msim.plot() # Plot
sims = [hpv.Sim(beta=0.015*(1+0.02*i)) for i in range(5)] # Create sims
for sim in sims: sim.run() # Run sims in serial
msim = hpv.MultiSim(sims) # Convert to multisim
msim.plot() # Plot as single sim
'''
def __init__(self, sims=None, base_sim=None, label=None, initialize=False, **kwargs):
# Handle inputs
if base_sim is None:
if isinstance(sims, hps.Sim):
base_sim = sims
sims = None
elif isinstance(sims, list):
base_sim = sims[0]
else:
errormsg = f'If base_sim is not supplied, sims must be either a single sim (treated as base_sim) or a list of sims, not {type(sims)}'
raise TypeError(errormsg)
# Set properties
self.sims = sims
self.base_sim = base_sim
self.label = base_sim.label if (label is None and base_sim is not None) else label
self.run_args = sc.mergedicts(kwargs)
self.results = None
self.which = None # Whether the multisim is to be reduced, combined, etc.
hpb.set_metadata(self) # Set version, date, and git info
# Optionally initialize
if initialize:
self.init_sims()
return
def __len__(self):
try:
return len(self.sims)
except:
return 0
[docs]
def result_keys(self):
''' Attempt to retrieve the results keys from the base sim '''
try:
keys = self.base_sim.result_keys()
except Exception as E:
errormsg = f'Could not retrieve result keys since base sim not accessible: {str(E)}'
raise ValueError(errormsg)
return keys
[docs]
def init_sims(self, **kwargs):
'''
Initialize the sims, but don't actually run them. Syntax is the same
as MultiSim.run(). Note: in most cases you can just call run() directly,
there is no need to call this separately.
Args:
kwargs (dict): passed to multi_run()
'''
# Handle which sims to use
if self.sims is None:
sims = self.base_sim
else:
sims = self.sims
# Initialize the sims but don't run them
kwargs = sc.mergedicts(self.run_args, kwargs, {'do_run':False}) # Never run, that's the point!
self.sims = multi_run(sims, **kwargs)
return
[docs]
def run(self, reduce=False, combine=False, **kwargs):
'''
Run the actual sims
Args:
reduce (bool): whether or not to reduce after running (see reduce())
combine (bool): whether or not to combine after running (see combine(), not compatible with reduce)
kwargs (dict): passed to multi_run(); use run_args to pass arguments to sim.run()
Returns:
None (modifies MultiSim object in place)
**Examples**::
msim.run()
msim.run(run_args=dict(until='2020-0601', restore_pars=False))
'''
# Handle which sims to use -- same as init_sims()
if self.sims is None:
sims = self.base_sim
else:
sims = self.sims
# Handle missing labels
for s,sim in enumerate(sims):
if sim.label is None:
sim.label = f'Sim {s}'
# Run
kwargs = sc.mergedicts(self.run_args, kwargs)
self.sims = multi_run(sims, **kwargs)
# Reduce or combine
if reduce:
self.reduce()
elif combine:
self.combine()
return self
def _has_orig_sim(self):
''' Helper method for determining if an original base sim is present '''
return hasattr(self, 'orig_base_sim')
def _rm_orig_sim(self, reset=False):
''' Helper method for removing the original base sim, if present '''
if self._has_orig_sim():
if reset:
self.base_sim = self.orig_base_sim
delattr(self, 'orig_base_sim')
return
[docs]
def shrink(self, **kwargs):
'''
Not to be confused with reduce(), this shrinks each sim in the msim;
see sim.shrink() for more information.
Args:
kwargs (dict): passed to sim.shrink() for each sim
'''
self.base_sim.shrink(**kwargs)
self._rm_orig_sim()
for sim in self.sims:
sim.shrink(**kwargs)
return
[docs]
def reset(self):
''' Undo a combine() or reduce() by resetting the base sim, which, and results '''
self._rm_orig_sim(reset=True)
self.which = None
self.results = None
return
[docs]
def reduce(self, quantiles=None, use_mean=False, bounds=None, output=False):
'''
Combine multiple sims into a single sim statistically: by default, use
the median value and the 10th and 90th percentiles for the lower and upper
bounds. If use_mean=True, then use the mean and ±2 standard deviations
for lower and upper bounds.
Args:
quantiles (dict): the quantiles to use, e.g. [0.1, 0.9] or {'low : '0.1, 'high' : 0.9}
use_mean (bool): whether to use the mean instead of the median
bounds (float): if use_mean=True, the multiplier on the standard deviation for upper and lower bounds (default 2)
output (bool): whether to return the "reduced" sim (in any case, modify the multisim in-place)
**Example**::
msim = hpv.MultiSim(hpv.Sim())
msim.run()
msim.reduce()
msim.summarize()
'''
if use_mean:
if bounds is None:
bounds = 2
else:
if quantiles is None:
quantiles = make_metapars()['quantiles']
if not isinstance(quantiles, dict):
try:
quantiles = {'low':float(quantiles[0]), 'high':float(quantiles[1])}
except Exception as E:
errormsg = f'Could not figure out how to convert {quantiles} into a quantiles object: must be a dict with keys low, high or a 2-element array ({str(E)})'
raise ValueError(errormsg)
# Store information on the sims
n_runs = len(self)
reduced_sim = sc.dcp(self.sims[0])
reduced_sim.metadata = dict(parallelized=True, combined=False, n_runs=n_runs, quantiles=quantiles, use_mean=use_mean, bounds=bounds) # Store how this was parallelized
# Perform the statistics
raw = {}
totalkeys = reduced_sim.result_keys('total')
genotypekeys = reduced_sim.result_keys('genotype')
sexkeys = reduced_sim.result_keys('sex')
agekeys = reduced_sim.result_keys('age')
type_dist_keys = reduced_sim.result_keys('type_dist')
n_age_bins = len(reduced_sim['age_bin_edges'])-1
for reskey in totalkeys:
raw[reskey] = np.zeros((reduced_sim.res_npts, len(self.sims)))
for s,sim in enumerate(self.sims):
vals = sim.results[reskey].values
raw[reskey][:, s] = vals
for reskey in genotypekeys:
raw[reskey] = np.zeros((reduced_sim['n_genotypes'], reduced_sim.res_npts, len(self.sims)))
for s,sim in enumerate(self.sims):
vals = sim.results[reskey].values
raw[reskey][:, :, s] = vals
for reskey in sexkeys:
raw[reskey] = np.zeros((2, reduced_sim.res_npts, len(self.sims)))
for s,sim in enumerate(self.sims):
vals = sim.results[reskey].values
raw[reskey][:, :, s] = vals
for reskey in agekeys:
raw[reskey] = np.zeros((n_age_bins, reduced_sim.res_npts, len(self.sims)))
for s,sim in enumerate(self.sims):
vals = sim.results[reskey].values
raw[reskey][:, :, s] = vals
for reskey in type_dist_keys:
raw[reskey] = np.zeros((reduced_sim['n_genotypes'], reduced_sim.res_npts, len(self.sims)))
for s,sim in enumerate(self.sims):
vals = sim.results[reskey].values
raw[reskey][:, :, s] = vals
for reskey in totalkeys + genotypekeys + sexkeys + agekeys + type_dist_keys:
if reskey in totalkeys:
axis = 1
results = reduced_sim.results
else:
axis = 2
results = reduced_sim.results
if use_mean:
r_mean = np.mean(raw[reskey], axis=axis)
r_std = np.std(raw[reskey], axis=axis)
results[reskey].values[:] = r_mean
results[reskey].low = r_mean - bounds * r_std
results[reskey].high = r_mean + bounds * r_std
else:
results[reskey].values[:] = np.quantile(raw[reskey], q=0.5, axis=axis)
results[reskey].low = np.quantile(raw[reskey], q=quantiles['low'], axis=axis)
results[reskey].high = np.quantile(raw[reskey], q=quantiles['high'], axis=axis)
# Deal with analyzers
reduced_analyzers = []
for ai,analyzer in enumerate(reduced_sim['analyzers']): # Loop over each type of analyzer
analyzer_list = [sim['analyzers'][ai] for sim in self.sims]
reduced_analyzers += [analyzer.reduce(analyzer_list)]
# Compute and store final results
reduced_sim.compute_summary()
self.orig_base_sim = self.base_sim
self.base_sim = reduced_sim
self.results = reduced_sim.results
self.summary = reduced_sim.summary
self.which = 'reduced'
self.analyzers = reduced_analyzers
if output:
return self.base_sim
else:
return
[docs]
def mean(self, bounds=None, **kwargs):
'''
Alias for reduce(use_mean=True). See reduce() for full description.
Args:
bounds (float): multiplier on the standard deviation for the upper and lower bounds (default, 2)
kwargs (dict): passed to reduce()
'''
return self.reduce(use_mean=True, bounds=bounds, **kwargs)
[docs]
def combine(self, output=False):
'''
Combine multiple sims into a single sim with scaled results.
**Example**::
msim = hpv.MultiSim(hpv.Sim())
msim.run()
msim.combine()
msim.summarize()
'''
n_runs = len(self)
combined_sim = sc.dcp(self.sims[0])
combined_sim.parallelized = dict(parallelized=True, combined=True, n_runs=n_runs) # Store how this was parallelized
for s,sim in enumerate(self.sims[1:]): # Skip the first one
if combined_sim.people: # If the people are there, add them and increment the population size accordingly
combined_sim.people += sim.people
combined_sim['n_agents'] = combined_sim.people.pars['n_agents']
else: # If not, manually update population size
combined_sim['n_agents'] += sim['n_agents'] # Record the number of agents
for key in sim.result_keys():
vals = sim.results[key].values
if vals.shape[-1] != combined_sim.res_npts:
errormsg = f'Cannot combine sims with inconsistent numbers of timepoints: {combined_sim.res_npts} vs. {len(vals)}'
raise ValueError(errormsg)
combined_sim.results[key].values += vals
# For non-count results (scale=False), rescale them
for key in combined_sim.result_keys():
if not combined_sim.results[key].scale:
combined_sim.results[key].values /= n_runs
# Compute and store final results
combined_sim.compute_summary()
self.orig_base_sim = self.base_sim
self.base_sim = combined_sim
self.results = combined_sim.results
self.summary = combined_sim.summary
self.which = 'combined'
if output:
return self.base_sim
else:
return
[docs]
def compare(self, t=None, sim_inds=None, output=False, do_plot=False, show_match=False, **kwargs):
'''
Create a dataframe compare sims at a single point in time.
Args:
t (int/str) : the day (or date) to do the comparison; default, the end
sim_inds (list) : list of integers of which sims to include (default: all)
output (bool) : whether or not to return the comparison as a dataframe
do_plot (bool) : whether or not to plot the comparison (see also plot_compare())
show_match (bool) : whether to include a column for whether all sims match
kwargs (dict) : passed to plot_compare()
Returns:
df (dataframe): a dataframe comparison
'''
# Handle time
if t is None:
t = -1
daystr = 'the last day'
else:
daystr = f'day {t}'
# Handle the indices
if sim_inds is None:
sim_inds = list(range(len(self.sims)))
# Move the results to a dictionary
resdict = defaultdict(dict)
for i,s in enumerate(sim_inds):
sim = self.sims[s]
label = sim.label
if not label: # Give it a label if it doesn't have one
label = f'Sim {i}'
if label in resdict: # Avoid duplicates
label += f' ({i})'
for reskey in sim.result_keys():
res = sim.results[reskey]
if res.values.ndim == 1:
val = res.values[t]
elif res.values.ndim == 2:
val = res.values[:,t].sum()
if res.scale: # Results that are scaled by population are ints
val = int(val)
resdict[label][reskey] = val
if do_plot:
self.plot_compare(**kwargs)
df = sc.dataframe(pd.DataFrame.from_dict(resdict).astype(object)) # astype is necessary to prevent type coercion
if show_match: # From https://stackoverflow.com/questions/22701799/pandas-dataframe-find-rows-where-all-columns-equal
data = df.values
df['all_match'] = (data == data[:, [0]]).all(axis=1)
if not output:
print(f'Results for {daystr} in each sim:')
print(df)
else:
return df
[docs]
def plot(self, to_plot=None, inds=None, plot_sims=False, color_by_sim=None, max_sims=5, colors=None, labels=None, alpha_range=None, plot_args=None, show_args=None, **kwargs):
'''
Plot all the sims -- arguments passed to Sim.plot(). The
behavior depends on whether or not combine() or reduce() has been called.
If so, this function by default plots only the combined/reduced sim (which
you can override with plot_sims=True). Otherwise, it plots a separate line
for each sim.
Note that this function is complex because it aims to capture the flexibility
of both sim.plot() and scens.plot(). By default, if combine() or reduce()
has been used, it will resemble sim.plot(); otherwise, it will resemble
scens.plot(). This can be changed via color_by_sim, together with the
other options.
Args:
to_plot (list) : list or dict of which results to plot; see hpv.get_default_plots() for structure
inds (list) : if not combined or reduced, the indices of the simulations to plot (if None, plot all)
plot_sims (bool) : whether to plot individual sims, even if combine() or reduce() has been used
color_by_sim (bool) : if True, set colors based on the simulation type; otherwise, color by result type; True implies a scenario-style plotting, False implies sim-style plotting
max_sims (int) : maximum number of sims to use with color-by-sim; can be overridden by other options
colors (list) : if supplied, override default colors for color_by_sim
labels (list) : if supplied, override default labels for color_by_sim
alpha_range (list) : a 2-element list/tuple/array providing the range of alpha values to use to distinguish the lines
plot_args (dict) : passed to sim.plot()
show_args (dict) : passed to sim.plot()
kwargs (dict) : passed to sim.plot()
Returns:
fig: Figure handle
**Examples**::
sim = hpv.Sim()
msim = hpv.MultiSim(sim)
msim.run()
msim.plot() # Plots individual sims
msim.reduce()
msim.plot() # Plots the combined sim
'''
# Plot a single curve, possibly with a range
if not plot_sims and self.which in ['combined', 'reduced']:
fig = self.base_sim.plot(to_plot=to_plot, colors=colors, **kwargs)
# PLot individual sims on top of each other
else:
# Initialize
fig = kwargs.pop('fig', None)
orig_show = kwargs.get('do_show', None)
orig_close = hpo.close
orig_setylim = kwargs.get('setylim', True)
kwargs['legend_args'] = sc.mergedicts({'show_legend':True}, kwargs.get('legend_args')) # Only plot the legend the first time
# Handle indices
if inds is None:
inds = np.arange(len(self.sims))
n_sims = len(inds)
# Handle what style of plotting to use:
if color_by_sim is None:
if n_sims <= max_sims:
color_by_sim = True
else:
color_by_sim = False
# Handle what to plot
if to_plot is None:
kind = 'scens' if color_by_sim else 'sim'
to_plot = hpd.get_default_plots(kind=kind)
# Handle colors
if colors is None:
if color_by_sim:
colors = sc.gridcolors(ncolors=n_sims)
else:
colors = [None]*n_sims # So we can iterate over it
else:
colors = [colors]*n_sims # Again, for iteration
# Handle alpha if not using colors
if alpha_range is None:
if color_by_sim:
alpha_range = [0.8, 0.8] # We're using color to distinguish sims, so don't need alpha
else:
alpha_range = [0.8, 0.3] # We're using alpha to distinguish sims
alphas = np.linspace(alpha_range[0], alpha_range[1], n_sims)
# Plot
for s,ind in enumerate(inds):
sim = self.sims[ind]
final_plot = (s == n_sims-1) # Check if this is the final plot
# Handle the legend and labels
if final_plot:
merged_show_args = show_args
kwargs['do_show'] = orig_show
kwargs['setylim'] = orig_setylim
hpo.set(close=orig_close) # Reset original closing settings
else:
merged_show_args = False # Only show things like data the last time it's plotting
kwargs['do_show'] = False # On top of that, don't show the plot at all unless it's the last time
kwargs['setylim'] = False # Don't set the y limits until we have all the data
hpo.set(close=False) # Do not close figures if we're in the middle of plotting
# Optionally set the label for the first max_sims sims
if color_by_sim is True and s<max_sims:
if labels is None:
merged_labels = sim.label
else:
merged_labels = labels[s]
elif final_plot and not color_by_sim:
merged_labels = labels
else:
merged_labels = ''
# Actually plot
merged_plot_args = sc.mergedicts({'alpha':alphas[s]}, plot_args) # Need a new variable to avoid overwriting
fig = sim.plot(fig=fig, to_plot=('scens', to_plot), colors=colors[s], labels=merged_labels, plot_args=merged_plot_args, show_args=merged_show_args, **kwargs)
return hppl.handle_show_return(fig=fig)
[docs]
def plot_result(self, key, colors=None, labels=None, *args, **kwargs):
''' Convenience method for plotting -- arguments passed to sim.plot_result() '''
if self.which in ['combined', 'reduced']:
fig = self.base_sim.plot_result(key, *args, **kwargs)
else:
fig = None
if colors is None:
colors = sc.gridcolors(len(self))
if labels is None:
labels = [sim.label for sim in self.sims]
orig_setylim = kwargs.get('setylim', True)
for s,sim in enumerate(self.sims):
if s == len(self.sims)-1:
kwargs['setylim'] = orig_setylim
else:
kwargs['setylim'] = False
fig = sim.plot_result(key=key, fig=fig, color=colors[s], label=labels[s], *args, **kwargs)
return hppl.handle_show_return(fig=fig)
[docs]
def plot_compare(self, t=-1, sim_inds=None, log_scale=True, **kwargs):
'''
Plot a comparison between sims, using bars to show different values for
each result. For an explanation of other available arguments, see Sim.plot().
Args:
t (int) : index of results, passed to compare()
sim_inds (list) : which sims to include, passed to compare()
log_scale (bool) : whether to plot with a logarithmic x-axis
kwargs (dict) : standard plotting arguments, see Sim.plot() for explanation
Returns:
fig: Figure handle
'''
df = self.compare(t=t, sim_inds=sim_inds, output=True)
return hppl.plot_compare(df, log_scale=log_scale, **kwargs)
[docs]
def save(self, filename=None, keep_people=False, **kwargs):
'''
Save to disk as a gzipped pickle. Load with hpv.load(filename) or
hpv.MultiSim.load(filename).
Args:
filename (str) : the name or path of the file to save to; if None, uses default
keep_people (bool) : whether or not to store the population in the Sim objects (NB, very large)
kwargs (dict) : passed to ``sc.makefilepath()``
Returns:
scenfile (str): the validated absolute path to the saved file
**Example**::
msim.save() # Saves to an .msim file
'''
if filename is None:
filename = 'hpvsim.msim'
msimfile = sc.makefilepath(filename=filename, **kwargs)
self.filename = filename # Store the actual saved filename
# Store sims separately
sims = self.sims
self.sims = None # Remove for now
obj = sc.dcp(self) # This should be quick once we've removed the sims
if keep_people:
obj.sims = sims # Just restore the object in full
print('Note: saving people, which may produce a large file!')
else:
obj.base_sim.shrink(in_place=True)
obj.sims = []
for sim in sims:
obj.sims.append(sim.shrink(in_place=False))
hpm.save(filename=msimfile, obj=obj) # Actually save
self.sims = sims # Restore
return msimfile
[docs]
@staticmethod
def load(msimfile, *args, **kwargs):
'''
Load from disk from a gzipped pickle.
Args:
msimfile (str): the name or path of the file to load from
kwargs: passed to hpv.load()
Returns:
msim (MultiSim): the loaded MultiSim object
**Example**::
msim = hpv.MultiSim.load('my-multisim.msim')
'''
msim = hpm.load(msimfile, *args, **kwargs)
if not isinstance(msim, MultiSim):
errormsg = f'Cannot load object of {type(msim)} as a MultiSim object'
raise TypeError(errormsg)
return msim
[docs]
@staticmethod
def merge(*args, base=False):
'''
Convenience method for merging two MultiSim objects.
Args:
args (MultiSim): the MultiSims to merge (either a list, or separate)
base (bool): if True, make a new list of sims from the multisim's two base sims; otherwise, merge the multisim's lists of sims
Returns:
msim (MultiSim): a new MultiSim object
**Examples**:
mm1 = hpv.MultiSim.merge(msim1, msim2, base=True)
mm2 = hpv.MultiSim.merge([m1, m2, m3, m4], base=False)
'''
# Handle arguments
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0] # A single list of MultiSims has been provided
# Create the multisim from the base sim of the first argument
msim = MultiSim(base_sim=sc.dcp(args[0].base_sim), label=args[0].label)
msim.sims = []
msim.chunks = [] # This is used to enable automatic splitting later
# Handle different options for combining
if base: # Only keep the base sims
for i,ms in enumerate(args):
sim = sc.dcp(ms.base_sim)
sim.label = ms.label
msim.sims.append(sim)
msim.chunks.append([[i]])
else: # Keep all the sims
for ms in args:
len_before = len(msim.sims)
msim.sims += sc.dcp(ms.sims)
len_after= len(msim.sims)
msim.chunks.append(list(range(len_before, len_after)))
return msim
[docs]
def split(self, inds=None, chunks=None):
'''
Convenience method for splitting one MultiSim into several. You can specify
either individual indices of simulations to extract, via inds, or consecutive
chunks of indices, via chunks. If this function is called on a merged MultiSim,
the chunks can be retrieved automatically and no arguments are necessary.
Args:
inds (list): a list of lists of indices, with each list turned into a MultiSim
chunks (int or list): if an int, split the MultiSim into that many chunks; if a list return chunks of that many sims
Returns:
A list of MultiSim objects
**Examples**::
m1 = hpv.MultiSim(hpv.Sim(label='sim1'), initialize=True)
m2 = hpv.MultiSim(hpv.Sim(label='sim2'), initialize=True)
m3 = hpv.MultiSim.merge(m1, m2)
m3.run()
m1b, m2b = m3.split()
msim = hpv.MultiSim(hpv.Sim(), n_runs=6)
msim.run()
m1, m2 = msim.split(inds=[[0,2,4], [1,3,5]])
mlist1 = msim.split(chunks=[2,4]) # Equivalent to inds=[[0,1], [2,3,4,5]]
mlist2 = msim.split(chunks=2) # Equivalent to inds=[[0,1,2], [3,4,5]]
'''
# Process indices and chunks
if inds is None: # Indices not supplied
if chunks is None: # Chunks not supplied
if hasattr(self, 'chunks'): # Created from a merged MultiSim
inds = self.chunks
else: # No indices or chunks and not created from a merge
errormsg = 'If a MultiSim has not been created via merge(), you must supply either inds or chunks to split it'
raise ValueError(errormsg)
else: # Chunks supplied, but not inds
inds = [] # Initialize
sim_inds = np.arange(len(self)) # Indices for the simulations
if sc.isiterable(chunks): # e.g. chunks = [2,4]
chunk_inds = np.cumsum(chunks)[:-1]
inds = np.split(sim_inds, chunk_inds)
else: # e.g. chunks = 3
inds = np.split(sim_inds, chunks) # This will fail if the length is wrong
# Do the conversion
mlist = []
for indlist in inds:
sims = sc.dcp([self.sims[i] for i in indlist])
msim = MultiSim(sims=sims)
mlist.append(msim)
return mlist
[docs]
def disp(self, output=False):
'''
Display a verbose description of a multisim. See also multisim.summarize()
(medium length output) and multisim.brief() (short output).
Args:
output (bool): if true, return a string instead of printing output
**Example**::
msim = hpv.MultiSim(hpv.Sim(verbose=0), label='Example multisim')
msim.run()
msim.disp() # Displays detailed output
'''
string = self._disp()
if not output:
print(string)
else:
return string
[docs]
def summarize(self, output=False):
'''
Print a moderate length summary of the MultiSim. See also multisim.disp()
(detailed output) and multisim.brief() (short output).
Args:
output (bool): if true, return a string instead of printing output
**Example**::
msim = hpv.MultiSim(hpv.Sim(verbose=0), label='Example multisim')
msim.run()
msim.summarize() # Prints moderate length output
'''
labelstr = f' "{self.label}"' if self.label else ''
simlenstr = f'{len(self.sims)}' if self.sims else '0'
string = f'MultiSim{labelstr} summary:\n'
string += f' Number of sims: {simlenstr}\n'
string += f' Reduced/combined: {self.which}\n'
string += f' Base: {self.base_sim.brief(output=True)}\n'
if self.sims:
string += ' Sims:\n'
for s,sim in enumerate(self.sims):
string += f' {s}: {sim.brief(output=True)}\n'
if not output:
print(string)
else:
return string
def _brief(self):
'''
Return a brief description of a multisim -- used internally and by repr();
see multisim.brief() for the user version.
'''
try:
labelstr = f'"{self.label}"; ' if self.label else ''
n_sims = 0 if not self.sims else len(self.sims)
string = f'MultiSim({labelstr}n_sims: {n_sims}; base: {self.base_sim.brief(output=True)})'
except Exception as E:
string = sc.objectid(self)
string += f'Warning, multisim appears to be malformed:\n{str(E)}'
return string
[docs]
def brief(self, output=False):
'''
Print a compact representation of the multisim. See also multisim.disp()
(detailed output) and multisim.summarize() (medium length output).
Args:
output (bool): if true, return a string instead of printing output
**Example**::
msim = hpv.MultiSim(hpv.Sim(verbose=0), label='Example multisim')
msim.run()
msim.brief() # Prints one-line output
'''
string = self._brief()
if not output:
print(string)
else:
return string
[docs]
def to_json(self, *args, **kwargs):
''' Shortcut for base_sim.to_json() '''
if not self.base_sim.results_ready: # pragma: no cover
errormsg = 'JSON export only available for reduced sim; please run msim.mean() or msim.median() first'
raise RuntimeError(errormsg)
return self.base_sim.to_json(*args, **kwargs)
[docs]
def to_excel(self, *args, **kwargs):
''' Shortcut for base_sim.to_excel() '''
if not self.base_sim.results_ready: # pragma: no cover
errormsg = 'Excel export only available for reduced sim; please run msim.mean() or msim.median() first'
raise RuntimeError(errormsg)
return self.base_sim.to_excel(*args, **kwargs)
[docs]
class Scenarios(hpb.ParsObj):
'''
Class for running multiple sets of multiple simulations -- e.g., scenarios.
Note that most users are recommended to use MultiSim rather than Scenarios,
as it gives more control over run options. Scenarios should be used primarily
for quick investigations. See the examples folder for example usage.
Args:
sim (Sim) : if supplied, use a pre-created simulation as the basis for the scenarios
metapars (dict) : meta-parameters for the run, e.g. number of runs; see make_metapars() for structure
scenarios (dict) : a dictionary defining the scenarios; see examples folder for examples; see below for baseline
basepars (dict) : a dictionary of sim parameters to be used for the basis of the scenarios (not required if sim is provided)
scenfile (str) : a filename for saving (defaults to the creation date)
label (str) : the name of the scenarios
**Example**::
scens = hpv.Scenarios()
Returns:
scens: a Scenarios object
'''
def __init__(self, sim=None, metapars=None, scenarios=None, basepars=None, scenfile=None, label=None):
# For this object, metapars are the foundation
default_pars = make_metapars() # Start with default pars
super().__init__(default_pars) # Initialize and set the parameters as attributes
hpb.set_metadata(self) # Set version, date, and git info
# Handle filename
if scenfile is None:
datestr = sc.getdate(obj=self.created, dateformat='%Y-%b-%d_%H.%M.%S')
scenfile = f'hpvsim_scenarios_{datestr}.scens'
self.scenfile = scenfile
self.label = label
# Handle scenarios -- by default, create the simplest possible baseline scenario
if scenarios is None:
scenarios = {'baseline':{'name':'Baseline', 'pars':{}}}
self.scenarios = scenarios
# Handle metapars
self.metapars = sc.dcp(sc.mergedicts(metapars))
self.update_pars(self.metapars)
# Create the simulation and handle basepars
if sim is None:
sim = hps.Sim()
self.base_sim = sc.dcp(sim)
self.basepars = sc.dcp(sc.mergedicts(basepars))
self.base_sim.update_pars(self.basepars)
self.base_sim.validate_pars()
if not self.base_sim.initialized:
self.base_sim.initialize(reset=True) # TODO: This breaks things, need to figure out how to avoid!!
# self.base_sim.init_genotypes()
# self.base_sim.init_immunity()
# self.base_sim.init_interventions()
# self.base_sim.init_people()
# self.base_sim.init_results()
# Copy quantities from the base sim to the main object
self.npts = self.base_sim.npts
self.tvec = self.base_sim.tvec
self.yearvec = self.base_sim.yearvec
self['verbose'] = self.base_sim['verbose']
self.res_npts = self.base_sim.res_npts
self.res_tvec = self.base_sim.res_tvec
self.res_yearvec = self.base_sim.res_yearvec
# Create the results object; order is: results key, scenario, best/low/high
self.sims = sc.objdict()
self.results = sc.objdict()
for reskey in self.result_keys():
self.results[reskey] = sc.objdict()
for scenkey in scenarios.keys():
self.results[reskey][scenkey] = sc.objdict()
for nblh in ['name', 'best', 'low', 'high']:
self.results[reskey][scenkey][nblh] = None # This will get populated below
return
[docs]
def result_keys(self, which='all'):
''' Attempt to retrieve the results keys from the base sim '''
try:
keys = self.base_sim.result_keys(which=which)
except Exception as E:
errormsg = f'Could not retrieve result keys since base sim not accessible: {str(E)}'
raise ValueError(errormsg)
return keys
[docs]
def run(self, n_runs=None, debug=False, keep_people=False, verbose=None, **kwargs):
'''
Run the specified scenarios.
Args:
debug (bool) : if True, runs a single run instead of multiple, which makes debugging easier
verbose (int) : level of detail to print, passed to sim.run()
kwargs (dict) : passed to multi_run() and thence to sim.run()
Returns:
None (modifies Scenarios object in place)
'''
if n_runs is None: n_runs = self['n_runs']
if verbose is None: verbose = self['verbose']
def print_heading(string):
''' Choose whether to print a heading, regular text, or nothing '''
if verbose >= 2:
sc.heading(string)
elif verbose == 1:
print(string)
return
totalkeys = self.result_keys('total')
genotypekeys = self.result_keys('genotype')
sexkeys = self.result_keys('sex')
agekeys = self.result_keys('age')
type_dist_keys = self.result_keys('type_dist')
# Loop over scenarios
# Assemble sims
scen_sims_dict = dict()
for scenkey,scen in self.scenarios.items():
scenname = scen['name']
scenpars = scen['pars']
allpars = sc.mergedicts(scenpars, {'location': self.base_sim['location']})
# This is necessary for plotting, and since self.npts is defined prior to run
if 'n_years' in scenpars.keys():
errormsg = 'Scenarios cannot be run with different numbers of years; set via basepars instead'
raise ValueError(errormsg)
# Create and run the simulations
print_heading(f'Multirun for {scenkey}')
scen_sim = sc.dcp(self.base_sim)
scen_sim.scenkey = scenkey
scen_sim.label = scenname
scen_sim.scen = scen
# Update the parameters, if provided, and re-initialize aspects of the simulation
scen_sim.update_pars(allpars)
# Determine whether we need to reinitialize the people and the initial states - by default, don't reset either
reset_people = True #False
reset_init_states = True #False
if 'interventions' in allpars.keys():
# If vaccination is provided, we need to re-initialize the people because the number of immunity sources is different
vaxintvs = [x for x in allpars['interventions'] if isinstance(x, hpi.BaseVaccination)]
if len(vaxintvs)>0:
reset_people = True
reset_init_states = True
# Initialize the sim, including potentially the people and initial states
scen_sim.initialize(reset=reset_people, init_states=reset_init_states)
if 'hiv_pars' in scen.keys():
hiv_scenpars = {'hiv_pars': sc.mergedicts(scen_sim.hivsim.pars['hiv_pars'], scen['hiv_pars'])}
scen_sim.hivsim.update_pars(hiv_scenpars)
scen_sims = []
for i in range(n_runs):
sim = scen_sim.copy()
sim['rand_seed'] += i # Reset the seed
scen_sims.append(sim)
scen_sims_dict[scenkey] = scen_sims
# Run sims
run_args = dict(noise=self['noise'], noisepar=self['noisepar'], keep_people=keep_people, verbose=verbose)
if debug:
print('Running in debug mode (not parallelized)')
for scenkey,scen_sims in scen_sims_dict.items():
for i in range(len(scen_sims)):
scen_sims[i] = single_run(scen_sims[i], **run_args, **kwargs) # Run a single sim
else:
all_sims = []
for sim_list in scen_sims_dict.values():
all_sims.extend(list(sim_list))
all_sims = multi_run(all_sims, **run_args, **kwargs) # This is where the sims actually get run
count = 0
for scenkey,scen_sims in scen_sims_dict.items():
n_sims = len(scen_sims)
scen_sims_dict[scenkey] = all_sims[count:count+n_sims]
count += n_sims
# Process sims
for scenkey,scen_sims in scen_sims_dict.items():
# Process the simulations
print_heading(f'Processing {scenkey}')
ng = scen_sims[0]['n_genotypes'] # Get number of genotypes
na = len(scen_sims[0]['age_bin_edges'])-1 # Get number of age bins
scenraw = {}
for reskey in totalkeys:
scenraw[reskey] = np.zeros((self.res_npts, len(scen_sims)))
for s,sim in enumerate(scen_sims):
scenraw[reskey][:,s] = sim.results[reskey].values
for reskey in genotypekeys+type_dist_keys:
scenraw[reskey] = np.zeros((ng, self.res_npts, len(scen_sims)))
for s,sim in enumerate(scen_sims):
scenraw[reskey][:,:,s] = sim.results[reskey].values
for reskey in sexkeys:
scenraw[reskey] = np.zeros((2, self.res_npts, len(scen_sims)))
for s,sim in enumerate(scen_sims):
scenraw[reskey][:,:,s] = sim.results[reskey].values
for reskey in agekeys:
scenraw[reskey] = np.zeros((na, self.res_npts, len(scen_sims)))
for s,sim in enumerate(scen_sims):
scenraw[reskey][:,:,s] = sim.results[reskey].values
scenres = sc.objdict()
scenres.best = {}
scenres.low = {}
scenres.high = {}
for reskey in totalkeys + genotypekeys + sexkeys:
axis = 1 if reskey in totalkeys else 2
scenres.best[reskey] = np.quantile(scenraw[reskey], q=0.5, axis=axis) # Changed from median to mean for smoother plots
scenres.low[reskey] = np.quantile(scenraw[reskey], q=self['quantiles']['low'], axis=axis)
scenres.high[reskey] = np.quantile(scenraw[reskey], q=self['quantiles']['high'], axis=axis)
for reskey in totalkeys + genotypekeys + sexkeys:
self.results[reskey][scenkey]['name'] = scenname
for blh in ['best', 'low', 'high']:
self.results[reskey][scenkey][blh] = scenres[blh][reskey]
self.sims[scenkey] = scen_sims
#%% Print statistics
if verbose:
self.compare()
# Save details about the run
self._kept_people = keep_people
return self
[docs]
def compare(self, t=None, output=False):
'''
Print out a comparison of each scenario.
Args:
t (int/str) : the day (or date) to do the comparison; default, the end
output (bool) : if true, return the dataframe instead of printing output
**Example**::
scenarios = {'base': {'name':'Base','pars': {}}, 'beta': {'name':'Beta', 'pars': {'beta': 0.020}}}
scens = hpv.Scenarios(scenarios=scenarios, label='Example scenarios')
scens.run()
scens.compare(t=30) # Prints comparison for day 30
'''
# Handle time
if t is None:
t = -1
daystr = 'the last timepoint'
else:
daystr = f'timepoint {t}'
day = t
# Compute dataframe
x = defaultdict(dict)
genotypekeys = self.result_keys('genotype')
sexkeys = self.result_keys('sex')
totalkeys = self.result_keys('total')
for scenkey in self.scenarios.keys():
for reskey in self.result_keys():
if reskey in genotypekeys:
for genotype in range(self.base_sim['n_genotypes']):
val = self.results[reskey][scenkey].best[genotype, day] # Only prints results for infections by first variant
genotypekey = reskey + str(genotype) # Add variant number to the summary output
x[scenkey][genotypekey] = int(val)
elif reskey in sexkeys:
for sex in range(2):
val = self.results[reskey][scenkey].best[sex, day]
sexkey = reskey + str(sex) # Add variant number to the summary output
x[scenkey][sexkey] = int(val)
elif reskey in totalkeys:
val = self.results[reskey][scenkey].best[day]
val = int(val)
x[scenkey][reskey] = val
df = pd.DataFrame.from_dict(x).astype(object)
if not output:
print(f'Results for {daystr} in each scenario:')
print(df)
else:
return df
[docs]
def plot(self, *args, **kwargs):
'''
Plot the results of a scenario. For an explanation of available arguments,
see Sim.plot().
Returns:
fig: Figure handle
**Example**::
scens = hpv.Scenarios()
scens.run()
scens.plot()
'''
return hppl.plot_scens(scens=self, *args, **kwargs)
[docs]
def plot_age_results(self, *args, **kwargs):
'''
Plot all age_results analyzers in a scenario together.
Returns:
fig: Figure handle
**Example**::
az = hpv.age_results(timepoints=['2015', '2020'], results=['hpv_incidence', 'total_cancers'])
base_sim = hpv.Sim(analyzers=az)
scens = hpv.Scenarios(sim=base_sim)
scens.run()
scens.plot_age_results()
'''
return hppl.plot_scen_age_results(scens=self, *args, **kwargs)
[docs]
def to_json(self, filename=None, tostring=True, indent=2, verbose=False, *args, **kwargs):
'''
Export results as JSON.
Args:
filename (str): if None, return string; else, write to file
Returns:
A unicode string containing a JSON representation of the results,
or writes the JSON file to disk
'''
d = {'t':self.tvec,
'results': self.results,
'basepars': self.basepars,
'metapars': self.metapars,
'simpars': self.base_sim.export_pars(),
'scenarios': self.scenarios
}
if filename is None:
output = sc.jsonify(d, tostring=tostring, indent=indent, verbose=verbose, *args, **kwargs)
else:
output = sc.savejson(filename=filename, obj=d, indent=indent, *args, **kwargs)
return output
[docs]
def to_excel(self, filename=None):
'''
Export results as XLSX
Args:
filename (str): if None, return string; else, write to file
Returns:
An sc.Spreadsheet with an Excel file, or writes the file to disk
'''
spreadsheet = sc.Spreadsheet()
spreadsheet.freshbytes()
with pd.ExcelWriter(spreadsheet.bytes, engine='xlsxwriter') as writer:
for key in self.result_keys('total'): # Multidimensional variant keys can't be exported
result_df = pd.DataFrame.from_dict(sc.flattendict(self.results[key], sep='_'))
result_df.to_excel(writer, sheet_name=key)
spreadsheet.load()
if filename is None:
output = spreadsheet
else:
output = spreadsheet.save(filename)
return output
[docs]
def save(self, scenfile=None, keep_sims=True, keep_people=False, **kwargs):
'''
Save to disk as a gzipped pickle.
Args:
scenfile (str) : the name or path of the file to save to; if None, uses stored
keep_sims (bool) : whether or not to store the actual Sim objects in the Scenarios object
keep_people (bool) : whether or not to store the population in the Sim objects (NB, very large)
kwargs (dict) : passed to makefilepath()
Returns:
scenfile (str): the validated absolute path to the saved file
**Example**::
scens.save() # Saves to a .scens file with the date and time of creation by default
'''
if scenfile is None:
scenfile = self.scenfile
scenfile = sc.makefilepath(filename=scenfile, **kwargs)
self.scenfile = scenfile # Store the actual saved filename
# Store sims separately
sims = self.sims
self.sims = None # Remove for now
obj = sc.dcp(self) # This should be quick once we've removed the sims
if not keep_people:
obj.base_sim.shrink(in_place=True)
if keep_sims or keep_people:
if keep_people:
if not obj._kept_people:
warnmsg = 'Warning: there are no people because they were not saved during the run. If you want people, please rerun with keep_people=True.'
hpm.warn(warnmsg)
obj.sims = sims # Just restore the object in full
print('Note: saving people, which may produce a large file!')
else:
obj.sims = sc.objdict()
for key in sims.keys():
obj.sims[key] = []
for sim in sims[key]:
obj.sims[key].append(sim.shrink(in_place=False))
hpm.save(filename=scenfile, obj=obj) # Actually save
self.sims = sims # Restore
return scenfile
[docs]
@staticmethod
def load(scenfile, *args, **kwargs):
'''
Load from disk from a gzipped pickle.
Args:
scenfile (str): the name or path of the file to load from
kwargs: passed to hpv.load()
Returns:
scens (Scenarios): the loaded scenarios object
**Example**::
scens = hpv.Scenarios.load('my-scenarios.scens')
'''
scens = hpm.load(scenfile, *args, **kwargs)
if not isinstance(scens, Scenarios):
errormsg = f'Cannot load object of {type(scens)} as a Scenarios object'
raise TypeError(errormsg)
return scens
[docs]
def disp(self, output=False):
'''
Display a verbose description of the scenarios. See also scenarios.summarize()
(medium length output) and scenarios.brief() (short output).
Args:
output (bool): if true, return a string instead of printing output
**Example**::
scens = hpv.Scenarios(hpv.Sim(), label='Example scenarios')
scens.run(verbose=0) # Run silently
scens.disp() # Displays detailed output
'''
string = self._disp()
if not output:
print(string)
else:
return string
[docs]
def summarize(self, output=False):
'''
Print a moderate length summary of the scenarios. See also scenarios.disp()
(detailed output) and scenarios.brief() (short output).
Args:
output (bool): if true, return a string instead of printing output
**Example**::
scens = hpv.Scenarios(hpv.Sim(), label='Example scenarios')
scens.run(verbose=0) # Run silently
scens.summarize() # Prints moderate length output
'''
labelstr = f' "{self.label}"' if self.label else ''
string = f'Scenarios{labelstr} summary:\n'
string += f' Number of scenarios: {len(self.sims)}\n'
string += f' Base: {self.base_sim.brief(output=True)}\n'
if self.sims:
string += ' Scenarios:\n'
for k,key,simlist in self.sims.enumitems():
keystr = f' {k}: "{key}"\n'
string += keystr
for s,sim in enumerate(simlist):
simstr = f'{sim.brief(output=True)}'
string += ' ' + f'{s}: {simstr}\n'
if not output:
print(string)
else:
return string
def _brief(self):
'''
Return a brief description of the scenarios -- used internally and by repr();
see scenarios.brief() for the user version.
'''
try:
labelstr = f'"{self.label}"; ' if self.label else ''
n_scenarios = 0 if not self.scenarios else len(self.scenarios)
string = f'Scenarios({labelstr}n_scenarios: {n_scenarios}; base: {self.base_sim.brief(output=True)})'
except Exception as E:
string = sc.objectid(self)
string += f'Warning, scenarios appear to be malformed:\n{str(E)}'
return string
[docs]
def brief(self, output=False):
'''
Print a compact representation of the scenarios. See also scenarios.disp()
(detailed output) and scenarios.summarize() (medium length output).
Args:
output (bool): if true, return a string instead of printing output
**Example**::
scens = hpv.Scenarios(label='Example scenarios')
scens.run()
scens.brief() # Prints one-line output
'''
string = self._brief()
if not output:
print(string)
else:
return string
[docs]
@staticmethod
def merge(*args):
'''
Merge two or more scenarios to create a single scenario object
Args:
args (Scenarios): the Scenarios to merge (either a list, or separate)
Returns:
scen (Scenarios): a new Scenario object
'''
# Handle arguments
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0]
# Create the scenarios from the base sim of the first argument
scens = Scenarios(sim=sc.dcp(args[0].base_sim), scenarios={})
scens.sims = []
base_res = args[0].results
res_keys = base_res.keys()
# Copy over sims and results
for scen in args:
scens.sims += sc.dcp(scen.sims)
scens.scenarios = sc.mergedicts(scens.scenarios, scen.scenarios)
res = scen.results
for rkey in res_keys: # Loop over result keys
for skey in res[rkey].keys():
scens.results[rkey][skey] = res[rkey][skey]
return scens
[docs]
class Sweep(MultiSim):
'''
Class for running parameter sweeps.
Args:
base_sim (Sim) : the sim used for shared properties
label (str) : the name of the multisim
kwargs (dict) : stored in run_args and passed to run()
Returns:
sweep: a Sweep object
**Examples**:
sim = hpv.Sim() # Create the sim
sweep = hpv.Sweep(sim, sweep_pars={'beta':[0.4, 0.8]}, n_runs=5) # Create the sweep
sweep.run() # Run the sims in parallel
sweep.plot() # Plot results
'''
def __init__(self, base_sim=None, sweep_pars=None, sweep_vars=None, n_draws=4, initialize=False, label=None,
**kwargs):
# Set properties
self.base_sim = base_sim
self.sweep_pars = sweep_pars
self.sweep_par_keys = list(sweep_pars.keys())
self.sweep_vars = sweep_vars
self.res_labels = None # Populated after sims are run
self.n_draws = n_draws
self.from_year = None # Populated within reduce()
self.to_year = None # Populated within reduce()
self.label = base_sim.label if (label is None) else label
self.run_args = sc.mergedicts(kwargs)
self.resdf = None
hpb.set_metadata(self) # Set version, date, and git info
# Create parameter draws
self.sweep_draws = sc.objdict()
for pname, prange in self.sweep_pars.items():
self.sweep_draws[pname] = np.random.uniform(prange[0], prange[1], self.n_draws)
# Create sims and optionally initialize
self.create_sims(initialize=initialize)
return
[docs]
def create_sims(self, initialize=False):
self.sims = sc.autolist()
for ipd in range(self.n_draws):
sim = sc.dcp(self.base_sim)
for pname,pdraws in self.sweep_draws.items():
sim.pars[pname] = pdraws[ipd]
self.sims += sim
# Optionally initialize the sims
if initialize:
kwargs = sc.mergedicts(self.run_args, kwargs, {'do_run':False})
self.sims = multi_run(sims, **kwargs)
[docs]
def run(self, shrink=True, reduce=True, sum_totals=True, from_year=None, to_year=None, **kwargs):
'''
Run the sims
Args:
shrink (bool): whether or not to shrink after running
kwargs (dict): passed to multi_run(); use run_args to pass arguments to sim.run()
Returns:
None (modifies MultiSim object in place)
**Examples**::
sweep.run()
'''
# Handle which sims to use
if self.sims is None:
sims = self.base_sim
else:
sims = self.sims
# Handle missing labels
for s,sim in enumerate(sims):
if sim.label is None:
sim.label = f'Sim {s}'
# Run
kwargs = sc.mergedicts(self.run_args, kwargs)
self.sims = multi_run(sims, **kwargs)
# Shrink
if shrink:
self.shrink()
# Create results
if reduce:
self.reduce(sum_totals=sum_totals, from_year=from_year, to_year=to_year)
return self
[docs]
def reduce(self, sum_totals=True, from_year=None, to_year=None):
''' Create a dataframe holding reduced results '''
# Handle from and to years
if from_year is not None:
try:
i0 = sc.findinds(self.sims[0].res_yearvec, from_year)[0]
except:
errormsg = f'Cannot sum from {from_year}, summing from simulation start year instead'
print(errormsg)
from_year = self.sims[0].res_yearvec[0]
i0 = None
self.from_year = from_year
else:
i0 = None
self.from_year = self.sims[0].res_yearvec[0]
if to_year is not None:
try:
i1 = sc.findinds(self.sims[0].res_yearvec, to_year)[0]
except:
errormsg = f'Cannot sum to {to_year}, summing to simulation end year instead'
print(errormsg)
to_year = self.sims[0].res_yearvec[-1]
i1 = None
else:
i1 = None
self.to_year = self.sims[0].res_yearvec[-1]
# Store result labels
self.res_labels = {}
ref_sim = self.sims[0]
for svar in self.sweep_vars:
self.res_labels[svar] = ref_sim.results[svar].name
# Start creating result dataframe
dfts = sc.autolist()
for sim in self.sims:
dft = pd.DataFrame()
for spar in self.sweep_pars:
dft[spar] = [sim[spar]]
for svar in self.sweep_vars:
if sum_totals:
dft[svar] = [sim.results[svar][i0:i1].sum()]
else:
dft[svar] = sim.results[svar][i0:i1]
dfts += dft
self.resdf = pd.concat(dfts, ignore_index=True)
return
[docs]
def combine(self, **kwargs):
raise NotImplementedError
[docs]
def mean(self, **kwargs):
raise NotImplementedError
[docs]
def compare(self, **kwargs):
raise NotImplementedError
[docs]
def plot_heatmap(self, to_plot=None, xpar=None, ypar=None, zscales=1, npts=100, **kwargs):
# Handle plotting inputs
if to_plot is None: to_plot = self.sweep_vars
if type(to_plot)==str: to_plot = [to_plot]
if type(to_plot)==list:
for res_to_plot in to_plot:
if res_to_plot not in self.sweep_vars:
errormsg = f'Cannot plot result {res_to_plot} as it does not appear in the stored results: {self.sweep_vars}.'
raise ValueError(errormsg)
# Handle x and y parameters
if len(self.sweep_par_keys)<2:
raise ValueError(f'Cannot plot a heatmap without 2 or more sweep parameters: sweep_pars={self.sweep_par_keys}.')
else:
if xpar is None: xpar = self.sweep_par_keys[0]
else:
if xpar not in self.sweep_par_keys:
errormsg = f'Cannot plot results for parameter {xpar} as it was not one of the sweep parameters: {self.sweep_par_keys}.'
raise ValueError(errormsg)
if ypar is None: ypar = self.sweep_par_keys[1]
else:
if ypar not in self.sweep_par_keys:
errormsg = f'Cannot plot results for parameter {ypar} as it was not one of the sweep parameters: {self.sweep_par_keys}.'
raise ValueError(errormsg)
# Other validation
if self.resdf is None:
raise ValueError('Sweeps not run yet.')
# Create xy grid
x = np.array(self.resdf[xpar])
y = np.array(self.resdf[ypar])
xi = np.linspace(self.sweep_pars[xpar][0], self.sweep_pars[xpar][1], npts)
yi = np.linspace(self.sweep_pars[ypar][0], self.sweep_pars[ypar][1], npts)
xx, yy = np.meshgrid(xi, yi)
# Call plotting code to create and return figures
return hppl.plot_heatmap(self, xx, yy, x=x, y=y, xi=xi, yi=yi, to_plot=to_plot, xpar=xpar, ypar=ypar, zscales=zscales, npts=npts, **kwargs)
[docs]
def single_run(sim, ind=0, reseed=True, noise=0.0, noisepar=None, keep_people=False, run_args=None, sim_args=None, verbose=None, do_run=True, **kwargs):
'''
Convenience function to perform a single simulation run. Mostly used for
parallelization, but can also be used directly.
Args:
sim (Sim) : the sim instance to be run
ind (int) : the index of this sim
reseed (bool) : whether or not to generate a fresh seed for each run
noise (float) : the amount of noise to add to each run
noisepar (str) : the name of the parameter to add noise to
keep_people (bool) : whether to keep the people after the sim run
run_args (dict) : arguments passed to sim.run()
sim_args (dict) : extra parameters to pass to the sim, e.g. 'n_infected'
verbose (int) : detail to print
do_run (bool) : whether to actually run the sim (if not, just initialize it)
kwargs (dict) : also passed to the sim
Returns:
sim (Sim): a single sim object with results
**Example**::
import hpvsim as hp
sim = hpv.Sim() # Create a default simulation
sim = hpv.single_run(sim) # Run it, equivalent(ish) to sim.run()
'''
# Set sim and run arguments
sim_args = sc.mergedicts(sim_args, kwargs)
run_args = sc.mergedicts({'verbose':verbose}, run_args)
if verbose is None:
verbose = sim['verbose']
if not sim.label:
sim.label = f'Sim {ind}'
if reseed:
sim['rand_seed'] += ind # Reset the seed, otherwise no point of parallel runs
hpu.set_seed()
# If the noise parameter is not found, guess what it should be
if noisepar is None:
noisepar = 'beta'
if noisepar not in sim.pars.keys():
raise sc.KeyNotFoundError(f'Noise parameter {noisepar} was not found in sim parameters')
# Handle noise -- normally distributed fractional error
noiseval = noise*np.random.normal()
if noiseval > 0:
noisefactor = 1 + noiseval
else:
noisefactor = 1/(1-noiseval)
sim[noisepar] *= noisefactor
if verbose>=1:
verb = 'Running' if do_run else 'Creating'
print(f'{verb} a simulation using seed={sim["rand_seed"]} and noise={noiseval}')
# Handle additional arguments
for key,val in sim_args.items():
print(f'Processing {key}:{val}')
if key in sim.pars.keys():
if verbose>=1:
print(f'Setting key {key} from {sim[key]} to {val}')
sim[key] = val
else:
raise sc.KeyNotFoundError(f'Could not set key {key}: not a valid parameter name')
# Run
if do_run:
sim.run(**run_args)
# Shrink the sim to save memory
if not keep_people:
sim.shrink()
return sim
[docs]
def multi_run(sim, n_runs=4, reseed=None, noise=0.0, noisepar=None, iterpars=None, combine=False, keep_people=None, run_args=None, sim_args=None, par_args=None, do_run=True, parallel=True, n_cpus=None, verbose=None, **kwargs):
'''
For running multiple runs in parallel. If the first argument is a list of sims,
exactly these will be run and most other arguments will be ignored.
Args:
sim (Sim) : the sim instance to be run, or a list of sims.
n_runs (int) : the number of parallel runs
reseed (bool) : whether or not to generate a fresh seed for each run (default: true for single, false for list of sims)
noise (float) : the amount of noise to add to each run
noisepar (str) : the name of the parameter to add noise to
iterpars (dict) : any other parameters to iterate over the runs; see sc.parallelize() for syntax
combine (bool) : whether or not to combine all results into one sim, rather than return multiple sim objects
keep_people (bool) : whether to keep the people after the sim run (default false)
run_args (dict) : arguments passed to sim.run()
sim_args (dict) : extra parameters to pass to the sim
par_args (dict) : arguments passed to sc.parallelize()
do_run (bool) : whether to actually run the sim (if not, just initialize it)
parallel (bool) : whether to run in parallel using multiprocessing (else, just run in a loop)
n_cpus (int) : the number of CPUs to run on (if blank, set automatically; otherwise, passed to par_args)
verbose (int) : detail to print
kwargs (dict) : also passed to the sim
Returns:
If combine is True, a single sim object with the combined results from each sim.
Otherwise, a list of sim objects (default).
**Example**::
import hpvsim as hpv
sim = hpv.Sim()
sims = hpv.multi_run(sim, n_runs=6, noise=0.2)
'''
# Handle inputs
sim_args = sc.mergedicts(sim_args, kwargs) # Handle blank
par_args = sc.mergedicts({'ncpus':n_cpus}, par_args) # Handle blank
# Handle iterpars
if iterpars is None:
iterpars = {}
else:
n_runs = None # Reset and get from length of dict instead
for key,val in iterpars.items():
new_n = len(val)
if n_runs is not None and new_n != n_runs:
raise ValueError(f'Each entry in iterpars must have the same length, not {n_runs} and {len(val)}')
else:
n_runs = new_n
# Run the sims
if isinstance(sim, hps.Sim): # One sim
if reseed is None: reseed = True
iterkwargs = dict(ind=np.arange(n_runs))
iterkwargs.update(iterpars)
kwargs = dict(sim=sim, reseed=reseed, noise=noise, noisepar=noisepar, verbose=verbose, keep_people=keep_people, sim_args=sim_args, run_args=run_args, do_run=do_run)
elif isinstance(sim, list): # List of sims
if reseed is None: reseed = False
iterkwargs = dict(sim=sim, ind=np.arange(len(sim)))
kwargs = dict(reseed=reseed, verbose=verbose, keep_people=keep_people, sim_args=sim_args, run_args=run_args, do_run=do_run)
else:
errormsg = f'Must be Sim object or list, not {type(sim)}'
raise TypeError(errormsg)
# Actually run!
if parallel:
try:
sims = sc.parallelize(single_run, iterkwargs=iterkwargs, kwargs=kwargs, **par_args) # Run in parallel
except RuntimeError as E: # Handle if run outside of __main__ on Windows
if 'freeze_support' in E.args[0]: # For this error, add additional information
errormsg = '''
Uh oh! It appears you are trying to run with multiprocessing on Windows outside
of the __main__ block; please see https://docs.python.org/3/library/multiprocessing.html
for more information. The correct syntax to use is e.g.
import hpvsim as hpv
sim = hpv.Sim()
msim = hpv.MultiSim(sim)
if __name__ == '__main__':
msim.run()
Alternatively, to run without multiprocessing, set parallel=False.
'''
raise RuntimeError(errormsg) from E
else: # For all other runtime errors, raise the original exception
raise E
else: # Run in serial, not in parallel
sims = []
n_sims = len(list(iterkwargs.values())[0]) # Must have length >=1 and all entries must be the same length
for s in range(n_sims):
this_iter = {k:v[s] for k,v in iterkwargs.items()} # Pull out items specific to this iteration
this_iter.update(kwargs) # Merge with the kwargs
this_iter['sim'] = this_iter['sim'].copy() # Ensure we have a fresh sim; this happens implicitly on pickling with multiprocessing
sim = single_run(**this_iter) # Run in series
sims.append(sim)
return sims
[docs]
def parallel(*args, **kwargs):
'''
A shortcut to ``hpv.MultiSim()``, allowing the quick running of multiple simulations
at once.
Args:
args (list): The simulations to run
kwargs (dict): passed to multi_run()
Returns:
A run MultiSim object.
**Examples**::
s1 = hpv.Sim(beta=0.01, label='Low')
s2 = hpv.Sim(beta=0.02, label='High')
hpv.parallel(s1, s2).plot()
msim = hpv.parallel([s1, s2], keep_people=True)
'''
sims = sc.mergelists(*args)
return MultiSim(sims=sims).run(**kwargs)