Source code for idmtools_calibra.utilities.prior

import logging
from functools import reduce
from operator import mul
import numpy as np
import pandas as pd
import scipy.stats

logger = logging.getLogger(__name__)


[docs]class SampleRange(object): """ Container for min, max of a range and a type of sampling to use :param range_type: Type of sampling within range. Supported values: linear, log, linear_int :param range_min: Minimum of sampling range. :param range_max: Maximum of sampling range. """ def __init__(self, range_type, range_min, range_max): if range_max <= range_min: raise Exception('range_max (%f) must be greater than range_min (%f).' % (range_max, range_min)) self.range_type = range_type self.range_min = range_min self.range_max = range_max self.function = self.get_sample_function()
[docs] def get_sample_function(self): """ Converts sample-range variables into scipy.stats frozen function :return: Frozen scipy.stats function """ if self.range_type == 'linear': return scipy.stats.uniform(loc=self.range_min, scale=(self.range_max - self.range_min)) elif self.range_type == 'log': if self.range_min <= 0: raise Exception('range_min (%f) must be greater than zero for range_type=log.' % self.range_min) return scipy.stats.reciprocal(a=self.range_min, b=self.range_max) elif self.range_type == 'linear_int': xx = range(int(self.range_min), int(self.range_max) + 1) ww = [1.0 / len(xx)] * len(xx) return scipy.stats.rv_discrete(name='linear_int', values=(xx, ww)) else: raise Exception('Unknown range_type (%s). Supported types: linear, log, linear_int.' % self.range_type)
[docs] def get_bins(self, n): if self.is_log(): return np.logspace(np.log10(self.range_min), np.log10(self.range_max), n) else: return np.linspace(self.range_min, self.range_max, n)
[docs] def get_xlim(self): return self.range_min, self.range_max
[docs] def is_log(self): return 'log' in self.range_type
[docs] def is_int(self): return 'int' in self.range_type
[docs]class SampleFunctionContainer(object): """ Container for a frozen function and optionally its associated sample-range properties """ def __init__(self, function, sample_range=None): self.function = function self.sample_range = sample_range
[docs] @classmethod def from_range(cls, sample_range): return cls(sample_range.function, sample_range)
[docs] @classmethod def from_tuple(cls, *args): return cls.from_range(SampleRange(*args))
[docs] def get_even_spaced_samples(self, n): """ Returns an evenly spaced sampling of the percent-point function (inverse CDF) :param n: number of evenly spaced samples """ return self.function.ppf(np.linspace(0.001, 0.999, n))
[docs] def pdf(self, X): """ Wrapper for contained function pdf with check for discrete distributions :return: """ if isinstance(self.function, scipy.stats._distn_infrastructure.rv_frozen): return self.function.pdf(X) elif isinstance(self.function, scipy.stats._distn_infrastructure.rv_sample): if self.sample_range is not None and 'int' in self.sample_range.range_type: return self.function.pmf(np.round(X)) # round to NEAREST integer else: X_cdfs = self.function.cdf(X) X_rounded = self.function.ppf(X_cdfs) # this will round down to value of non-zero PMF return self.function.pmf(X_rounded) else: raise Exception("Expecting a frozen scipy.stats function.")
[docs]class MultiVariatePrior(object): """ Multi-variate wrapper exposing same interfaces as scipy.stats functions, i.e. pdf and rvs Different dimensions are drawn independently from the univariate distributions. Args: sample_functions : list of scipy.stats frozen functions params : list of parameter names associated with functions (optional) ranges : list of SampleRange objects associated with functions (optional) name : name of MultiVariatePrior object (optional) """ def __init__(self, functions, params=[], ranges=[], name=None): self.name = name if not params: params = range(len(functions)) elif len(functions) != len(params): raise Exception("params and functions lists must have same length") if not ranges: ranges = [None] * len(functions) elif len(functions) != len(ranges): raise Exception("ranges and functions lists must have same length") self.sample_functions = {p: SampleFunctionContainer(f, r) for (p, f, r) in zip(params, functions, ranges)} @property def functions(self): return [sfc.function for sfc in self.sample_functions.values()] @property def params(self): return self.sample_functions.keys() @property def ndim(self): return len(self.functions)
[docs] @classmethod def by_range(cls, **param_sample_ranges): """ Builds multi-variate wrapper from keyword arguments of parameter names to SampleRange (min, max, type) Args: param_sample_ranges: keyword arguments of parameter names to SampleRange tuple Returns: MultiVariatePrior instance An example usage:: > prior = MultiVariatePrior.by_range( MSP1_Merozoite_Kill_Fraction=('linear', 0.4, 0.7), Max_Individual_Infections=('linear_int', 3, 8), Base_Gametocyte_Production_Rate=('log', 0.001, 0.5)) """ ranges = [SampleRange(*sample_range_tuple) for sample_range_tuple in param_sample_ranges.values()] return cls(functions=[r.function for r in ranges], ranges=ranges, params=param_sample_ranges.keys())
[docs] @classmethod def by_param(cls, **param_sample_functions): """ Builds multi-variate wrapper from keyword arguments of parameter names to univariate frozen functions Args: param_sample_functions: keyword arguments of parameter names to univariate object supporting pdf and rvs interfaces Returns: MultiVariatePrior instance An example usage:: > from scipy.stats import uniform > prior = MultiVariatePrior.by_param( MSP1_Merozoite_Kill_Fraction=uniform(loc=0.4, scale=0.3), # from 0.4 to 0.7 Nonspecific_Antigenicity_Factor=uniform(loc=0.1, scale=0.8)) # from 0.1 to 0.9 """ return cls(functions=param_sample_functions.values(), params=param_sample_functions.keys())
[docs] def pdf(self, X): """ Returns product of individual component function PDFs at each input point. Args: X: array of points, where each point is an array of correct dimension. """ if isinstance(X, list): X = np.array(X) # allow equivalent python list or np.ndarray inputs if X.ndim == 2: npts, ndim = X.shape # the default case elif X.ndim == 1: if len(self.functions) == 1: npts, ndim = X.size, 1 # multiple 1-d measurements: [1, 2, 3] --> [[1], [2], [3]] else: npts, ndim = 1, X.size # a single multi-dimensional measurement: [1, 2, 3] --> [[1, 2, 3]] X = np.reshape(X, (npts, ndim)) else: raise Exception('Expecting 1- or 2-dimensional array input.') if self.ndim == ndim: logger.debug('Input dimension = %d', ndim) else: raise Exception('Dimensionality of sample points (%d) does not match function (%d)' % (ndim, self.ndim)) pdfs = [] for params in X: pdfs.append(reduce(mul, [f.pdf(x) for f, x in zip(self.sample_functions.values(), params)], 1)) return np.array(pdfs)
[docs] def rvs(self, size=1): """ Returns an array of random points, where each point is sampled randomly in its component dimensions. Args: size : the number of random points to sample. """ values = np.array([[f.rvs() for f in self.functions] for _ in range(size)]).squeeze() return values
[docs] def lhs(self, size=1): """ Returns a Latin Hypercube sample, drawn from the sampling ranges of the component dimensions :param size: the number of random points to sample """ samples = np.zeros((size, len(self.functions))) for i, sample_function in enumerate(self.sample_functions.values()): sample_bins = sample_function.get_even_spaced_samples(size) np.random.shuffle(sample_bins) samples[:, i] = sample_bins return samples
[docs] def to_dataframe(self, param_value_array): """ Transforms an array of parameter-value arrays to a pandas.DataFrame with appropriate column names """ return pd.DataFrame(data=param_value_array, columns=self.params)
[docs] def to_dict(self, param_point): """ Transforms an individual point in parameter space to a dictionary of parameter names to values. Also will round parameters where the range_type requires integer-only values. """ ret = param_point for param, value in ret.items(): if param not in self.sample_functions: continue sfc = self.sample_functions[param] if sfc.sample_range and sfc.sample_range.is_int(): ret[param] = int(np.round(value)) return ret
if __name__ == '__main__': import matplotlib.pyplot as plt prior = MultiVariatePrior.by_range( MSP1_Merozoite_Kill_Fraction=('linear', 0.4, 0.7), Max_Individual_Infections=('linear_int', 3, 8), Base_Gametocyte_Production_Rate=('log', 0.001, 0.5)) n_random = 5000 n_bins = 50 # Latin Hypercube sample and independent random variable samples dfs = [prior.to_dataframe(prior.lhs(size=n_random)), prior.to_dataframe(prior.rvs(size=n_random))] f, axs = plt.subplots(2, len(prior.params), figsize=(15, 8)) for j, df in enumerate(dfs): for i, p in enumerate(prior.params): ax = axs[j][i] sample_range = prior.sample_functions[p].sample_range xscale = 'linear' if sample_range is None: stats = df[p].describe() vmin, vmax = stats.loc['min'], stats.loc['max'] bins = np.linspace(stats.loc['min'], stats.loc['max'], n_bins) else: vmin, vmax = sample_range.range_min, sample_range.range_max bins = sample_range.get_bins(n_bins) if sample_range.is_log(): xscale = 'log' df[p].plot(kind='hist', ax=ax, bins=bins, alpha=0.5) ax.set(xscale=xscale, xlim=(vmin, vmax), title=p) f.set_tight_layout(True) plt.show()