Source code for poliosim.utils

'''
Numerical utilities for running Poliosim
'''

#%% Housekeeping

import numba  as nb # For faster computations
import numpy  as np # For numerics
import random # Used only for resetting the seed
import pandas as pd # For dataframe operations
import sciris as sc # For scientific utilties
import pylab as pl # For plotting
from . import version as psver


# What functions are externally visible -- note, this gets populated in each section below
__all__ = []

# Set dtypes -- note, these cannot be changed after import since Numba functions are precompiled
nbbool  = nb.bool_
nbint   = nb.int32
nbfloat = nb.float32
default_float = np.float32
default_int   = np.int32
result_float  = np.float64


#%% Sampling and seed methods

__all__ += ['sample', 'set_seed']


[docs]def sample(dist=None, par1=None, par2=None, size=None, **kwargs): ''' Draw a sample from the distribution specified by the input. Args: dist (str): the distribution to sample from par1 (float): the "main" distribution parameter (e.g. mean) par2 (float): the "secondary" distribution parameter (e.g. std) size (int): the number of samples (default=1) kwargs (dict): passed to individual sampling functions Returns: A length N array of samples **Examples**:: sample() # returns Unif(0,1) sample(dist='normal', par1=3, par2=0.5) # returns Normal(μ=3, σ=0.5) Notes: Lognormal distributions are parameterized with reference to the underlying normal distribution (see: https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.random.lognormal.html), but this function assumes the user wants to specify the mean and variance of the lognormal distribution. ''' choices = [ 'uniform', 'normal', 'lognormal', 'normal_pos', 'normal_int', 'lognormal_int', 'poisson', 'neg_binomial', ] # Compute distribution parameters and draw samples # NB, if adding a new distribution, also add to choices above if dist == 'uniform': samples = np.random.uniform(low=par1, high=par2, size=size, **kwargs) elif dist == 'normal': samples = np.random.normal(loc=par1, scale=par2, size=size, **kwargs) elif dist == 'normal_pos': samples = np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs)) elif dist == 'normal_int': samples = np.round(np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs))) elif dist == 'poisson': samples = n_poisson(rate=par1, n=size, **kwargs) # Use Numba version below for speed elif dist == 'neg_binomial': samples = n_neg_binomial(rate=par1, dispersion=par2, n=size, **kwargs) # Use custom version below elif dist in ['lognormal', 'lognormal_int']: if par1>0: mean = np.log(par1**2 / np.sqrt(par2**2 + par1**2)) # Computes the mean of the underlying normal distribution sigma = np.sqrt(np.log(par2**2/par1**2 + 1)) # Computes sigma for the underlying normal distribution samples = np.random.lognormal(mean=mean, sigma=sigma, size=size, **kwargs) else: samples = np.zeros(size) if dist == 'lognormal_int': samples = np.round(samples) else: choicestr = '\n'.join(choices) errormsg = f'The selected distribution "{dist}" is not implemented; choices are: {choicestr}' raise NotImplementedError(errormsg) return samples
[docs]def set_seed(seed=None): ''' Reset the random seed -- complicated because of Numba, which requires special syntax to reset the seed. This function also resets Python's built-in random number generated. Args: seed (int): the random seed ''' @nb.njit((nbint,), cache=True) def set_seed_numba(seed): return np.random.seed(seed) def set_seed_regular(seed): return np.random.seed(seed) # Dies if a float is given if seed is not None: seed = int(seed) set_seed_regular(seed) # If None, reinitializes it if seed is None: # Numba can't accept a None seed, so use our just-reinitialized Numpy stream to generate one seed = np.random.randint(1e9) set_seed_numba(seed) random.seed(seed) # Finally, reset Python's built-in random number generator, just in case (used by SynthPops) return
#%% Probabilities -- mostly not jitted since performance gain is minimal __all__ += ['n_binomial', 'binomial_filter', 'binomial_arr', 'n_multinomial', 'poisson', 'n_poisson', 'n_neg_binomial', 'choose', 'choose_r', 'choose_w']
[docs]def n_binomial(prob, n): ''' Perform multiple binomial (Bernolli) trials Args: prob (float): probability of each trial succeeding n (int): number of trials (size of array) Returns: Boolean array of which trials succeeded **Example**:: outcomes = ps.n_binomial(0.5, 100) # Perform 100 coin-flips ''' return np.random.random(n) < prob
[docs]def binomial_filter(prob, arr): # No speed gain from Numba ''' Binomial "filter" -- the same as n_binomial, except return the elements of arr that succeeded. Args: prob (float): probability of each trial succeeding arr (array): the array to be filtered Returns: Subset of array for which trials succeeded **Example**:: inds = ps.binomial_filter(0.5, np.arange(20)**2) # Return which values out of the (arbitrary) array passed the coin flip ''' return arr[(np.random.random(len(arr)) < prob).nonzero()[0]]
[docs]def binomial_arr(prob_arr): ''' Binomial (Bernoulli) trials each with different probabilities. Args: prob_arr (array): array of probabilities Returns: Boolean array of which trials on the input array succeeded **Example**:: outcomes = ps.binomial_arr([0.1, 0.1, 0.2, 0.2, 0.8, 0.8]) # Perform 6 trials with different probabilities ''' return np.random.random(len(prob_arr)) < prob_arr
[docs]def n_multinomial(probs, n): # No speed gain from Numba ''' An array of multinomial trials. Args: probs (array): probability of each outcome, which usually should sum to 1 n (int): number of trials Returns: Array of integer outcomes **Example**:: outcomes = ps.n_multinomial(np.ones(6)/6.0, 50)+1 # Return 50 die-rolls ''' return np.searchsorted(np.cumsum(probs), np.random.random(n))
[docs]@nb.njit((nbfloat,), cache=True) # This hugely increases performance def poisson(rate): ''' A Poisson trial. Args: rate (float): the rate of the Poisson process **Example**:: outcome = ps.poisson(100) # Single Poisson trial with mean 100 ''' return np.random.poisson(rate, 1)[0]
[docs]@nb.njit((nbfloat, nbint), cache=True) # Numba hugely increases performance def n_poisson(rate, n): ''' An array of Poisson trials. Args: rate (float): the rate of the Poisson process (mean) n (int): number of trials **Example**:: outcomes = ps.n_poisson(100, 20) # 20 poisson trials with mean 100 ''' return np.random.poisson(rate, n)
[docs]def n_neg_binomial(rate, dispersion, n, step=1): # Numba not used due to incompatible implementation ''' An array of negative binomial trials; with dispersion = ∞, converges to Poisson. Args: rate (float): the rate of the process (mean, same as Poisson) dispersion (float): amount of dispersion: 0 = infinite, 1 = std is equal to mean, ∞ = Poisson n (int): number of trials step (float): the step size to use if non-integer outputs are desired **Example**:: outcomes = ps.n_neg_binomial(100, 1, 20) # 20 negative binomial trials with mean 100 and dispersion equal to mean ''' nbn_n = dispersion nbn_p = dispersion/(rate/step + dispersion) samples = np.random.negative_binomial(n=nbn_n, p=nbn_p, size=n)*step return samples
[docs]@nb.njit((nbint, nbint), cache=True) # This hugely increases performance def choose(max_n, n): ''' Choose a subset of items (e.g., people) without replacement. Args: max_n (int): the total number of items n (int): the number of items to choose **Example**:: choices = ps.choose(5, 2) # choose 2 out of 5 people with equal probability (without repeats) ''' return np.random.choice(max_n, n, replace=False)
[docs]@nb.njit((nbint, nbint), cache=True) # This hugely increases performance def choose_r(max_n, n): ''' Choose a subset of items (e.g., people), with replacement. Args: max_n (int): the total number of items n (int): the number of items to choose **Example**:: choices = ps.choose_r(5, 10) # choose 10 out of 5 people with equal probability (with repeats) ''' return np.random.choice(max_n, n, replace=True)
[docs]def choose_w(probs, n, unique=True): ''' Choose n items (e.g. people), each with a probability from the distribution probs. Args: probs (array): list of probabilities, should sum to 1 n (int): number of samples to choose unique (bool): whether or not to ensure unique indices **Example**:: choices = ps.choose_w([0.2, 0.5, 0.1, 0.1, 0.1], 2) # choose 2 out of 5 people with nonequal probability. ''' probs = np.array(probs) n_choices = len(probs) n_samples = int(n) probs_sum = probs.sum() if probs_sum: # Weight is nonzero, rescale probs = probs/probs_sum else: # Weights are all zero, choose uniformly probs = np.ones(n_choices)/n_choices return np.random.choice(n_choices, n_samples, p=probs, replace=not(unique))
#%% Simple array operations __all__ += ['true', 'false', 'defined', 'undefined', 'itrue', 'ifalse', 'idefined', 'itruei', 'ifalsei', 'idefinedi']
[docs]def true(arr): ''' Returns the indices of the values of the array that are true: just an alias for arr.nonzero()[0]. Args: arr (array): any array **Example**:: inds = ps.true(np.array([1,0,0,1,1,0,1])) ''' return arr.nonzero()[0]
[docs]def false(arr): ''' Returns the indices of the values of the array that are false. Args: arr (array): any array **Example**:: inds = ps.false(np.array([1,0,0,1,1,0,1])) ''' return (~arr).nonzero()[0]
[docs]def defined(arr): ''' Returns the indices of the values of the array that are not-nan. Args: arr (array): any array **Example**:: inds = ps.defined(np.array([1,np.nan,0,np.nan,1,0,1])) ''' return (~np.isnan(arr)).nonzero()[0]
[docs]def undefined(arr): ''' Returns the indices of the values of the array that are not-nan. Args: arr (array): any array **Example**:: inds = ps.defined(np.array([1,np.nan,0,np.nan,1,0,1])) ''' return np.isnan(arr).nonzero()[0]
[docs]def itrue(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[true] Args: arr (array): a Boolean array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = ps.itrue(np.array([True,False,True,True]), inds=np.array([5,22,47,93])) ''' return inds[arr]
[docs]def ifalse(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[false] Args: arr (array): a Boolean array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = ps.ifalse(np.array([True,False,True,True]), inds=np.array([5,22,47,93])) ''' return inds[~arr]
[docs]def idefined(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[defined] Args: arr (array): any array, used as a filter inds (array): any other array (usually, an array of indices) of the same size **Example**:: inds = ps.idefined(np.array([3,np.nan,np.nan,4]), inds=np.array([5,22,47,93])) ''' return inds[~np.isnan(arr)]
[docs]def itruei(arr, inds): ''' Returns the indices that are true in the array -- name is short for indices[true[indices]] Args: arr (array): a Boolean array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = ps.itruei(np.array([True,False,True,True,False,False,True,False]), inds=np.array([0,1,3,5])) ''' return inds[arr[inds]]
[docs]def ifalsei(arr, inds): ''' Returns the indices that are false in the array -- name is short for indices[false[indices]] Args: arr (array): a Boolean array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = ps.ifalsei(np.array([True,False,True,True,False,False,True,False]), inds=np.array([0,1,3,5])) ''' return inds[~arr[inds]]
[docs]def idefinedi(arr, inds): ''' Returns the indices that are defined in the array -- name is short for indices[defined[indices]] Args: arr (array): any array, used as a filter inds (array): an array of indices for the original array **Example**:: inds = ps.idefinedi(np.array([4,np.nan,0,np.nan,np.nan,4,7,4,np.nan]), inds=np.array([0,1,3,5])) ''' return inds[~np.isnan(arr[inds])]
#%% Convenience imports from Sciris __all__ += ['load', 'save', 'date', 'day', 'daydiff', 'date_range'] load = sc.load save = sc.save date = sc.date day = sc.day daydiff = sc.daydiff date_range = sc.daterange #%% Custom Poliosim functions __all__ += ['load_data', 'savefig', 'get_png_metadata', 'git_info', 'check_version', 'check_save_version', 'compute_gof', 'diff_sims']
[docs]def load_data(datafile, columns=None, calculate=True, check_date=True, verbose=True, **kwargs): ''' Load data for comparing to the model output, either from file or from a dataframe. Args: datafile (str or df): if a string, the name of the file to load (either Excel or CSV); if a dataframe, use directly columns (list): list of column names (otherwise, load all) calculate (bool): whether to calculate cumulative values from daily counts check_date (bool): whether to check that a 'date' column is present kwargs (dict): passed to pd.read_excel() Returns: data (dataframe): pandas dataframe of the loaded data ''' # Load data if isinstance(datafile, str): if datafile.lower().endswith('csv'): raw_data = pd.read_csv(datafile, **kwargs) elif datafile.lower().endswith('xlsx'): raw_data = pd.read_excel(datafile, **kwargs) else: errormsg = f'Currently loading is only supported from .csv and .xlsx files, not {datafile}' raise NotImplementedError(errormsg) elif isinstance(datafile, pd.DataFrame): raw_data = datafile else: errormsg = f'Could not interpret data {type(datafile)}: must be a string or a dataframe' raise TypeError(errormsg) # Confirm data integrity and simplify if columns is not None: for col in columns: if col not in raw_data.columns: errormsg = f'Column "{col}" is missing from the loaded data' raise ValueError(errormsg) data = raw_data[columns] else: data = raw_data # Calculate any cumulative columns that are missing if calculate: columns = data.columns for col in columns: if col.startswith('new'): cum_col = col.replace('new_', 'cum_') if cum_col not in columns: data[cum_col] = np.cumsum(data[col]) if verbose: print(f' Automatically adding cumulative column {cum_col} from {col}') # Ensure required columns are present and reset the index if check_date: if 'date' not in data.columns: errormsg = f'Required column "date" not found; columns are {data.columns}' raise ValueError(errormsg) else: data['date'] = pd.to_datetime(data['date']).dt.date data.set_index('date', inplace=True, drop=False) # Don't drop so sim.data['date'] can still be accessed return data
def get_caller(frame=2, tostring=True): ''' Try to get information on the calling function, but fail gracefully. Frame 1 is the current file (this one), so not very useful. Frame 2 is the default assuming it is being called directly. Frame 3 is used if another function is calling this function internally. Args: frame (int): how many frames to descend (e.g. the caller of the caller of the...) tostring (bool): whether to return a string instead of a dict Returns: output (str/dict): the filename and line number of the calling function, either as a string or dict ''' try: import inspect result = inspect.getouterframes(inspect.currentframe(), 2) fname = str(result[frame][1]) lineno = str(result[frame][2]) if tostring: output = f'{fname}, line {lineno}' else: output = {'filename':fname, 'lineno':lineno} except Exception as E: if tostring: output = f'Calling function information not available ({str(E)})' else: output = {'filename':'N/A', 'lineno':'N/A'} return output
[docs]def savefig(filename=None, comments=None, **kwargs): ''' Wrapper for Matplotlib's savefig() function which automatically stores poliosim metadata in the figure. By default, saves Args: filename (str): name of the file to save to (default, timestamp) comments (str): additional metadata to save to the figure kwargs (dict): passed to savefig() **Example**:: ps.Sim().run(do_plot=True) filename = ps.savefig() ''' # Handle inputs dpi = kwargs.pop('dpi', 150) metadata = kwargs.pop('metadata', {}) if filename is None: now = sc.getdate(dateformat='%Y-%b-%d_%H.%M.%S') filename = f'poliosim_{now}.png' metadata = {} metadata['Poliosim version'] = psver.__version__ gitinfo = git_info() for key,value in gitinfo['poliosim'].items(): metadata[f'Poliosim {key}'] = value for key,value in gitinfo['called_by'].items(): metadata[f'Poliosim caller {key}'] = value metadata['Poliosim current time'] = sc.getdate() metadata['Poliosim calling file'] = get_caller() if comments: metadata['Poliosim comments'] = comments # Save the figure pl.savefig(filename, dpi=dpi, metadata=metadata, **kwargs) return filename
[docs]def get_png_metadata(filename, output=False): ''' Read metadata from a PNG file. For use with images saved with ps.savefig(). Requires pillow, an optional dependency. Args: filename (str): the name of the file to load the data from **Example**:: ps.Sim().run(do_plot=True) ps.savefig('poliosim.png') ps.get_png_metadata('poliosim.png') ''' try: import PIL except ImportError as E: errormsg = f'Pillow import failed ({str(E)}), please install first (pip install pillow)' raise ImportError(errormsg) im = PIL.Image.open(filename) metadata = {} for key,value in im.info.items(): if key.startswith('Poliosim'): metadata[key] = value if not output: print(f'{key}: {value}') if output: return metadata else: return
[docs]def git_info(filename=None, check=False, comments=None, old_info=None, die=False, indent=2, verbose=True, **kwargs): ''' Get current git information and optionally write it to disk. Simplest usage is ps.git_info(__file__) Args: filename (str): name of the file to write to or read from check (bool): whether or not to compare two git versions comments (str/dict): additional comments to include in the file old_info (dict): dictionary of information to check against die (bool): whether or not to raise an exception if the check fails indent (int): how many indents to use when writing the file to disk verbose (bool): detail to print kwargs (dict): passed to loadjson (if check=True) or loadjson (if check=False) **Examples**:: ps.git_info() # Return information ps.git_info(__file__) # Writes to disk ps.git_info('poliosim_version.gitinfo') # Writes to disk ps.git_info('poliosim_version.gitinfo', check=True) # Checks that current version matches saved file ''' # Handle the case where __file__ is supplied as the argument if isinstance(filename, str) and filename.endswith('.py'): filename = filename.replace('.py', '.gitinfo') # Get git info calling_file = sc.makefilepath(get_caller(frame=3, tostring=False)['filename']) ps_info = {'version':psver.__version__} ps_info.update(sc.gitinfo(__file__, verbose=False)) caller_info = sc.gitinfo(calling_file, verbose=False) caller_info['filename'] = calling_file info = {'poliosim':ps_info, 'called_by':caller_info} if comments: info['comments'] = comments # Just get information and optionally write to disk if not check: if filename is not None: output = sc.savejson(filename, info, indent=indent, **kwargs) else: output = info return output # Check if versions match, and optionally raise an error else: if filename is not None: old_info = sc.loadjson(filename, **kwargs) string = '' old_ps_info = old_info['poliosim'] if 'poliosim' in old_info else old_info if ps_info != old_ps_info: string = f'Git information differs: {ps_info} vs. {old_ps_info}' if die: raise ValueError(string) elif verbose: print(string) return
[docs]def check_version(expected, die=False, verbose=True, **kwargs): ''' Get current git information and optionally write it to disk. Args: expected (str): expected version information die (bool): whether or not to raise an exception if the check fails ''' version = psver.__version__ compare = sc.compareversions(version, expected) # Returns -1, 0, or 1 relation = ['older', '', 'newer'][compare+1] # Picks the right string if relation: # Not empty, print warning string = f'Note: Poliosim is {relation} than expected ({version} vs. {expected})' if die: raise ValueError(string) elif verbose: print(string) return compare
[docs]def check_save_version(expected=None, filename=None, die=False, verbose=True, **kwargs): ''' A convenience function that bundles check_version with git_info and saves automatically to disk from the calling file. The idea is to put this at the top of an analysis script, and commit the resulting file, to keep track of which version of poliosim was used. Args: expected (str): expected version information filename (str): file to save to; if None, guess based on current file name kwargs (dict): passed to git_info() **Examples**:: ps.check_save_version() ps.check_save_version('1.3.2', filename='script.gitinfo', comments='This is the main analysis script') ''' # First, check the version if supplied if expected: check_version(expected, die=die, verbose=verbose) # Now, check and save the git info if filename is None: filename = get_caller(tostring=False)['filename'] git_info(filename=filename, **kwargs) return
[docs]def compute_gof(actual, predicted, normalize=True, use_frac=False, use_squared=False, as_scalar='none', eps=1e-9, skestimator=None, **kwargs): ''' Calculate the goodness of fit. By default use normalized absolute error, but highly customizable. For example, mean squared error is equivalent to setting normalize=False, use_squared=True, as_scalar='mean'. Args: actual (arr): array of actual (data) points predicted (arr): corresponding array of predicted (model) points normalize (bool): whether to divide the values by the largest value in either series use_frac (bool): convert to fractional mismatches rather than absolute use_squared (bool): square the mismatches as_scalar (str): return as a scalar instead of a time series: choices are sum, mean, median eps (float): to avoid divide-by-zero skestimator (str): if provided, use this scikit-learn estimator instead kwargs (dict): passed to the scikit-learn estimator Returns: gofs (arr): array of goodness-of-fit values, or a single value if as_scalar is True **Examples**:: x1 = np.cumsum(np.random.random(100)) x2 = np.cumsum(np.random.random(100)) e1 = compute_gof(x1, x2) # Default, normalized absolute error e2 = compute_gof(x1, x2, normalize=False, use_frac=False) # Fractional error e3 = compute_gof(x1, x2, normalize=False, use_squared=True, as_scalar='mean') # Mean squared error e4 = compute_gof(x1, x2, estimator='mean_squared_error') # Scikit-learn's MSE method e5 = compute_gof(x1, x2, as_scalar='median') # Normalized median absolute error -- highly robust ''' # Handle inputs actual = np.array(sc.dcp(actual), dtype=float) predicted = np.array(sc.dcp(predicted), dtype=float) # Custom estimator is supplied: use that if skestimator is not None: try: import sklearn.metrics as sm sklearn_gof = getattr(sm, skestimator) # Shortcut to e.g. sklearn.metrics.max_error except ImportError as E: raise ImportError(f'You must have scikit-learn >=0.22.2 installed: {str(E)}') except AttributeError: raise AttributeError(f'Estimator {skestimator} is not available; see https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter for options') gof = sklearn_gof(actual, predicted, **kwargs) return gof # Default case: calculate it manually else: # Key step -- calculate the mismatch! gofs = abs(np.array(actual) - np.array(predicted)) if normalize and not use_frac: actual_max = abs(actual).max() if actual_max>0: gofs /= actual_max if use_frac: if (actual<0).any() or (predicted<0).any(): print('Warning: Calculating fractional errors for non-positive quantities is ill-advised!') else: maxvals = np.maximum(actual, predicted) + eps gofs /= maxvals if use_squared: gofs = gofs**2 if as_scalar == 'sum': gofs = np.sum(gofs) elif as_scalar == 'mean': gofs = np.mean(gofs) elif as_scalar == 'median': gofs = np.median(gofs) return gofs
[docs]def diff_sims(sim1, sim2, skip_key_diffs=False, output=False, die=False): ''' Compute the difference of the summaries of two simulations, and print any values which differ. Args: sim1 (sim/dict): either a simulation object or the sim.summary dictionary sim2 (sim/dict): ditto skip_key_diffs (bool): whether to skip keys that don't match between sims output (bool): whether to return the output as a string (otherwise print) die (bool): whether to raise an exception if the sims don't match require_run (bool): require that the simulations have been run **Example**:: s1 = cv.Sim(beta=0.01) s2 = cv.Sim(beta=0.02) s1.run() s2.run() cv.diff_sims(s1, s2) ''' from . import model as psm # To avoid circular import if isinstance(sim1, psm.Sim): sim1 = sim1.compute_summary() if isinstance(sim2, psm.Sim): sim2 = sim2.compute_summary() for sim in [sim1, sim2]: if not isinstance(sim, dict): # pragma: no cover errormsg = f'Cannot compare object of type {type(sim)}, must be a sim or a sim.summary dict' raise TypeError(errormsg) # Compare keys keymatchmsg = '' sim1_keys = set(sim1.keys()) sim2_keys = set(sim2.keys()) if sim1_keys != sim2_keys and not skip_key_diffs: # pragma: no cover keymatchmsg = "Keys don't match!\n" missing = list(sim1_keys - sim2_keys) extra = list(sim2_keys - sim1_keys) if missing: keymatchmsg += f' Missing sim1 keys: {missing}\n' if extra: keymatchmsg += f' Extra sim2 keys: {extra}\n' # Compare values valmatchmsg = '' mismatches = {} for key in sim2.keys(): # To ensure order if key in sim1_keys: # If a key is missing, don't count it as a mismatch sim1_val = sim1[key] if key in sim1 else 'not present' sim2_val = sim2[key] if key in sim2 else 'not present' both_nan = sc.isnumber(sim1_val, isnan=True) and sc.isnumber(sim2_val, isnan=True) if sim1_val != sim2_val and not both_nan: mismatches[key] = {'sim1': sim1_val, 'sim2': sim2_val} if len(mismatches): valmatchmsg = '\nThe following values differ between the two simulations:\n' df = pd.DataFrame.from_dict(mismatches).transpose() diff = [] ratio = [] change = [] small_change = 1e-3 # Define a small change, e.g. a rounding error for mdict in mismatches.values(): old = mdict['sim1'] new = mdict['sim2'] numeric = sc.isnumber(sim1_val) and sc.isnumber(sim2_val) if numeric and old>0: this_diff = new - old this_ratio = new/old abs_ratio = max(this_ratio, 1.0/this_ratio) # Set the character to use if abs_ratio<small_change: change_char = '≈' elif new > old: change_char = '↑' elif new < old: change_char = '↓' else: errormsg = f'Could not determine relationship between sim1={old} and sim2={new}' raise ValueError(errormsg) # Set how many repeats it should have repeats = 1 if abs_ratio >= 1.1: repeats = 2 if abs_ratio >= 2: repeats = 3 if abs_ratio >= 10: repeats = 4 this_change = change_char*repeats else: # pragma: no cover this_diff = np.nan this_ratio = np.nan this_change = 'N/A' diff.append(this_diff) ratio.append(this_ratio) change.append(this_change) df['diff'] = diff df['ratio'] = ratio for col in ['sim1', 'sim2', 'diff', 'ratio']: df[col] = df[col].round(decimals=3) df['change'] = change valmatchmsg += str(df) # Raise an error if mismatches were found mismatchmsg = keymatchmsg + valmatchmsg if mismatchmsg: # pragma: no cover if die: raise ValueError(mismatchmsg) elif output: return mismatchmsg else: print(mismatchmsg) else: if not output: print('Sims match') return
@nb.njit((nbint[:], nbint[:], nb.int64[:]), cache=True) def find_contacts(p1, p2, inds): # pragma: no cover """ Numba for Layer.find_contacts() A set is returned here rather than a sorted array so that custom tracing interventions can efficiently add extra people. For a version with sorting by default, see Layer.find_contacts(). Indices must be an int64 array since this is what's returned by true() etc. functions by default. """ pairing_partners = set() inds = set(inds) for i in range(len(p1)): if p1[i] in inds: pairing_partners.add(p2[i]) if p2[i] in inds: pairing_partners.add(p1[i]) return pairing_partners