'''
Handle sim parameters
'''
import numpy as np
import sciris as sc
from . import utils as fpu
from . import defaults as fpd
__all__ = ['Pars', 'pars']
#%% Pars (parameters) class
def getval(v):
''' Handle different ways of supplying a value -- number, distribution, function '''
if sc.isnumber(v):
return v
elif isinstance(v, dict):
return fpu.sample(**v)[0] # [0] since returns an array
elif callable(v):
return v()
[docs]
class Pars(dict):
'''
Class to hold a dictionary of parameters, and associated methods.
Usually not called by the user directly -- use ``fp.pars()`` instead.
Args:
pars (dict): dictionary of parameters
'''
def __init__(self, pars=None, *args, **kwargs):
if pars is None:
pars = {}
super().__init__(*args, **kwargs)
self.update(pars)
return
def __repr__(self, *args, **kwargs):
''' Use odict repr, but with a custom class name and no quotes '''
return sc.odict.__repr__(self, quote='', numsep='.', classname='fp.Parameters()', *args, **kwargs)
[docs]
def copy(self):
''' Shortcut for deep copying '''
return sc.dcp(self)
[docs]
def to_dict(self):
''' Return parameters as a new dictionary '''
return {k:v for k,v in self.items()}
[docs]
def to_json(self, filename, **kwargs):
'''
Export parameters to a JSON file.
Args:
filename (str): filename to save to
kwargs (dict): passed to ``sc.savejson``
**Example**::
sim.pars.to_json('my_pars.json')
'''
return sc.savejson(filename=filename, obj=self.to_dict(), **kwargs)
[docs]
def from_json(self, filename, **kwargs):
'''
Import parameters from a JSON file.
Args:
filename (str): filename to load from
kwargs (dict): passed to ``sc.loadjson``
**Example**::
sim.pars.from_json('my_pars.json')
'''
pars = sc.loadjson(filename=filename, **kwargs)
self.update(pars)
return self
[docs]
def validate(self, die=True, update=True):
'''
Perform validation on the parameters
Args:
die (bool): whether to raise an exception if an error is encountered
update (bool): whether to update the method and age maps
'''
# Check that keys are correct
valid_keys = set(default_pars.keys())
keys = set(self.keys())
if keys != valid_keys:
diff1 = valid_keys - keys
diff2 = keys - valid_keys
errormsg = ''
if diff1:
errormsg += 'The parameter set is not valid since the following keys are missing:\n'
errormsg += f'{sc.strjoin(diff1)}\n'
if diff2:
errormsg += 'The parameter set is not valid since the following keys are not recognized:\n'
errormsg += f'{sc.strjoin(diff2)}\n'
if die: raise ValueError(errormsg)
else: print(errormsg)
# Validate method properties
methods = self['methods']
new_method_map = methods['map']
new_method_age_map = methods['age_map']
n = len(new_method_map)
method_keys = list(new_method_map.keys())
modern_keys = list(methods['modern'].keys())
eff_keys = list(methods['eff'].keys())
if not (method_keys == eff_keys == modern_keys):
errormsg = f'Mismatch between method mapping keys:\n{method_keys}",\nmodern keys\n"{modern_keys}", and efficacy keys\n"{eff_keys}"'
if die: raise ValueError(errormsg)
else: print(errormsg)
# Validate method matrices
raw = methods['raw']
age_keys = set(new_method_age_map.keys())
for mkey in ['annual', 'pp0to1', 'pp1to6']:
m_age_keys = set(raw[mkey].keys())
if age_keys != m_age_keys:
errormsg = f'Matrix "{mkey}" has inconsistent keys: "{sc.strjoin(age_keys)}" ≠ "{sc.strjoin(m_age_keys)}"'
if die: raise ValueError(errormsg)
else: print(errormsg)
for k in age_keys:
shape = raw['pp0to1'][k].shape
if shape != (n,):
errormsg = f'Postpartum method initiation matrix for ages {k} has unexpected shape: should be ({n},), not {shape}'
if die: raise ValueError(errormsg)
else: print(errormsg)
for mkey in ['annual', 'pp1to6']:
shape = raw[mkey][k].shape
if shape != (n,n):
errormsg = f'Method matrix {mkey} for ages {k} has unexpected shape: should be ({n},{n}), not {shape}'
if die: raise ValueError(errormsg)
else: print(errormsg)
# Copy to defaults, making use of mutable objects to preserve original object ID
if update:
for k in list(fpd.method_map.keys()): fpd.method_map.pop(k) # Remove all items
for k in list(fpd.method_age_map.keys()): fpd.method_age_map.pop(k) # Remove all items
for k,v in new_method_map.items():
fpd.method_map[k] = v
for k,v in new_method_age_map.items():
fpd.method_age_map[k] = v
return self
def _as_ind(self, key, allow_none=True):
'''
Take a method key and convert to an int, e.g. 'Condoms' → 7.
If already an int, do validation.
'''
mapping = self['methods']['map']
keys = list(mapping.keys())
# Validation
if key is None and not allow_none:
errormsg = "No key supplied; did you mean 'None' instead of None?"
raise ValueError(errormsg)
# Handle options
if key in fpd.none_all_keys:
ind = slice(None) # This is equivalent to ":" in matrix[:,:]
elif isinstance(key, str): # Normal case, convert from key to index
try:
ind = mapping[key]
except KeyError as E:
errormsg = f'Key "{key}" is not a valid method'
raise sc.KeyNotFoundError(errormsg) from E
elif isinstance(key, int): # Already an int, do nothing
ind = key
if ind < len(keys):
key = keys[ind]
else:
errormsg = f'Method index {ind} is out of bounds for methods {sc.strjoin(keys)}'
raise IndexError(errormsg)
else:
errormsg = f'Could not process key of type {type(key)}: must be str or int'
raise TypeError(errormsg)
return ind
def _as_key(self, ind):
'''
Convert ind to key, e.g. 7 → 'Condoms'.
If already a key, do validation.
'''
keys = list(self['methods']['map'].keys())
if isinstance(ind, int): # Normal case, convert to string
if ind < len(keys):
key = keys[ind]
else:
errormsg = f'Method index {ind} is out of bounds for methods {sc.strjoin(keys)}'
raise IndexError(errormsg)
elif isinstance(ind, str): # Already a string, do nothing
key = ind
if key not in keys:
errormsg = f'Name "{key}" is not a valid method: choices are {sc.strjoin(keys)}'
raise sc.KeyNotFoundError(errormsg)
else:
errormsg = f'Could not process index of type {type(ind)}: must be int or str'
raise TypeError(errormsg)
return key
[docs]
def update_method_eff(self, method, eff=None, verbose=False):
'''
Update efficacy of one or more contraceptive methods.
Args:
method (str/dict): method to update, or dict of method:value pairs
eff (float): new value of contraceptive efficacy (not required if method is a dict)
**Examples**::
pars.update_method_eff('Injectables', 0.99)
pars.update_method_eff({'Injectables':0.99, 'Condoms':0.50})
'''
# Validation
if not isinstance(method, dict):
if eff is None:
errormsg = 'Must supply a value to update the contraceptive efficacy to'
raise ValueError(errormsg)
else:
method = {method:eff}
# Perform updates
for k,rawval in method.items():
k = self._as_key(k)
v = getval(rawval)
effs = self['methods']['eff']
orig = effs[k]
effs[k] = v
if verbose:
print(f'Efficacy for method {k} was changed from {orig:0.3f} to {v:0.3f}')
return self
[docs]
def update_method_prob(self, source=None, dest=None, factor=None, value=None,
ages=None, matrix=None, copy_from=None, verbose=False):
'''
Updates the probability matrices with a new value. Usually used via the
intervention ``fp.update_methods()``.
Args:
source (str/int): the method to switch from
dest (str/int): the method to switch to
factor (float): if supplied, multiply the probability by this factor
value (float): if supplied, change the probability to this value
ages (str/list): the ages to modify (default: all)
matrix (str): which switching matrix to modify (default: annual)
copy_from (str): the existing method to copy the probability vectors from (optional)
verbose (bool): how much detail to print
'''
raw = self['methods']['raw'] # We adjust the raw matrices, so the effects are persistent
# Convert from strings to indices
if copy_from:
copy_from = self._as_ind(copy_from, allow_none=False)
if source is None: # We need a source, but it's not always used
source = copy_from
source = self._as_ind(source, allow_none=False)
dest = self._as_ind(dest, allow_none=False)
# Replace age keys with all ages if so asked
if ages in fpd.none_all_keys:
ages = raw['annual'].keys()
else:
ages = sc.tolist(ages)
# Check matrix is valid
if matrix not in raw:
errormsg = f'Invalid matrix "{matrix}"; valid choices are: {sc.strjoin(raw.keys())}'
raise sc.KeyNotFoundError(errormsg)
# Actually loop over the matrices and apply the changes
for k in ages:
arr = raw[matrix][k]
if matrix == 'pp0to1': # Handle the postpartum initialization *vector*
orig = arr[dest] # Pull out before being overwritten
# Handle copy from
if copy_from is not None:
arr[dest] = arr[copy_from]
# Handle everything else
if factor is not None:
arr[dest] *= getval(factor)
elif value is not None:
val = getval(value)
arr[dest] = 0
arr *= (1-val)/arr.sum()
arr[dest] = val
assert np.isclose(arr.sum(), 1, atol=1e-3), f'Matrix should sum to 1, not {arr.sum()}'
if verbose:
print(f'Matrix {matrix} for age group {k} was changed from:\n{orig}\nto\n{arr[dest]}')
else: # Handle annual switching *matrices*
orig = sc.dcp(arr[source, dest])
# Handle copy from
if copy_from is not None:
arr[:,dest] = arr[:, copy_from]
arr[dest,:] = arr[copy_from, :]
median_init = np.median(arr[:, copy_from])
median_discont = np.median(arr[copy_from, :])
arr[dest,dest] = arr[copy_from, copy_from] # Replace diagonal element with correct version
arr[copy_from, dest] = median_init
arr[dest,copy_from] = median_discont
# Handle modifications
if factor is not None:
arr[source, dest] *= getval(factor)
elif value is not None:
val = getval(value)
arr[source, dest] = 0
arr[source, :] *= (1-val)/arr[source, :].sum()
arr[source, dest] = val
assert np.isclose(arr[source, :].sum(), 1, atol=1e-3), f'Matrix should sum to 1, not {arr.sum()}'
if verbose:
print(f'Matrix {matrix} for age group {k} was changed from:\n{orig}\nto\n{arr[source, dest]}')
return self
[docs]
def reset_methods_map(self):
''' Refresh the methods map to be self-consistent '''
methods = self['methods']
methods['map'] = {k:i for i,k in enumerate(methods['map'].keys())} # Reset numbering
return self
[docs]
def add_method(self, name, eff, modern=True):
'''
Add a new contraceptive method to the switching matrices.
A new method should only be added before the sim is run, not during.
Note: the matrices are stored in ``pars['methods']['raw']``; this method
is a helper function for modifying those. For more flexibility, modify
them directly. The ``fp.update_methods()`` intervention can be used to
modify the switching probabilities later.
Args:
name (str): the name of the new method
eff (float): the efficacy of the new method
modern (bool): whether it's a modern method (default: yes)
**Examples**::
pars = fp.pars()
pars.add_method('New method', 0.90)
pars.add_method(name='Male pill', eff=0.98, modern=True)
'''
# Remove from mapping and efficacy
methods = self['methods']
n = len(methods['map'])
methods['map'][name] = n # Can't use reset_methods_map since need to define the new entry
methods['modern'][name] = modern
methods['eff'][name] = eff
# Modify method matrices
raw = methods['raw']
age_keys = methods['age_map'].keys()
for k in age_keys:
# Handle the initiation matrix
pp0to1 = raw['pp0to1']
pp0to1[k] = np.append(pp0to1[k], 0) # Append a zero to the end
# Handle the other matrices
for mkey in ['annual', 'pp1to6']:
matrix = raw[mkey]
zeros_row = np.zeros((1,n))
zeros_col = np.zeros((n+1,1))
matrix[k] = np.append(matrix[k], zeros_row, axis=0) # Append row to bottom
matrix[k] = np.append(matrix[k], zeros_col, axis=1) # Append column to right
matrix[k][n,n] = 1.0 # Set everything to zero except continuation
# Validate
self.validate()
return self
[docs]
def rm_method(self, name):
'''
Removes a contraceptive method from the switching matrices.
A method should only be removed before the sim is run, not during, since
the method associated with each person in the sim will point to the wrong
index.
Args:
name (str/ind): the name or index of the method to remove
**Example**::
pars = fp.pars()
pars.rm_method('Other modern')
'''
# Get index of method to remove
ind = self._as_ind(name, allow_none=False)
key = self._as_key(name)
# Store a copy for debugging
methods = self['methods']
methods['map_orig'] = sc.dcp(methods['map'])
# Remove from mapping and efficacy
for parkey in ['map', 'modern', 'eff']:
methods[parkey].pop(key)
self.reset_methods_map()
# Modify method matrices
raw = methods['raw']
age_keys = methods['age_map'].keys()
for k in age_keys:
# Handle the initiation matrix
pp0to1 = raw['pp0to1']
pp0to1[k] = np.delete(pp0to1[k], ind)
# Handle the other matrices
for mkey in ['annual', 'pp1to6']:
matrix = raw[mkey]
for axis in [0,1]:
matrix[k] = np.delete(matrix[k], ind, axis=axis)
# Validate
self.validate()
return self
[docs]
def reorder_methods(self, order):
'''
Reorder the contraceptive method matrices.
Method reordering should be done before the sim is created (or at least before it's run).
Args:
order (arr): the new order of methods, either ints or strings
sim (Sim): if supplied, also reorder
**Exampls**::
pars = fp.pars()
pars.reorder_methods([2, 6, 4, 7, 0, 8, 5, 1, 3])
'''
# Store a copy for debugging
methods = self['methods']
orig = sc.dcp(methods['map'])
orig_keys = list(orig.keys())
methods['map_orig'] = orig
# Reorder mapping and efficacy
if isinstance(order[0], str): # If strings are supplied, convert to ints
order = [orig_keys.index(k) for k in order]
order_set = sorted(set(order))
orig_set = sorted(set(np.arange(len(orig_keys))))
# Validation
if order_set != orig_set:
errormsg = f'Reordering "{order}" does not match indices of methods "{orig_set}"'
raise ValueError(errormsg)
# Reorder map and efficacy -- TODO: think about how to implement rename as well
new_keys = [orig_keys[k] for k in order]
for parkey in ['map', 'modern', 'eff']:
methods[parkey] = {k:methods[parkey][k] for k in new_keys}
self.reset_methods_map() # Restore ordering
# Modify method matrices
raw = methods['raw']
age_keys = methods['age_map'].keys()
for k in age_keys:
# Handle the initiation matrix
pp0to1 = raw['pp0to1']
pp0to1[k] = pp0to1[k][order]
# Handle the other matrices
for mkey in ['annual', 'pp1to6']:
matrix = raw[mkey]
matrix[k] = matrix[k][:,order][order]
# Validate
self.validate()
return self
#%% Parameter creation functions
def sim_pars():
''' Additional parameters used in the sim '''
sim_pars = dict(
mortality_probs = {}, # CK: TODO: rethink implementation
interventions = [],
analyzers = [],
)
return sim_pars
def empowerment_pars():
''' Additional empowerment parameters'''
empwrmnt_pars = dict(
urban_prop = None,
empowerment = None,
education = None,
age_partnership = None,
)
return empwrmnt_pars
[docs]
def pars(location=None, validate=True, die=True, update=True, **kwargs):
'''
Function for getting default parameters.
Args:
location (str): the location to use for the parameters; use 'test' for a simple test set of parameters
validate (bool): whether to perform validation on the parameters
die (bool): whether to raise an exception if validation fails
update (bool): whether to update values during validation
kwargs (dict): custom parameter values
**Example**::
pars = fp.pars(location='senegal')
'''
from . import locations as fplocs # Here to avoid circular import
if not location:
location = 'default'
location = location.lower() # Ensure it's lowercase
# Set test parameters
if location == 'test':
location = 'default'
kwargs.setdefault('n_agents', 100)
kwargs.setdefault('verbose', 0)
kwargs.setdefault('start_year', 2000)
kwargs.setdefault('end_year', 2010)
# Define valid locations
if location in ['senegal', 'default']:
pars = fplocs.senegal.make_pars()
elif location == 'kenya':
pars = fplocs.kenya.make_pars()
elif location == 'ethiopia':
pars = fplocs.ethiopia.make_pars()
# Else, error
else:
errormsg = f'Location "{location}" is not currently supported'
raise NotImplementedError(errormsg)
# Add parameter keys related to empowerment attributes,
# with default values None
if location != 'kenya':
# Merge with empowerment_pars
pars.update(empowerment_pars())
# Merge with sim_pars and kwargs and copy
pars.update(sim_pars())
pars = sc.mergedicts(pars, kwargs, _copy=True)
# Convert to the class
pars = Pars(pars)
# Perform validation
if validate:
pars.validate(die=die, update=update)
return pars
# Finally, create default parameters to use for accessing keys etc
default_pars = pars(validate=False) # Do not validate since default parameters are used for validation
par_keys = default_pars.keys()