Source code for poliosim.base

'''
Base classes for Poliosim. These classes handle a lot of the boilerplate of the
People and Sim classes (e.g. loading, saving, key lookups, etc.), so those classes
can be focused on the disease-specific functionality.
'''

import numpy as np
import pandas as pd
import sciris as sc
import datetime as dt
from . import utils as psu
from . import parameters as pspar
from . import version as psv
from . import population as pspop
from . import interventions as psi
from . import analysis as psa
from . import plotting as psplt
obj_get = object.__getattribute__ # Alias the default getattribute method
obj_set = object.__setattr__

# Specify all externally visible classes this file defines
__all__ = ['ParsObj', 'Result', 'BaseSim', 'BasePeople', 'Person', 'FlexDict', 'Contacts', 'Layer']


#%% Define simulation classes

[docs]class ParsObj(sc.prettyobj): ''' A class based around performing operations on a self.pars dict. ''' def __init__(self, pars): self.update_pars(pars, create=True) return def __getitem__(self, key): ''' Allow sim['par_name'] instead of sim.pars['par_name'] ''' try: return self.pars[key] except: all_keys = '\n'.join(list(self.pars.keys())) errormsg = f'Key "{key}" not found; available keys:\n{all_keys}' raise sc.KeyNotFoundError(errormsg) def __setitem__(self, key, value): ''' Ditto ''' if key in self.pars: self.pars[key] = value else: all_keys = '\n'.join(list(self.pars.keys())) errormsg = f'Key "{key}" not found; available keys:\n{all_keys}' raise sc.KeyNotFoundError(errormsg) return
[docs] def update_pars(self, pars=None, create=False): ''' Update internal dict with new pars. Args: pars (dict): the parameters to update (if None, do nothing) create (bool): if create is False, then raise a KeyNotFoundError if the key does not already exist ''' if pars is not None: if not isinstance(pars, dict): raise TypeError(f'The pars object must be a dict; you supplied a {type(pars)}') if not hasattr(self, 'pars'): self.pars = pars if not create: available_keys = list(self.pars.keys()) mismatches = [key for key in pars.keys() if key not in available_keys] if len(mismatches): errormsg = f'Key(s) {mismatches} not found; available keys are {available_keys}' raise sc.KeyNotFoundError(errormsg) self.pars.update(pars) return
[docs]class Result(object): ''' Stores a single result -- by default, acts like an array. Args: name (str): name of this result, e.g. new_infections npts (int): if values is None, precreate it to be of this length scale (str): whether or not the value scales by population size; options are "dynamic", "static", or False color (str/arr): default color for plotting (hex or RGB notation) n_variants (int): the number of variants the result is for (0 for results not by variant) **Example**:: import poliosim as ps r1 = ps.Result(name='test1', npts=10) r1[:5] = 20 print(r1.values) ''' def __init__(self, name=None, npts=None, scale='dynamic', color=None): self.name = name # Name of this result self.scale = scale # Whether or not to scale the result by the scale factor if color is None: color = '#000000' self.color = color # Default color if npts is None: npts = 0 self.values = np.array(np.zeros(int(npts)), dtype=psu.result_float) self.low = None self.high = None return def __repr__(self, *args, **kwargs): ''' Use pretty repr, like sc.prettyobj, but displaying full values ''' output = sc.prepr(self, skip=['values', 'low', 'high']) output += 'values:\n' + repr(self.values) if self.low is not None: output += '\nlow:\n' + repr(self.low) if self.high is not None: output += '\nhigh:\n' + repr(self.high) return output def __getitem__(self, *args, **kwargs): ''' To allow e.g. result[5] instead of result.values[5] ''' return self.values.__getitem__(*args, **kwargs) def __setitem__(self, *args, **kwargs): ''' To allow e.g. result[:] = 1 instead of result.values[:] = 1 ''' return self.values.__setitem__(*args, **kwargs) def __len__(self): ''' To allow len(result) instead of len(result.values) ''' return len(self.values) @property def npts(self): return len(self.values)
def set_metadata(obj): ''' Set standard metadata for an object ''' obj.created = sc.now() obj.version = psv.__version__ obj.git_info = psu.git_info() return
[docs]class BaseSim(ParsObj): ''' The BaseSim class handles the running of the simulation: the number of people, number of time points, and the parameters of the simulation. ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Initialize and set the parameters as attributes return
[docs] def set_metadata(self, simfile): ''' Set the metadata for the simulation -- creation time and filename ''' set_metadata(self) if simfile is None: datestr = sc.getdate(obj=self.created, dateformat='%Y-%b-%d_%H.%M.%S') self.simfile = f'poliosim_{datestr}.sim' return
[docs] def set_seed(self, seed=-1): ''' Set the seed for the random number stream from the stored or supplied value Args: seed (None or int): if no argument, use current seed; if None, randomize; otherwise, use and store supplied seed Returns: None ''' # Unless no seed is supplied, reset it if seed != -1: self['rand_seed'] = seed psu.set_seed(self['rand_seed']) return
@property def n(self): ''' Count the number of people -- if it fails, assume none ''' try: # By default, the length of the people dict return len(self.people) except: # If it's None or missing return 0 @property def scaled_pop_size(self): ''' Get the total population size, i.e. the number of agents times the scale factor -- if it fails, assume none ''' try: return self['pop_size']*self['pop_scale'] except: # If it's None or missing return 0 @property def npts(self): ''' Count the number of time points ''' try: return int(self['n_days'] + 1) except: return 0 @property def tvec(self): ''' Create a time vector ''' try: return np.arange(self.npts) except: return np.array([]) @property def datevec(self): ''' Create a vector of dates Returns: Array of `datetime` instances containing the date associated with each simulation time step ''' try: return self['start_day'] + self.tvec * dt.timedelta(days=1) except: return np.array([])
[docs] def day(self, day, *args): ''' Convert a string, date/datetime object, or int to a day (int). Args: day (str, date, int, or list): convert any of these objects to a day relative to the simulation's start day Returns: days (int or str): the day(s) in simulation time **Example**:: sim.day('2020-04-05') # Returns 35 ''' return psu.day(day, *args, start_day=self['start_day'])
[docs] def date(self, ind, *args, dateformat=None, as_date=False): ''' Convert one or more integer days of simulation time to a date/list of dates -- by default returns a string, or returns a datetime Date object if as_date is True. See also ps.date(), which provides a partly overlapping set of date conversion features. Args: ind (int, list, or array): the day(s) in simulation time args (list): additional day(s) dateformat (str): the format to return the date in as_date (bool): whether to return as a datetime date instead of a string Returns: dates (str, Date, or list): the date(s) corresponding to the simulation day(s) **Examples**:: sim.date(34) # Returns '2020-04-04' sim.date([34, 54]) # Returns ['2020-04-04', '2020-04-24'] sim.date(34, 54, as_dt=True) # Returns [datetime.date(2020, 4, 4), datetime.date(2020, 4, 24)] ''' # Handle inputs if not isinstance(ind, list): # If it's a number, string, or dateobj, convert it to a list ind = sc.promotetolist(ind) ind.extend(args) if dateformat is None: dateformat = '%Y-%m-%d' # Do the conversion dates = [] for raw in ind: if sc.isnumber(raw): date_obj = sc.date(self['start_day'], as_date=True) + dt.timedelta(days=int(raw)) else: date_obj = sc.date(raw, as_date=True) if as_date: dates.append(date_obj) else: dates.append(date_obj.strftime(dateformat)) # Return a string rather than a list if only one provided if len(ind)==1: dates = dates[0] return dates
[docs] def result_keys(self): ''' Get the actual results objects, not other things stored in sim.results ''' keys = [key for key in self.results.keys() if isinstance(self.results[key], Result)] return keys
[docs] def copy(self): ''' Returns a deep copy of the sim ''' return sc.dcp(self)
[docs] def export_results(self, for_json=True, filename=None, indent=2, *args, **kwargs): ''' Convert results to dict -- see also to_json(). The results written to Excel must have a regular table shape, whereas for the JSON output, arbitrary data shapes are supported. Args: for_json (bool): if False, only data associated with Result objects will be included in the converted output filename (str): filename to save to; if None, do not save indent (int): indent (int): if writing to file, how many indents to use per nested level args (list): passed to savejson() kwargs (dict): passed to savejson() Returns: resdict (dict): dictionary representation of the results ''' if not self.results_ready: errormsg = 'Please run the sim before exporting the results' raise RuntimeError(errormsg) resdict = {} resdict['t'] = self.results['t'] # Assume that there is a key for time if for_json: resdict['timeseries_keys'] = self.result_keys() for key,res in self.results.items(): if isinstance(res, Result): resdict[key] = res.values if res.low is not None: resdict[key+'_low'] = res.low if res.high is not None: resdict[key+'_high'] = res.high elif for_json: if key == 'date': resdict[key] = [str(d) for d in res] # Convert dates to strings else: resdict[key] = res if filename is not None: sc.savejson(filename=filename, obj=resdict, indent=indent, *args, **kwargs) return resdict
[docs] def export_pars(self, filename=None, indent=2, *args, **kwargs): ''' Return parameters for JSON export -- see also to_json(). This method is required so that interventions can specify their JSON-friendly representation. Args: filename (str): filename to save to; if None, do not save indent (int): indent (int): if writing to file, how many indents to use per nested level args (list): passed to savejson() kwargs (dict): passed to savejson() Returns: pardict (dict): a dictionary containing all the parameter values ''' pardict = {} for key in self.pars.keys(): if key == 'interventions': pardict[key] = [intervention.to_json() for intervention in self.pars[key]] elif key == 'start_day': pardict[key] = str(self.pars[key]) else: pardict[key] = self.pars[key] if filename is not None: sc.savejson(filename=filename, obj=pardict, indent=indent, *args, **kwargs) return pardict
[docs] def to_json(self, filename=None, keys=None, tostring=False, indent=2, verbose=False, *args, **kwargs): ''' Export results and parameters as JSON. Args: filename (str): if None, return string; else, write to file keys (str or list): attributes to write to json (default: results, parameters, and summary) tostring (bool): if not writing to file, whether to write to string (alternative is sanitized dictionary) indent (int): if writing to file, how many indents to use per nested level verbose (bool): detail to print args (list): passed to savejson() kwargs (dict): passed to savejson() Returns: A unicode string containing a JSON representation of the results, or writes the JSON file to disk **Examples**:: json = sim.to_json() sim.to_json('results.json') sim.to_json('summary.json', keys='summary') ''' # Handle keys if keys is None: keys = ['results', 'pars', 'summary'] keys = sc.promotetolist(keys) # Convert to JSON-compatible format d = {} for key in keys: if key == 'results': resdict = self.export_results(for_json=True) d['results'] = resdict elif key in ['pars', 'parameters']: pardict = self.export_pars() d['parameters'] = pardict elif key == 'summary': d['summary'] = dict(sc.dcp(self.summary)) else: # pragma: no cover try: d[key] = sc.sanitizejson(getattr(self, key)) except Exception as E: errormsg = f'Could not convert "{key}" to JSON: {str(E)}; continuing...' print(errormsg) if filename is None: output = sc.jsonify(d, tostring=tostring, indent=indent, verbose=verbose, *args, **kwargs) else: output = sc.savejson(filename=filename, obj=d, indent=indent, *args, **kwargs) return output
[docs] def to_df(self, date_index=False): ''' Export results to a pandas dataframe Args: date_index (bool): if True, use the date as the index ''' resdict = self.export_results(for_json=False) df = pd.DataFrame.from_dict(resdict) df['date'] = self.datevec new_columns = ['t','date'] + df.columns[1:-1].tolist() # Get column order df = df.reindex(columns=new_columns) # Reorder so 't' and 'date' are first if date_index: df = df.set_index('date') return df
[docs] def to_pandas(self, key=None, to_file=False, filename='output.ftr'): ''' Return a part of the sim as a pandas dataframe with optional export Args: key (str): attribute to return as pandas dataframe (options are: contacts, infection_log, or states (from save_states analyzer)) to_file (bool): if True, export as a feather file filename (str): filename for write to file (should end in .ftr) Returns: A pd.dataframe, or writes the file to disk **Examples**:: sim = ps.create_sim(rand_seed=100, pop_size=10e3, n_days=30, pop_infected = 100, vx_coverage=0.05, trace_prob=1.0, test_delay=0, rel_symp_prob=1.0, analyzers=ps.save_states(full=True), ) sim.run() sim.to_pandas('contacts') sim.to_pandas('contacts', to_file=True, filename='df_contacts.ftr') sim.to_pandas('infection_log', to_file=True, filename='df_infection_log.ftr') sim.to_pandas('states', to_file=True, filename='df_states.ftr') ''' def concat_contacts_as_df(contacts, layers=None): if layers is None: layers = contacts.keys() output = dict() for layer in layers: output[layer] = pd.DataFrame(contacts[layer]) output = pd.concat(output) output = output.reset_index() output = output.rename(columns={'level_0': 'layer'}) output = output.drop('level_1', 1) return output # Convert to Pandas dataframe-compatible format if key == 'contacts': d = concat_contacts_as_df(contacts=self.people.contacts) elif key == 'infection_log': d = pd.DataFrame(self.people.infection_log) elif key == 'states': try: an = self.get_analyzer(label='save_states') except ValueError: print('save_states analyzer not found in sim') an_dict = an.to_dfdict() #to_dfdict(an) d = pd.concat(an_dict) d = d.stack() d = d.reset_index() d = d.pivot_table(values=0, index=['level_1', 'level_2'], columns='level_0').reset_index() d = d.rename(columns={'level_1': 'uid', 'level_2': 'day'}) else: # pragma: no cover errormsg = f'Error: "{key}" is not a valid key. Options are "contacts", "infection_log", or "states"' raise sc.KeyNotFoundError(errormsg) # Export if to_file: output = d.to_feather(filename) else: output = d return output
[docs] def to_excel(self, filename=None, skip_pars=None): ''' Export parameters and results as Excel format Args: filename (str): if None, return string; else, write to file skip_pars (list): if provided, a custom list parameters to exclude Returns: An sc.Spreadsheet with an Excel file, or writes the file to disk ''' if skip_pars is None: skip_pars = ['variant_map', 'vaccine_map'] # These include non-string keys so fail at sc.flattendict() # Export results result_df = self.to_df(date_index=True) # Export parameters pars = {k:v for k,v in self.pars.items() if k not in skip_pars} par_df = pd.DataFrame.from_dict(sc.flattendict(pars, sep='_'), orient='index', columns=['Value']) par_df.index.name = 'Parameter' # Convert to spreadsheet spreadsheet = sc.Spreadsheet() spreadsheet.freshbytes() with pd.ExcelWriter(spreadsheet.bytes, engine='xlsxwriter') as writer: result_df.to_excel(writer, sheet_name='Results') par_df.to_excel(writer, sheet_name='Parameters') spreadsheet.load() if filename is None: output = spreadsheet else: output = spreadsheet.save(filename) return output
[docs] def shrink(self, skip_attrs=None, in_place=True): ''' "Shrinks" the simulation by removing the people, and returns a copy of the "shrunken" simulation. Used to reduce the memory required for saved files. Args: skip_attrs (list): a list of attributes to skip in order to perform the shrinking; default "people" Returns: shrunken (Sim): a Sim object with the listed attributes removed ''' # By default, skip people (~90% of memory), the popdict (which is usually empty anyway), and _orig_pars (which is just a backup) if skip_attrs is None: skip_attrs = ['popdict', 'people', '_orig_pars'] # Create the new object, and copy original dict, skipping the skipped attributes if in_place: shrunken = self for attr in skip_attrs: setattr(self, attr, None) return else: shrunken = object.__new__(self.__class__) shrunken.__dict__ = {k:(v if k not in skip_attrs else None) for k,v in self.__dict__.items()} # Shrink interventions and analyzers, with a lot of checking along the way if hasattr(shrunken, 'pars'): # In case the user removes this for key in ['interventions', 'analyzers']: ia_list = self.pars[key] # List of interventions or analyzers self.pars[key] = [ia.shrink(in_place=in_place) for ia in ia_list] # Actually shrink, and re-store # Don't return if in place if in_place: return else: return shrunken
[docs] def save(self, filename=None, keep_people=None, skip_attrs=None, **kwargs): ''' Save to disk as a gzipped pickle. Args: filename (str or None): the name or path of the file to save to; if None, uses stored kwargs: passed to sc.makefilepath() Returns: filename (str): the validated absolute path to the saved file **Example**:: sim.save() # Saves to a .sim file with the date and time of creation by default ''' # Set keep_people based on whether or not we're in the middle of a run if keep_people is None: if self.initialized and not self.results_ready: keep_people = True else: keep_people = False # Handle the filename if filename is None: filename = self.simfile filename = sc.makefilepath(filename=filename, **kwargs) self.filename = filename # Store the actual saved filename # Handle the shrinkage and save if skip_attrs or not keep_people: obj = self.shrink(skip_attrs=skip_attrs, in_place=False) else: obj = self psu.save(filename=filename, obj=obj) return filename
[docs] @staticmethod def load(filename, *args, **kwargs): ''' Load from disk from a gzipped pickle. Args: filename (str): the name or path of the file to load from kwargs: passed to ps.load() Returns: sim (Sim): the loaded simulation object **Example**:: sim = ps.Sim.load('my-simulation.sim') ''' sim = psu.load(filename, *args, **kwargs) if not isinstance(sim, BaseSim): errormsg = f'Cannot load object of {type(sim)} as a Sim object' raise TypeError(errormsg) return sim
# From what used to be in model.py
[docs] def update_pars(self, pars=None, create=False, **kwargs): ''' Ensure that metaparameters get used properly before being updated ''' pars = sc.mergedicts(pars, kwargs) if pars: super().update_pars(pars=pars, create=create) # Call update_pars() for ParsObj return
[docs] def load_data(self, datafile=None, datacols=None, verbose=None, **kwargs): ''' Load the data to calibrate against, if provided ''' if verbose is None: verbose = self['verbose'] self.datafile = datafile # Store this if datafile is not None: # If a data file is provided, load it self.data = psu.load_data(datafile=datafile, columns=datacols, verbose=verbose, **kwargs) return
[docs] def layer_keys(self): ''' Attempt to retrieve the current layer keys, in the following order: from the people object (for an initialized sim), from the popdict (for one in the process of being initialized), from the beta_layer parameter (for an uninitialized sim), or by assuming a default (if none of the above are available). ''' try: keys = list( self['beta_layer'].keys()) # Get keys from beta_layer since the "most required" layer parameter except: keys = [] return keys
[docs] def validate_layer_pars(self): ''' Handle layer parameters, since they need to be validated after the population creation, rather than before. ''' # First, try to figure out what the layer keys should be and perform basic type checking layer_keys = self.layer_keys() layer_pars = pspar.layer_pars # The names of the parameters that are specified by layer for lp in layer_pars: val = self[lp] if sc.isnumber(val): # It's a scalar instead of a dict, assume it's all contacts self[lp] = {k: val for k in layer_keys} # Handle key mismaches for lp in layer_pars: lp_keys = set(self.pars[lp].keys()) if not lp_keys == set(layer_keys): errormsg = f'Layer parameters have inconsistent keys with the layer keys {layer_keys}:' for lp2 in layer_pars: # Fail on first error, but re-loop to list all of them errormsg += f'\n{lp2} = ' + ', '.join(self.pars[lp].keys()) raise sc.KeyNotFoundError(errormsg) # Handle mismatches with the population if self.people is not None: pop_keys = set(self.people.contacts.keys()) if pop_keys != set(layer_keys): errormsg = f'Please update your parameter keys {layer_keys} to match population keys {pop_keys}.' raise sc.KeyNotFoundError(errormsg) return
[docs] def validate_pars(self, validate_layers=True): ''' Some parameters can take multiple types; this makes them consistent. Args: validate_layers (bool): whether to validate layer parameters as well via validate_layer_pars() -- usually yes, except during initialization ''' # Handle types for key in ['pop_size', 'pop_infected', 'pop_size']: try: self[key] = int(self[key]) except Exception as E: errormsg = f'Could not convert {key}={self[key]} of {type(self[key])} to integer' raise ValueError(errormsg) from E # Handle start day start_day = self['start_day'] # Shorten if start_day in [None, 0]: # Use default start day start_day = '2020-03-01' self['start_day'] = psu.date(start_day) # Handle end day and n_days end_day = self['end_day'] n_days = self['n_days'] if end_day: self['end_day'] = psu.date(end_day) n_days = psu.daydiff(self['start_day'], self['end_day']) if n_days <= 0: errormsg = f"Number of days must be >0, but you supplied start={str(self['start_day'])} and end={str(self['end_day'])}, which gives n_days={n_days}" raise ValueError(errormsg) else: self['n_days'] = int(n_days) else: if n_days: self['n_days'] = int(n_days) self['end_day'] = self.date(n_days) # Convert from the number of days to the end day else: errormsg = f'You must supply one of n_days and end_day, not "{n_days}" and "{end_day}"' raise ValueError(errormsg) # Handle population data popdata_choices = ['random', 'hybrid', 'hybrid_mixed_community', 'clustered', 'synthpops'] choice = self['pop_type'] if choice and choice not in popdata_choices: choicestr = ', '.join(popdata_choices) errormsg = f'Population type "{choice}" not available; choices are: {choicestr}' raise ValueError(errormsg) # Handle interventions and analyzers self['interventions'] = sc.promotetolist(self['interventions'], keepnone=False) for i, interv in enumerate(self['interventions']): if isinstance(interv, dict): # It's a dictionary representation of an intervention self['interventions'][i] = psi.InterventionDict(**interv) self['analyzers'] = sc.promotetolist(self['analyzers'], keepnone=False) # Optionally handle layer parameters if validate_layers: self.validate_layer_pars() return
[docs] def load_population(self, popfile=None, **kwargs): ''' Load the population dictionary from file -- typically done automatically as part of sim.initialize(). Supports loading either saved population dictionaries (popdicts, file ending .pop by convention), or ready-to-go People objects (file ending .ppl by convention). Either object an also be supplied directly. Once a population file is loaded, it is removed from the Sim object. Args: popfile (str or obj): if a string, name of the file; otherwise, the popdict or People object to load kwargs (dict): passed to sc.makefilepath() ''' # Set the file path if not is provided if popfile is None and self.popfile is not None: popfile = self.popfile # Handle the population (if it exists) if popfile is not None: # Load from disk or use directly if isinstance(popfile, str): # It's a string, assume it's a filename filepath = sc.makefilepath(filename=popfile, **kwargs) obj = psu.load(filepath) if self['verbose']: print(f'Loading population from {filepath}') else: obj = popfile # Use it directly # Process the input if isinstance(obj, dict): self.popdict = obj n_actual = len(self.popdict['uid']) elif isinstance(obj, BasePeople): self.people = obj self.people.set_pars(self.pars) # Replace the saved parameters with this simulation's n_actual = len(self.people) else: errormsg = f'Cound not interpret input of {type(obj)} as a population file: must be a dict or People object' raise ValueError(errormsg) # Perform validation n_expected = self['pop_size'] if n_actual != n_expected: errormsg = f'Wrong number of people ({n_expected:n} requested, {n_actual:n} actual) -- please change "pop_size" to match or regenerate the file' raise ValueError(errormsg) self.popfile = None # Once loaded, remove to save memory return
[docs] def init_people(self, save_pop=False, load_pop=False, popfile=None, verbose=None, seed_infections=None, **kwargs): ''' Create the people. Args: save_pop (bool): if true, save the population dictionary to popfile load_pop (bool): if true, load the population dictionary from popfile popfile (str): filename to load/save the population verbose (int): detail to print kwargs (dict): passed to ps.make_people() ''' # Handle inputs if verbose is None: verbose = self['verbose'] if verbose: print(f'Initializing sim with {self["pop_size"]:0n} people for {self["n_days"]} days') if load_pop and self.popdict is None: self.load_population(popfile=popfile) # Actually make the people self.people = pspop.make_people(self, save_pop=save_pop, popfile=popfile, verbose=verbose, **kwargs) self.people.initialize(seed_infections=seed_infections) # Fully initialize the people return
[docs] def init_interventions(self): ''' Initialize and validate the interventions ''' # Initialization if self._orig_pars and 'interventions' in self._orig_pars: self['interventions'] = self._orig_pars.pop('interventions') # Restore for intervention in self['interventions']: if isinstance(intervention, psi.Intervention): intervention.initialize(self) # Validation trace_ind = np.nan # Index of the tracing intervention(s) test_ind = np.nan # Index of the tracing intervention(s) for i,intervention in enumerate(self['interventions']): if isinstance(intervention, (psi.contact_tracing)): trace_ind = np.fmin(trace_ind, i) # Find the earliest-scheduled tracing intervention elif isinstance(intervention, (psi.test_prob)): test_ind = np.fmax(test_ind, i) # Find the latest-scheduled testing intervention if not np.isnan(trace_ind): # pragma: no cover warningmsg = '' if np.isnan(test_ind): warningmsg = 'Note: you have defined a contact tracing intervention but no testing intervention was found. Unless this is intentional, please define at least one testing intervention.' elif trace_ind < test_ind: warningmsg = f'Note: contact tracing (index {trace_ind:.0f}) is scheduled before testing ({test_ind:.0f}); this creates a 1-day delay. Unless this is intentional, please reorder the interentions.' if self['verbose'] and warningmsg: print(warningmsg) return
[docs] def finalize_interventions(self): for intervention in self['interventions']: if isinstance(intervention, psi.Intervention): intervention.finalize(self)
[docs] def init_analyzers(self): ''' Initialize the analyzers ''' if self._orig_pars and 'analyzers' in self._orig_pars: self['analyzers'] = self._orig_pars.pop('analyzers') # Restore for analyzer in self['analyzers']: if isinstance(analyzer, psa.Analyzer): analyzer.initialize(self) return
[docs] def finalize_analyzers(self): for analyzer in self['analyzers']: if isinstance(analyzer, psa.Analyzer): analyzer.finalize(self)
[docs] def restore_pars(self, orig_pars): ''' Restore the original parameter values, except for the analyzers ''' analyzers = self['analyzers'] # Make a copy so these don't get wiped for key, val in orig_pars.items(): self.pars[key] = val # So pointers, e.g. in sim.people, get updated as well self['analyzers'] = analyzers # Restore the analyzers return
[docs] def compute_summary(self, verbose=None): ''' Compute the summary statistics to display at the end of a run ''' if verbose is None: verbose = self['verbose'] self.summary = sc.objdict() for key in self.result_keys(): self.summary[key] = self.results[key][-1] return self.summary
[docs] def summarize(self, output=False): ''' Print a brief summary of the simulation ''' if self.results_ready: summary_str = 'Simulation summary:\n' for key in self.result_keys(): if key.startswith('cum_'): summary_str += f' {self.summary[key]:5.0f} {self.results[key].name.lower()}\n' if not output: print(summary_str) else: return summary_str else: return self.brief(output=output) # If the simulation hasn't been run, default to the brief summary
[docs] def brief(self, output=False): ''' Return a one-line description of a sim ''' if self.results_ready: infections = self.summary['cum_infections'] results = f'{infections:n}⚙' else: results = 'not run' if self.label: label = f'"{self.label}"' else: label = '<no label>' start = psu.date(self['start_day'], as_date=False) if self['end_day']: end = psu.date(self['end_day'], as_date=False) else: end = psu.date(self['n_days'], start_date=start) pop_size = self['pop_size'] pop_type = self['pop_type'] string = f'Sim({label}; {start}{end}; pop: {pop_size:n} {pop_type}; epi: {results})' if not output: print(string) else: return string
[docs] def compute_fit(self, *args, **kwargs): ''' Compute the fit between the model and the data. See cv.Fit() for more information. Args: args (list): passed to cv.Fit() kwargs (dict): passed to cv.Fit() Returns: A Fit object **Example**:: sim = cv.Sim(datafile='data.csv') sim.run() fit = sim.compute_fit() fit.plot() ''' self.fit = psa.Fit(self, *args, **kwargs) return self.fit
[docs] def calibrate(self, calib_pars, **kwargs): ''' Automatically calibrate the simulation, returning a Calibration object (a type of analyzer). See the documentation on that class for more information. Args: calib_pars (dict): a dictionary of the parameters to calibrate of the format dict(key1=[best, low, high]) kwargs (dict): passed to cv.Calibration() Returns: A Calibration object **Example**:: sim = cv.Sim(datafile='data.csv') calib_pars = dict(beta=[0.015, 0.010, 0.020]) calib = sim.calibrate(calib_pars, n_trials=50) calib.plot() ''' calib = psa.Calibration(sim=self, calib_pars=calib_pars, **kwargs) calib.calibrate() return calib
[docs] def make_age_histogram(self, output=True, *args, **kwargs): ''' Calculate the age histograms of infections, diagnoses, etc. See ps.age_histogram() for more information. This can be used alternatively to supplying the age histogram as an analyzer to the sim. If used this way, it can only record the final time point since the states of each person are not saved during the sim. Args: output (bool): whether or not to return the age histogram; if not, store in sim.results args (list): passed to ps.age_histogram() kwargs (dict): passed to ps.age_histogram() **Example**:: sim = ps.Sim() sim.run() agehist = sim.make_age_histogram() agehist.plot() ''' agehist = psa.age_histogram(sim=self, *args, **kwargs) if output: return agehist else: self.results.agehist = agehist return
[docs] def make_transtree(self, output=True, *args, **kwargs): ''' Create a TransTree (transmission tree) object, for analyzing the pattern of transmissions in the simulation. See ps.TransTree() for more information. Args: output (bool): whether or not to return the TransTree; if not, store in sim.results args (list): passed to ps.TransTree() kwargs (dict): passed to ps.TransTree() **Example**:: sim = ps.Sim() sim.run() tt = sim.make_transtree() ''' tt = psa.TransTree(self, *args, **kwargs) if output: return tt else: self.results.transtree = tt return
[docs] def plot(self, *args, **kwargs): ''' Plot the results of a single simulation. ''' fig = psplt.plot_sim(sim=self, *args, **kwargs) return fig
[docs] def plot_result(self, key, *args, **kwargs): ''' Simple method to plot a single result. Useful for results that aren't standard outputs. See sim.plot() for explanation of other arguments. Args: key (str): the key of the result to plot **Examples**:: sim.plot_result('r_eff') ''' fig = psplt.plot_result(sim=self, key=key, *args, **kwargs) return fig
def _get_ia(self, which, label=None, partial=False, as_list=False, as_inds=False, die=True, first=False): ''' Helper method for get_interventions() and get_analyzers(); see get_interventions() docstring ''' # Handle inputs if which not in ['interventions', 'analyzers']: # pragma: no cover errormsg = f'This method is only defined for interventions and analyzers, not "{which}"' raise ValueError(errormsg) ia_list = self.pars[which] # List of interventions or analyzers n_ia = len(ia_list) # Number of interventions/analyzers if label == 'summary': # Print a summary of the interventions df = pd.DataFrame(columns=['ind', 'label', 'type']) for ind,ia_obj in enumerate(ia_list): df = df.append(dict(ind=ind, label=str(ia_obj.label), type=type(ia_obj)), ignore_index=True) print(f'Summary of {which}:') print(df) return else: # Standard usage case position = 0 if first else -1 # Choose either the first or last element if label is None: # Get all interventions if no label is supplied, e.g. sim.get_interventions() label = np.arange(n_ia) if isinstance(label, np.ndarray): # Allow arrays to be provided label = label.tolist() labels = sc.promotetolist(label) # Calculate the matches matches = [] match_inds = [] for label in labels: if sc.isnumber(label): matches.append(ia_list[label]) # This will raise an exception if an invalid index is given label = n_ia + label if label<0 else label # Convert to a positive number match_inds.append(label) elif sc.isstring(label) or isinstance(label, type): for ind,ia_obj in enumerate(ia_list): if sc.isstring(label) and ia_obj.label == label or (partial and (label in str(ia_obj.label))): matches.append(ia_obj) match_inds.append(ind) elif isinstance(label, type) and isinstance(ia_obj, label): matches.append(ia_obj) match_inds.append(ind) else: # pragma: no cover errormsg = f'Could not interpret label type "{type(label)}": should be str, int, list, or {which} class' raise TypeError(errormsg) # Parse the output options if as_inds: output = match_inds elif as_list: # Used by get_interventions() output = matches else: if len(matches) == 0: # pragma: no cover if die: errormsg = f'No {which} matching "{label}" were found' raise ValueError(errormsg) else: output = None else: output = matches[position] # Return either the first or last match (usually), used by get_intervention() return output
[docs] def get_interventions(self, label=None, partial=False, as_inds=False): ''' Find the matching intervention(s) by label, index, or type. If None, return all interventions. If the label provided is "summary", then print a summary of the interventions (index, label, type). Args: label (str, int, Intervention, list): the label, index, or type of intervention to get; if a list, iterate over one of those types partial (bool): if true, return partial matches (e.g. 'beta' will match all beta interventions) as_inds (bool): if true, return matching indices instead of the actual interventions **Examples**:: tp = ps.test_prob(symp_prob=0.1) cb1 = ps.change_beta(days=5, changes=0.3, label='NPI') cb2 = ps.change_beta(days=10, changes=0.3, label='Masks') sim = ps.Sim(interventions=[tp, cb1, cb2]) cb1, cb2 = sim.get_interventions(ps.change_beta) tp, cb2 = sim.get_interventions([0,2]) ind = sim.get_interventions(ps.change_beta, as_inds=True) # Returns [1,2] sim.get_interventions('summary') # Prints a summary ''' return self._get_ia('interventions', label=label, partial=partial, as_inds=as_inds, as_list=True)
[docs] def get_intervention(self, label=None, partial=False, first=False, die=True): ''' Like get_interventions(), find the matching intervention(s) by label, index, or type. If more than one intervention matches, return the last by default. If no label is provided, return the last intervention in the list. Args: label (str, int, Intervention, list): the label, index, or type of intervention to get; if a list, iterate over one of those types partial (bool): if true, return partial matches (e.g. 'beta' will match all beta interventions) first (bool): if true, return first matching intervention (otherwise, return last) die (bool): whether to raise an exception if no intervention is found **Examples**:: tp = ps.test_prob(symp_prob=0.1) cb = ps.change_beta(days=5, changes=0.3, label='NPI') sim = ps.Sim(interventions=[tp, cb]) cb = sim.get_intervention('NPI') cb = sim.get_intervention('NP', partial=True) cb = sim.get_intervention(ps.change_beta) cb = sim.get_intervention(1) cb = sim.get_intervention() tp = sim.get_intervention(first=True) ''' return self._get_ia('interventions', label=label, partial=partial, first=first, die=die, as_inds=False, as_list=False)
[docs] def get_analyzers(self, label=None, partial=False, as_inds=False): ''' Same as get_interventions(), but for analyzers. ''' return self._get_ia('analyzers', label=label, partial=partial, as_list=True, as_inds=as_inds)
[docs] def get_analyzer(self, label=None, partial=False, first=False, die=True): ''' Same as get_intervention(), but for analyzers. ''' return self._get_ia('analyzers', label=label, partial=partial, first=first, die=die, as_inds=False, as_list=False)
#%% Define people classes
[docs]class BasePeople(sc.prettyobj): ''' A class to handle all the boilerplate for people -- note that everything interesting happens in the People class. Args: pars (dict): a dictionary with, at minimum, keys 'pop_size' and 'n_days' ''' def __init__(self): ''' Initialize essential attributes used for filtering ''' obj_set(self, '_keys', []) # Since getattribute is overwritten obj_set(self, '_inds', None) obj_set(self, '_subinds', None) return def __getitem__(self, key): ''' Allow people['attr'] instead of getattr(people, 'attr') If the key is an integer, alias `people.person()` to return a `Person` instance ''' try: return self.__getattribute__(key) except: # pragma: no cover if isinstance(key, int): return self.person(key) else: errormsg = f'Key "{key}" is not a valid attribute of people' raise AttributeError(errormsg) def __setitem__(self, key, value): ''' Ditto ''' if self._lock and key not in self.__dict__: # pragma: no cover errormsg = f'Key "{key}" is not a current attribute of people, and the people object is locked; see people.unlock()' raise AttributeError(errormsg) self.__setattr__(key, value) return def _is_filtered(self, attr): ''' Determine if a given attribute is filtered (e.g. people.age is, people.inds isn't) ''' is_filtered = (self._inds is not None and attr in self._keys) return is_filtered def __getattribute__(self, attr): ''' For array quantities, handle filtering ''' output = obj_get(self, attr) if attr[0] == '_': # Short-circuit for built-in methods to save time return output else: try: # Unclear why this fails, but sometimes it does during initialization/pickling keys = obj_get(self, '_keys') except: keys = [] if attr not in keys: return output else: if self._is_filtered(attr): output = output[self.inds] # If the People object is filtered, only return those indices return output def __setattr__(self, attr, value): ''' Ditto ''' if attr[0] == '_': # Short-circuit for built-in methods to save time return obj_set(self, attr, value) if self._is_filtered(attr): array = obj_get(self, attr) array[self.inds] = value else: # If not initialized, rely on the default behavior obj_set(self, attr, value) return def __len__(self): try: inds = obj_get(self, '_inds') if inds is None: return len(obj_get(self, 'uid')) else: return len(inds) except Exception as E1: try: return self.pars['pop_size'] except Exception as E2: print(f'Warning: could not get length of People (could not get self.uid: {E1} or pop_size: {E2})') return 0 @property def len_inds(self): ''' Alias (almost) to len(self) ''' if self._inds is not None: return len(obj_get(self, '_inds')) else: return len(self) @property def len_people(self): ''' Full length of People array, ignoring filtering ''' try: return len(obj_get(self, 'uid')) except Exception as E1: try: return self.pars['pop_size'] except Exception as E2: print(f'Warning: could not get length of People (could not get self.uid: {E1} or pop_size: {E2})') return 0 @property def pop_size(self): ''' Alias to len_people ''' return self.len_people @property def inds(self): ''' Alias to self._inds to prevent accidental overwrite & increase speed by allowing "_" shortcircuit ''' if self._inds is not None: return self._inds else: return np.arange(len(self)) @property def subinds(self): ''' Alias to self._subinds ''' return self._subinds def __iter__(self): ''' Iterate over people ''' for i in range(len(self)): yield self[i] def __add__(self, people2): ''' Combine two people arrays ''' newpeople = sc.dcp(self) for key in self.keys(): newpeople.set(key, np.concatenate([newpeople[key], people2[key]]), die=False) # Allow size mismatch # Validate newpeople.pop_size += people2.pop_size newpeople.validate() # Reassign UIDs so they're unique newpeople.set('uid', np.arange(len(newpeople))) return newpeople
[docs] def set(self, key, value, die=True): ''' Ensure sizes and dtypes match ''' current = self[key] value = np.array(value, dtype=self._dtypes[key]) # Ensure it's the right type if die and len(value) != len(current): errormsg = f'Length of new array does not match current ({len(value)} vs. {len(current)})' raise IndexError(errormsg) self[key] = value return
[docs] def get(self, key): ''' Convenience method -- key can be string or list of strings ''' if isinstance(key, str): return self[key] elif isinstance(key, list): arr = np.zeros((len(self), len(key))) for k,ky in enumerate(key): arr[:,k] = self[ky] return arr
[docs] def filter(self, criteria=None, inds=None, reset=False): ''' Store indices to allow for easy filtering of the People object. Args: criteria (array): a boolean array for the filtering critria inds (array): alternatively, explicitly filter by these indices reset (bool): reset the indices rather than use existing ones Returns: A filtered People object, which works just like a normal People object except only operates on a subset of indices. ''' # Create a new People object with the same properties as the original filtered = object.__new__(self.__class__) # Create a new People instance BasePeople.__init__(filtered) # Perform essential initialization filtered.__dict__ = {k:v for k,v in self.__dict__.items()} # Copy pointers to the arrays in People # Perform the filtering if reset: filtered._inds = None filtered._subinds = None if criteria is None: # No filtering: reset if inds is not None: # Unless indices are supplied directly, in which case use them new_inds = inds else: new_inds = None # Nothing supplied, nothing to do else: # Main use case: perform filtering if len(criteria) == len(self): # Main use case: a new filter applied on an already filtered object, e.g. filtered.filter(filtered.age > 5) new_inds = criteria.nonzero()[0] # Criteria is already filtered, just get the indices elif len(criteria) == self.len_people: # Alternative: a filter on the underlying People object is applied to the filtered object, e.g. filtered.filter(people.age > 5) new_inds = criteria[filtered.inds].nonzero()[0] # Apply filtering before getting the new indices else: errormsg = f'"criteria" must be boolean array matching either current filter length ({self.len_inds}) or else the total number of people ({self.len_people}), not {len(criteria)}' raise ValueError(errormsg) if new_inds is not None: if filtered._inds is None: # Not yet filtered: use the indices directly filtered._inds = new_inds else: # Already filtered: map them back onto the original People indices filtered._inds = filtered._inds[new_inds] filtered._subinds = new_inds # if list(filtered._inds) != list(sorted(filtered._inds)): # import traceback; traceback.print_exc(); import pdb; pdb.set_trace() return filtered
[docs] def unfilter(self): ''' An easy way of unfiltering the People object, returning the original. ''' unfiltered = self.filter(reset=True) return unfiltered
[docs] def binomial(self, prob, as_inds=False, as_filter=True): ''' Return indices either by a single probability or by an array of probabilities. By default just return the boolean array, but can also return the indices, or the filtered People object. Args: prob (float/array): either a scalar probability, or an array of probabilities of the same length as People as_inds (bool): return as list of indices instead of a boolean array as_filter (bool): return as filter instead than boolean array ''' if sc.isnumber(prob): arr = psu.n_binomial(prob, len(self)) elif sc.isarray(prob): arr = psu.binomial_arr(prob) else: errormsg = f'Could not recognize {type(prob)} as a scalar or array' raise TypeError(errormsg) if as_inds: output = np.nonzero(arr)[0] elif as_filter: output = self.filter(arr) else: output = arr return output
[docs] def filter_tf(self, arr): ''' Split the People object into True and False sets ''' bool_arr = np.array(arr, dtype=bool) filtered_true = self.filter(bool_arr) filtered_false = self.filter(~bool_arr) return filtered_true, filtered_false
[docs] def true(self, key): ''' Return indices matching the condition ''' return self[key].nonzero()[0]
[docs] def false(self, key): ''' Return indices not matching the condition ''' return (~self[key]).nonzero()[0]
[docs] def defined(self, key): ''' Return indices of people who are not-nan ''' return (~np.isnan(self[key])).nonzero()[0]
[docs] def not_defined(self, key): ''' Return indices of people who are nan ''' return np.isnan(self[key]).nonzero()[0]
[docs] def count(self, key): ''' Count the number of people for a given key ''' return np.count_nonzero(self[key])
[docs] def count_not(self, key): ''' Count the number of people who do not have a property for a given key ''' return len(self[key]) - self.count(key)
[docs] def set_pars(self, pars=None): ''' Re-link the parameters stored in the people object to the sim containing it, and perform some basic validation. ''' if pars is None: pars = {} elif sc.isnumber(pars): # Interpret as a population size pars = {'pop_size':pars} # Ensure it's a dictionary orig_pars = self.__dict__.get('pars') # Get the current parameters using dict's get method pars = sc.mergedicts(orig_pars, pars) if 'pop_size' not in pars: errormsg = f'The parameter "pop_size" must be included in a population; keys supplied were:\n{sc.newlinejoin(pars.keys())}' raise sc.KeyNotFoundError(errormsg) pars['pop_size'] = int(pars['pop_size']) pars.setdefault('n_variants', 1) pars.setdefault('location', None) self.pars = pars # Actually store the pars return
[docs] def keys(self): ''' Returns keys for all properties of the people object ''' return self.meta.all_states[:]
[docs] def person_keys(self): ''' Returns keys specific to a person (e.g., their age) ''' return self.meta.person[:]
[docs] def state_keys(self): ''' Returns keys for different states of a person (e.g., symptomatic) ''' return self.meta.states[:]
[docs] def date_keys(self): ''' Returns keys for different event dates (e.g., date a person became symptomatic) ''' return self.meta.dates[:]
[docs] def dur_keys(self): ''' Returns keys for different durations (e.g., the duration from exposed to infectious) ''' return self.meta.durs[:]
[docs] def layer_keys(self): ''' Get the available contact keys -- try contacts first, then beta_layer ''' try: keys = list(self.contacts.keys()) except: # If not fully initialized try: keys = list(self.pars['beta_layer'].keys()) except: # If not even partially initialized keys = [] return keys
[docs] def indices(self): ''' The indices of each people array ''' return np.arange(len(self))
[docs] def validate(self, die=True, verbose=False): # Check that the keys match contact_layer_keys = set(self.contacts.keys()) layer_keys = set(self.layer_keys()) if contact_layer_keys != layer_keys: errormsg = f'Parameters layers {layer_keys} are not consistent with contact layers {contact_layer_keys}' raise ValueError(errormsg) # Check that the length of each array is consistent expected_len = len(self) for key in self.keys(): actual_len = len(self[key]) if actual_len != expected_len: if die: errormsg = f'Length of key "{key}" did not match population size ({actual_len} vs. {expected_len})' raise IndexError(errormsg) else: if verbose: print(f'Resizing "{key}" from {actual_len} to {expected_len}') self._resize_arrays(keys=key) return
def _resize_arrays(self, pop_size=None, keys=None): ''' Resize arrays if any mismatches are found ''' if pop_size is None: pop_size = len(self) if keys is None: keys = self.keys() keys = sc.promotetolist(keys) for key in keys: self[key].resize(pop_size, refcheck=False) return
[docs] def to_df(self): ''' Convert to a Pandas dataframe ''' df = pd.DataFrame.from_dict({key:self[key] for key in self.keys()}) return df
[docs] def to_arr(self): ''' Return as numpy array ''' arr = np.empty((len(self), len(self.keys())), dtype=psu.default_float) for k,key in enumerate(self.keys()): if key == 'uid': arr[:,k] = np.arange(len(self)) else: arr[:,k] = self[key] return arr
[docs] def person(self, ind): ''' Method to create person from the people ''' p = Person() for key in self.meta.all_states: data = self[key] if data.ndim == 1: val = data[ind] elif data.ndim == 2: val = data[:,ind] else: errormsg = f'Cannot extract data from {key}: unexpected dimensionality ({data.ndim})' raise ValueError(errormsg) setattr(p, key, val) contacts = {} for lkey, layer in self.contacts.items(): contacts[lkey] = layer.find_contacts(ind) p.contacts = contacts return p
[docs] def to_people(self): ''' Return all people as a list ''' return list(self)
[docs] def from_people(self, people, resize=True): ''' Convert a list of people back into a People object ''' # Handle population size pop_size = len(people) if resize: self._resize_arrays(pop_size=pop_size) # Iterate over people -- slow! for p,person in enumerate(people): for key in self.keys(): self[key][p] = getattr(person, key) return
[docs] def to_graph(self): # pragma: no cover ''' Convert all people to a networkx MultiDiGraph, including all properties of the people (nodes) and contacts (edges). **Example**:: import poliosim as ps import networkx as nx sim = ps.Sim(pop_size=50, pop_type='hybrid', contacts=dict(h=3, s=10, w=10, c=5)).run() G = sim.people.to_graph() nodes = G.nodes(data=True) edges = G.edges(keys=True) node_colors = [n['age'] for i,n in nodes] layer_map = dict(h='#37b', s='#e11', w='#4a4', c='#a49') edge_colors = [layer_map[G[i][j][k]['layer']] for i,j,k in edges] edge_weights = [G[i][j][k]['beta']*5 for i,j,k in edges] nx.draw(G, node_color=node_colors, edge_color=edge_colors, width=edge_weights, alpha=0.5) ''' import networkx as nx # Copy data from people into graph G = self.contacts.to_graph() for key in self.keys(): data = {k:v for k,v in enumerate(self[key])} nx.set_node_attributes(G, data, name=key) # Include global layer weights for u,v,k in G.edges(keys=True): edge = G[u][v][k] edge['beta'] *= self.pars['beta_layer'][edge['layer']] return G
[docs] def init_contacts(self, reset=False): ''' Initialize the contacts dataframe with the correct columns and data types ''' # Create the contacts dictionary contacts = Contacts(layer_keys=self.layer_keys()) if self.contacts is None or reset: # Reset all self.contacts = contacts else: # Only replace specified keys for key,layer in contacts.items(): self.contacts[key] = layer return
[docs] def add_contacts(self, contacts, lkey=None, beta=None): ''' Add new contacts to the array ''' # If no layer key is supplied and it can't be worked out from defaults, use the first layer if lkey is None: lkey = self.layer_keys()[0] # Validate the supplied contacts if isinstance(contacts, Contacts): new_contacts = contacts elif isinstance(contacts, Layer): new_contacts = {} new_contacts[lkey] = contacts elif sc.checktype(contacts, 'array'): new_contacts = {} new_contacts[lkey] = pd.DataFrame(data=contacts) elif isinstance(contacts, dict): new_contacts = {} new_contacts[lkey] = pd.DataFrame.from_dict(contacts) elif isinstance(contacts, list): # Assume it's a list of contacts by person, not an edgelist new_contacts = self.make_edgelist(contacts) # Assume contains key info else: errormsg = f'Cannot understand contacts of type {type(contacts)}; expecting dataframe, array, or dict' raise TypeError(errormsg) # Ensure the columns are right and add values if supplied for lkey, new_layer in new_contacts.items(): n = len(new_layer['p1']) if 'beta' not in new_layer.keys() or len(new_layer['beta']) != n: if beta is None: beta = 1.0 beta = psu.default_float(beta) new_layer['beta'] = np.ones(n, dtype=psu.default_float)*beta # Create the layer if it doesn't yet exist if lkey not in self.contacts: self.contacts[lkey] = Layer(label=lkey) # Actually include them, and update properties if supplied for col in self.contacts[lkey].keys(): # Loop over the supplied columns self.contacts[lkey][col] = np.concatenate([self.contacts[lkey][col], new_layer[col]]) self.contacts[lkey].validate() return
[docs] def update_contacts(self): ''' Refresh dynamic contacts, e.g. community ''' # Figure out if anything needs to be done -- e.g. {'h':False, 'c':True} dynam_keys = [lkey for lkey,is_dynam in self.pars['dynam_layer'].items() if is_dynam] # Loop over dynamic keys for lkey in dynam_keys: # Remove existing contacts self.contacts.pop(lkey) # Choose how many contacts to make pop_size = len(self) n_contacts = self.pars['contacts'][lkey] n_new = int(n_contacts*pop_size/2) # Since these get looped over in both directions later # Create the contacts new_contacts = {} # Initialize new_contacts['p1'] = np.array(psu.choose_r(max_n=pop_size, n=n_new), dtype=psu.default_int) # Choose with replacement new_contacts['p2'] = np.array(psu.choose_r(max_n=pop_size, n=n_new), dtype=psu.default_int) new_contacts['beta'] = np.ones(n_new, dtype=psu.default_float) # Add to contacts self.add_contacts(new_contacts, lkey=lkey) self.contacts[lkey].validate() return self.contacts
[docs] def make_edgelist(self, contacts): ''' Parse a list of people with a list of contacts per person and turn it into an edge list. ''' # Handle layer keys lkeys = self.layer_keys() if len(contacts): contact_keys = contacts[0].keys() # Pull out the keys of this contact list lkeys += [key for key in contact_keys if key not in lkeys] # Extend the layer keys # Initialize the new contacts new_contacts = Contacts(layer_keys=lkeys) for lkey in lkeys: new_contacts[lkey]['p1'] = [] # Person 1 of the contact pair new_contacts[lkey]['p2'] = [] # Person 2 of the contact pair # Populate the new contacts for p,cdict in enumerate(contacts): for lkey,p_contacts in cdict.items(): n = len(p_contacts) # Number of contacts new_contacts[lkey]['p1'].extend([p]*n) # e.g. [4, 4, 4, 4] new_contacts[lkey]['p2'].extend(p_contacts) # e.g. [243, 4538, 7,19] # Turn into a dataframe for lkey in lkeys: new_layer = Layer(label=lkey) for ckey,value in new_contacts[lkey].items(): new_layer[ckey] = np.array(value, dtype=new_layer.meta[ckey]) new_contacts[lkey] = new_layer return new_contacts
[docs] @staticmethod def remove_duplicates(df): ''' Sort the dataframe and remove duplicates -- note, not extensively tested ''' p1 = df[['p1', 'p2']].values.min(1) # Reassign p1 to be the lower-valued of the two contacts p2 = df[['p1', 'p2']].values.max(1) # Reassign p2 to be the higher-valued of the two contacts df['p1'] = p1 df['p2'] = p2 df.sort_values(['p1', 'p2'], inplace=True) # Sort by p1, then by p2 df.drop_duplicates(['p1', 'p2'], inplace=True) # Remove duplicates df = df[df['p1'] != df['p2']] # Remove self connections df.reset_index(inplace=True, drop=True) return df
[docs]class Person(sc.prettyobj): ''' Class for a single person. Note: this is largely deprecated since sim.people is now based on arrays rather than being a list of people. ''' def __init__(self, pars=None, uid=None, age=-1, sex=-1, contacts=None): self.uid = uid # This person's unique identifier self.age = psu.default_float(age) # Age of the person (in years) self.sex = psu.default_int(sex) # Female (0) or male (1) self.contacts = contacts # Contacts # self.infected = [] #: Record the UIDs of all people this person infected # self.infected_by = None #: Store the UID of the person who caused the infection. If None but person is infected, then it was an externally seeded infection return
[docs]class FlexDict(dict): ''' A dict that allows more flexible element access: in addition to obj['a'], also allow obj[0]. Lightweight implementation of the Sciris odict class. ''' def __getitem__(self, key): ''' Lightweight odict -- allow indexing by number, with low performance ''' try: return super().__getitem__(key) except KeyError as KE: try: # Assume it's an integer dictkey = self.keys()[key] return self[dictkey] except: raise sc.KeyNotFoundError(KE) # Raise the original error
[docs] def keys(self): return list(super().keys())
[docs] def values(self): return list(super().values())
[docs] def items(self): return list(super().items())
[docs]class Contacts(FlexDict): ''' A simple (for now) class for storing different contact layers. ''' def __init__(self, layer_keys=None): if layer_keys is not None: for lkey in layer_keys: self[lkey] = Layer(label=lkey) return def __repr__(self): ''' Use slightly customized repr''' keys_str = ', '.join([str(k) for k in self.keys()]) output = f'Contacts({keys_str})\n' for key in self.keys(): output += f'\n"{key}": ' output += self[key].__repr__() + '\n' return output def __len__(self): ''' The length of the contacts is the length of all the layers ''' output = 0 for key in self.keys(): try: output += len(self[key]) except: pass return output
[docs] def add_layer(self, **kwargs): ''' Small method to add one or more layers to the contacts. Layers should be provided as keyword arguments. **Example**:: hospitals_layer = cv.Layer(label='hosp') sim.people.contacts.add_layer(hospitals=hospitals_layer) ''' for lkey,layer in kwargs.items(): layer.validate() self[lkey] = layer return
[docs] def pop_layer(self, *args): ''' Remove the layer(s) from the contacts. **Example**:: sim.people.contacts.pop_layer('hospitals') Note: while included here for convenience, this operation is equivalent to simply popping the key from the contacts dictionary. ''' for lkey in args: self.pop(lkey) return
[docs] def to_graph(self): # pragma: no cover ''' Convert all layers to a networkx MultiDiGraph **Example**:: import networkx as nx sim = cv.Sim(pop_size=50, pop_type='hybrid').run() G = sim.people.contacts.to_graph() nx.draw(G) ''' import networkx as nx H = nx.MultiDiGraph() for lkey,layer in self.items(): G = layer.to_graph() H = nx.compose(H, nx.MultiDiGraph(G)) return H
[docs]class Layer(FlexDict): ''' A small class holding a single layer of contact edges (connections) between people. The input is typically three arrays: person 1 of the connection, person 2 of the connection, and the weight of the connection. Connections are undirected; each person is both a source and sink. This class is usually not invoked directly by the user, but instead is called as part of the population creation. Args: p1 (array): an array of N connections, representing people on one side of the connection p2 (array): an array of people on the other side of the connection beta (array): an array of weights for each connection label (str): the name of the layer (optional) kwargs (dict): other keys copied directly into the layer Note that all arguments (except for label) must be arrays of the same length, although not all have to be supplied at the time of creation (they must all be the same at the time of initialization, though, or else validation will fail). **Examples**:: # Generate an average of 10 contacts for 1000 people n = 10_000 n_people = 1000 p1 = np.random.randint(n_people, size=n) p2 = np.random.randint(n_people, size=n) beta = np.ones(n) layer = cv.Layer(p1=p1, p2=p2, beta=beta, label='rand') # Convert one layer to another with extra columns index = np.arange(n) self_conn = p1 == p2 layer2 = cv.Layer(**layer, index=index, self_conn=self_conn, label=layer.label) ''' def __init__(self, label=None, **kwargs): self.meta = { 'p1': psu.default_int, # Person 1 'p2': psu.default_int, # Person 2 'beta': psu.default_float, # Default transmissibility for this contact type } self.basekey = 'p1' # Assign a base key for calculating lengths and performing other operations self.label = label # Initialize the keys of the layers for key,dtype in self.meta.items(): self[key] = np.empty((0,), dtype=dtype) # Set data, if provided for key,value in kwargs.items(): self[key] = np.array(value, dtype=self.meta.get(key)) return def __len__(self): try: return len(self[self.basekey]) except: # pragma: no cover return 0 def __repr__(self): ''' Convert to a dataframe for printing ''' namestr = self.__class__.__name__ labelstr = f'"{self.label}"' if self.label else '<no label>' keys_str = ', '.join(self.keys()) output = f'{namestr}({labelstr}, {keys_str})\n' # e.g. Layer("h", p1, p2, beta) output += self.to_df().__repr__() return output def __contains__(self, item): """ Check if a person is present in a layer Args: item: Person index Returns: True if person index appears in any interactions """ return (item in self['p1']) or (item in self['p2']) @property def members(self): """ Return sorted array of all members """ return np.unique([self['p1'], self['p2']])
[docs] def meta_keys(self): ''' Return the keys for the layer's meta information -- i.e., p1, p2, beta ''' return self.meta.keys()
[docs] def validate(self): ''' Check the integrity of the layer: right types, right lengths ''' n = len(self[self.basekey]) for key,dtype in self.meta.items(): if dtype: actual = self[key].dtype expected = dtype if actual != expected: errormsg = f'Expecting dtype "{expected}" for layer key "{key}"; got "{actual}"' raise TypeError(errormsg) actual_n = len(self[key]) if n != actual_n: errormsg = f'Expecting length {n} for layer key "{key}"; got {actual_n}' raise TypeError(errormsg) return
[docs] def pop_inds(self, inds): ''' "Pop" the specified indices from the edgelist and return them as a dict. Returns in the right format to be used with layer.append(). Args: inds (int, array, slice): the indices to be removed ''' output = {} for key in self.meta_keys(): output[key] = self[key][inds] # Copy to the output object self[key] = np.delete(self[key], inds) # Remove from the original return output
[docs] def append(self, contacts): ''' Append contacts to the current layer. Args: contacts (dict): a dictionary of arrays with keys p1,p2,beta, as returned from layer.pop_inds() ''' for key in self.keys(): new_arr = contacts[key] n_curr = len(self[key]) # Current number of contacts n_new = len(new_arr) # New contacts to add n_total = n_curr + n_new # New size self[key] = np.resize(self[key], n_total) # Resize to make room, preserving dtype self[key][n_curr:] = new_arr # Copy contacts into the layer return
[docs] def to_df(self): ''' Convert to dataframe ''' df = pd.DataFrame.from_dict(self) return df
[docs] def from_df(self, df, keys=None): ''' Convert from a dataframe ''' if keys is None: keys = self.meta_keys() for key in keys: self[key] = df[key].to_numpy() return self
[docs] def to_graph(self): # pragma: no cover ''' Convert to a networkx DiGraph **Example**:: import networkx as nx sim = ps.Sim(pop_size=20, pop_type='hybrid').run() G = sim.people.contacts['h'].to_graph() nx.draw(G) ''' import networkx as nx data = [np.array(self[k], dtype=dtype).tolist() for k,dtype in [('p1', int), ('p2', int), ('beta', float)]] G = nx.DiGraph() G.add_weighted_edges_from(zip(*data), weight='beta') nx.set_edge_attributes(G, self.label, name='layer') return G
[docs] def find_contacts(self, inds, as_array=True): """ Find all contacts of the specified people For some purposes (e.g. contact tracing) it's necessary to find all of the contacts associated with a subset of the people in this layer. Since contacts are bidirectional it's necessary to check both P1 and P2 for the target indices. The return type is a Set so that there is no duplication of indices (otherwise if the Layer has explicit symmetric interactions, they could appear multiple times). This is also for performance so that the calling code doesn't need to perform its own unique() operation. Note that this cannot be used for cases where multiple connections count differently than a single infection, e.g. exposure risk. Args: inds (array): indices of people whose contacts to return as_array (bool): if true, return as sorted array (otherwise, return as unsorted set) Returns: contact_inds (array): a set of indices for pairing partners Example: If there were a layer with - P1 = [1,2,3,4] - P2 = [2,3,1,4] Then find_contacts([1,3]) would return {1,2,3} """ # Check types if not isinstance(inds, np.ndarray): inds = sc.promotetoarray(inds) if inds.dtype != np.int64: # pragma: no cover # This is int64 since indices often come from cv.true(), which returns int64 inds = np.array(inds, dtype=np.int64) # Find the contacts contact_inds = psu.find_contacts(self['p1'], self['p2'], inds) if as_array: contact_inds = np.fromiter(contact_inds, dtype=psu.default_int) contact_inds.sort() # Sorting ensures that the results are reproducible for a given seed as well as being identical to previous versions of Poliosim return contact_inds
[docs] def update(self, people, frac=1.0): ''' Regenerate contacts on each timestep. This method gets called if the layer appears in ``sim.pars['dynam_layer']``. The Layer implements the update procedure so that derived classes can customize the update e.g. implementing over-dispersion/other distributions, random clusters, etc. Typically, this method also takes in the ``people`` object so that the update can depend on person attributes that may change over time (e.g. changing contacts for people that are severe/critical). Args: people (People): the Poliosim People object, which is usually used to make new contacts frac (float): the fraction of contacts to update on each timestep ''' # Choose how many contacts to make pop_size = len(people) # Total number of people n_contacts = len(self) # Total number of contacts n_new = int(np.round(n_contacts*frac)) # Since these get looped over in both directions later inds = psu.choose(n_contacts, n_new) # Create the contacts, not skipping self-connections self['p1'][inds] = np.array(psu.choose_r(max_n=pop_size, n=n_new), dtype=psu.default_int) # Choose with replacement self['p2'][inds] = np.array(psu.choose_r(max_n=pop_size, n=n_new), dtype=psu.default_int) self['beta'][inds] = np.ones(n_new, dtype=psu.default_float) return
class AlreadyRunError(RuntimeError): ''' This error is raised if a simulation is run in such a way that no timesteps will be taken. This error is a distinct type so that it can be safely caught and ignored if required, but it is anticipated that most of the time, calling sim.run() and not taking any timesteps, would be an inadvertent error. ''' pass