Source code for idmtools_calibra.plotters.separatrix_bhm_plotter

# flake8: noqa E402
import logging
import os
import matplotlib

matplotlib.use('Agg', force=True)
import matplotlib.pyplot as plt
import matplotlib.tri as mtri
import numpy as np
import pandas as pd
import seaborn as sns
from idmtools_calibra.plotters.base_plotter import BasePlotter
from idmtools_calibra.process_state import StatusPoint

logger = logging.getLogger(__name__)
try:
    from history_matching.gpc import GPC
except ImportError:
    logger.warning("Could not load history matching. You must install itertool with history matching")
    logger.warning("pip install itertool[history-matching]")

sns.set_style('white')
fs = (16, 10)  # Figure size (24,15)


[docs]class SeparatrixBHMPlotter(BasePlotter): def __init__(self): super(SeparatrixBHMPlotter, self).__init__(False) self.prediction_grid = pd.DataFrame() self.prediction_resolution = 100
[docs] def make_prediction_grid(self): # Prediction grid - for plotting when 2 parameters assert (len(self.param_names) == 2) self.x_var = self.param_names[0] self.y_var = self.param_names[1] Px = Py = self.prediction_resolution px = np.linspace(self.param_info.loc[self.x_var]['Min'], self.param_info.loc[self.x_var]['Max'], Px) py = np.linspace(self.param_info.loc[self.y_var]['Min'], self.param_info.loc[self.y_var]['Max'], Py) self.Px, self.Py = np.meshgrid(px, py) self.prediction_grid = pd.DataFrame({self.x_var: self.Px.flatten(), self.y_var: self.Py.flatten()}) self.prediction_grid['Outcome'] = np.NaN self.prediction_grid.index.name = 'Sample' self.prediction_grid.reset_index(inplace=True) self.prediction_grid['Implausible'] = False self.prediction_grid['Max_Implausibility'] = -1 # For plotting
[docs] def cleanup(self): pass
[docs] def visualize(self, iteration_state): self.iteration_state = iteration_state iteration_status = self.iteration_state.status self.directory = self.iteration_state.iteration_directory self.param_names = self.iteration_state.param_names self.npt = self.iteration_state.next_point_algo.get_state() self.data = pd.DataFrame.from_dict(self.npt['data']) self.hyperparameters = pd.DataFrame.from_dict(self.npt['hyperparameters']) self.emulation = pd.DataFrame.from_dict(self.npt['emulation']) self.gpc_vec = [GPC.from_dict(config) if config else None for config in self.npt['gpc_vec']] self.param_info = pd.DataFrame.from_dict(self.npt['params'], orient='columns').set_index('Name') self.target = self.npt['target_success_probability'] self.implausibility_threshold = self.npt['implausibility_threshold'] self.for_plotting = pd.DataFrame.from_dict(self.npt['for_plotting']) if self.prediction_grid.shape[0] == 0: self.make_prediction_grid() if iteration_status == StatusPoint.commission: if self.iteration_state.iteration > 0: self.visualize_at_start_of_iteration() elif iteration_status == StatusPoint.plot: self.visualize_at_end_of_iteration() else: raise Exception('Unknown stage %s' % iteration_status.name)
###gc.collect()
[docs] def visualize_at_end_of_iteration(self): if len(self.param_names) != 2: return iteration = self.iteration_state.iteration max_iterations = self.npt['max_iterations'] if iteration == 0: return prev_data = self.data.loc[self.data['Iteration'] < iteration] # prev_data = prev_data.merge( self.emulation, on=['Iteration', 'Sample']) prev_data_prediction = self.gpc_vec[iteration].evaluate(prev_data) prev_data = prev_data.merge(prev_data_prediction, left_index=True, right_index=True) fig, ax = plt.subplots(1, 2, figsize=fs) # fig = plt.figure(figsize=fs) # ax = [] # ax.append(fig.add_subplot(1,2,1)) #, projection='3d', proj_type = 'ortho')) # ax.append(fig.add_subplot(1,2,2))#, projection='3d', proj_type = 'ortho')) success = prev_data['Results'] == 1 # ax[0].tricontourf(prev_data[self.x_var], prev_data[self.y_var], prev_data['Mean'], levels=25, cmap="RdBu_r") triang = mtri.Triangulation(prev_data[self.x_var], prev_data[self.y_var]) ax[0].tripcolor(triang, prev_data['Mean'], cmap='RdBu_r', shading='gouraud') ax[0].tricontour(prev_data[self.x_var], prev_data[self.y_var], prev_data['Mean'], levels=[self.target], linewidths=2, colors='k') ax[0].set_xlabel(self.x_var) ax[0].set_xlim([self.param_info.loc[self.x_var, 'Min'], self.param_info.loc[self.x_var, 'Max']]) ax[0].set_ylabel(self.y_var) ax[0].set_ylim([self.param_info.loc[self.y_var, 'Min'], self.param_info.loc[self.y_var, 'Max']]) ax[0].set_title('Mean') ax[0].scatter(prev_data.loc[success, self.x_var], prev_data.loc[success, self.y_var], s=50, c='r', marker='o', edgecolors='k', linewidths=1) ax[0].scatter(prev_data.loc[~success, self.x_var], prev_data.loc[~success, self.y_var], s=50, c='b', marker='o', edgecolors='k', linewidths=1) # ax[1].tricontourf(prev_data[self.x_var], prev_data[self.y_var], np.sqrt(prev_data['Var']), cmap="RdBu_r") ax[1].tripcolor(triang, np.sqrt(prev_data['Var']), cmap='RdBu_r', shading='gouraud') ax[1].set_xlabel(self.x_var) ax[1].set_xlim([self.param_info.loc[self.x_var, 'Min'], self.param_info.loc[self.x_var, 'Max']]) ax[1].set_ylabel(self.y_var) ax[1].set_ylim([self.param_info.loc[self.y_var, 'Min'], self.param_info.loc[self.y_var, 'Max']]) ax[1].set_title('Stdev') ax[1].scatter(prev_data.loc[success, self.x_var], prev_data.loc[success, self.y_var], s=50, c='r', marker='o', edgecolors='k', linewidths=1) ax[1].scatter(prev_data.loc[~success, self.x_var], prev_data.loc[~success, self.y_var], s=50, c='b', marker='o', edgecolors='k', linewidths=1) plt.savefig(os.path.join(self.directory, 'Separatrix_it%d.png' % iteration)) plt.close(fig)
[docs] def visualize_at_start_of_iteration(self): iteration = self.iteration_state.iteration # Plot 2d scatter of points? # Plot % space removed - would be zero on iter 0? if iteration == 0: return ########## Hyperparameters and function value fig, ax_vec = plt.subplots(nrows=1, ncols=self.hyperparameters.shape[1], sharex=True, figsize=(16, 10)) # , figsize=fs for i, ax in enumerate(ax_vec): hp_name = self.hyperparameters.columns[i] ax.plot(self.hyperparameters[hp_name], marker='o', ls='-') ax.set_title(hp_name) plt.savefig(os.path.join(self.directory, 'Separatrix_Hyperparameters_it%d.png' % iteration)) plt.close(fig) ########## Accepted percent fig, ax_vec = plt.subplots(nrows=1, ncols=self.hyperparameters.shape[1], sharex=True, figsize=(16, 10)) # , figsize=fs for i, ax in enumerate(ax_vec): hp_name = self.hyperparameters.columns[i] ax.plot(self.hyperparameters[hp_name], marker='o', ls='-') ax.set_title(hp_name) plt.savefig(os.path.join(self.directory, 'Separatrix_Hyperparameters_%d.png' % iteration)) plt.close(fig) ########## Evaluate GPC on prediction grid for plotting prediction = self.gpc_vec[iteration].evaluate(self.prediction_grid) # if 'Mean' in self.data: self.data.drop('Mean', axis=1, inplace=True) # if 'Var' in self.data: self.data.drop('Var', axis=1, inplace=True) next_data = self.data.loc[self.data['Iteration'] == iteration] prev_data = self.data.loc[self.data['Iteration'] < iteration] emu_iter = self.emulation.set_index('Iteration').loc[iteration - 1] prev_data = prev_data.merge(emu_iter, on=['Sample']) train = self.gpc_vec[iteration].training_data.reset_index() # prev_data.loc[prev_data['Train'] == True] train = train.merge(emu_iter, on=['Sample']) test = prev_data.loc[prev_data['Train'] == False] fig = self.gpc_vec[iteration].plot_errors(train, test, 'Mean', 'Var') plt.savefig(os.path.join(self.directory, 'Separatrix_Errors_it%d.png' % iteration)) plt.close(fig) ###### fig = plt.figure(figsize=fs) fig.suptitle('Iteration %d' % iteration, fontsize=12) ax1 = fig.add_subplot(1, 2, 1, projection='3d') ax2 = fig.add_subplot(1, 2, 2) success = prev_data['Results'] == 1 # ax1.scatter(prev_data[self.x_var], prev_data[self.y_var], 0.5*(prev_data['Results']+1), c='k', marker='*')#, 25, marker='*', color='k') ax1.scatter(prev_data.loc[success, self.x_var], prev_data.loc[success, self.y_var], 0.5 * (prev_data.loc[success, 'Results'] + 1), s=50, c='g', marker='*') ax1.scatter(prev_data.loc[~success, self.x_var], prev_data.loc[~success, self.y_var], 0.5 * (prev_data.loc[~success, 'Results'] + 1), s=50, c='r', marker='*') # ax1.scatter(self.prediction_grid[self.x_var], self.prediction_grid[self.y_var], prediction['Mean'], c='b', marker='o') # cntr = ax1.tricontourf(self.prediction_grid[self.x_var], self.prediction_grid[self.y_var], prediction['Mean'], cmap="Blues", alpha=0.5) x = self.prediction_grid[self.x_var].values.reshape(self.prediction_resolution, self.prediction_resolution) y = self.prediction_grid[self.y_var].values.reshape(self.prediction_resolution, self.prediction_resolution) z = prediction['Mean'].values.reshape(self.prediction_resolution, self.prediction_resolution) s = np.sqrt(prediction['Var']) s = s.values.reshape(self.prediction_resolution, self.prediction_resolution) ax1.plot_surface(x, y, z, cmap="Blues", alpha=0.5, edgecolors='none', linewidth=0, antialiased=True) # ax1.scatter(self.prediction_grid[self.x_var], self.prediction_grid[self.y_var], prediction['Mean'] + 2*np.sqrt(prediction['Var']), c = 'm', marker='o') ax1.plot_surface(x, y, z + 2 * s, color='k', alpha=0.1, edgecolors='none', linewidth=0, antialiased=True) # ax1.scatter(self.prediction_grid[self.x_var], self.prediction_grid[self.y_var], prediction['Mean'] - 2*np.sqrt(prediction['Var']), c = 'c', marker='o') ax1.plot_surface(x, y, z - 2 * s, color='k', alpha=0.1, edgecolors='none', linewidth=0, antialiased=True) ax1.set_xlabel(self.x_var) ax1.set_xlim([self.param_info.loc[self.x_var, 'Min'], self.param_info.loc[self.x_var, 'Max']]) ax1.set_ylabel(self.y_var) ax1.set_ylim([self.param_info.loc[self.y_var, 'Min'], self.param_info.loc[self.y_var, 'Max']]) ax1.set_zlabel('Outcome') ax1.set_zlim([-1.2, 1.2]) ax1.set_title('GPC Metamodel') # Add in points from p to increase plotting resolution # for it in reversed(range(iteration+1)): # TODO: Only evaluate non-implausible points to save time, although will degrate plotting self.prediction_grid['Implausibility_%d' % iteration] = np.sqrt( (prediction['Mean'] - self.target) ** 2 / prediction['Var']) self.prediction_grid['Implausibile_%d' % iteration] = self.prediction_grid[ 'Implausibility_%d' % iteration] > self.implausibility_threshold self.prediction_grid['Implausible'] = self.prediction_grid['Implausible'] | self.prediction_grid[ 'Implausibile_%d' % iteration] self.prediction_grid['Max_Implausibility'] = pd.concat( [self.prediction_grid['Max_Implausibility'], self.prediction_grid['Implausibility_%d' % iteration]], axis=1).max(axis=1) # Better way? self.for_plotting = self.for_plotting.append( self.prediction_grid[[self.x_var, self.y_var, 'Max_Implausibility']], ignore_index=True) self.for_plotting.loc[self.for_plotting[ 'Max_Implausibility'] > self.implausibility_threshold + 2, 'Max_Implausibility'] = self.implausibility_threshold + 2 ax2.tricontour(self.for_plotting[self.x_var], self.for_plotting[self.y_var], self.for_plotting['Max_Implausibility'], levels=[self.implausibility_threshold], linewidths=2, colors='k') levels = list(range(self.implausibility_threshold + 4)) cntr2 = ax2.tricontourf(self.for_plotting[self.x_var], self.for_plotting[self.y_var], self.for_plotting['Max_Implausibility'], levels=levels, cmap="RdBu_r") ax2.plot(next_data[self.x_var], next_data[self.y_var], 'yo') ax2.plot(prev_data.loc[success, self.x_var], prev_data.loc[success, self.y_var], 'wo') ax2.plot(prev_data.loc[~success, self.x_var], prev_data.loc[~success, self.y_var], 'ko') ax2.plot(train[self.x_var], train[self.y_var], 'cx') ax2.plot(test[self.x_var], test[self.y_var], 'mx') for idx, row in test.iterrows(): ax2.annotate(xy=(row[self.x_var], row[self.y_var]), s=str(row['Sample'])) # ax2.contour(self.Px, self.Py, self.Pf, levels = [self.target], colors='k', linestyles='dashed', linewidths=2) ax2.set_xlabel(self.x_var) ax2.set_xlim([self.param_info.loc[self.x_var, 'Min'], self.param_info.loc[self.x_var, 'Max']]) ax2.set_ylim([self.param_info.loc[self.y_var, 'Min'], self.param_info.loc[self.y_var, 'Max']]) ax2.set_ylabel(self.y_var) ax2.set_title('Implausibility & Next Samples') plt.savefig(os.path.join(self.directory, 'Separatrix_Implausibility_it%d.png' % iteration)) plt.close(fig)
[docs] def cleanup_plot(self, calib_manager): pass