'''
Numerical utilities for running Poliosim
'''
#%% Housekeeping
import numba as nb # For faster computations
import numpy as np # For numerics
import random # Used only for resetting the seed
import pandas as pd # For dataframe operations
import sciris as sc # For scientific utilties
import pylab as pl # For plotting
from . import version as psver
# What functions are externally visible -- note, this gets populated in each section below
__all__ = []
# Set dtypes -- note, these cannot be changed after import since Numba functions are precompiled
nbbool = nb.bool_
nbint = nb.int32
nbfloat = nb.float32
default_float = np.float32
default_int = np.int32
result_float = np.float64
#%% Sampling and seed methods
__all__ += ['sample', 'set_seed']
[docs]def sample(dist=None, par1=None, par2=None, size=None, **kwargs):
'''
Draw a sample from the distribution specified by the input.
Args:
dist (str): the distribution to sample from
par1 (float): the "main" distribution parameter (e.g. mean)
par2 (float): the "secondary" distribution parameter (e.g. std)
size (int): the number of samples (default=1)
kwargs (dict): passed to individual sampling functions
Returns:
A length N array of samples
**Examples**::
sample() # returns Unif(0,1)
sample(dist='normal', par1=3, par2=0.5) # returns Normal(μ=3, σ=0.5)
Notes:
Lognormal distributions are parameterized with reference to the underlying normal distribution (see:
https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.random.lognormal.html), but this
function assumes the user wants to specify the mean and variance of the lognormal distribution.
'''
choices = [
'uniform',
'normal',
'lognormal',
'normal_pos',
'normal_int',
'lognormal_int',
'poisson',
'neg_binomial',
]
# Compute distribution parameters and draw samples
# NB, if adding a new distribution, also add to choices above
if dist == 'uniform': samples = np.random.uniform(low=par1, high=par2, size=size, **kwargs)
elif dist == 'normal': samples = np.random.normal(loc=par1, scale=par2, size=size, **kwargs)
elif dist == 'normal_pos': samples = np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs))
elif dist == 'normal_int': samples = np.round(np.abs(np.random.normal(loc=par1, scale=par2, size=size, **kwargs)))
elif dist == 'poisson': samples = n_poisson(rate=par1, n=size, **kwargs) # Use Numba version below for speed
elif dist == 'neg_binomial': samples = n_neg_binomial(rate=par1, dispersion=par2, n=size, **kwargs) # Use custom version below
elif dist in ['lognormal', 'lognormal_int']:
if par1>0:
mean = np.log(par1**2 / np.sqrt(par2**2 + par1**2)) # Computes the mean of the underlying normal distribution
sigma = np.sqrt(np.log(par2**2/par1**2 + 1)) # Computes sigma for the underlying normal distribution
samples = np.random.lognormal(mean=mean, sigma=sigma, size=size, **kwargs)
else:
samples = np.zeros(size)
if dist == 'lognormal_int':
samples = np.round(samples)
else:
choicestr = '\n'.join(choices)
errormsg = f'The selected distribution "{dist}" is not implemented; choices are: {choicestr}'
raise NotImplementedError(errormsg)
return samples
[docs]def set_seed(seed=None):
'''
Reset the random seed -- complicated because of Numba, which requires special
syntax to reset the seed. This function also resets Python's built-in random
number generated.
Args:
seed (int): the random seed
'''
@nb.njit((nbint,), cache=True)
def set_seed_numba(seed):
return np.random.seed(seed)
def set_seed_regular(seed):
return np.random.seed(seed)
# Dies if a float is given
if seed is not None:
seed = int(seed)
set_seed_regular(seed) # If None, reinitializes it
if seed is None: # Numba can't accept a None seed, so use our just-reinitialized Numpy stream to generate one
seed = np.random.randint(1e9)
set_seed_numba(seed)
random.seed(seed) # Finally, reset Python's built-in random number generator, just in case (used by SynthPops)
return
#%% Probabilities -- mostly not jitted since performance gain is minimal
__all__ += ['n_binomial', 'binomial_filter', 'binomial_arr', 'n_multinomial',
'poisson', 'n_poisson', 'n_neg_binomial', 'choose', 'choose_r', 'choose_w']
[docs]def n_binomial(prob, n):
'''
Perform multiple binomial (Bernolli) trials
Args:
prob (float): probability of each trial succeeding
n (int): number of trials (size of array)
Returns:
Boolean array of which trials succeeded
**Example**::
outcomes = ps.n_binomial(0.5, 100) # Perform 100 coin-flips
'''
return np.random.random(n) < prob
[docs]def binomial_filter(prob, arr): # No speed gain from Numba
'''
Binomial "filter" -- the same as n_binomial, except return
the elements of arr that succeeded.
Args:
prob (float): probability of each trial succeeding
arr (array): the array to be filtered
Returns:
Subset of array for which trials succeeded
**Example**::
inds = ps.binomial_filter(0.5, np.arange(20)**2) # Return which values out of the (arbitrary) array passed the coin flip
'''
return arr[(np.random.random(len(arr)) < prob).nonzero()[0]]
[docs]def binomial_arr(prob_arr):
'''
Binomial (Bernoulli) trials each with different probabilities.
Args:
prob_arr (array): array of probabilities
Returns:
Boolean array of which trials on the input array succeeded
**Example**::
outcomes = ps.binomial_arr([0.1, 0.1, 0.2, 0.2, 0.8, 0.8]) # Perform 6 trials with different probabilities
'''
return np.random.random(len(prob_arr)) < prob_arr
[docs]def n_multinomial(probs, n): # No speed gain from Numba
'''
An array of multinomial trials.
Args:
probs (array): probability of each outcome, which usually should sum to 1
n (int): number of trials
Returns:
Array of integer outcomes
**Example**::
outcomes = ps.n_multinomial(np.ones(6)/6.0, 50)+1 # Return 50 die-rolls
'''
return np.searchsorted(np.cumsum(probs), np.random.random(n))
[docs]@nb.njit((nbfloat,), cache=True) # This hugely increases performance
def poisson(rate):
'''
A Poisson trial.
Args:
rate (float): the rate of the Poisson process
**Example**::
outcome = ps.poisson(100) # Single Poisson trial with mean 100
'''
return np.random.poisson(rate, 1)[0]
[docs]@nb.njit((nbfloat, nbint), cache=True) # Numba hugely increases performance
def n_poisson(rate, n):
'''
An array of Poisson trials.
Args:
rate (float): the rate of the Poisson process (mean)
n (int): number of trials
**Example**::
outcomes = ps.n_poisson(100, 20) # 20 poisson trials with mean 100
'''
return np.random.poisson(rate, n)
[docs]def n_neg_binomial(rate, dispersion, n, step=1): # Numba not used due to incompatible implementation
'''
An array of negative binomial trials; with dispersion = ∞, converges to Poisson.
Args:
rate (float): the rate of the process (mean, same as Poisson)
dispersion (float): amount of dispersion: 0 = infinite, 1 = std is equal to mean, ∞ = Poisson
n (int): number of trials
step (float): the step size to use if non-integer outputs are desired
**Example**::
outcomes = ps.n_neg_binomial(100, 1, 20) # 20 negative binomial trials with mean 100 and dispersion equal to mean
'''
nbn_n = dispersion
nbn_p = dispersion/(rate/step + dispersion)
samples = np.random.negative_binomial(n=nbn_n, p=nbn_p, size=n)*step
return samples
[docs]@nb.njit((nbint, nbint), cache=True) # This hugely increases performance
def choose(max_n, n):
'''
Choose a subset of items (e.g., people) without replacement.
Args:
max_n (int): the total number of items
n (int): the number of items to choose
**Example**::
choices = ps.choose(5, 2) # choose 2 out of 5 people with equal probability (without repeats)
'''
return np.random.choice(max_n, n, replace=False)
[docs]@nb.njit((nbint, nbint), cache=True) # This hugely increases performance
def choose_r(max_n, n):
'''
Choose a subset of items (e.g., people), with replacement.
Args:
max_n (int): the total number of items
n (int): the number of items to choose
**Example**::
choices = ps.choose_r(5, 10) # choose 10 out of 5 people with equal probability (with repeats)
'''
return np.random.choice(max_n, n, replace=True)
[docs]def choose_w(probs, n, unique=True):
'''
Choose n items (e.g. people), each with a probability from the distribution probs.
Args:
probs (array): list of probabilities, should sum to 1
n (int): number of samples to choose
unique (bool): whether or not to ensure unique indices
**Example**::
choices = ps.choose_w([0.2, 0.5, 0.1, 0.1, 0.1], 2) # choose 2 out of 5 people with nonequal probability.
'''
probs = np.array(probs)
n_choices = len(probs)
n_samples = int(n)
probs_sum = probs.sum()
if probs_sum: # Weight is nonzero, rescale
probs = probs/probs_sum
else: # Weights are all zero, choose uniformly
probs = np.ones(n_choices)/n_choices
return np.random.choice(n_choices, n_samples, p=probs, replace=not(unique))
#%% Simple array operations
__all__ += ['true', 'false', 'defined', 'undefined',
'itrue', 'ifalse', 'idefined',
'itruei', 'ifalsei', 'idefinedi']
[docs]def true(arr):
'''
Returns the indices of the values of the array that are true: just an alias
for arr.nonzero()[0].
Args:
arr (array): any array
**Example**::
inds = ps.true(np.array([1,0,0,1,1,0,1]))
'''
return arr.nonzero()[0]
[docs]def false(arr):
'''
Returns the indices of the values of the array that are false.
Args:
arr (array): any array
**Example**::
inds = ps.false(np.array([1,0,0,1,1,0,1]))
'''
return (~arr).nonzero()[0]
[docs]def defined(arr):
'''
Returns the indices of the values of the array that are not-nan.
Args:
arr (array): any array
**Example**::
inds = ps.defined(np.array([1,np.nan,0,np.nan,1,0,1]))
'''
return (~np.isnan(arr)).nonzero()[0]
[docs]def undefined(arr):
'''
Returns the indices of the values of the array that are not-nan.
Args:
arr (array): any array
**Example**::
inds = ps.defined(np.array([1,np.nan,0,np.nan,1,0,1]))
'''
return np.isnan(arr).nonzero()[0]
[docs]def itrue(arr, inds):
'''
Returns the indices that are true in the array -- name is short for indices[true]
Args:
arr (array): a Boolean array, used as a filter
inds (array): any other array (usually, an array of indices) of the same size
**Example**::
inds = ps.itrue(np.array([True,False,True,True]), inds=np.array([5,22,47,93]))
'''
return inds[arr]
[docs]def ifalse(arr, inds):
'''
Returns the indices that are true in the array -- name is short for indices[false]
Args:
arr (array): a Boolean array, used as a filter
inds (array): any other array (usually, an array of indices) of the same size
**Example**::
inds = ps.ifalse(np.array([True,False,True,True]), inds=np.array([5,22,47,93]))
'''
return inds[~arr]
[docs]def idefined(arr, inds):
'''
Returns the indices that are true in the array -- name is short for indices[defined]
Args:
arr (array): any array, used as a filter
inds (array): any other array (usually, an array of indices) of the same size
**Example**::
inds = ps.idefined(np.array([3,np.nan,np.nan,4]), inds=np.array([5,22,47,93]))
'''
return inds[~np.isnan(arr)]
[docs]def itruei(arr, inds):
'''
Returns the indices that are true in the array -- name is short for indices[true[indices]]
Args:
arr (array): a Boolean array, used as a filter
inds (array): an array of indices for the original array
**Example**::
inds = ps.itruei(np.array([True,False,True,True,False,False,True,False]), inds=np.array([0,1,3,5]))
'''
return inds[arr[inds]]
[docs]def ifalsei(arr, inds):
'''
Returns the indices that are false in the array -- name is short for indices[false[indices]]
Args:
arr (array): a Boolean array, used as a filter
inds (array): an array of indices for the original array
**Example**::
inds = ps.ifalsei(np.array([True,False,True,True,False,False,True,False]), inds=np.array([0,1,3,5]))
'''
return inds[~arr[inds]]
[docs]def idefinedi(arr, inds):
'''
Returns the indices that are defined in the array -- name is short for indices[defined[indices]]
Args:
arr (array): any array, used as a filter
inds (array): an array of indices for the original array
**Example**::
inds = ps.idefinedi(np.array([4,np.nan,0,np.nan,np.nan,4,7,4,np.nan]), inds=np.array([0,1,3,5]))
'''
return inds[~np.isnan(arr[inds])]
#%% Convenience imports from Sciris
__all__ += ['load', 'save', 'date', 'day', 'daydiff', 'date_range']
load = sc.load
save = sc.save
date = sc.date
day = sc.day
daydiff = sc.daydiff
date_range = sc.daterange
#%% Custom Poliosim functions
__all__ += ['load_data', 'savefig', 'get_png_metadata', 'git_info', 'check_version', 'check_save_version', 'compute_gof', 'diff_sims']
[docs]def load_data(datafile, columns=None, calculate=True, check_date=True, verbose=True, **kwargs):
'''
Load data for comparing to the model output, either from file or from a dataframe.
Args:
datafile (str or df): if a string, the name of the file to load (either Excel or CSV); if a dataframe, use directly
columns (list): list of column names (otherwise, load all)
calculate (bool): whether to calculate cumulative values from daily counts
check_date (bool): whether to check that a 'date' column is present
kwargs (dict): passed to pd.read_excel()
Returns:
data (dataframe): pandas dataframe of the loaded data
'''
# Load data
if isinstance(datafile, str):
if datafile.lower().endswith('csv'):
raw_data = pd.read_csv(datafile, **kwargs)
elif datafile.lower().endswith('xlsx'):
raw_data = pd.read_excel(datafile, **kwargs)
else:
errormsg = f'Currently loading is only supported from .csv and .xlsx files, not {datafile}'
raise NotImplementedError(errormsg)
elif isinstance(datafile, pd.DataFrame):
raw_data = datafile
else:
errormsg = f'Could not interpret data {type(datafile)}: must be a string or a dataframe'
raise TypeError(errormsg)
# Confirm data integrity and simplify
if columns is not None:
for col in columns:
if col not in raw_data.columns:
errormsg = f'Column "{col}" is missing from the loaded data'
raise ValueError(errormsg)
data = raw_data[columns]
else:
data = raw_data
# Calculate any cumulative columns that are missing
if calculate:
columns = data.columns
for col in columns:
if col.startswith('new'):
cum_col = col.replace('new_', 'cum_')
if cum_col not in columns:
data[cum_col] = np.cumsum(data[col])
if verbose:
print(f' Automatically adding cumulative column {cum_col} from {col}')
# Ensure required columns are present and reset the index
if check_date:
if 'date' not in data.columns:
errormsg = f'Required column "date" not found; columns are {data.columns}'
raise ValueError(errormsg)
else:
data['date'] = pd.to_datetime(data['date']).dt.date
data.set_index('date', inplace=True, drop=False) # Don't drop so sim.data['date'] can still be accessed
return data
def get_caller(frame=2, tostring=True):
'''
Try to get information on the calling function, but fail gracefully.
Frame 1 is the current file (this one), so not very useful. Frame 2 is
the default assuming it is being called directly. Frame 3 is used if
another function is calling this function internally.
Args:
frame (int): how many frames to descend (e.g. the caller of the caller of the...)
tostring (bool): whether to return a string instead of a dict
Returns:
output (str/dict): the filename and line number of the calling function, either as a string or dict
'''
try:
import inspect
result = inspect.getouterframes(inspect.currentframe(), 2)
fname = str(result[frame][1])
lineno = str(result[frame][2])
if tostring:
output = f'{fname}, line {lineno}'
else:
output = {'filename':fname, 'lineno':lineno}
except Exception as E:
if tostring:
output = f'Calling function information not available ({str(E)})'
else:
output = {'filename':'N/A', 'lineno':'N/A'}
return output
[docs]def savefig(filename=None, comments=None, **kwargs):
'''
Wrapper for Matplotlib's savefig() function which automatically stores poliosim
metadata in the figure. By default, saves
Args:
filename (str): name of the file to save to (default, timestamp)
comments (str): additional metadata to save to the figure
kwargs (dict): passed to savefig()
**Example**::
ps.Sim().run(do_plot=True)
filename = ps.savefig()
'''
# Handle inputs
dpi = kwargs.pop('dpi', 150)
metadata = kwargs.pop('metadata', {})
if filename is None:
now = sc.getdate(dateformat='%Y-%b-%d_%H.%M.%S')
filename = f'poliosim_{now}.png'
metadata = {}
metadata['Poliosim version'] = psver.__version__
gitinfo = git_info()
for key,value in gitinfo['poliosim'].items():
metadata[f'Poliosim {key}'] = value
for key,value in gitinfo['called_by'].items():
metadata[f'Poliosim caller {key}'] = value
metadata['Poliosim current time'] = sc.getdate()
metadata['Poliosim calling file'] = get_caller()
if comments:
metadata['Poliosim comments'] = comments
# Save the figure
pl.savefig(filename, dpi=dpi, metadata=metadata, **kwargs)
return filename
[docs]def git_info(filename=None, check=False, comments=None, old_info=None, die=False, indent=2, verbose=True, **kwargs):
'''
Get current git information and optionally write it to disk. Simplest usage
is ps.git_info(__file__)
Args:
filename (str): name of the file to write to or read from
check (bool): whether or not to compare two git versions
comments (str/dict): additional comments to include in the file
old_info (dict): dictionary of information to check against
die (bool): whether or not to raise an exception if the check fails
indent (int): how many indents to use when writing the file to disk
verbose (bool): detail to print
kwargs (dict): passed to loadjson (if check=True) or loadjson (if check=False)
**Examples**::
ps.git_info() # Return information
ps.git_info(__file__) # Writes to disk
ps.git_info('poliosim_version.gitinfo') # Writes to disk
ps.git_info('poliosim_version.gitinfo', check=True) # Checks that current version matches saved file
'''
# Handle the case where __file__ is supplied as the argument
if isinstance(filename, str) and filename.endswith('.py'):
filename = filename.replace('.py', '.gitinfo')
# Get git info
calling_file = sc.makefilepath(get_caller(frame=3, tostring=False)['filename'])
ps_info = {'version':psver.__version__}
ps_info.update(sc.gitinfo(__file__, verbose=False))
caller_info = sc.gitinfo(calling_file, verbose=False)
caller_info['filename'] = calling_file
info = {'poliosim':ps_info, 'called_by':caller_info}
if comments:
info['comments'] = comments
# Just get information and optionally write to disk
if not check:
if filename is not None:
output = sc.savejson(filename, info, indent=indent, **kwargs)
else:
output = info
return output
# Check if versions match, and optionally raise an error
else:
if filename is not None:
old_info = sc.loadjson(filename, **kwargs)
string = ''
old_ps_info = old_info['poliosim'] if 'poliosim' in old_info else old_info
if ps_info != old_ps_info:
string = f'Git information differs: {ps_info} vs. {old_ps_info}'
if die:
raise ValueError(string)
elif verbose:
print(string)
return
[docs]def check_version(expected, die=False, verbose=True, **kwargs):
'''
Get current git information and optionally write it to disk.
Args:
expected (str): expected version information
die (bool): whether or not to raise an exception if the check fails
'''
version = psver.__version__
compare = sc.compareversions(version, expected) # Returns -1, 0, or 1
relation = ['older', '', 'newer'][compare+1] # Picks the right string
if relation: # Not empty, print warning
string = f'Note: Poliosim is {relation} than expected ({version} vs. {expected})'
if die:
raise ValueError(string)
elif verbose:
print(string)
return compare
[docs]def check_save_version(expected=None, filename=None, die=False, verbose=True, **kwargs):
'''
A convenience function that bundles check_version with git_info and saves
automatically to disk from the calling file. The idea is to put this at the
top of an analysis script, and commit the resulting file, to keep track of
which version of poliosim was used.
Args:
expected (str): expected version information
filename (str): file to save to; if None, guess based on current file name
kwargs (dict): passed to git_info()
**Examples**::
ps.check_save_version()
ps.check_save_version('1.3.2', filename='script.gitinfo', comments='This is the main analysis script')
'''
# First, check the version if supplied
if expected:
check_version(expected, die=die, verbose=verbose)
# Now, check and save the git info
if filename is None:
filename = get_caller(tostring=False)['filename']
git_info(filename=filename, **kwargs)
return
[docs]def compute_gof(actual, predicted, normalize=True, use_frac=False, use_squared=False, as_scalar='none', eps=1e-9, skestimator=None, **kwargs):
'''
Calculate the goodness of fit. By default use normalized absolute error, but
highly customizable. For example, mean squared error is equivalent to
setting normalize=False, use_squared=True, as_scalar='mean'.
Args:
actual (arr): array of actual (data) points
predicted (arr): corresponding array of predicted (model) points
normalize (bool): whether to divide the values by the largest value in either series
use_frac (bool): convert to fractional mismatches rather than absolute
use_squared (bool): square the mismatches
as_scalar (str): return as a scalar instead of a time series: choices are sum, mean, median
eps (float): to avoid divide-by-zero
skestimator (str): if provided, use this scikit-learn estimator instead
kwargs (dict): passed to the scikit-learn estimator
Returns:
gofs (arr): array of goodness-of-fit values, or a single value if as_scalar is True
**Examples**::
x1 = np.cumsum(np.random.random(100))
x2 = np.cumsum(np.random.random(100))
e1 = compute_gof(x1, x2) # Default, normalized absolute error
e2 = compute_gof(x1, x2, normalize=False, use_frac=False) # Fractional error
e3 = compute_gof(x1, x2, normalize=False, use_squared=True, as_scalar='mean') # Mean squared error
e4 = compute_gof(x1, x2, estimator='mean_squared_error') # Scikit-learn's MSE method
e5 = compute_gof(x1, x2, as_scalar='median') # Normalized median absolute error -- highly robust
'''
# Handle inputs
actual = np.array(sc.dcp(actual), dtype=float)
predicted = np.array(sc.dcp(predicted), dtype=float)
# Custom estimator is supplied: use that
if skestimator is not None:
try:
import sklearn.metrics as sm
sklearn_gof = getattr(sm, skestimator) # Shortcut to e.g. sklearn.metrics.max_error
except ImportError as E:
raise ImportError(f'You must have scikit-learn >=0.22.2 installed: {str(E)}')
except AttributeError:
raise AttributeError(f'Estimator {skestimator} is not available; see https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter for options')
gof = sklearn_gof(actual, predicted, **kwargs)
return gof
# Default case: calculate it manually
else:
# Key step -- calculate the mismatch!
gofs = abs(np.array(actual) - np.array(predicted))
if normalize and not use_frac:
actual_max = abs(actual).max()
if actual_max>0:
gofs /= actual_max
if use_frac:
if (actual<0).any() or (predicted<0).any():
print('Warning: Calculating fractional errors for non-positive quantities is ill-advised!')
else:
maxvals = np.maximum(actual, predicted) + eps
gofs /= maxvals
if use_squared:
gofs = gofs**2
if as_scalar == 'sum':
gofs = np.sum(gofs)
elif as_scalar == 'mean':
gofs = np.mean(gofs)
elif as_scalar == 'median':
gofs = np.median(gofs)
return gofs
[docs]def diff_sims(sim1, sim2, skip_key_diffs=False, output=False, die=False):
'''
Compute the difference of the summaries of two simulations, and print any
values which differ.
Args:
sim1 (sim/dict): either a simulation object or the sim.summary dictionary
sim2 (sim/dict): ditto
skip_key_diffs (bool): whether to skip keys that don't match between sims
output (bool): whether to return the output as a string (otherwise print)
die (bool): whether to raise an exception if the sims don't match
require_run (bool): require that the simulations have been run
**Example**::
s1 = cv.Sim(beta=0.01)
s2 = cv.Sim(beta=0.02)
s1.run()
s2.run()
cv.diff_sims(s1, s2)
'''
from . import model as psm # To avoid circular import
if isinstance(sim1, psm.Sim):
sim1 = sim1.compute_summary()
if isinstance(sim2, psm.Sim):
sim2 = sim2.compute_summary()
for sim in [sim1, sim2]:
if not isinstance(sim, dict): # pragma: no cover
errormsg = f'Cannot compare object of type {type(sim)}, must be a sim or a sim.summary dict'
raise TypeError(errormsg)
# Compare keys
keymatchmsg = ''
sim1_keys = set(sim1.keys())
sim2_keys = set(sim2.keys())
if sim1_keys != sim2_keys and not skip_key_diffs: # pragma: no cover
keymatchmsg = "Keys don't match!\n"
missing = list(sim1_keys - sim2_keys)
extra = list(sim2_keys - sim1_keys)
if missing:
keymatchmsg += f' Missing sim1 keys: {missing}\n'
if extra:
keymatchmsg += f' Extra sim2 keys: {extra}\n'
# Compare values
valmatchmsg = ''
mismatches = {}
for key in sim2.keys(): # To ensure order
if key in sim1_keys: # If a key is missing, don't count it as a mismatch
sim1_val = sim1[key] if key in sim1 else 'not present'
sim2_val = sim2[key] if key in sim2 else 'not present'
both_nan = sc.isnumber(sim1_val, isnan=True) and sc.isnumber(sim2_val, isnan=True)
if sim1_val != sim2_val and not both_nan:
mismatches[key] = {'sim1': sim1_val, 'sim2': sim2_val}
if len(mismatches):
valmatchmsg = '\nThe following values differ between the two simulations:\n'
df = pd.DataFrame.from_dict(mismatches).transpose()
diff = []
ratio = []
change = []
small_change = 1e-3 # Define a small change, e.g. a rounding error
for mdict in mismatches.values():
old = mdict['sim1']
new = mdict['sim2']
numeric = sc.isnumber(sim1_val) and sc.isnumber(sim2_val)
if numeric and old>0:
this_diff = new - old
this_ratio = new/old
abs_ratio = max(this_ratio, 1.0/this_ratio)
# Set the character to use
if abs_ratio<small_change:
change_char = '≈'
elif new > old:
change_char = '↑'
elif new < old:
change_char = '↓'
else:
errormsg = f'Could not determine relationship between sim1={old} and sim2={new}'
raise ValueError(errormsg)
# Set how many repeats it should have
repeats = 1
if abs_ratio >= 1.1:
repeats = 2
if abs_ratio >= 2:
repeats = 3
if abs_ratio >= 10:
repeats = 4
this_change = change_char*repeats
else: # pragma: no cover
this_diff = np.nan
this_ratio = np.nan
this_change = 'N/A'
diff.append(this_diff)
ratio.append(this_ratio)
change.append(this_change)
df['diff'] = diff
df['ratio'] = ratio
for col in ['sim1', 'sim2', 'diff', 'ratio']:
df[col] = df[col].round(decimals=3)
df['change'] = change
valmatchmsg += str(df)
# Raise an error if mismatches were found
mismatchmsg = keymatchmsg + valmatchmsg
if mismatchmsg: # pragma: no cover
if die:
raise ValueError(mismatchmsg)
elif output:
return mismatchmsg
else:
print(mismatchmsg)
else:
if not output:
print('Sims match')
return
@nb.njit((nbint[:], nbint[:], nb.int64[:]), cache=True)
def find_contacts(p1, p2, inds): # pragma: no cover
"""
Numba for Layer.find_contacts()
A set is returned here rather than a sorted array so that custom tracing interventions can efficiently
add extra people. For a version with sorting by default, see Layer.find_contacts(). Indices must be
an int64 array since this is what's returned by true() etc. functions by default.
"""
pairing_partners = set()
inds = set(inds)
for i in range(len(p1)):
if p1[i] in inds:
pairing_partners.add(p2[i])
if p2[i] in inds:
pairing_partners.add(p1[i])
return pairing_partners