import logging
import numpy as np
import pandas as pd
from scipy.spatial.distance import seuclidean
from scipy.stats import multivariate_normal
from idmtools_calibra.algorithms.next_point_algorithm import NextPointAlgorithm
logger = logging.getLogger(__name__)
[docs]class IMIS(NextPointAlgorithm):
"""
IMIS (Incremental Mixture Importance Sampling)
Algorithm ported from R code:
http://cran.r-project.org/web/packages/IMIS
Full description from Adrian Raftery and Le Bao (2009):
http://www.stat.washington.edu/research/reports/2009/tr560.pdf
The basic idea of IMIS is that points with high importance weights are in areas where
the target density is underrepresented by the importance sampling distribution. At each
iteration, a multivariate normal distribution centered at the point with the highest importance
weight is added to the current importance sampling distribution, which thus becomes
a mixture of such functions and of the prior. In this way underrepresented parts of the
parameter space are successively identified and are given representation, ending up with an
iteratively constructed importance sampling distribution that covers the target distribution
well.
"""
def __init__(self, prior_fn, initial_samples=1e4, samples_per_iteration=1e3, n_resamples=3e3, current_state=None):
super(IMIS, self).__init__()
self.iteration = 0
self.prior_fn = prior_fn
self.samples_per_iteration = int(samples_per_iteration)
self.initial_samples = initial_samples
# we set this later in set_initial_samples
self.n_initial_samples = 0.0
self.prior_covariance: np.ndarray = None
self.priors = []
self.results = []
self.samples = np.array([])
self.latest_samples = np.array([])
self.data = pd.DataFrame(columns=['Iteration', '__sample_index__', 'Prior', 'Result', *self.get_param_names()])
self.set_state(current_state or {}, self.iteration)
if (self.samples.size > 0) ^ (self.latest_samples.size > 0):
raise Exception('Both samples (size=%d) and latest_samples (size=%d) '
'should be empty or already initialized.',
self.samples.size, self.latest_samples.size)
if self.samples.size == 0:
self.set_initial_samples()
self.generate_variables_from_data()
self.max_resampling_attempts = 100
self.n_dimensions = self.samples[0].size
logger.info('%s instance with %d initial %d-dimensional samples and %d per iteration' %
(self.__class__.__name__, self.samples.shape[0],
self.n_dimensions, self.samples_per_iteration))
self.gaussian_probs = {}
self.gaussian_centers = []
self.gaussian_covariances = []
self.n_resamples = n_resamples
self.D = 1 # mixture of D multivariate normal distributions (optimization stage not implemented)
self.weights = []
self.validate_parameters()
[docs] def validate_parameters(self):
"""
Ensure valid parameter ranges: 'samples_per_iteration' is used to select N-closest points
to maximum-weight sample, so it can't exceed 'n_initial_samples'. It is also used to estimate
weighted covariance, so it cannot be smaller than the dimensionality of the samples.
"""
if self.samples_per_iteration > self.n_initial_samples:
raise Exception('samples_per_iteration (%d) cannot exceed n_initial_samples (%d)'
% (self.samples_per_iteration, self.n_initial_samples))
if self.samples_per_iteration <= self.n_dimensions:
raise Exception('n_dimensions (%d) cannot exceed samples_per_iteration (%d)'
% (self.n_dimensions, self.samples_per_iteration))
[docs] def choose_initial_samples(self):
self.set_initial_samples()
return self.get_next_samples_for_iteration(0)
[docs] def set_initial_samples(self):
"""
Set the initial samples points for the algorithm.
If initial_samples parameter is an array, use those values as initial samples.
Otherwise, if initial_samples parameter is a number,
draw the specified number randomly from the prior distribution.
"""
iteration = 0
self.data = pd.DataFrame(columns=['Iteration', '__sample_index__', 'Prior', 'Result', *self.get_param_names()])
self.data['Iteration'] = self.data['Iteration'].astype(int)
self.data['__sample_index__'] = self.data['__sample_index__'].astype(int)
if isinstance(self.initial_samples, (int, float)): # allow float like 1e3
samples = self.sample_from_function(self.prior_fn, int(self.initial_samples))
elif isinstance(self.initial_samples, (list, np.ndarray)):
samples = np.array(self.initial_samples)
else:
raise Exception("The 'initial_samples' parameter must be a number or an array.")
self.add_samples(samples, iteration)
self.generate_variables_from_data()
logger.debug('Initial samples:\n%s' % samples)
self.n_initial_samples = self.samples.shape[0]
self.prior_covariance = np.cov(self.samples.T)
logger.debug('Covariance of prior samples:\n%s' % self.prior_covariance)
[docs] def add_samples(self, samples, iteration):
samples_df = pd.DataFrame(samples, columns=self.get_param_names())
samples_df.index.name = '__sample_index__'
samples_df['Iteration'] = iteration
samples_df.reset_index(inplace=True)
self.data = pd.concat([self.data, samples_df])
logger.debug('__sample_index__:\n%s' % samples_df[self.get_param_names()].values)
[docs] def get_next_samples_for_iteration(self, iteration):
return self.data.query('Iteration == @iteration').sort_values('__sample_index__')[self.get_param_names()]
[docs] def choose_next_point_samples(self, iteration):
# Note: this is called from get_samples_for_iteration which is called only at commission stage
# therefore, this method is called also only at commission stage
# knowing this is very important for resume feature
assert (iteration >= 1)
next_samples = self.sample_from_function(
self.next_point_fn(), self.samples_per_iteration)
latest_samples = self.verify_valid_samples(next_samples)
self.add_samples(latest_samples, iteration)
self.generate_variables_from_data()
logger.debug('Next samples:\n%s', self.latest_samples)
return self.get_next_samples_for_iteration(iteration)
[docs] def generate_variables_from_data(self):
"""
restore some properties from self.data
"""
# keep as numpy.ndarray type
self.samples = self.data[self.get_param_names()].values
if self.samples.size > 0:
data_by_iteration = self.data.set_index('Iteration')
last_iter = sorted(data_by_iteration.index.unique())[-1]
self.latest_samples = self.get_next_samples_for_iteration(last_iter).values
self.n_dimensions = self.samples[0].size
else:
self.latest_samples = np.array([])
# keep as list type
self.priors = list(self.data['Prior'])
self.results = list(self.data['Result'])
[docs] def get_samples_for_iteration(self, iteration):
# Note: this method is called only from commission stage.
# Important to know this for resume feature (need to restore correct next_point including samples)
if iteration == 0:
samples = self.choose_initial_samples()
else:
samples = self.choose_next_point_samples(iteration)
self.update_gaussian_probabilities(iteration - 1)
samples.reset_index(drop=True, inplace=True)
return self.generate_samples_from_df(samples)
[docs] def update_iteration(self, iteration):
"""
Initial Stage (iteration: k = 0)
(a) Sample N inputs :math:`\\theta_1, \\theta_2, ... , \\theta_N` from the prior distribution :math:`p(\\theta)`.
(b) For each :math:`\\theta_i`, calculate the likelihood :math:`L_i`, and form the importance weights:
:math:`w^{(0)}_i = L_i / sum(L_j)`.
Importance Sampling Stage (iteration: k > 0; samples_per_iteration: B)
(c) Calculate the likelihood of the new inputs and combine the new inputs with the
previous ones. Form the importance weights:
:math:`w^{(k)_i} = c * L_i * p(\\theta_i) / q^{(k)}(\\theta_i)`,
where c is chosen so that the weights add to 1, :math:`q^{(k)}` is the mixture sampling distribution:
:math:`q^{(k)} = (N_0/N_k) * p + (B/N_k) * sum(H_s)`
where :math:`H_s` is the Sth multivariate normal distribution,
and :math:`N_k = N_0 + B_k` is the total number of inputs up to iteration k.
"""
super(IMIS, self).update_iteration(iteration)
if not self.iteration:
sampling_envelope = self.priors
else:
w = float(self.n_initial_samples) / self.samples_per_iteration
stack = np.vstack([[np.multiply(self.priors, w)], self.gaussian_probs])
logger.debug('Stack weighted prior + gaussian sample prob %s:\n%s', stack.shape, stack)
norm = (w + self.D + (self.iteration - 2))
sampling_envelope = np.sum(stack, 0) / norm
logger.debug('Sampling envelope:\n%s', sampling_envelope)
self.weights = [p * l / e for (p, l, e) in
zip(self.priors, self.results, sampling_envelope)] # TODO: perform in log space
self.weights /= np.sum(self.weights)
logger.debug('Weights:\n%s', self.weights)
[docs] def update_state(self, iteration):
"""
Update the next-point algorithm state and select next samples.
"""
self.update_iteration(iteration)
self.update_gaussian()
[docs] def update_gaussian(self):
"""
Importance Sampling Stage (iteration: k > 0; samples_per_iteration: B)
(a) Choose the current maximum weight input as the center :math:`\\theta^{(k)}`. Estimate :math:`\\Sigma^{(k)}` from
the weighted covariance of the B inputs with the smallest Mahalanobis distances
to :math:`\\theta^{(k)}`, where the distances are calculated with respect to the covariance of the
prior distribution and the weights are taken to be proportional to the average of
the importance weights and :math:`1/N_k`.
(b) Sample 'samples_per_iteration' new inputs from a multivariate Gaussian distribution
:math:`H_k` with covariance matrix :math:`\\Sigma^{(k)}`.
"""
self.update_gaussian_center()
distances = self.weighted_distances_from_center()
self.update_gaussian_covariance(distances)
[docs] def update_gaussian_center(self):
"""
Choose the current maximum weight input as the center point
for the next iteration of multivariate-normal sampling.
"""
max_weight_idx = np.argmax(self.weights)
max_weight_sample = self.samples[max_weight_idx]
logger.debug('Centering new points at:\n%s', max_weight_sample)
self.gaussian_centers.append(max_weight_sample)
[docs] def weighted_distances_from_center(self):
"""
Calculate the covariance-weighted distances from the current maximum-weight sample.
N.B. Using normalized Euclid instead of Mahalanobis if we're just going to diagonalize anyways.
"""
max_weight_sample = self.gaussian_centers[-1]
V = self.prior_covariance if self.prior_covariance.size == 1 else np.diag(self.prior_covariance)
distances = [seuclidean(s, max_weight_sample, V=V) for s in self.samples]
logger.debug('Distances:\n%s', distances)
return distances
[docs] def update_gaussian_covariance(self, distances):
"""
Calculate the covariance of the next-iteration of multivariate-normal sampling
from the "samples_per_iteration" closest samples.
"""
n_closest_idxs = np.argsort(distances)[:self.samples_per_iteration]
n_closest_weights = self.weights[n_closest_idxs]
logger.debug('N-closest weights:\n%s', n_closest_weights)
n_closest_samples = self.samples[n_closest_idxs]
logger.debug('N-closest samples:\n%s', n_closest_samples)
weighted_covariance = self.calculate_weighted_covariance(
samples=n_closest_samples,
weights=n_closest_weights + 1. / len(self.weights),
center=self.gaussian_centers[-1])
self.gaussian_covariances.append(weighted_covariance)
[docs] def get_param_names(self):
return list(self.prior_fn.params)
[docs] def verify_valid_samples(self, next_samples):
"""
Resample from next-point function until all samples have non-zero prior.
"""
attempt = 0
while attempt < self.max_resampling_attempts:
sample_priors = self.prior_fn.pdf(next_samples)
valid_samples = next_samples[sample_priors > 0]
n_invalid = len(next_samples) - len(valid_samples)
if not n_invalid:
break
logger.warning('Attempt %d: resampling to replace %d invalid samples '
'with non-positive prior probability.', attempt, n_invalid)
resamples = self.sample_from_function(self.next_point_fn(), n_invalid)
next_samples = np.concatenate((valid_samples, resamples))
attempt += 1
return next_samples
[docs] def next_point_fn(self):
""" IMIS next-point sampling from multivariate normal centered on the maximum weight. """
return multivariate_normal(mean=self.gaussian_centers[-1], cov=self.gaussian_covariances[-1])
[docs] def update_gaussian_probabilities(self, iteration):
"""
Calculate the probabilities of all sample points as estimated from the
multivariate-normal probability distribution function centered on the maximum weight
and with covariance fitted from the most recent iteration.
"""
if not iteration:
self.gaussian_probs = multivariate_normal.pdf(
self.samples, self.gaussian_centers[-1],
self.gaussian_covariances[-1]).reshape((1, len(self.samples)))
else:
updated_gaussian_probs = np.zeros(((self.D + self.iteration), len(self.samples)))
updated_gaussian_probs[:self.gaussian_probs.shape[0], :self.gaussian_probs.shape[1]] = self.gaussian_probs
for j in range(iteration):
updated_gaussian_probs[j, self.gaussian_probs.shape[1]:] = multivariate_normal.pdf(
self.latest_samples, self.gaussian_centers[j], self.gaussian_covariances[j])
updated_gaussian_probs[-1:] = multivariate_normal.pdf(
self.samples, self.gaussian_centers[-1], self.gaussian_covariances[-1])
self.gaussian_probs = updated_gaussian_probs
logger.debug('Gaussian sample probabilities %s:\n%s', self.gaussian_probs.shape, self.gaussian_probs)
[docs] def calculate_weighted_covariance(self, samples, weights, center):
"""
A weighted covariance of sample points.
N.B. The weights are normalized as in the R function "cov.wt"
"""
d = (samples - center)
logger.debug('Directed distance of samples from center:\n%s', d)
# if self.n_dimensions == 1:
# w = weights / sum(weights)
# logger.debug('Normalized weights:\n%s', w)
# logger.debug('Squared-distance of samples:\n%s', np.square(d))
# d2 = np.square(d)
# sig2 = np.dot(d2, w)
# else:
weights = weights / sum(weights)
w = 1. / (1 - sum(np.square(weights)))
wd = np.multiply(d.T, weights).T
sig2 = w * np.dot(wd.T, d)
logger.debug('Weighted covariance:\n%s', sig2)
return sig2
[docs] def end_condition(self):
"""
Stopping Criterion:
The algorithm ends when the importance sampling weights are reasonably uniform.
Specifically, we end the algorithm when the expected fraction of unique points in the resample
is at least (1 - 1/e) = 0.632. This is the expected fraction when the importance
sampling weights are all equal, which is the case when the importance sampling function is
the same as the target distribution.
"""
weight_distribution = np.sum(1 - (1 - np.array(self.weights)) ** self.n_resamples)
end_condition = (1 - np.exp(-1)) * self.n_resamples
if weight_distribution > end_condition:
logger.info('Truncating IMIS iteration: %0.2f > %0.2f', weight_distribution, end_condition)
return True
else:
logger.info('Continuing IMIS iterations: %0.2f < %0.2f', weight_distribution, end_condition)
return False
[docs] def get_final_samples(self):
nonzero_idxs = self.weights > 0
idxs = [i for i, w in enumerate(self.weights[nonzero_idxs])]
try:
# Renormalize (disabled for now)
probs = self.weights[nonzero_idxs]
# probs /= probs.sum()
resample_idxs = np.random.choice(idxs, self.n_resamples, replace=True, p=probs)
except ValueError:
# To isolate dtk-tools issue #96
logger.error(nonzero_idxs)
logger.error(self.weights)
logger.error(idxs)
raise
# Add the parameters
params = self.get_param_names()
ret = dict()
for i in range(len(params)):
ret[params[i]] = [param_values[i] for param_values in self.samples[resample_idxs]]
# And the weights
ret['weights'] = [val for val in self.weights[resample_idxs]]
return {'final_samples': ret}
[docs] def get_state(self):
imis_state = dict(n_initial_samples=self.n_initial_samples,
data=self.prep_for_dict(self.data),
data_dtypes={name: str(data.dtype) for name, data in self.data.items()},
gaussian_probs=self.gaussian_probs,
gaussian_centers=self.gaussian_centers,
gaussian_covariances=self.gaussian_covariances)
return imis_state
[docs] def set_state(self, state, iteration):
data_dtypes = state.get('data_dtypes', [])
if data_dtypes:
self.data = pd.DataFrame.from_dict(state['data'], orient='columns')
self.latest_samples = pd.DataFrame.from_dict(state['data'], orient='columns')
for c in self.data.columns: # Argh
self.data[c] = self.data[c].astype(data_dtypes[c])
self.generate_variables_from_data()
self.n_initial_samples = state.get('n_initial_samples', 0)
self.gaussian_probs = state.get('gaussian_probs', {})
self.gaussian_centers = state.get('gaussian_centers', [])
self.gaussian_covariances = state.get('gaussian_covariances', [])
if state:
self.validate_parameters() # if the current state is being reset from file
[docs] def set_results_for_iteration(self, iteration, results):
results = results.total.tolist()
logger.info('%s: Choosing samples at iteration %d:', self.__class__.__name__, iteration)
logger.debug('Results:\n%s', results)
# update self.data first
data_by_iter = self.data.set_index('Iteration')
if iteration + 1 in data_by_iter.index.unique():
# Been here before, reset
data_by_iter = data_by_iter.loc[:iteration]
# Store results ... even if changed
data_by_iter.loc[iteration, 'Result'] = results
data_by_iter.loc[iteration, 'Prior'] = self.prior_fn.pdf(self.latest_samples)
self.data = data_by_iter.reset_index()
# make two properties available which will be used in the following steps: self.update_state
self.priors = list(data_by_iter['Prior'])
self.results = list(data_by_iter['Result'])
self.update_state(iteration)
[docs] def cleanup(self):
self.gaussian_probs = {}
self.gaussian_covariances = []
self.gaussian_centers = []
[docs] def restore(self, iteration_state):
self.gaussian_covariances = iteration_state.next_point['gaussian_covariances']
self.gaussian_centers = iteration_state.next_point['gaussian_centers']