Source code for idmtools_calibra.algorithms.optim_tools_pspo
from __future__ import division
import logging
import numpy as np
import pandas as pd
from idmtools_calibra.algorithms.next_point_algorithm import NextPointAlgorithm
logger = logging.getLogger(__name__)
user_logger = logging.getLogger('user')
[docs]class OptimToolPSPO(NextPointAlgorithm):
"""
OptimTool
The basic idea of OptimTool is
"""
def __init__(self, params, constrain_sample_fn, comps_per_iteration=10):
super().__init__()
self.args = locals() # Store inputs in case set_state is called later and we want to override with new
# (user) args
del self.args['self']
self.need_resolve = False
self.data = pd.DataFrame()
self.constrain_sample_fn = constrain_sample_fn
self.state = pd.DataFrame(columns=['Iteration', 'Parameter', 'Center', 'Hessian', 'Min', 'Max', 'Dynamic'])
self.state['Iteration'] = self.state['Iteration'].astype(int)
self.params = params # TODO: Check min <= center <= max
self.comps_per_iteration = int(comps_per_iteration)
self.n_dimensions = 0
self.Xmin = {p['Name']: p['Min'] for p in self.params}
self.Xmax = {p['Name']: p['Max'] for p in self.params}
self.Dynamic = {p['Name']: p['Dynamic'] for p in self.params}
[docs] def resolve_args(self, iteration):
# Have args from user and from set_state.
# Note this is called only right before commissioning a new iteration, likely from 'resume'
# TODO: be more sensitive with params, user could have added or removed variables, need to adjust
# TODO: Check min <= center <= max for params
# TODO: could clean this up with a helper function
self.params = self.args[
'params'] if 'params' in self.args else self.params # Guess may move, but should be ignored
self.comps_per_iteration = self.args[
'comps_per_iteration'] if 'comps_per_iteration' in self.args else self.comps_per_iteration
self.n_dimensions = len(self.params)
self.Xmin = {p['Name']: p['Min'] for p in self.params}
self.Xmax = {p['Name']: p['Max'] for p in self.params}
self.Dynamic = {p['Name']: p['Dynamic'] for p in self.params}
self.need_resolve = False
"""
# TODO: Allow user to override current 'Center' value
state_by_iter = self.state.set_index('Iteration')
if iteration == 0:
# Not sure this would happen, but just in case
cur_val = {p['Name']:p['Guess'] for p in self.params}
else:
cur_val = {s['Parameter']:s['Center'] for (i,s) in state_by_iter.loc[iteration-1].iterrows()}
iter_state = pd.DataFrame(columns=['Iteration', 'Parameter', 'Center', 'Min', 'Max', 'Dynamic'])
for param in self.params:
iter_state.loc[len(iter_state)] = [iteration, param['Name'], cur_val[param['Name']], param['Min'], param['Max'], param['Dynamic']]
self.state = self.state.loc[:iteration-1]
self.state.reset_index(inplace=True)
self.state = pd.concat([self.state, iter_state], ignore_index=True)
self.state['Iteration'] = self.state['Iteration'].astype(int)
with pd.option_context("display.max_rows", 500, "display.max_columns", 500):
user_logger.info(self.state)
raw_input('resolve_args')
"""
def _get_X_center(self, iteration):
state_by_iter = self.state.reset_index(drop=True).set_index(['Iteration', 'Parameter'])
assert (iteration in state_by_iter.index.get_level_values('Iteration'))
return state_by_iter.loc[iteration]['Center']
def _get_Hessian(self, iteration):
state_by_iter = self.state.reset_index(drop=True).set_index(['Iteration', 'Parameter'])
assert (iteration in state_by_iter.index.get_level_values('Iteration'))
return state_by_iter.loc[iteration]['Hessian']
[docs] def add_samples(self, samples, iteration):
samples_cpy = samples.copy()
samples_cpy.index.name = '__sample_index__'
samples_cpy['Iteration'] = iteration
samples_cpy.reset_index(inplace=True)
self.data = pd.concat([self.data, samples_cpy], ignore_index=True)
self.data['__sample_index__'] = self.data['__sample_index__'].astype(int)
[docs] def get_samples_for_iteration(self, iteration):
# Update args
if self.need_resolve:
self.resolve_args(iteration)
if iteration == 0:
# Choose initial samples
samples = self.choose_initial_samples()
else:
# Regress inputs and results from previous iteration
# Move X_center, choose samples, save in dataframe
samples = self.stochastic_newton_raphson(iteration)
samples.reset_index(drop=True, inplace=True)
return self.generate_samples_from_df(samples)
[docs] def clamp(self, X):
user_logger.info('X.before:\n', X)
# X should be a data frame
for pname in X.columns:
X[pname] = np.minimum(self.Xmax[pname], np.maximum(self.Xmin[pname], X[pname]))
return X
[docs] def set_results_for_iteration(self, iteration, results):
results = results.total.tolist()
logger.info('%s: Choosing samples at iteration %d:', self.__class__.__name__, iteration)
logger.debug('Results:\n%s', results)
data_by_iter = self.data.set_index('Iteration')
if iteration + 1 in data_by_iter.index.unique():
# Been here before, reset
data_by_iter = data_by_iter.loc[:iteration]
state_by_iter = self.state.set_index('Iteration')
self.state = state_by_iter.loc[:iteration].reset_index()
# Store results ... even if changed
data_by_iter.loc[iteration, 'Results'] = results
self.data = data_by_iter.reset_index()
[docs] def choose_initial_samples(self):
self.data = pd.DataFrame(columns=['Iteration', '__sample_index__', 'Results', *self.get_param_names()])
self.data['Iteration'] = self.data['Iteration'].astype(int)
self.data['__sample_index__'] = self.data['__sample_index__'].astype(int)
self.n_dimensions = len(self.params)
iteration = 0
# Clear self.state in case of resuming iteration 0 from commission
self.state = pd.DataFrame(columns=['Iteration', 'Parameter', 'Center', 'Hessian', 'Min', 'Max', 'Dynamic'])
self.state['Iteration'] = self.state['Iteration'].astype(int)
for param in self.params:
user_logger.info(iteration, param['Name'], param['Guess'], param['Min'], param['Max'], param['Dynamic'])
self.state.loc[len(self.state)] = [iteration, param['Name'], param['Guess'], param['aprioriHessian'],
param['Min'], param['Max'], param['Dynamic']]
# user_logger.info(self.state)
initial_samples = self.choose_and_clamp_samples_for_iteration(iteration)
self.add_samples(initial_samples, iteration)
return initial_samples
[docs] def stochastic_newton_raphson(self, iteration):
assert (iteration >= 1)
def proj_spd(a):
# NOTE: the input matrix is assumed to be symmetric
d, v = np.linalg.eigh(a)
a = (v * np.maximum(abs(d), 1e-5)).dot(v.T)
a = (a + a.T) / 2
return a
# DYNAMIC ON PREVIOUS ITERATION WHEN COMMISSIONED ...
state_prev_iter = self.state.set_index('Iteration').loc[iteration - 1]
dynamic_params = [r['Parameter'] for idx, r in state_prev_iter.iterrows() if r['Dynamic']]
self.data.set_index('Iteration', inplace=True)
latest_dynamic_samples = self.data.loc[iteration - 1, dynamic_params].values
latest_results = self.data.loc[iteration - 1, 'Results'].values
x_min = np.asarray([self.Xmin[pname] for pname in dynamic_params])
x_max = np.asarray([self.Xmax[pname] for pname in dynamic_params])
x_current = latest_dynamic_samples[0]
x_current_scaled = (x_current - x_min) / (x_max - x_min)
# H_sign = -1 # Hessian should be neg. semi definite (maximization)
p = x_current.size
m = self.comps_per_iteration
# hessian_elements = [r['Hessian'] for idx, r in state_prev_iter.iterrows() if r['Dynamic']]
# h_current_est = np.diag(hessian_elements)
# largest_step_size = np.abs(np.array([1] * p) - np.array([0] * p)) / 5
# optimization parameters
a0, step = 3, .001
a0 = step * (1 + a0) ** 0.602
a = a0 * (iteration + 1 + a0) ** (-0.602)
# TODO: assumed here that there is no prior Hessian information
# w = .8 * (iteration + 1) ** (-.501) if iteration > 1 else 1
# G_avg = np.zeros(shape=(p, M))
# s_avg = np.zeros(shape=(p, p, m))
delta_f = np.zeros(shape=(m, 1))
deltas = np.zeros(shape=(p, m))
for k in range(m):
np.random.seed()
theta_plus = (np.array(latest_dynamic_samples[k + 1]) - x_min) / (x_max - x_min)
f_plus = latest_results[k + 1]
delta_f[k] = f_plus - latest_results[0]
deltas[:, k] = theta_plus - x_current_scaled
# thetaPlus = (np.array(latest_dynamic_samples[4*k+1])- X_min)/(X_max - X_min)
# thetaMinus = (np.array(latest_dynamic_samples[4*k+2])- X_min)/(X_max - X_min)
# f_plus = latest_results[4*k+1]
# f_minus = latest_results[4*k+2]
# delta_f[2*k] = f_plus - latest_results[0]
# delta_f[2 * k + 1] = f_minus - latest_results[0]
# Deltas[:, 2*k] = thetaPlus - X_current_scaled
# Deltas[:, 2*k+1] = thetaMinus - X_current_scaled
# thetaPlusPlus = (np.array(latest_dynamic_samples[4*k+3])- X_min)/(X_max - X_min)
# thetaMinusPlus = (np.array(latest_dynamic_samples[4*k+4])- X_min)/(X_max - X_min)
# f_PlusPlus = latest_results[4*k+3]
# f_MinusPlus = latest_results[4*k+4]
#
# G_p = (f_PlusPlus - f_plus) / (thetaPlusPlus-thetaMinusPlus)
# G_m = (f_MinusPlus- f_minus) / (thetaPlusPlus-thetaMinusPlus)
#
# Sk = np.dot( (1/(thetaPlus-thetaMinus))[:,None],(G_p-G_m)[None,:] )
# S_avg[:,:,k] = k/(k+1)*S_avg[:,:,k-1] + 1/(k+1)* Sk
if m >= p:
g = (np.dot(np.linalg.inv(np.dot(deltas, deltas.T)), deltas)).dot(delta_f)
else:
g = (np.dot(deltas, np.linalg.inv(np.dot(deltas.T, deltas)))).dot(delta_f)
# H_hat = .5 * ( S_avg[:,:,M-1]+S_avg[:,:,M-1].T)
# H_bar = (1-w) * H_current_est + w * H_hat
# H_2bar = -1*proj_spd(-H_bar)#H_bar#.5*(H_bar - sqrtm(np.linalg.matrix_power(H_bar,2)+1e-6*np.eye(p)))
# H_2bar_INV = np.linalg.inv(H_2bar)
h_bar = -np.eye(p)
# update x
# update = H_2bar_INV.dot(G)
update = np.transpose(-g)
# if (np.abs(update) > largest_step_size).any():
# update = np.linalg.norm(largest_step_size) * update/np.linalg.norm(update)
x_next_scaled = x_current_scaled - a * update # x_{n+1} = x_n - a_n H_n^{-1} G_n
x_next = x_next_scaled * (x_max - x_min) + x_min
hessian_next = np.diag(h_bar)
self.data.reset_index(inplace=True)
old_center = self._get_X_center(iteration - 1)
# old_center_of_dynamic_params = old_center[dynamic_params].values
new_dynamic_center = x_next.tolist()[0]
new_center_dict = old_center.to_dict() # {k:v for k,v in zip(self.get_param_names(), old_center)}
new_center_dict.update({k: v for k, v in zip(dynamic_params, new_dynamic_center)})
old_hessian = self._get_Hessian(iteration - 1)
# old_hessian_of_dynamic_params = old_hessian[dynamic_params].values
new_dynamic_hessian = hessian_next.tolist()
new_hessian_dict = old_hessian.to_dict()
new_hessian_dict.update({k: v for k, v in zip(dynamic_params, new_dynamic_hessian)})
# User may have added or removed params
param_names = [p['Name'] for p in self.params]
# Remove -
new_center_df = {k: v for k, v in new_center_dict.items() if k in param_names}
new_hessian_df = {k: v for k, v in new_hessian_dict.items() if k in param_names}
# Add -
new_params = {p['Name']: p['Guess'] for p in self.params if p['Name'] not in new_center_dict}
new_center_dict.update(new_params)
new_hessian_dict.update(new_params)
# CLAMP
new_center_df = pd.Series(new_center_dict, name=0).to_frame().transpose()
new_hessian_df = pd.Series(new_hessian_dict, name=0).to_frame().transpose()
new_center_df = self.clamp(new_center_df)
# USER CONSTRAINT FN
new_center_df = new_center_df.apply(self.constrain_sample_fn, axis=1)
new_state = pd.DataFrame({
'Iteration': [iteration] * self.n_dimensions,
'Parameter': new_center_df.columns.values,
'Center': new_center_df.as_matrix()[0],
'Hessian': new_hessian_df.as_matrix()[0],
'Min': [self.Xmin[pname] for pname in new_center_df.columns.values],
'Max': [self.Xmax[pname] for pname in new_center_df.columns.values],
'Dynamic': [self.Dynamic[pname] for pname in new_center_df.columns.values]
})
self.state = self.state.query('Iteration < @iteration')
self.state = pd.concat([self.state, new_state], ignore_index=True)
samples = self.choose_and_clamp_samples_for_iteration(iteration)
self.add_samples(samples, iteration)
return samples
[docs] def choose_and_clamp_samples_for_iteration(self, iteration):
M = self.comps_per_iteration
state_by_iter = self.state.set_index('Iteration')
# Will vary parameters that are 'Dyanmic' on this iteration
samples = self.sample_simultaneous_perturbation(M, iteration, state_by_iter.loc[iteration])
# Clamp and constrain
samples = self.clamp(samples)
samples = samples.apply(self.constrain_sample_fn, axis=1)
return samples # Order shouldn't matter now, so dropped [self.get_param_names()]
[docs] def sample_simultaneous_perturbation(self, M, iteration, state):
# Pick samples x+c\Delta, x-c\Delta, x+c\Delta+c_tilde\Delta_tilde, x-c\Delta+c_tilde\Delta_tilde
assert (M > 0)
dynamic_state = state.query('Dynamic == True')
dynamic_state_by_param = dynamic_state.set_index('Parameter')
x_min = np.array([dynamic_state_by_param.loc[pname, 'Min'] for pname in dynamic_state['Parameter']])
x_max = np.array([dynamic_state_by_param.loc[pname, 'Max'] for pname in dynamic_state['Parameter']])
x_current = np.array([dynamic_state_by_param.loc[pname, 'Center'] for pname in dynamic_state['Parameter']])
c0 = .05 * (x_max - x_min) / M ** 0.5
c = c0 * (iteration + 1) ** (-0.101)
c_tilde = 2 * c
n_dyn = len(c0)
for i in range(n_dyn):
while (x_current[i] - 2 * c[i] < x_min[i]) or (x_current[i] + 2 * c[i] > x_max[i]):
c[i] = .8 * min(x_current[i] - x_min[i], x_max[i] - x_current[i]) / 2
deviations = []
delta0 = np.ones(n_dyn)
for k in range(M):
np.random.seed()
delta = list(delta0)
if n_dyn != 2 or (k % n_dyn) != 0:
delta[k % n_dyn] = -1 * delta0[k % n_dyn]
theta_plus = x_current + (c * delta)
theta_minus = x_current - (c * delta)
while (x_min > theta_plus).any() or (theta_plus > x_max).any() or \
(theta_minus < x_min).any() or (theta_minus > x_max). \
any():
delta = np.round(np.random.uniform(0, 1, n_dyn)) * 2 - 1
theta_plus = x_current + (c * delta)
theta_minus = x_current - (c * delta)
deviations.append((c * delta).tolist())
deviations.append((-c * delta).tolist())
delta_tilde = np.round(np.random.uniform(0, 1, n_dyn)) * 2 - 1
theta_plus_plus = theta_plus + c_tilde * delta_tilde
theta_minus_plus = theta_minus + c_tilde * delta_tilde
while (theta_plus_plus < x_min).any() or (theta_plus_plus > x_max).any() or \
(theta_minus_plus < x_min).any() or (theta_minus_plus > x_max).any():
delta_tilde = np.round(np.random.uniform(0, 1, n_dyn)) * 2 - 1
theta_plus_plus = theta_plus + c_tilde * delta_tilde
theta_minus_plus = theta_minus + c_tilde * delta_tilde
deviations.append((c * delta + c_tilde * delta_tilde).tolist())
deviations.append((-c * delta + c_tilde * delta_tilde).tolist())
x_center = state.reset_index(drop=True).set_index(['Parameter'])[['Center']]
xc = x_center.transpose().reset_index(drop=True)
xc.columns.name = ""
samples = pd.concat([xc] * (M + 1)).reset_index(drop=True)
# samples = pd.concat( [xc]*(4*M+1) ).reset_index(drop=True)
user_logger.info('samples:\n', samples)
user_logger.info('deviations:\n', deviations)
dt = np.transpose(deviations)
user_logger.info('dt:\n', dt)
dynamic_state_by_param = dynamic_state.set_index('Parameter')
for i, pname in enumerate(dynamic_state['Parameter']):
x_cen = dynamic_state_by_param.loc[pname, 'Center']
samples.loc[1:(M + 1), pname] = x_cen + dt[i]
# samples.loc[1:(4*M+1), pname] = Xcen + dt[i]
return samples
[docs] def end_condition(self):
user_logger.info("end_condition")
# Stopping Criterion: good rsqared with small norm?
# Return True to stop, False to continue
logger.info('Continuing iterations ...')
return False
[docs] def get_final_samples(self):
user_logger.info("get_final_samples")
'''
Resample Stage:
'''
state_by_iteration = self.state.set_index('Iteration')
last_iter = sorted(state_by_iteration.index.unique())[-1]
X_Center = self._get_X_center(last_iter)
xc = X_Center.to_frame().transpose().reset_index(drop=True)
xc.columns.name = ""
dtypes = {name: str(data.dtype) for name, data in xc.items()}
final_samples_NaN_to_Null = xc.where(~xc.isnull(), other=None)
return {'final_samples': final_samples_NaN_to_Null.to_dict(orient='list'), 'final_samples_dtypes': dtypes}
[docs] def prep_for_dict(self, df):
# Needed for Windows compatibility
# nulls = df.isnull()
# if nulls.values.any():
# df[nulls] = None
# return df.to_dict(orient='list')
return df.where(~df.isnull(), other=None).to_dict(orient='list')
[docs] def get_state(self):
optimtool_state = dict(
n_dimensions=self.n_dimensions,
params=self.params,
comps_per_iteration=self.comps_per_iteration,
data=self.prep_for_dict(self.data),
data_dtypes={name: str(data.dtype) for name, data in self.data.items()},
state=self.prep_for_dict(self.state),
state_dtypes={name: str(data.dtype) for name, data in self.state.items()}
)
return optimtool_state
[docs] def set_state(self, state, iteration):
self.n_dimensions = state['n_dimensions']
self.params = state['params'] # NOTE: This line will override any updated user params passed to __init__
self.comps_per_iteration = state['comps_per_iteration']
data_dtypes = state['data_dtypes']
self.data = pd.DataFrame.from_dict(state['data'], orient='columns')
for c in self.data.columns: # Argh
self.data[c] = self.data[c].astype(data_dtypes[c])
state_dtypes = state['state_dtypes']
self.state = pd.DataFrame.from_dict(state['state'], orient='columns')
for c in self.state.columns: # Argh
self.state[c] = self.state[c].astype(state_dtypes[c])
self.need_resolve = True