Source code for hpvsim.sim

'''
Define core Sim classes
'''

# Imports
import numpy as np
import pandas as pd
import sciris as sc
from . import base as hpb
from . import misc as hpm
from . import defaults as hpd
from . import utils as hpu
from . import population as hppop
from . import parameters as hppar
from . import hiv as hphiv
from . import analysis as hpa
from . import plotting as hpplt
from . import immunity as hpimm
from . import interventions as hpi
from .settings import options as hpo


# Define the model
[docs] class Sim(hpb.BaseSim): def __init__(self, pars=None, datafile=None, label=None, popfile=None, popdict=None, people=None, version=None, hiv_datafile=None, art_datafile=None, **kwargs): # Set attributes self.label = label # The label/name of the simulation self.created = None # The datetime the sim was created self.datafile = datafile # The name of the data file self.art_datafile = art_datafile # The name of the ART data file self.hiv_datafile = hiv_datafile # The name of the HIV data file self.popfile = popfile # The population file self.data = None # The data self.popdict = popdict # The population dictionary self.people = people # People object self.t = None # The current time in the simulation (during execution); outside of sim.step(), its value corresponds to next timestep to be computed self.results = {} # For storing results self.summary = None # For storing a summary of the results self.initialized = False # Whether or not initialization is complete self.complete = False # Whether a simulation has completed running self.results_ready = False # Whether or not results are ready self._default_ver = version # Default version of parameters used self._orig_pars = None # Store original parameters to optionally restore at the end of the simulation # Make default parameters (using values from parameters.py) default_pars = hppar.make_pars(version=version) # Start with default pars default_location = sc.dcp(default_pars['location']) # Pull out the default location here default_pars['location'] = None # Don't load the default location here super().__init__(default_pars) # Initialize and set the parameters as attributes # Load data, including datafile that are used to create additional optional parameters self.load_data(datafile) # Load the data, if provided # Update parameters, including demographic data if pars is None: pars = dict(location=default_location) else: if not pars.get('location') or pars['location'] is None: pars['location'] = default_location self.update_pars(pars, **kwargs) # Update the parameters return
[docs] def load_data(self, datafile=None, **kwargs): ''' Load the data to calibrate against, if provided ''' if datafile is not None: # If a data file is provided, load it self.data = hpm.load_data(datafile=datafile, check_date=True, **kwargs) return
[docs] def initialize(self, reset=False, init_states=True, init_analyzers=True, **kwargs): ''' Perform all initializations on the sim. ''' self.t = 0 # The current time index self.validate_pars() # Ensure parameters have valid values self.validate_dt() self.init_time_vecs() # Initialise time vectors hpu.set_seed(self['rand_seed']) # Reset the random seed before the population is created self.init_genotypes() # Initialize the genotypes self.init_results() # After initializing the genotypes and people, create the results structure self.init_interventions() # Initialize the interventions BEFORE the people, because then vaccination interventions get counted in immunity structures self.init_immunity() # Includes immunity matrices and cumulative dysplasia arrays self.init_people(reset=reset, init_states=init_states, **kwargs) # Create all the people (the heaviest step) if init_analyzers: self.init_analyzers() # ...and the analyzers... hpu.set_seed(self['rand_seed']+1) # Reset the random seed to the default run seed, so that if the simulation is run with reset_seed=False right after initialization, it will still produce the same output self.initialized = True self.complete = False self.results_ready = False return self
[docs] def layer_keys(self): ''' Attempt to retrieve the current layer keys. ''' try: keys = list(self['acts'].keys()) # Get keys from acts except: # pragma: no cover keys = [] return keys
[docs] def reset_layer_pars(self, layer_keys=None, force=False): ''' Reset the parameters to match the population. Args: layer_keys (list): override the default layer keys (use stored keys by default) force (bool): reset the parameters even if they already exist ''' if layer_keys is None: if self.people is not None: # If people exist layer_keys = self.people.contacts.keys() elif self.popdict is not None: layer_keys = self.popdict['layer_keys'] hppar.reset_layer_pars(self.pars, layer_keys=layer_keys, force=force) return
[docs] def validate_layer_pars(self): ''' Handle layer parameters, since they need to be validated after the population creation, rather than before. ''' # First, try to figure out what the layer keys should be and perform basic type checking layer_keys = self.layer_keys() layer_pars = hppar.layer_pars # The names of the parameters that are specified by layer for lp in layer_pars: val = self[lp] if sc.isnumber(val): # It's a scalar instead of a dict, assume it's all contacts self[lp] = {k:val for k in layer_keys} # Handle key mismatches for lp in layer_pars: lp_keys = set(self.pars[lp].keys()) if lp != 'layer_probs': if not lp_keys == set(layer_keys): errormsg = 'At least one layer parameter is inconsistent with the layer keys; all parameters must have the same keys:' errormsg += f'\nsim.layer_keys() = {layer_keys}' for lp2 in layer_pars: # Fail on first error, but re-loop to list all of them errormsg += f'\n{lp2} = ' + ', '.join(self.pars[lp2].keys()) raise sc.KeyNotFoundError(errormsg) # TODO: add validation here for layer_probs # Handle mismatches with the population if self.people is not None: pop_keys = set(self.people.contacts.keys()) if pop_keys != set(layer_keys): # pragma: no cover if not len(pop_keys): errormsg = f'Your population does not have any layer keys, but your simulation does {layer_keys}. If you called hpv.People() directly, you probably need hpv.make_people() instead.' raise sc.KeyNotFoundError(errormsg) else: errormsg = f'Please update your parameter keys {layer_keys} to match population keys {pop_keys}. You may find sim.reset_layer_pars() helpful.' raise sc.KeyNotFoundError(errormsg) return
[docs] def validate_dt(self): ''' Check that 1/dt is an integer value, otherwise results and time vectors will have mismatching shapes. init_results explicitly makes this assumption by casting resfrequency = int(1/dt). ''' dt = self['dt'] reciprocal = 1.0 / dt # Compute the reciprocal of dt if not reciprocal.is_integer(): # Check if reciprocal is not a whole (integer) number # Round the reciprocal reciprocal = int(reciprocal) rounded_dt = 1.0 / reciprocal self['dt'] = rounded_dt if self['verbose']: warnmsg = f"Warning: Provided time step dt: {dt} resulted in a non-integer number of steps/year. Rounded to {rounded_dt}." print(warnmsg)
[docs] def validate_pars(self, validate_layers=True): ''' Some parameters can take multiple types; this makes them consistent. Args: validate_layers (bool): whether to validate layer parameters as well via validate_layer_pars() -- usually yes, except during initialization ''' # Handle types for key in ['n_agents']: try: self[key] = int(self[key]) except Exception as E: errormsg = f'Could not convert {key}={self[key]} of {type(self[key])} to integer' raise ValueError(errormsg) from E # Handle start if self['start'] in [None, 0]: # Use default start self['start'] = 2015 # Handle end and n_years if self['end']: self['n_years'] = int(self['end'] - self['start']) if self['n_years'] <= 0: errormsg = f"Number of years must be >0, but you supplied start={str(self['start'])} and end={str(self['end'])}, which gives n_years={self['n_years']}" raise ValueError(errormsg) else: if self['n_years']: self['end'] = self['start'] + self['n_years'] else: errormsg = 'You must supply one of n_years and end."' raise ValueError(errormsg) # Handle population network data network_choices = ['random', 'default'] choice = self['network'] if choice and choice not in network_choices: # pragma: no cover choicestr = ', '.join(network_choices) errormsg = f'Population type "{choice}" not available; choices are: {choicestr}' raise ValueError(errormsg) # Handle analyzers and interventions for key in ['interventions', 'analyzers']: # Ensure all of them are lists self[key] = sc.dcp(sc.tolist(self[key], keepnone=False)) # All of these have initialize functions that run into issues if they're reused for i,interv in enumerate(self['interventions']): if isinstance(interv, dict): # It's a dictionary representation of an intervention self['interventions'][i] = hpi.InterventionDict(**interv) # Optionally handle layer parameters if validate_layers: self.validate_layer_pars() # Handle verbose if self['verbose'] == 'brief': self['verbose'] = -1 if not sc.isnumber(self['verbose']): # pragma: no cover errormsg = f'Verbose argument should be either "brief", -1, or a float, not {type(self["verbose"])} "{self["verbose"]}"' raise ValueError(errormsg) return
[docs] def init_time_vecs(self): ''' Construct vectors things that keep track of time ''' self.years = sc.inclusiverange(self['start'],self['end']) self.yearvec = sc.inclusiverange(start=self['start'], stop=self['end']+1-self['dt'], step=self['dt']) # Includes all the timepoints in the last year self.npts = len(self.yearvec) self.tvec = np.arange(self.npts)
[docs] def validate_init_conditions(self, init_hpv_prev): ''' Initial prevalence values can be supplied with different amounts of detail. Here we flesh out any missing details so that the initial prev values are by age and genotype. We also check the prevalence values are ok. ''' def validate_arrays(vals, n_age_brackets=None): ''' Little helper function to check prevalence values ''' if n_age_brackets is not None: if len(vals) != n_age_brackets: errormsg = f'The initial prevalence values must either be the same length as the age brackets: {len(vals)} vs {n_age_brackets}.' raise ValueError(errormsg) else: if len(vals) != 1: errormsg = f'No age brackets were supplied, but more than one prevalence value was supplied ({len(vals)}). An array of prevalence values can only be supplied along with an array of corresponding age brackets.' raise ValueError(errormsg) if vals.any() < 0 or vals.any() > 1: errormsg = f'The initial prevalence values must either between 0 and 1, not {vals}.' raise ValueError(errormsg) return # If values have been provided, validate them sex_keys = {'m', 'f'} tot_keys = ['all', 'total', 'tot', 'average', 'avg'] n_age_brackets = None if init_hpv_prev is not None: if sc.checktype(init_hpv_prev, dict): # Get age brackets if supplied if 'age_brackets' in init_hpv_prev.keys(): age_brackets = init_hpv_prev.pop('age_brackets') n_age_brackets = len(age_brackets) else: age_brackets = np.array([150]) # Handle the rest of the keys var_keys = list(init_hpv_prev.keys()) if (len(var_keys)==1 and var_keys[0] not in tot_keys) or (len(var_keys)>1 and set(var_keys) != sex_keys): errormsg = f'Could not understand the initial prevalence provided: {init_hpv_prev}. If supplying a dictionary, please use "m" and "f" keys or "tot". ' raise ValueError(errormsg) if len(var_keys) == 1: k = var_keys[0] init_hpv_prev = {sk: sc.promotetoarray(init_hpv_prev[k]) for sk in sex_keys} # Now set the values for k, vals in init_hpv_prev.items(): init_hpv_prev[k] = sc.promotetoarray(vals) elif sc.checktype(init_hpv_prev, 'arraylike') or sc.isnumber(init_hpv_prev): # If it's an array, assume these values apply to males and females init_hpv_prev = {sk: sc.promotetoarray(init_hpv_prev) for sk in sex_keys} age_brackets = np.array([150]) else: errormsg = f'Initial prevalence values of type {type(init_hpv_prev)} not recognized, must be a dict, an array, or a float.' raise ValueError(errormsg) # Now validate the arrays for sk, vals in init_hpv_prev.items(): validate_arrays(vals, n_age_brackets) # If values haven't been supplied, assume zero else: init_hpv_prev = {'f': np.array([0]), 'm': np.array([0])} age_brackets = np.array([150]) return init_hpv_prev, age_brackets
[docs] def init_genotypes(self, upper_dysp_lim=200): ''' Initialize the genotype parameters ''' if self._orig_pars and 'genotypes' in self._orig_pars: self['genotypes'] = self._orig_pars.pop('genotypes') # Restore default_gpars = hppar.get_genotype_pars() user_gpars = sc.dcp(self['genotype_pars']) self['genotype_pars'] = sc.objdict() # Handle special input cases if self['genotypes'] == 'all': self['genotypes'] = default_gpars.keys() if not len(self['genotypes']): print('No genotypes provided: simulating 16, 18, and 5 other pooled HR types (31, 33, 45, 52, 58).') self['genotypes'] = [16,18,'hi5'] # Loop over genotypes for i, g in enumerate(self['genotypes']): # Standardize format of genotype inputs if sc.isnumber(g): g = f'hpv{g}' # Convert e.g. 16 to hpv16 if sc.checktype(g,str): if not g in default_gpars.keys(): errormsg = f'Genotype {i} ({g}) is not one of the inbuilt options.' raise ValueError(errormsg) else: errormsg = f'Format {type(g)} is not understood.' raise ValueError(errormsg) # Add to genotype_par dict self['genotype_pars'][g] = default_gpars[g] self['genotype_map'][i] = g # Loop over user-supplied genotype parameters that can overwrite values if len(user_gpars): for g,gpars in user_gpars.items(): # Standardize format of genotype inputs if sc.isnumber(g): g = f'hpv{g}' # Convert e.g. 16 to hpv16 if sc.checktype(g, str): if not g in self['genotype_pars'].keys(): errormsg = f'Parameters provided for genotype {g}, but it is not in the sim.' raise ValueError(errormsg) else: for gparname,gparval in gpars.items(): if gparname in self['genotype_pars'][g].keys(): printmsg = f"Resetting parameter '{gparname}' from {self['genotype_pars'][g][gparname]} to {gparval} for genotype {g}" sc.printv(printmsg, 1, self['verbose']) self['genotype_pars'][g][gparname] = gparval else: errormsg = f"Parameter {gparname} does not exist for genotype {g}" raise ValueError(errormsg) self['n_genotypes'] = len(self['genotype_pars']) # Each genotype has an entry in genotype_pars # Set the number of immunity sources self['n_imm_sources'] = len(self['genotypes']) return
[docs] def init_results(self, frequency='annual', add_data=True): ''' Create the main results structure. The prefix "n" is used for stock variables, i.e. counting the total number in any given state (sus/inf/etc) on any particular timestep Arguments: sim (hpv.Sim) : a sim frequency (str or float) : the frequency with which to save results: accepts 'annual', 'dt', or a float which is interpreted as a fraction of a year, e.g. 0.2 will save results every 0.2 years add_data (bool) : whether or not to add data to the result structures ''' # Handle frequency if type(frequency) == str: if frequency == 'annual': resfreq = int(1 / self['dt']) elif frequency == 'dt': resfreq = 1 else: errormsg = f'Result frequency not understood: must be "annual", "dt" or a float, but you provided {frequency}.' raise ValueError(errormsg) elif type(frequency) == float: if frequency < self['dt']: errormsg = f'You requested results with frequency {frequency}, but this is smaller than the simulation timestep {self["dt"]}.' raise ValueError(errormsg) else: resfreq = int(frequency / self['dt']) self.resfreq = resfreq if not self.resfreq > 0: errormsg = f'The results frequence should be a positive integer, not {self.resfreq}: dt may be too large' raise ValueError(errormsg) # Construct the tvec that will be used with the results points_to_use = np.arange(0, self.npts, self.resfreq) self.res_yearvec = self.yearvec[points_to_use] self.res_npts = len(self.res_yearvec) self.res_tvec = np.arange(self.res_npts) # Function to create results def init_res(*args, **kwargs): ''' Initialize a single result object ''' output = hpb.Result(*args, **kwargs, npts=self.res_npts) return output # Initialize storage results = sc.objdict() ng = self['n_genotypes'] # Number of genotypes na = len(self['age_bin_edges']) - 1 # Number of age bins # Create flows for flow in hpd.flows: results[flow.name] = init_res(flow.label, color=flow.color) results[flow.name+'_by_genotype'] = init_res(flow.label+' by genotype', n_rows=ng) results[flow.name+'_by_age'] = init_res(flow.label+' by age', n_rows=na, color=flow.color) # Create stocks for stock in hpd.PeopleMeta().stock_states: results[f'n_{stock.name}'] = init_res(stock.label, color=stock.color) results[f'n_{stock.name}_by_genotype'] = init_res(stock.label+' by genotype', n_rows=ng) # Only by-age stock result we will need is number infectious, susceptible, and with cin, for HPV and CIN prevalence/incidence calculations results['n_infectious_by_age'] = init_res('Number infectious by age', n_rows=na, color=stock.color) results['n_females_infectious_by_age'] = init_res('Number of females infectious by age', n_rows=na, color=stock.color) results['n_susceptible_by_age'] = init_res('Number susceptible by age', n_rows=na, color=stock.color) results['n_precin_by_age'] = init_res('Number Pre-CIN by age', n_rows=na, color=stock.color) results['n_cin_by_age'] = init_res('Number CIN by age', n_rows=na, color=stock.color) # Create incidence and prevalence results for var,name,color in zip(hpd.inci_keys, hpd.inci_names, hpd.inci_colors): results[f'{var}_incidence'] = init_res(name+' incidence', color=color) results[f'{var}_incidence_by_genotype'] = init_res(name+' incidence by genotype', n_rows=ng) results[f'{var}_incidence_by_age'] = init_res(name+' incidence by age', n_rows=na, color=color) # Create demographic flows for var, name, color in zip(hpd.dem_keys, hpd.dem_names, hpd.dem_colors): results[var] = init_res(name, color=color) # Create results by sex for var, name, color in zip(hpd.by_sex_keys, hpd.by_sex_colors, hpd.by_sex_colors): results[var] = init_res(name, color=color, n_rows=2) # Create ASR results using standard populations results['asr_cancer_incidence'] = init_res('Age-adjusted cervical cancer incidence', scale=False) results['asr_cancer_mortality'] = init_res('Age-adjusted cervical cancer mortality', scale=False) stock_colors = [i for i in set(hpd.PeopleMeta().stock_colors) if i is not None] # Type distributions by cytology for var, name in zip(hpd.type_dist_keys, hpd.type_dist_names): results[var+'_genotype_dist'] = init_res(name, n_rows=ng, color=stock_colors[0]) # Vaccination results results['new_vaccinated'] = init_res('Newly vaccinated by genotype', n_rows=ng) results['new_total_vaccinated'] = init_res('Newly vaccinated') results['cum_vaccinated'] = init_res('Cumulative number vaccinated by genotype', n_rows=ng) results['cum_total_vaccinated'] = init_res('Cumulative number vaccinated') results['new_doses'] = init_res('New doses') results['cum_doses'] = init_res('Cumulative doses') # Therapeutic vaccine results results['new_txvx_doses'] = init_res('New therapeutic vaccine doses') results['new_tx_vaccinated'] = init_res('Newly received therapeutic vaccine') results['cum_txvx_doses'] = init_res('Cumulative therapeutic vaccine doses') results['cum_tx_vaccinated'] = init_res('Total received therapeutic vaccine') # Screen & treat results results['new_screens'] = init_res('New screens') results['new_screened'] = init_res('Newly screened') results['new_cin_treatments'] = init_res('New CIN treatments') results['new_cin_treated'] = init_res('Newly treated for CINs') results['new_cancer_treatments'] = init_res('New cancer treatments') results['new_cancer_treated'] = init_res('Newly treated for cancer') results['cum_screens'] = init_res('Cumulative screens') results['cum_screened'] = init_res('Cumulative number screened') results['cum_cin_treatments'] = init_res('Cumulative CIN treatments') results['cum_cin_treated'] = init_res('Cumulative number treated for CINs') results['cum_cancer_treatments'] = init_res('Cumulative cancer treatments') results['cum_cancer_treated'] = init_res('Cumulative number treated for cancer') # Additional cancer results results['detected_cancer_incidence'] = init_res('Detected cancer incidence', color='#fcba03') results['cancer_mortality'] = init_res('Cancer mortality') # Other results results['n_alive'] = init_res('Number alive') results['n_alive_by_sex'] = init_res('Number alive by sex', n_rows=2) results['n_alive_by_age'] = init_res('Number alive by age', n_rows=na) results['n_females_alive_by_age'] = init_res('Number females alive by age', n_rows=na) results['cdr'] = init_res('Crude death rate', scale=False) results['cbr'] = init_res('Crude birth rate', scale=False, color='#fcba03') results['hpv_prevalence'] = init_res('HPV prevalence', color=stock_colors[0]) results['hpv_prevalence_by_genotype'] = init_res('HPV prevalence', n_rows=ng, color=stock_colors[0]) results['hpv_prevalence_by_age'] = init_res('HPV prevalence by age', n_rows=na, color=stock_colors[0]) results['precin_prevalence'] = init_res('Pre-CIN prevalence', color=stock_colors[0]) results['precin_prevalence_by_genotype'] = init_res('Pre-CIN prevalence by genotype', n_rows=ng, color=stock_colors[0]) results['precin_prevalence_by_age'] = init_res('Pre-CIN prevalence by age', n_rows=na, color=stock_colors[0]) results['cin_prevalence'] = init_res('CIN prevalence', color=stock_colors[1]) results['cin_prevalence_by_genotype'] = init_res('CIN prevalence by genotype', n_rows=ng, color=stock_colors[1]) results['cin_prevalence_by_age'] = init_res('CIN prevalence by age', n_rows=na, color=stock_colors[1]) results['female_hpv_prevalence_by_age'] = init_res('Female HPV prevalence by age', n_rows=na, color=stock_colors[3]) results['lsil_prevalence'] = init_res('HPV/CIN1 prevalence', color=stock_colors[3]) results['lsil_prevalence_by_age'] = init_res('HPV/CIN1 prevalence by age', n_rows=na, color=stock_colors[3]) # Time vector results['year'] = self.res_yearvec results['t'] = self.res_tvec # Final items self.results = results self.results_ready = False return
[docs] def init_interventions(self): ''' Initialize and validate the interventions ''' # Initialization self.interventions = sc.autolist() # Translate the intervention specs into actual interventions for i,intervention in enumerate(self['interventions']): if isinstance(intervention, type) and issubclass(intervention, hpi.Intervention): intervention = intervention() # Convert from a class to an instance of a class if isinstance(intervention, hpi.Intervention): intervention.initialize(self) self.interventions += intervention elif callable(intervention): self.interventions += intervention else: errormsg = f'Intervention {intervention} does not seem to be a valid intervention: must be a function or hpv.Intervention subclass' raise TypeError(errormsg) return
[docs] def init_people(self, popdict=None, init_states=False, reset=False, verbose=None, **kwargs): ''' Create the people and the network. Use ``init_states=False`` for creating a fresh People object for use in future simulations Args: popdict (any): pre-generated people of various formats. init_states (bool): whether to initialize states (default false when called directly) reset (bool): whether to regenerate the people even if they already exist verbose (int): detail to print kwargs (dict): passed to hpv.make_people() ''' # Handle inputs if verbose is None: verbose = self['verbose'] if popdict is not None: self.popdict = popdict if verbose > 0: resetstr= '' if self.people: resetstr = ' (resetting people)' if reset else ' (warning: not resetting sim.people)' print(f'Initializing sim{resetstr} with {self["n_agents"]:0n} agents') if self.popfile and self.popdict is None: # If there's a popdict, we initialize it self.load_population(init_people=False) #TODO: no method for this # Make the people self.people, total_pop = hppop.make_people(self, reset=reset, verbose=verbose, microstructure=self['network'], **kwargs) # Figure out the scale factors # Case 1: total pop and location both provided if self['total_pop'] is not None and total_pop is not None: # If no pop_scale has been provided, try to get it from the location msg = f"Rescaling the population of the chosen location to {self['total_pop']}" if self['verbose']: print(msg) total_pop = self['total_pop'] # Case 2: no location provided but total pop provided elif total_pop is None and self['total_pop'] is not None: total_pop = self['total_pop'] # Case 3: neither total pop, location, nor pop scale provided if self['pop_scale'] is None: if total_pop is None: self['pop_scale'] = 1.0 # Resolve cases 1 & 2 by creating the pop scal else: self['pop_scale'] = total_pop/self['n_agents'] self['ms_agent_ratio'] = int(self['ms_agent_ratio']) # Deal with HIV self.init_hiv() # Creates the hivsim object, which is stored in the sim self.hivsim.init_states(self.people) # Adds some states to the people # Finish initialization self.people.initialize(sim_pars=self.pars) # Fully initialize the people self.reset_layer_pars(force=False) # Ensure that layer keys match the loaded population if init_states: init_hpv_prev = sc.dcp(self['init_hpv_prev']) init_hpv_prev, age_brackets = self.validate_init_conditions(init_hpv_prev) self.init_states(age_brackets=age_brackets, init_hpv_prev=init_hpv_prev) return self
[docs] def init_analyzers(self): ''' Initialize the analyzers ''' self.analyzers = sc.autolist() def convert_analyzer(analyzer): ''' Helper function to turn strings into analyzers ''' choices = hpa.analyzer_map.keys() if not analyzer in choices: errormsg = f'Analyzer {analyzer} not understood: choices are {choices}.' raise ValueError(errormsg) else: analyzer = hpa.analyzer_map[analyzer] return analyzer # Interpret analyzers for ai, analyzer in enumerate(self['analyzers']): if isinstance(analyzer, str): analyzer_list = sc.tolist( convert_analyzer(analyzer)) # If not a list, turn it into one - for consistency of processing for az in analyzer_list: if isinstance(az, str): az = convert_analyzer(az) # It might still be a string self.analyzers += az() # Unpack list else: if isinstance(analyzer, type) and issubclass(analyzer, hpa.Analyzer): analyzer = analyzer() # Convert from a class to an instance of a class if not (isinstance(analyzer, hpa.Analyzer) or callable(analyzer)): errormsg = f'Analyzer {analyzer} does not seem to be a valid analyzer: must be a function or hpv.Analyzer subclass' raise TypeError(errormsg) self.analyzers += analyzer # Add it in for analyzer in self.analyzers: if isinstance(analyzer, hpa.Analyzer): analyzer.initialize(self) return
[docs] def init_immunity(self, create=True): ''' Initialize immunity matrices and cumulative dysplasia ''' # Initialize immunity arrays hpimm.init_immunity(self, create=create) return
[docs] def init_hiv(self): ''' Initialize states, attributes, and parameters relating to HIV ''' if self.pars['model_hiv']: if self.hiv_datafile is None or self.art_datafile is None: raise ValueError('Must supply HIV and ART datafiles to model HIV.') self.hivsim = hphiv.HIVsim(self, hiv_datafile=self.hiv_datafile, art_datafile=self.art_datafile, hiv_pars=self['hiv_pars']) return
def finalize_analyzers(self): for analyzer in self.analyzers: if isinstance(analyzer, hpa.Analyzer): analyzer.finalize(self)
[docs] def init_states(self, age_brackets=None, init_hpv_prev=None, init_cin_prev=None, init_cancer_prev=None): ''' Initialize prior immunity and seed infections ''' # Shorten key variables ng = self['n_genotypes'] # Assign people to age buckets age_inds = np.digitize(self.people.age, age_brackets) # Assign probabilities of having HPV to each age/sex group hpv_probs = np.full(len(self.people), np.nan, dtype=hpd.default_float) hpv_probs[self.people.f_inds] = init_hpv_prev['f'][age_inds[self.people.f_inds]]*self.pars['rel_init_prev'] hpv_probs[self.people.m_inds] = init_hpv_prev['m'][age_inds[self.people.m_inds]]*self.pars['rel_init_prev'] hpv_probs[~self.people.is_active] = 0 # Blank out people who are not yet sexually active # Get indices of people who have HPV hpv_inds = hpu.true(hpu.binomial_arr(hpv_probs)) # Determine which genotype people are infected with if self['init_hpv_dist'] is None: # No type distribution provided, assume even split genotypes = np.random.randint(0, ng, len(hpv_inds)) else: # Error checking if not sc.checktype(self['init_hpv_dist'], dict): errormsg = f'Please provide initial HPV type distribution as a dictionary keyed by genotype, not {self["init_hpv_dist"]}' raise ValueError(errormsg) if set(self['init_hpv_dist'].keys())!=set(self['genotype_map'].values()): errormsg = f'The HPV types provided in the initial HPV type distribution are not the same as the HPV types being simulated: {self["init_hpv_dist"].keys()} vs {self["genotype_map"].values()}.' raise ValueError(errormsg) type_dist = np.array(list(self['init_hpv_dist'].values())) genotypes = hpu.choose_w(type_dist, len(hpv_inds), unique=False) for g in range(ng): self.people.infect(inds=hpv_inds[genotypes==g], g=g, layer='seed_infection') return
[docs] def step(self): ''' Step through time and update values ''' # Set the time and if we have reached the end of the simulation, then do nothing if self.complete: raise AlreadyRunError('Simulation already complete (call sim.initialize() to re-run)') # Shorten key variables dt = self['dt'] # Timestep t = self.t ng = self['n_genotypes'] condoms = self['condoms'] eff_condoms = self['eff_condoms'] beta = self['beta'] gen_pars = self['genotype_pars'] imm_kin_pars = self['imm_kin'] mixing = self['mixing'] layer_probs = self['layer_probs'] f_cross_layer = self['f_cross_layer'] m_cross_layer = self['m_cross_layer'] acts = self['acts'] dur_pship = self['dur_pship'] age_act_pars = self['age_act_pars'] trans = np.array([self['transf2m'],self['transm2f']]) # F2M first since that's the order things are done later year = self.yearvec[t] # Make HIV-related updates if self.pars['model_hiv']: self.hivsim.step(people=self.people, year=year) # Update demographics, states, and partnerships self.people.update_states_pre(t=t, year=year) # This also ages people, applies deaths, and generates new births people = self.people # Shorten people.dissolve_partnerships(t=t) # Dissolve partnerships tind = self.yearvec[t] - self['start'] people.create_partnerships(tind, mixing, layer_probs, f_cross_layer, m_cross_layer, dur_pship, acts, age_act_pars) # Apply interventions for i,intervention in enumerate(self.interventions): intervention(self) # If it's a function, call it directly # Assign sus_imm values, i.e. the protection against infection based on prior immune history if self['use_waning']: inds = hpu.true(people.peak_imm.sum(axis=0)).astype(hpd.default_int) if len(inds): ss = people.t_imm_event[:, inds].shape t_since_boost = (t - people.t_imm_event[:,inds]).ravel() current_imm = imm_kin_pars[t_since_boost].reshape(ss) # Get people's current level of immunity people.nab_imm[:,inds] = current_imm*people.peak_imm[:,inds] # Set immunity relative to peak else: people.nab_imm[:] = people.peak_imm hpimm.check_immunity(people) # Shorten more variables gen_betas = np.array([g['rel_beta'] * beta for g in gen_pars.values()], dtype=hpd.default_float) sus_imm = people.sus_imm rel_sus = people.rel_sus rel_trans = people.rel_trans inf = people.infectious.copy() # calculate transmission based on infectiousness at start of timestep i.e. someone infected in one layer cannot transmit the infection via a different layer in the same timestep # Loop over layers for lkey, layer in people.contacts.items(): sus = people.susceptible.copy() # for each layer, update who's still susceptible # Shorten variables f = layer['f'] m = layer['m'] acts = layer['acts'] * dt frac_acts, whole_acts = np.modf(acts) whole_acts = whole_acts.astype(hpd.default_int) effective_condoms = hpd.default_float(condoms[lkey] * eff_condoms) # Compute transmissions by genotype for g in range(ng): f_source_inds = (inf[g][f] & sus[g][m]).nonzero()[0] # get female sources where female partner is infectious with genotype and male partner is susceptible to that genotype m_source_inds = (inf[g][m] & sus[g][f]).nonzero()[0] # get male sources where the male partner is infectious with genotype and the female partner is susceptible to that genotype foi_frac = 1 - frac_acts * gen_betas[g] * trans[:, None] * (1 - effective_condoms) # Probability of not getting infected from any fractional acts foi_whole = (1 - gen_betas[g] * trans[:, None] * (1 - effective_condoms)) ** whole_acts # Probability of not getting infected from whole acts foi = (1 - (foi_whole * foi_frac)).astype(hpd.default_float) discordant_pairs = [[f_source_inds, f[f_source_inds], m[f_source_inds], foi[0,:]], [m_source_inds, m[m_source_inds], f[m_source_inds], foi[1,:]]] # Compute transmissibility for each partnership for pship_inds, sources, targets, this_foi in discordant_pairs: betas = this_foi[pship_inds] * (1. - sus_imm[g,targets]) * rel_sus[targets] * rel_trans[sources]# Pull out the transmissibility associated with this partnership transmissions = (np.random.random(len(betas)) < betas).nonzero()[0] # Apply probabilities to determine partnerships in which transmission occurred target_inds = targets[transmissions] # Extract indices of those who got infected target_inds, unique_inds = np.unique(target_inds, return_index=True) # Due to multiple partnerships, some people will be counted twice; remove them people.infect(inds=target_inds, g=g, layer=lkey) # Infect people # Determine if there are any reactivated infections on this timestep for g in range(ng): latent_inds = hpu.true(people.latent[g,:]) if len(latent_inds): sev_imm = people.sev_imm[g, latent_inds] reactivation_probs = np.full_like(latent_inds, self['hpv_reactivation'] * dt, dtype=hpd.default_float) reactivation_probs *= (1 - sev_imm) if self['model_hiv']: # determine if any of these inds have HIV and adjust their probs hiv_latent_inds = latent_inds[hpu.true(people.hiv[latent_inds])] if len(hiv_latent_inds): immune_compromise = 1 - people.art_adherence[hiv_latent_inds] mod = immune_compromise * self.hivsim['hiv_pars']['rel_reactivation_prob'] mod[mod < 1] = 1 reactivation_probs[hpu.true(people.hiv[latent_inds])] *= mod is_reactivated = hpu.binomial_arr(reactivation_probs) reactivated_inds = latent_inds[is_reactivated] people.infect(inds=reactivated_inds, g=g, layer='reactivation') # Updates after infection self.people.update_states_post(t=t, year=year) # Index for results idx = int(t / self.resfreq) # Update counts for this time step: flows for key,count in people.flows.items(): self.results[key][idx] += count for key,count in people.demographic_flows.items(): self.results[key][idx] += count for key,count in people.genotype_flows.items(): flow_ind = [flow.name for flow in hpd.flows].index(key) if hpd.flows[flow_ind].by_genotype: for genotype in range(ng): self.results[key+'_by_genotype'][genotype][idx] += count[genotype] for key,count in people.sex_flows.items(): for sex in range(2): self.results[key][sex][idx] += count[sex] for key,count in people.age_flows.items(): self.results[key+'_by_age'][:,idx] += count # Make stock updates every nth step, where n is the frequency of result output if t % self.resfreq == self.resfreq-1: # Number infectious/susceptible by age, for prevalence calculations f_inds = hpu.true(people['sex']==0) infinds = hpu.true(people['infectious']) f_infinds = np.intersect1d(f_inds, infinds) susinds = hpu.true(people['susceptible']) precininds = hpu.true(people['precin']) cininds = hpu.true(people['cin']) self.results['n_females_infectious_by_age'][:, idx] = np.histogram(people.age[f_infinds], bins=people.age_bin_edges, weights=people.scale[f_infinds])[0] self.results['n_infectious_by_age'][:, idx] = np.histogram(people.age[infinds], bins=people.age_bin_edges, weights=people.scale[infinds])[0] self.results['n_susceptible_by_age'][:, idx] = np.histogram(people.age[susinds], bins=people.age_bin_edges, weights=people.scale[susinds])[0] self.results['n_precin_by_age'][:, idx] = np.histogram(people.age[precininds], bins=people.age_bin_edges, weights=people.scale[precininds])[0] self.results['n_cin_by_age'][:, idx] = np.histogram(people.age[cininds], bins=people.age_bin_edges, weights=people.scale[cininds])[0] # Create total stocks for key in self.people.meta.genotype_stock_keys: # Stocks by genotype for g in range(ng): self.results[f'n_{key}_by_genotype'][g, idx] = people.count_by_genotype(key, g) # Total stocks if key not in ['susceptible']: # For n_infectious etc, we get the total number where this state is true for at least one genotype self.results[f'n_{key}'][idx] = people.count_any(key) elif key == 'susceptible': # For n_total_susceptible, we get the total number of infections that could theoretically happen in the population, which can be greater than the population size self.results[f'n_{key}'][idx] = people.count(key) # Create stocks of interventions for key in self.people.meta.intv_stock_keys: self.results[f'n_{key}'][idx] = people.count(key) # Update cancers and cancers by age cases_by_age = self.results['cancers_by_age'][:, idx] inds = people.alive * (self.people.sex==0) * ~people.cancerous.any(axis=0) vals = self.people.age[inds] bins = self.pars['standard_pop'][0,] weights = people.scale[inds] denom = np.histogram(vals, bins, weights=weights)[0] age_specific_incidence = sc.safedivide(cases_by_age, denom)*100e3 standard_pop = self.pars['standard_pop'][1, :-1] self.results['asr_cancer_incidence'][idx] = np.dot(age_specific_incidence,standard_pop) # Save number alive alive_inds = hpu.true(people.alive) alive_female_inds = hpu.true(people.alive*people.is_female) self.results['n_alive'][idx] = people.scale_flows(alive_inds) self.results['n_alive_by_sex'][0,idx] = people.scale_flows((people.alive*people.is_female).nonzero()[0]) self.results['n_alive_by_sex'][1,idx] = people.scale_flows((people.alive*people.is_male).nonzero()[0]) self.results['n_alive_by_age'][:,idx] = np.histogram(people.age[alive_inds], bins=people.age_bin_edges, weights=people.scale[alive_inds])[0] self.results['n_females_alive_by_age'][:,idx] = np.histogram(people.age[alive_female_inds], bins=people.age_bin_edges, weights=people.scale[alive_female_inds])[0] # Apply analyzers for i,analyzer in enumerate(self.analyzers): analyzer(self) # Tidy up self.t += 1 if self.t == self.npts: self.complete = True return
[docs] def run(self, do_plot=False, until=None, restore_pars=True, reset_seed=True, verbose=None, **kwargs): ''' Run the model once ''' # Initialization steps -- start the timer, initialize the sim and the seed, and check that the sim hasn't been run T = sc.timer() if not self.initialized: self.initialize(**kwargs) self._orig_pars = sc.dcp(self.pars) # Create a copy of the parameters, to restore after the run, in case they are dynamically modified if verbose is None: verbose = self['verbose'] if reset_seed: # Reset the RNG. The primary use case (and why it defaults to True) is to ensure that # # >>> sim0.initialize() # >>> sim0.run() # >>> sim1.initialize() # >>> sim1.run() # # produces the same output as # # >>> sim0.initialize() # >>> sim1.initialize() # >>> sim0.run() # >>> sim1.run() # # The seed is offset by 1 to avoid drawing the same random numbers as those used for population generation, otherwise # the first set of random numbers in the model (e.g., deaths) will be correlated with the first set of random numbers # drawn in population generation (e.g., sex) hpu.set_seed(self['rand_seed']+1) # Check for AlreadyRun errors errormsg = None until = self.npts if until is None else self.get_t(until) if until > self.npts: errormsg = f'Requested to run until t={until} but the simulation end is t={self.npts}' if self.t >= until: # NB. At the start, self.t is None so this check must occur after initialization errormsg = f'Simulation is currently at t={self.t}, requested to run until t={until} which has already been reached' if self.complete: errormsg = 'Simulation is already complete (call sim.initialize() to re-run)' if self.people.t not in [self.t, self.t-1]: # Depending on how the sim stopped, either of these states are possible errormsg = f'The simulation has been run independently from the people (t={self.t}, people.t={self.people.t}): if this is intentional, manually set sim.people.t = sim.t. Remember to save the people object before running the sim.' if errormsg: raise AlreadyRunError(errormsg) # Main simulation loop while self.t < until: # Check if we were asked to stop elapsed = T.toc(output=True) if self['timelimit'] and elapsed > self['timelimit']: sc.printv(f"Time limit ({self['timelimit']} s) exceeded; call sim.finalize() to compute results if desired", 1, verbose) return elif self['stopping_func'] and self['stopping_func'](self): sc.printv("Stopping function terminated the simulation; call sim.finalize() to compute results if desired", 1, verbose) return # Print progress if verbose: simlabel = f'"{self.label}": ' if self.label else '' string = f' Running {simlabel}{self.yearvec[self.t]:0.1f} ({self.t:2.0f}/{self.npts}) ({elapsed:0.2f} s) ' if verbose >= 2: sc.heading(string) elif verbose>0: if not (self.t % int(1.0/verbose)): sc.progressbar(self.t+1, self.npts, label=string, length=20, newline=True) # Do the heavy lifting -- actually run the model! self.step() # If simulation reached the end, finalize the results if self.complete: self.finalize(verbose=verbose, restore_pars=restore_pars) sc.printv(f'Run finished after {elapsed:0.2f} s.\n', 1, verbose) return self
[docs] def finalize(self, verbose=None, restore_pars=True): ''' Compute final results ''' if self.results_ready: # Because the results are rescaled in-place, finalizing the sim cannot be run more than once or # otherwise the scale factor will be applied multiple times raise AlreadyRunError('Simulation has already been finalized') # Finalize analyzers and interventions self.finalize_analyzers() # self.finalize_interventions() #TODO: why is this commented out? if self['model_hiv']: self.hivsim.finalize(self) # Final settings self.results_ready = True # Set this first so self.summary() knows to print the results self.t -= 1 # During the run, this keeps track of the next step; restore this be the final day of the sim # Perform calculations on results self.compute_results(verbose=verbose) # Calculate the rest of the results self.results = sc.objdict(self.results) # Convert results to a odicts/objdict to allow e.g. sim.results.diagnoses # Optionally print summary output if verbose: # Verbose is any non-zero value if verbose>0: # Verbose is any positive number self.summarize() # Print medium-length summary of the sim else: self.brief() # Print brief summary of the sim return
[docs] def compute_results(self, verbose=None): ''' Perform final calculations on the results ''' self.compute_states() self.compute_summary() return
[docs] def compute_states(self): ''' Compute prevalence, incidence, and other states. ''' res = self.results # Compute HPV incidence and prevalence def safedivide(num,denom): ''' Define a variation on sc.safedivide that respects shape of numerator ''' answer = np.zeros_like(num) fill_inds = (denom!=0).nonzero() if len(num.shape)==len(denom.shape): answer[fill_inds] = num[fill_inds] / denom[fill_inds] else: answer[:, fill_inds] = num[:, fill_inds] / denom[fill_inds] return answer ng = self.pars['n_genotypes'] self.results['hpv_incidence'][:] = safedivide(res['infections'][:], ng*res['n_susceptible'][:]) self.results['hpv_incidence_by_genotype'][:] = safedivide(res['infections_by_genotype'][:], res['n_susceptible_by_genotype'][:]) self.results['hpv_incidence_by_age'][:] = safedivide(res['infections_by_age'][:], res['n_susceptible_by_age'][:]) self.results['hpv_prevalence'][:] = safedivide(res['n_infectious'][:], ng*res['n_alive'][:]) self.results['hpv_prevalence_by_genotype'][:] = safedivide(res['n_infectious_by_genotype'][:], res['n_alive'][:]) self.results['hpv_prevalence_by_age'][:] = safedivide(res['n_infectious_by_age'][:], res['n_alive_by_age'][:]) alive_females = res['n_alive_by_sex'][0,:] self.results['female_hpv_prevalence_by_age'][:] = safedivide((res['n_females_infectious_by_age'][:]), res['n_females_alive_by_age'][:]) self.results['precin_prevalence'][:] = safedivide(res['n_precin'][:], ng * alive_females) self.results['precin_prevalence_by_genotype'][:] = safedivide(res['n_precin_by_genotype'][:], alive_females) self.results['precin_prevalence_by_age'][:] = safedivide(res['n_precin_by_age'][:], res['n_females_alive_by_age'][:]) self.results['cin_prevalence'][:] = safedivide(res['n_cin'][:], ng*alive_females) self.results['cin_prevalence_by_genotype'][:] = safedivide(res['n_cin_by_genotype'][:], alive_females) self.results['cin_prevalence_by_age'][:] = safedivide(res['n_cin_by_age'][:], res['n_females_alive_by_age'][:]) # Compute cancer incidence. at_risk_females = alive_females - res['n_cancerous'][:] scale_factor = 1e5 # Cancer incidence are displayed as rates per 100k women demoninator = at_risk_females / scale_factor self.results['cancer_incidence'][:] = res['cancers'][:] / demoninator self.results['cancer_incidence_by_genotype'][:] = res['cancers_by_genotype'][:] / demoninator self.results['cancer_incidence_by_age'][:] = safedivide(res['cancers_by_age'][:], res['n_females_alive_by_age'][:]/scale_factor) # Compute cancer mortality. Denominator is all women alive denominator = alive_females/scale_factor self.results['cancer_mortality'][:] = res['cancer_deaths'][:]/denominator # Compute HPV type distribution by cytology for which in hpd.type_dist_keys: by_type = res[f'n_{which}_by_genotype'][:] totals = by_type.sum(axis=0) inds_to_fill = totals > 0 res[which + '_genotype_dist'][:, inds_to_fill] = by_type[:, inds_to_fill] / totals[inds_to_fill] # Demographic results self.results['cdr'][:] = self.results['other_deaths'][:] / (self.results['n_alive'][:]) self.results['cbr'][:] = self.results['births'][:] / (self.results['n_alive'][:]) # Vaccination results self.results['cum_vaccinated'][:] = np.cumsum(self.results['new_vaccinated'][:], axis=0) self.results['cum_total_vaccinated'][:] = np.cumsum(self.results['new_total_vaccinated'][:]) self.results['cum_doses'][:] = np.cumsum(self.results['new_doses'][:]) # Therapeutic vaccination results self.results['cum_tx_vaccinated'][:] = np.cumsum(self.results['new_tx_vaccinated'][:], axis=0) self.results['cum_txvx_doses'][:] = np.cumsum(self.results['new_txvx_doses'][:]) # Screen & treat results self.results['cum_screens'][:] = np.cumsum(self.results['new_screens'][:], axis=0) self.results['cum_screened'][:] = np.cumsum(self.results['new_screened'][:], axis=0) self.results['cum_cin_treatments'][:] = np.cumsum(self.results['new_cin_treatments'][:], axis=0) self.results['cum_cin_treated'][:] = np.cumsum(self.results['new_cin_treated'][:], axis=0) self.results['cum_cancer_treatments'][:] = np.cumsum(self.results['new_cancer_treatments'][:], axis=0) self.results['cum_cancer_treated'][:] = np.cumsum(self.results['new_cancer_treatments'][:], axis=0) return
def compute_age_mean(self, reskey, t=None): if t is None: t = -1 assert 'by_age' in reskey, 'This method can only be used for age results' res = self.results[reskey][:,t] edges = self['age_bin_edges'] mean_edges = edges[:-1] + np.diff(edges)/2 age_mean = (sc.safedivide(res,res.sum())*mean_edges).sum() return age_mean
[docs] def compute_summary(self, t=None, update=True, output=False, full=False, require_run=False): ''' Compute the summary dict and string for the sim. Used internally; see sim.summarize() for the user version. Args: t (int/str): day or date to compute summary for (by default, the last point) update (bool): whether to update the stored sim.summary output (bool): whether to return the summary require_run (bool): whether to raise an exception if simulations have not been run yet ''' if t is None: t = -1 # Compute the summary if require_run and not self.results_ready: errormsg = 'Simulation not yet run' raise RuntimeError(errormsg) s = sc.odict() s['total HPV infections'] = self.results['infections'].sum() s['total cancers'] = self.results['cancers'].sum() s['total cancer deaths'] = self.results['cancer_deaths'].sum() s['mean HPV prevalence (%)'] = self.results['hpv_prevalence'].mean()*100 s['mean cancer incidence (per 100k)'] = self.results['cancer_incidence'].mean() s['mean age of infection (years)'] = self.compute_age_mean('infections_by_age', t=t) s['mean age of cancer (years)'] = self.compute_age_mean('cancers_by_age', t=t) summary = sc.objdict() for key in self.result_keys('total'): summary[key] = self.results[key][t] # Update the stored state if update: self.short_summary = s self.summary = summary # Optionally return if output: if full: return summary else: return s else: return
[docs] def summarize(self, full=False, t=None, sep=None, output=False): ''' Print a medium-length summary of the simulation, drawing from the last time point in the simulation by default. Called by default at the end of a sim run. point in the simulation by default. Called by default at the end of a sim run. See also sim.disp() (detailed output) and sim.brief() (short output). Args: full (bool): whether or not to print all results (by default, only cumulative) t (int/str): day or date to compute summary for (by default, the last point) sep (str): thousands separator (default ',') output (bool): whether to return the summary instead of printing it **Examples**:: sim = hpv.Sim(label='Example sim', verbose=0) # Set to run silently sim.run() # Run the sim sim.summarize() # Print medium-length summary of the sim sim.summarize(t=24, full=True) # Print a "slice" of all sim results on day 24 ''' # Compute the summary summary = self.compute_summary(t=t, update=False, output=True) # Construct the output string if sep is None: sep = hpo.sep # Default separator labelstr = f' "{self.label}"' if self.label else '' string = f'Simulation{labelstr} summary:\n' for label,val in summary.items(): if 'total' in label: printval = f' {val:13,.0f} ' elif 'mean' in label: printval = f' {val:13,.2f} ' else: raise NotImplementedError string += printval + label + '\n' # Print or return string if not output: print(string) else: return string
[docs] def plot(self, *args, **kwargs): ''' Plot the outputs of the model ''' fig = hpplt.plot_sim(sim=self, *args, **kwargs) return fig
[docs] def compute_fit(self): ''' Compute fit between model and data. ''' return self.fit
[docs] class AlreadyRunError(RuntimeError): ''' This error is raised if a simulation is run in such a way that no timesteps will be taken. This error is a distinct type so that it can be safely caught and ignored if required, but it is anticipated that most of the time, calling :py:func:`Sim.run` and not taking any timesteps, would be an inadvertent error. ''' pass
[docs] def diff_sims(sim1, sim2, skip_key_diffs=False, skip=None, full=False, output=False, die=False): ''' Compute the difference of the summaries of two simulations, and print any values which differ. Args: sim1 (sim/dict): either a simulation object or the sim.summary dictionary sim2 (sim/dict): ditto skip_key_diffs (bool): whether to skip keys that don't match between sims skip (list): a list of values to skip full (bool): whether to use the full summary (else, brief) output (bool): whether to return the output as a string (otherwise print) die (bool): whether to raise an exception if the sims don't match require_run (bool): require that the simulations have been run **Example**:: s1 = hpv.Sim(rand_seed=1).run() s2 = hpv.Sim(rand_seed=2).run() hpv.diff_sims(s1, s2) ''' if isinstance(sim1, Sim): sim1 = sim1.compute_summary(update=False, output=True, require_run=True, full=full) if isinstance(sim2, Sim): sim2 = sim2.compute_summary(update=False, output=True, require_run=True, full=full) for sim in [sim1, sim2]: if not isinstance(sim, dict): # pragma: no cover errormsg = f'Cannot compare object of type {type(sim)}, must be a sim or a sim.summary dict' raise TypeError(errormsg) # Compare keys keymatchmsg = '' sim1_keys = set(sim1.keys()) sim2_keys = set(sim2.keys()) if sim1_keys != sim2_keys and not skip_key_diffs: # pragma: no cover keymatchmsg = "Keys don't match!\n" missing = list(sim1_keys - sim2_keys) extra = list(sim2_keys - sim1_keys) if missing: keymatchmsg += f' Missing sim1 keys: {missing}\ns' if extra: keymatchmsg += f' Extra sim2 keys: {extra}\n' # Compare values valmatchmsg = '' mismatches = {} skip = sc.tolist(skip) for key in sim2.keys(): # To ensure order if key in sim1_keys and key not in skip: # If a key is missing, don't count it as a mismatch sim1_val = sim1[key] if key in sim1 else 'not present' sim2_val = sim2[key] if key in sim2 else 'not present' if not np.isclose(sim1_val, sim2_val, equal_nan=True): mismatches[key] = {'sim1': sim1_val, 'sim2': sim2_val} if len(mismatches): valmatchmsg = '\nThe following values differ between the two simulations:\n' df = pd.DataFrame.from_dict(mismatches).transpose() diff = [] ratio = [] change = [] small_change = 1e-3 # Define a small change, e.g. a rounding error for mdict in mismatches.values(): old = mdict['sim1'] new = mdict['sim2'] numeric = sc.isnumber(sim1_val) and sc.isnumber(sim2_val) if numeric and old>0: this_diff = new - old this_ratio = new/old abs_ratio = max(this_ratio, 1.0/this_ratio) # Set the character to use if abs_ratio<small_change: change_char = '≈' elif new > old: change_char = '↑' elif new < old: change_char = '↓' else: errormsg = f'Could not determine relationship between sim1={old} and sim2={new}' raise ValueError(errormsg) # Set how many repeats it should have repeats = 1 if abs_ratio >= 1.1: repeats = 2 if abs_ratio >= 2: repeats = 3 if abs_ratio >= 10: repeats = 4 this_change = change_char*repeats else: # pragma: no cover this_diff = np.nan this_ratio = np.nan this_change = 'N/A' diff.append(this_diff) ratio.append(this_ratio) change.append(this_change) df['diff'] = diff df['ratio'] = ratio for col in ['sim1', 'sim2', 'diff', 'ratio']: df[col] = df[col].round(decimals=3) df['change'] = change valmatchmsg += str(df) # Raise an error if mismatches were found mismatchmsg = keymatchmsg + valmatchmsg if mismatchmsg: # pragma: no cover if die: raise ValueError(mismatchmsg) elif output: return mismatchmsg else: print(mismatchmsg) else: if not output: print('Sims match') return