Source code for idmtools_calibra.iteration_state

import json
import os
import time
import pandas as pd
from datetime import datetime
from logging import getLogger
from idmtools.analysis.analyze_manager import AnalyzeManager
from idmtools.registry.functions import FunctionPluginManager

from idmtools_calibra.utilities.parameter_set import ParameterSet
from idmtools_calibra.process_state import StatusPoint
from idmtools_calibra.utilities.encoding import NumpyEncoder, json_numpy_obj_hook
from idmtools_calibra.utilities.display import verbose_timedelta

logger = getLogger("Calibration")
user_logger = getLogger('user')


[docs]class IterationState: """ Holds the settings, parameters, simulation state, analysis results, etc. for one calibtool iteration. Allows for the resumption or extension of existing CalibManager instances from an arbitrary point in the iterative process. """ def __init__(self, **kwargs): self.iteration = 0 self.calibration_name = None self.calibration_directory = None self.platform = None self.task = None self.sites = [] self.suite_id = {} self.samples_for_this_iteration = {} self.next_point = {} self.simulations = {} self.analyzers = {} self.results = {} self.experiment_id = None self.next_point_algo = None self.analyzer_list = [] self.site_analyzer_names = {} self.experiment_builder_function = None self.map_sample_to_model_input_fn = None self.sim_runs_per_param_set = None self.plotters = [] self.all_results = None self.summary_table = None self.iteration_start = None self.calibration_start = None self.resume = False if 'calibration_start' in kwargs: cs = kwargs.pop('calibration_start') or datetime.now().replace(microsecond=0) if not isinstance(cs, datetime): self.calibration_start = datetime.strptime(cs, '%Y-%m-%d %H:%M:%S') else: self.calibration_start = cs if 'iteration_start' in kwargs: cs = kwargs.pop('iteration_start') or datetime.now().replace(microsecond=0) if not isinstance(cs, datetime): self.iteration_start = datetime.strptime(cs, '%Y-%m-%d %H:%M:%S') else: self.iteration_start = cs self._status = None self.update(**kwargs) if self._status is not None: self._status = StatusPoint[self._status] if not os.path.exists(self.iteration_directory): os.makedirs(self.iteration_directory) @property def status(self): return self._status @status.setter def status(self, status): self._status = status self.save()
[docs] def update(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v)
[docs] def restore_results(self, iteration): """ Restore summary results from serialized state. Args: iteration: the # of iteration Returns: """ if isinstance(self.all_results, pd.DataFrame): self.all_results.set_index('sample', inplace=True) self.all_results = self.all_results[self.all_results.iteration <= iteration] elif isinstance(self.all_results, list): self.all_results = self.all_results[iteration]
[docs] def run(self): # START_STEP if not self.status: self.starting_step() # COMMISSION STEP if self.status == StatusPoint.iteration_start: self.commission_step() # RUNNING if self.status == StatusPoint.commission: self.status = StatusPoint.running self.wait_for_finished() # ANALYZE STEP if self.status == StatusPoint.running: self.analyze_step() # PLOTTING STEP if self.status == StatusPoint.analyze: self.plotting_step() # Done with calibration? exit the loop if self.finished(): return # NEXT STEP if self.status == StatusPoint.plot: self.next_point_step() self.status = StatusPoint.done
[docs] def starting_step(self): self.status = StatusPoint.iteration_start # Restart the time for each iteration self.iteration_start = datetime.now().replace(microsecond=0) logger.info('---- Starting Iteration %d ----' % self.iteration)
[docs] def commission_step(self): # Get the params from the next_point next_params = self.next_point_algo.get_samples_for_iteration(self.iteration) self.set_samples_for_iteration(next_params, self.next_point_algo) # Ready for commissioning self.status = StatusPoint.commission # Then commission self.commission_iteration(next_params) # Call the plot for post commission plots self.plot_iteration()
[docs] def analyze_step(self): # Ready for analyzing self.status = StatusPoint.analyze # Analyze the iteration self.analyze_iteration()
[docs] def plotting_step(self): # Ready for plotting self.status = StatusPoint.plot # Plot the iteration self.plot_iteration()
[docs] def next_point_step(self): # Ready for next point self.status = StatusPoint.next_point self.next_point_algo.update_iteration(self.iteration)
[docs] def commission_iteration(self, next_params): """ Commission an experiment of simulations constructed from a list of combinations of random seeds, calibration sites, and the next sample points. Cache the relevant experiment and simulation information to the IterationState. Args: next_params: the next sample Returns: None """ from idmtools.entities.templated_simulation import TemplatedSimulations from idmtools.entities.experiment import Experiment ts = TemplatedSimulations(base_task=self.task) builder = self.experiment_builder_function(next_params) ts.add_builder(builder) exp_name = f'{self.calibration_name}_iter{self.iteration}' experiment = Experiment(name=exp_name) # create mixed experiment from two templates experiment.simulations = ts experiment.parent_id = self.suite_id # run experiment experiment.run() # store experiment id self.experiment_id = experiment.uid # save simulations' tags self.simulations = {sim.id: sim.tags for sim in experiment.simulations} logger.debug('Commissioned new simulations for experiment id: %s' % self.experiment_id) self.save()
[docs] def plot_iteration(self): # Run all the plotters for plotter in self.plotters: plotter.visualize(self)
[docs] def analyze_iteration(self): """ Analyze the output of completed simulations by using the relevant analyzers by site. Cache the results that are returned by those analyzers. """ if self.results: logger.info('Reloading results from cached iteration state.') return self.results['total'] from idmtools.core import ItemType analyzerManager = AnalyzeManager(ids=[(self.experiment_id, ItemType.EXPERIMENT)], analyzers=self.analyzer_list, working_dir=self.iteration_directory, verbose=False, platform=self.platform, force_manager_working_directory=True) if not analyzerManager.analyze(): print("Error encountered during analysis... Exiting") exit() # Make sure each results index is sorted in correct order (ascending) for a in analyzerManager.analyzers: ser = a.results ser.index = ser.index.astype(int) ser = ser.sort_index(ascending=True) a.results = ser # Ask the analyzers to cache themselves cached_analyses = {a.uid: a.cache() if callable(a.cache) else {} for a in analyzerManager.analyzers} logger.debug(cached_analyses) # Get the results from the analyzers and ask the next point how it wants to cache them results = pd.DataFrame({a.uid: a.results for a in analyzerManager.analyzers}) cached_results = self.next_point_algo.get_results_to_cache(results) # Store the analyzers and results in the iteration state self.analyzers = cached_analyses self.results = cached_results # Set those results in the next point algorithm self.next_point_algo.set_results_for_iteration(self.iteration, results) # Update the summary table and all the results self.all_results, self.summary_table = self.next_point_algo.update_summary_table(self, self.all_results) # re-order columns top_columns = ['iteration', 'total'] self.all_results = self.all_results.reindex( columns=(top_columns + list([a for a in self.all_results.columns if a not in top_columns]))) logger.info(self.summary_table) print(self.summary_table)
[docs] def wait_for_finished(self, init_sleep=1.0, sleep_time=30): from idmtools.core import ItemType logger.debug('Waiting for iteration %s simulations to complete' % self.iteration) experiment = self.platform.get_item(self.experiment_id, ItemType.EXPERIMENT) while True: time.sleep(init_sleep) self.platform.refresh_status(experiment) # Output time info current_time = datetime.now() iteration_time_elapsed = current_time - self.iteration_start calibration_time_elapsed = current_time - self.calibration_start logger.info('\n\nCalibration: %s' % self.calibration_name) logger.info('Calibration path: %s' % self.calibration_directory) logger.info('Calibration started: %s' % self.calibration_start) logger.info('Current iteration: Iteration %s' % self.iteration) logger.info('Current Iteration Started: %s' % self.iteration_start) logger.info('Time since iteration started: %s' % verbose_timedelta(iteration_time_elapsed)) logger.info('Time since calibration started: %s\n' % verbose_timedelta(calibration_time_elapsed)) # If Calibration has been canceled -> exit if experiment.any_failed and not experiment.done: # Kill the remaining simulations print("\nOne or more simulations failed. Calibration cannot continue. Exiting...") FunctionPluginManager.instance().hook.idmtools_runnable_on_failure(item=experiment) self.cancel() exit() # Test if we are all done if experiment.done: FunctionPluginManager.instance().hook.idmtools_runnable_on_done(item=experiment) break time.sleep(sleep_time) # exit if it is failed if experiment.done and not experiment.succeeded: print("\nexperiment failed") exit() FunctionPluginManager.instance().hook.idmtools_runnable_on_succeeded(item=experiment) # Print the status one more time iteration_time_elapsed = current_time - self.iteration_start logger.info("Iteration %s done (took %s)" % (self.iteration, verbose_timedelta(iteration_time_elapsed)))
[docs] def cancel(self): self.platform._experiments.platform_cancel(self.experiment_id) # Print confirmation user_logger.info("Have submitted cancellation for Calibration %s" % self.calibration_name)
@property def iteration_directory(self): return os.path.join(self.calibration_directory, 'iter%d' % self.iteration) @property def iteration_file(self): return os.path.join(self.iteration_directory, "IterationState.json") @property def param_names(self): return self.next_point_algo.get_param_names()
[docs] def finished(self): """ The next-point algorithm has reached its termination condition. """ return self.next_point_algo.end_condition()
[docs] @classmethod def from_file(cls, filepath): with open(filepath, 'r', encoding='utf-8') as f: return cls(**json.load(f, object_hook=json_numpy_obj_hook))
[docs] def to_file(self): state = dict(status=self.status.name, location=self.platform._config_block, samples_for_this_iteration=self.samples_for_this_iteration, analyzers=self.analyzers, iteration=self.iteration, iteration_start=self.iteration_start, results=self.results, calibration_name=self.calibration_name, experiment_id=self.experiment_id, calibration_directory=self.calibration_directory, simulations=self.simulations, next_point=self.next_point_algo.get_state(), suite_id=self.suite_id) with open(self.iteration_file, 'w') as f: json.dump(state, f, indent=4, cls=NumpyEncoder)
[docs] @classmethod def restore_state(cls, iteration): """ Restore IterationState """ iter_directory = os.path.join(cls.calibration_directory, 'iter%d' % iteration) iter_file = os.path.join(iter_directory, 'IterationState.json') return cls.from_file(iter_file)
[docs] def set_samples_for_iteration(self, samples, next_point): if isinstance(samples, pd.DataFrame): dtypes = {name: str(data.dtype) for name, data in samples.items()} self.samples_for_this_iteration_dtypes = dtypes samples_NaN_to_Null = samples.where(~samples.isnull(), other=None) self.samples_for_this_iteration = samples_NaN_to_Null.to_dict(orient='list') else: self.samples_for_this_iteration = samples
[docs] def save(self): """ Cache information about the IterationState that is needed to resume after an interruption. If resuming from an existing iteration, also copy to backup the initial cached state. """ logger.debug('Saving calibration iteration %s' % self.iteration) if not self.calibration_name: return self.to_file()
[docs] def get_parameter_sets_with_likelihoods(self): likelihoods = self.results['total'] # an ordered list of likelihood floats all_sublikelihoods = {k: v for k, v in self.results.items() if k != 'total'} param_dicts = self.samples_for_this_iteration # an ordered list of input input parameters (user knobs) if len(likelihoods) != len(param_dicts): raise Exception('Inconsistent iteration data. \'total\' and \'samples_for_this_iteration\' ' 'are not the same length') # find and attach the sim_id & run number for every parameter set replicate parameter_sets = [] for sample_index in range(len(param_dicts)): param_dict = param_dicts[sample_index] likelihood = likelihoods[sample_index] sublikelihoods = {k: v[sample_index] for k, v in all_sublikelihoods.items()} replicates_dict = {sim_id: sim_dict for sim_id, sim_dict in self.simulations.items() if sim_dict['__sample_index__'] == sample_index} if len(replicates_dict) == 0: raise Exception('There should be at least one simulation associated with sample_index: %s. ' 'There are none. %s' % (sample_index, len(replicates_dict))) # Create a distinct ParameterSet object for each replicate for sim_id, replicate_dict in replicates_dict.items(): run_number = replicate_dict['Run_Number'] parameter_set = ParameterSet(param_dict=param_dict, likelihood=likelihood, sublikelihoods=sublikelihoods, iteration_number=self.iteration, sim_id=sim_id, run_number=run_number) parameter_sets.append(parameter_set) return parameter_sets