Source code for idmtools_calibra.algorithms.optim_tool
import logging
import math
import time
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.special import gammaln # for calculation of mu_r
from scipy.stats import norm
from idmtools_calibra.algorithms.next_point_algorithm import NextPointAlgorithm
logger = logging.getLogger(__name__)
user_logger = logging.getLogger('user')
[docs]class OptimTool(NextPointAlgorithm):
"""
OptimTool
The basic idea of OptimTool is
"""
def __init__(self, params, constrain_sample_fn=lambda s: s, mu_r=0.1, sigma_r=0.02, center_repeats=2,
samples_per_iteration=1e2, rsquared_thresh=0.5):
super(OptimTool, self).__init__()
self.args = locals() # Store inputs in case set_state is called later and we want to override with new (user) args
del self.args['self']
self.need_resolve = False
self.constrain_sample_fn = constrain_sample_fn
self.regression = pd.DataFrame(
columns=['Iteration', 'Parameter', 'Value']) # Parameters: Rsquared, Regression_Parameters
self.regression['Iteration'] = self.regression['Iteration'].astype(int)
self.state = pd.DataFrame(columns=['Iteration', 'Parameter', 'Center', 'Min', 'Max', 'Dynamic'])
self.state['Iteration'] = self.state['Iteration'].astype(int)
if center_repeats >= samples_per_iteration:
raise Exception('OptimTool requires samples_per_itertion > center_repeats.')
self.params = params # TODO: Check min <= center <= max
self.mu_r = mu_r
self.sigma_r = sigma_r
self.center_repeats = center_repeats
self.rsquared_thresh = rsquared_thresh
self.samples_per_iteration = int(samples_per_iteration)
self.Xmin = {p['Name']: p['Min'] for p in self.params}
self.Xmax = {p['Name']: p['Max'] for p in self.params}
self.Dynamic = {p['Name']: p['Dynamic'] for p in self.params}
self.n_dimensions = 0
self.data = pd.DataFrame()
self.verify_param()
[docs] def verify_param(self):
# Checking Dynamic
if len([p for p in self.params if p['Dynamic']]) == 0:
warning_note = \
"""
/!\\ WARNING /!\\ the OptimTool requires at least one of params with Dynamic set to True. Exiting...
"""
user_logger.warning(warning_note)
exit()
[docs] def resolve_args(self, iteration):
# Have args from user and from set_state.
# Note this is called only right before commissioning a new iteration, likely from 'resume'
# TODO: be more sensitive with params, user could have added or removed variables, need to adjust
# TODO: Check min <= center <= max for params
# TODO: could clean this up with a helper function
self.params = self.args['params'] if 'params' in self.args else self.params # Guess may move, but should be ignored
self.mu_r = self.args['mu_r'] if 'mu_r' in self.args else self.mu_r
self.sigma_r = self.args['sigma_r'] if 'sigma_r' in self.args else self.sigma_r
self.center_repeats = self.args['center_repeats'] if 'center_repeats' in self.args else self.center_repeats
self.rsquared_thresh = self.args['rsquared_thresh'] if 'rsquared_thresh' in self.args else self.rsquared_thresh
self.samples_per_iteration = self.args[
'samples_per_iteration'] if 'samples_per_iteration' in self.args else self.samples_per_iteration
self.n_dimensions = len(self.params)
self.Xmin = {p['Name']: p['Min'] for p in self.params}
self.Xmax = {p['Name']: p['Max'] for p in self.params}
self.Dynamic = {p['Name']: p['Dynamic'] for p in self.params}
self.need_resolve = False
"""
# TODO: Allow user to override current 'Center' value
state_by_iter = self.state.set_index('Iteration')
if iteration == 0:
# Not sure this would happen, but just in case
cur_val = {p['Name']:p['Guess'] for p in self.params}
else:
cur_val = {s['Parameter']:s['Center'] for (i,s) in state_by_iter.loc[iteration-1].iterrows()}
iter_state = pd.DataFrame(columns=['Iteration', 'Parameter', 'Center', 'Min', 'Max', 'Dynamic'])
for param in self.params:
iter_state.loc[len(iter_state)] = [iteration, param['Name'], cur_val[param['Name']], param['Min'], param['Max'], param['Dynamic']]
self.state = self.state.loc[:iteration-1]
self.state.reset_index(inplace=True)
self.state = pd.concat([self.state, iter_state], ignore_index=True)
self.state['Iteration'] = self.state['Iteration'].astype(int)
with pd.option_context("display.max_rows", 500, "display.max_columns", 500):
logger.info(self.state)
raw_input('resolve_args')
"""
def _get_X_center(self, iteration):
state_by_iter = self.state.reset_index(drop=True).set_index(['Iteration', 'Parameter'])
# assert( iteration in state_by_iter.index.get_level_values('Iteration') )
return state_by_iter.loc[iteration]['Center']
[docs] def add_samples(self, samples, iteration):
samples_cpy = samples.copy()
samples_cpy.index.name = '__sample_index__'
samples_cpy['Iteration'] = iteration
samples_cpy.reset_index(inplace=True)
self.data = pd.concat([self.data, samples_cpy], ignore_index=True)
self.data['__sample_index__'] = self.data['__sample_index__'].astype(int)
[docs] def get_samples_for_iteration(self, iteration):
# Update args
if self.need_resolve:
self.resolve_args(iteration)
if iteration == 0:
# Choose initial samples
samples = self.choose_initial_samples()
else:
# Regress inputs and results from previous iteration
# Move X_center, choose hypersphere, save in dataframe
samples = self.choose_samples_via_gradient_ascent(iteration)
samples.reset_index(drop=True, inplace=True)
return self.generate_samples_from_df(samples)
[docs] def clamp(self, X):
# logger.info('X.before:\n', X)
# X should be a data frame
for pname in X.columns:
X[pname] = np.minimum(self.Xmax[pname], np.maximum(self.Xmin[pname], X[pname]))
return X
[docs] def set_results_for_iteration(self, iteration, results):
results = results.total.tolist()
logger.info('%s: Choosing samples at iteration %d:', self.__class__.__name__, iteration)
logger.debug('Results:\n%s', results)
data_by_iter = self.data.set_index('Iteration')
if iteration + 1 in data_by_iter.index.unique():
# Been here before, reset
data_by_iter = data_by_iter.loc[:iteration]
regression_by_iter = self.regression.set_index('Iteration')
self.regression = regression_by_iter.loc[:iteration - 1].reset_index()
state_by_iter = self.state.set_index('Iteration')
self.state = state_by_iter.loc[:iteration].reset_index()
# Store results ... even if changed
data_by_iter.loc[iteration, 'Results'] = results
self.data = data_by_iter.reset_index()
[docs] def choose_initial_samples(self):
self.data = pd.DataFrame(
columns=['Iteration', '__sample_index__', 'Results', 'Fitted', *self.get_param_names()])
self.data['Iteration'] = self.data['Iteration'].astype(int)
self.data['__sample_index__'] = self.data['__sample_index__'].astype(int)
self.n_dimensions = len(self.params)
iteration = 0
# Clear self.state in case of resuming iteration 0 from commission
self.state = pd.DataFrame(columns=['Iteration', 'Parameter', 'Center', 'Min', 'Max', 'Dynamic'])
self.state['Iteration'] = self.state['Iteration'].astype(int)
for param in self.params:
user_logger.info((iteration, param['Name'], param['Guess'], param['Min'], param['Max'], param['Dynamic']))
self.state.loc[len(self.state)] = [iteration, param['Name'], param['Guess'], param['Min'], param['Max'],
param['Dynamic']]
initial_samples = self.choose_and_clamp_hypersphere_samples_for_iteration(iteration)
self.add_samples(initial_samples, iteration)
return initial_samples
[docs] def choose_samples_via_gradient_ascent(self, iteration):
# assert(iteration >= 1)
# DYNAMIC ON PREVIOUS ITERATION WHEN COMMISSIONED ...
state_prev_iter = self.state.set_index('Iteration').loc[[iteration - 1]]
dynamic_params = [r['Parameter'] for idx, r in state_prev_iter.iterrows() if r['Dynamic']]
self.data.set_index('Iteration', inplace=True)
latest_dynamic_samples = self.data.loc[iteration - 1, dynamic_params].values
latest_results = self.data.loc[iteration - 1, 'Results'].values
# Make sure both have 'float' type
latest_dynamic_samples = latest_dynamic_samples.astype(np.float64)
latest_results = latest_results.astype(np.float64)
mod = sm.OLS(latest_results, sm.add_constant(latest_dynamic_samples))
retry = 0
while retry < 5:
try:
mod_fit = mod.fit()
break
except Exception as ex:
time.sleep(0.5)
retry += 1
if retry >= 5:
raise ex
# user_logger.info(mod_fit.summary())
# Regression parameters for plotting / analysis
self.regression = self.regression.query('Iteration < @iteration')
r2_df = pd.DataFrame([[iteration - 1, 'Rsquared', mod_fit.rsquared]],
columns=['Iteration', 'Parameter', 'Value'])
thresh_df = pd.DataFrame([[iteration - 1, 'Rsquared_Threshold', self.rsquared_thresh]],
columns=['Iteration', 'Parameter', 'Value'])
repeats_df = pd.DataFrame([[iteration - 1, 'Center_Repeats', self.center_repeats]],
columns=['Iteration', 'Parameter', 'Value'])
self.regression = pd.concat([self.regression, r2_df, thresh_df, repeats_df])
for (p, v) in zip(['Constant'] + dynamic_params, mod_fit.params): # mod.endog_names
regression_param_df = pd.DataFrame([[iteration - 1, p, v]], columns=['Iteration', 'Parameter', 'Value'])
self.regression = pd.concat([self.regression, regression_param_df])
for (p, v) in zip(['P_Constant'] + ['P_' + s for s in dynamic_params], mod_fit.pvalues): # mod.endog_names
regression_param_df = pd.DataFrame([[iteration - 1, p, v]], columns=['Iteration', 'Parameter', 'Value'])
self.regression = pd.concat([self.regression, regression_param_df])
"""
#L1_wt : scalar : The fraction of the penalty given to the L1 penalty term. Must be between 0 and 1 (inclusive). If 0, the fit is ridge regression. If 1, the fit is the lasso.
mod_fit = mod.fit_regularized(method='coord_descent', maxiter=10000, alpha=1.0, L1_wt=1.0, start_params=None, cnvrg_tol=1e-08, zero_tol=1e-08)
user_logger.info(mod_fit.summary())
from sklearn import linear_model
clf = linear_model.Lasso(alpha=1.0, fit_intercept=True, normalize=True, precompute=False, copy_X=True, max_iter=1000, tol=0.0001, warm_start=False, positive=False, random_state=None, selection='cyclic')
clf.fit(latest_dynamic_samples, latest_results)
user_logger.info(clf.coef_)
user_logger.info(clf.intercept_)
y = clf.predict(latest_dynamic_samples)
user_logger.info(zip(latest_results, y))
user_logger.info('R2:', clf.score(latest_dynamic_samples, latest_results))
user_logger.info('LassoCV')
cvf = linear_model.LassoCV(eps=0.001, n_alphas=100, alphas=None, fit_intercept=True, normalize=True, precompute='auto', max_iter=1000, tol=0.0001, copy_X=True, cv=None, verbose=False, n_jobs=-1, positive=False, random_state=None, selection='cyclic')
cvf.fit(latest_dynamic_samples, latest_results)
user_logger.info(cvf.coef_)
user_logger.info(cvf.intercept_)
y = cvf.predict(latest_dynamic_samples)
user_logger.info(zip(latest_results, y))
user_logger.info('R2:', cvf.score(latest_dynamic_samples, latest_results))
"""
self.fit_summary = mod_fit.summary().as_csv()
self.data.loc[iteration - 1, 'Fitted'] = mod_fit.fittedvalues
self.data.reset_index(inplace=True)
# Choose X_center for this iteration based on previous
old_center = self._get_X_center(iteration - 1)
if mod_fit.rsquared > self.rsquared_thresh:
# user_logger.info('Good R^2 (%f), using params: '%mod_fit.rsquared, mod_fit.params)
coef = mod_fit.params[1:] # Drop constant
den = np.sqrt(
sum([(self.Xmax[pname] - self.Xmin[pname]) ** 2 * c ** 2 for c, pname in zip(coef, dynamic_params)]))
old_center_of_dynamic_params = old_center[dynamic_params].values
new_dynamic_center = [x + (self.Xmax[pname] - self.Xmin[pname]) ** 2 * c * self.mu_r / den for x, c, pname
in zip(old_center_of_dynamic_params, coef, dynamic_params)]
else:
# user_logger.info('Bad R^2 (%f)'%mod_fit.rsquared)
max_idx = np.argmax(latest_results)
logger.info("Stepping to argmax of {} at: {}".format(latest_results[max_idx], latest_dynamic_samples[max_idx]))
new_dynamic_center = latest_dynamic_samples[max_idx].tolist()
new_center_dict = old_center.to_dict() # {k:v for k,v in zip(self.get_param_names(), old_center)}
new_center_dict.update({k: v for k, v in zip(dynamic_params, new_dynamic_center)})
# User may have added or removed params
param_names = [p['Name'] for p in self.params]
# Remove -
new_center_df = {k: v for k, v in new_center_dict.items() if k in param_names}
# Add -
new_params = {p['Name']: p['Guess'] for p in self.params if p['Name'] not in new_center_dict}
new_center_dict.update(new_params)
# CLAMP
new_center_df = pd.Series(new_center_dict, name=0).to_frame().transpose()
new_center_df = self.clamp(new_center_df)
# USER CONSTRAINT FN
new_center_df = new_center_df.apply(self.constrain_sample_fn, axis=1)
new_state = pd.DataFrame({
'Iteration': [iteration] * self.n_dimensions,
'Parameter': new_center_df.columns.values,
# 'Center': new_center_df.as_matrix()[0],
'Center': new_center_df.to_numpy()[0],
'Min': [self.Xmin[pname] for pname in new_center_df.columns.values],
'Max': [self.Xmax[pname] for pname in new_center_df.columns.values],
'Dynamic': [self.Dynamic[pname] for pname in new_center_df.columns.values]
})
self.state = self.state.query('Iteration < @iteration')
self.state = pd.concat([self.state, new_state], ignore_index=True)
samples = self.choose_and_clamp_hypersphere_samples_for_iteration(iteration)
self.add_samples(samples, iteration)
return samples
[docs] def choose_and_clamp_hypersphere_samples_for_iteration(self, iteration):
n = self.samples_per_iteration
state_by_iter = self.state.set_index('Iteration')
# Will vary parameters that are 'Dyanmic' on this iteration
samples = self.sample_hypersphere(n, state_by_iter.loc[[iteration]])
# Clamp and constrain
samples = self.clamp(samples)
samples = samples.apply(self.constrain_sample_fn, axis=1)
return samples # Order shouldn't matter now, so dropped [self.get_param_names()]
[docs] def sample_hypersphere(self, N, state):
# Pick samples on hypersphere - TODO: clean
# assert(N > self.center_repeats)
dynamic_state = state.query('Dynamic == True')
deviations = []
standard_normal = norm(loc=0, scale=1)
radius_normal = norm(loc=self.mu_r, scale=self.sigma_r)
for i in range(N - self.center_repeats):
sn_rvs = standard_normal.rvs(size=len(dynamic_state))
sn_nrm = np.linalg.norm(sn_rvs)
radius = radius_normal.rvs()
deviations.append([radius / sn_nrm * sn for sn in sn_rvs])
x_center = state.reset_index(drop=True).set_index(['Parameter'])[['Center']]
xc = x_center.transpose().reset_index(drop=True)
xc.columns.name = ""
samples = pd.concat([xc] * N).reset_index(drop=True)
dt = np.transpose(deviations)
dynamic_state_by_param = dynamic_state.set_index('Parameter')
for i, pname in enumerate(dynamic_state['Parameter']):
x_cen = dynamic_state_by_param.loc[pname, 'Center']
x_range = dynamic_state_by_param.loc[pname, 'Max'] - dynamic_state_by_param.loc[pname, 'Min']
samples.loc[self.center_repeats:N, pname] = x_cen + dt[i] * x_range
return samples
[docs] def end_condition(self):
# user_logger.info("end_condition")
# Stopping Criterion: good rsqared with small norm?
# Return True to stop, False to continue
logger.info('Continuing iterations ...')
return False
[docs] def get_final_samples(self):
"""
Resample Stage:
"""
state_by_iteration = self.state.set_index('Iteration')
last_iter = sorted(state_by_iteration.index.unique())[-1]
x_center = self._get_X_center(last_iter)
xc = x_center.to_frame().transpose().reset_index(drop=True)
xc.columns.name = ""
dtypes = {name: str(data.dtype) for name, data in xc.items()}
final_samples_NaN_to_Null = xc.where(~xc.isnull(), other=None)
return {'final_samples': final_samples_NaN_to_Null.to_dict(orient='list'), 'final_samples_dtypes': dtypes}
[docs] def prep_for_dict(self, df):
# Needed for Windows compatibility
# nulls = df.isnull()
# if nulls.values.any():
# df[nulls] = None
# return df.to_dict(orient='list')
return df.where(~df.isnull(), other=None).to_dict(orient='list')
[docs] def get_state(self):
optimtool_state = dict(
mu_r=self.mu_r,
sigma_r=self.sigma_r,
center_repeats=self.center_repeats,
rsquared_thresh=self.rsquared_thresh,
n_dimensions=self.n_dimensions,
params=self.params,
samples_per_iteration=self.samples_per_iteration,
data=self.prep_for_dict(self.data),
data_dtypes={name: str(data.dtype) for name, data in self.data.items()},
regression=self.prep_for_dict(self.regression),
regression_dtypes={name: str(data.dtype) for name, data in self.regression.items()},
state=self.prep_for_dict(self.state),
state_dtypes={name: str(data.dtype) for name, data in self.state.items()}
)
return optimtool_state
[docs] def set_state(self, state, iteration):
self.mu_r = state['mu_r']
self.sigma_r = state['sigma_r']
self.center_repeats = state['center_repeats']
self.rsquared_thresh = state['rsquared_thresh']
self.n_dimensions = state['n_dimensions']
self.params = state['params'] # NOTE: This line will override any updated user params passed to __init__
self.samples_per_iteration = state['samples_per_iteration']
data_dtypes = state['data_dtypes']
self.data = pd.DataFrame.from_dict(state['data'], orient='columns')
for c in self.data.columns: # Argh
self.data[c] = self.data[c].astype(data_dtypes[c])
regression_dtypes = state['regression_dtypes']
self.regression = pd.DataFrame.from_dict(state['regression'], orient='columns')
for c in self.regression.columns: # Argh
self.regression[c] = self.regression[c].astype(regression_dtypes[c])
state_dtypes = state['state_dtypes']
self.state = pd.DataFrame.from_dict(state['state'], orient='columns')
for c in self.state.columns: # Argh
self.state[c] = self.state[c].astype(state_dtypes[c])
self.need_resolve = True
[docs] @staticmethod
def get_r(num_params, volume_fraction):
r = math.exp(
1 / float(num_params) * (math.log(volume_fraction) - gammaln(num_params / 2. + 1) + num_params / 2. * math.log(math.pi)))
return r