Source code for idmtools_calibra.calib_manager

import os
import re
import json
import shutil
import pandas as pd
from datetime import datetime
from logging import getLogger
from functools import partial
from typing import Optional, Any, Dict, List
from idmtools.builders import SimulationBuilder
from idmtools.core.context import get_current_platform
from idmtools.entities.iplatform import IPlatform
from idmtools.entities.itask import ITask
from idmtools.entities.simulation import Simulation
from idmtools.utils.json import IDMJSONEncoder
from idmtools_calibra.algorithms.next_point_algorithm import NextPointAlgorithm
from idmtools_calibra.calib_site import CalibSite
from idmtools_calibra.iteration_state import IterationState
from idmtools_calibra.plotters.base_plotter import BasePlotter
from idmtools_calibra.process_state import StatusPoint
from idmtools_calibra.utilities.mod_fn import ModFn
from idmtools_calibra.utilities.helper import validate_exp_name
from idmtools_calibra.utilities.display import verbose_timedelta

logger = getLogger(__name__)


[docs]def set_parameter_sweep_callback(simulation: Simulation, param: str, value: Any) -> Dict[str, Any]: """ Convenience callback for sweeps Args: simulation: Simulation we are updating param: Parameter value: Value Returns: Tags to set on simulation """ if hasattr(simulation.task, 'config') and hasattr(simulation.task.config, 'parameters'): # For example: EMODTask simulation.task.config.parameters[param] = value elif not hasattr(simulation.task, 'config') and hasattr(simulation.task, 'parameters'): # For example: JSONConfiguredTask, JSONConfiguredPythonTask, JSONConfiguredRTask simulation.task.parameters[param] = value else: raise Exception("Support task with 'config' or 'parameters' only.") return {param: value}
[docs]class SampleIndexWrapper(object): """ Wrapper for a SimConfigBuilder-modifying function to add metadata on sample-point index when called in a iteration over sample points """ __name__ = "SampleIndex" def __init__(self, map_sample_to_model_input_fn): self.map_sample_to_model_input_fn = map_sample_to_model_input_fn def __call__(self, simulation, idx, sample, *args, **kwargs): # because this sample might be reused for replicate sims of this one, we need to make sure to not modify it params_dict = self.map_sample_to_model_input_fn(simulation, sample.copy(), *args, **kwargs) params_dict.update({'__sample_index__': idx}) return params_dict
[docs]class CalibManager(object): """ Manages the creation, execution, and resumption of multi-iteration a calibration suite. Each iteration spawns a new ExperimentManager to configure and commission either local or HPC simulations for a set of random seeds, sample points, and site configurations. """ def __init__(self, task: ITask, map_sample_to_model_input_fn, sites: List[CalibSite], next_point: NextPointAlgorithm, platform: Optional[IPlatform] = None, name: str = 'calib_test', sim_runs_per_param_set: int = 1, max_iterations: int = 5, plotters: List[BasePlotter] = None, map_replicates_callback=None, directory='.'): self.name = name self.directory = os.path.join(directory, name) # path to root calib dir if platform is None: platform = get_current_platform() self.platform = platform self.task = task self.map_sample_to_model_input_fn = SampleIndexWrapper(map_sample_to_model_input_fn) self.sites = sites self.next_point = next_point self.sim_runs_per_param_set = sim_runs_per_param_set self.max_iterations = max_iterations self.plotters = plotters or [] self.suites = [] self.all_results = None self.summary_table = None self.calibration_start = None self.latest_iteration = 0 self.current_iteration = None self.resume = False self.experiment_builder_function = self.default_experiment_builder_function # if not overridden in the set method, use internally-generated func self.map_replicates_callback = map_replicates_callback if map_replicates_callback else partial( set_parameter_sweep_callback, param="Run_Number")
[docs] @classmethod def open_for_reading(cls, calibration_directory): return cls(task=None, map_sample_to_model_input_fn=None, sites=None, next_point=None, name=calibration_directory)
@property def suite_id(self): # Generate the suite ID if not present if not self.suites: from idmtools.entities import Suite suite = Suite(name=self.name) suites = self.platform.create_items(suite) suite_id = suites[0][1] self.suites.append({'id': suite_id}) self.cache_calibration() return self.suites[-1]['id'] @property def iteration(self): return self.current_iteration.iteration if self.current_iteration else 0
[docs] def run_calibration(self, **kwargs): """ Create and run a complete multi-iteration calibration suite. kwargs supports the following optional parameters: Args: resume: bool, default=False, flag required for calibration resume iteration: int, default=None, from which iteration to resume iter_step: str, default=None, from which calibration step to resume. support values: 'commission', 'analyze', plot' and 'next_point' loop: bool, default=True, if like to continue to next iteration max_iterations, int, default=None, user can override the max_iterations defined in calib_manager backup: bool, default=False, if like to backup Calibration.json dry_run: bool, default=False, if like to really execute resume action directory: str, default=None, calibration directory Returns: None """ directory = kwargs.get('directory', None) if directory: self.directory = os.path.join(directory, self.name) # path to root calib dir ll_all = kwargs.get('ll_all', False) if ll_all: from idmtools_calibra.utilities.ll_all_generator import generate_ll_all iteration = kwargs.get('iteration', None) print("Generating ll_all.csv file...") generate_ll_all(self, iteration=iteration) exit(0) resume = kwargs.get('resume', False) if resume: if not os.path.exists(self.directory): print(f"\n/!\\ WARNING /!\\ This is a brand new run for calibration '{self.name}', can't resume.") exit() self.resume_calibration(**kwargs) else: # Check experiment name as early as possible if not validate_exp_name(self.name): exit() self.create_calibration() self.run_iterations()
[docs] def run_iterations(self, iteration: int = 0, max_iterations: int = None, loop: bool = True): """ Run iterations in a loop Args: iteration: the # of iterations max_iterations: max iterations loop: if or not continue iteration loop Returns: None """ self.calibration_start = datetime.now().replace(microsecond=0) if not max_iterations: max_iterations = self.max_iterations # normal run for i in range(iteration, max_iterations): self.current_iteration = self.create_iteration_state(i) self.current_iteration.run() self.post_iteration() if not loop: break self.finalize_calibration() # Print the calibration finish time current_time = datetime.now() calibration_time_elapsed = current_time - self.calibration_start logger.info("Calibration done (took %s)" % verbose_timedelta(calibration_time_elapsed)) print("Calibration done (took %s)" % verbose_timedelta(calibration_time_elapsed))
[docs] def post_iteration(self): self.all_results = self.current_iteration.all_results self.summary_table = self.current_iteration.summary_table self.cache_calibration(iteration=self.iteration + 1)
[docs] def set_experiment_builder_function(self, exp_builder_function): """ Set the experiment builder to a predefined one, such that exp_builder_func will return it Args: exp_builder_function: an experiment builder object Returns: None """ self.experiment_builder_function = exp_builder_function
[docs] def set_map_replicates_callback(self, map_replicates_callback): """ This sets the maps replicate callback. This callback should take a value parameter that is the value of the replicate. Normally you would want to map this to a value in the config Args: map_replicates_callback: Returns: """ self.map_replicates_callback = map_replicates_callback
[docs] def default_experiment_builder_function(self, next_params, n_replicates: Optional[int] = None) -> SimulationBuilder: """ Defines the function that builds the experiment for each iteration of a calibration run Args: next_params: The next parameters to run n_replicates: Number of replicates Returns: Simulation Builder """ if not n_replicates: n_replicates = self.sim_runs_per_param_set sweeps = [[ModFn(site.setup_fn) for site in self.sites]] sweep = [ModFn(self.map_replicates_callback, value=i + 1) for i in range(n_replicates)] if n_replicates > 1 and len(sweep) == 1: sweep = sweep[0] sweeps.append(sweep) sweeps.append( [ModFn(self.map_sample_to_model_input_fn, index, samples.copy()) for index, samples in enumerate(next_params)]) builder = SimulationBuilder() count = None for sweep in sweeps: builder.sweeps.append(sweep) count = len(sweep) if count is None else count * len(sweep) builder.count = count return builder
[docs] def create_iteration_state(self, iteration): """ Create iteration state Args: iteration: the # of the iteration Returns: created IterationState """ if self.resume: self.resume = False self.current_iteration.calibration_start = self.calibration_start return self.current_iteration return IterationState(iteration=iteration, calibration_name=self.name, calibration_directory=self.directory, platform=self.platform, sites=self.sites, suite_id=self.suite_id, next_point_algo=self.next_point, map_sample_to_model_input_fn=self.map_sample_to_model_input_fn, experiment_builder_function=self.experiment_builder_function, sim_runs_per_param_set=self.sim_runs_per_param_set, site_analyzer_names=self.site_analyzer_names(), analyzer_list=self.analyzer_list, task=self.task, plotters=self.plotters, all_results=self.all_results, calibration_start=self.calibration_start)
[docs] def create_calibration(self): """ Create the working directory for a new calibration. Cache the relevant suite-level information to allow re-initializing this instance. """ if os.path.exists(self.directory): logger.info("Calibration with name %s already exists at directory %s" % (self.name, self.directory)) var = "" while var not in ('R', 'B', 'C', 'A'): var = input('Do you want to [R]esume, [B]ackup + run, [C]leanup + run, [A]bort: ') var = var.upper() # Abort if var == 'A': exit() elif var == 'B': tstamp = re.sub('[ :.-]', '_', str(datetime.now())) shutil.move(self.directory, "%s_backup_%s" % (self.directory, tstamp)) self.create_calibration() elif var == "C": self.cleanup() self.create_calibration() elif var == "R": self.resume_calibration() exit() # avoid calling self.run_iterations(**kwargs) else: os.makedirs(self.directory) self.cache_calibration()
[docs] def finalize_calibration(self): """ Get the final samples from the next point algorithm. """ final_samples = self.next_point.get_final_samples() print("\nFinal samples") for k, v in final_samples['final_samples'].items(): print("{}: {}".format(k, v)) self.cache_calibration(**final_samples)
[docs] def cache_calibration(self, **kwargs): """ Cache information about the CalibManager that is needed to resume after an interruption. N.B. This is not currently the complete state, some of which relies on nested and frozen functions. As such, the 'resume' logic relies on the existence of the original configuration script. Args: **kwargs: extra info Returns: None """ it = getattr(self, 'current_iteration', None) state = {'name': self.name, 'directory': self.directory, 'location': self.platform._config_block, 'suites': self.suites, 'iteration': self.iteration, 'param_names': self.param_names(), 'sites': self.site_analyzer_names(), 'results': self.serialize_results(), 'calibration_start': self.calibration_start} state.update(kwargs) json.dump(state, open(os.path.join(self.directory, 'CalibManager.json'), 'w'), indent=4, cls=IDMJSONEncoder) # save backup to current iteration if it: iteration = kwargs.get('iteration') - 1 if 'iteration' in kwargs else self.iteration state['iteration'] = iteration json.dump(state, open(os.path.join(it.iteration_directory, f'CalibManager_{it.iteration}.json'), 'w'), indent=4, cls=IDMJSONEncoder)
[docs] def backup_calibration(self): """ Backup CalibManager.json for resume action """ calibration_path = os.path.join(self.directory, 'CalibManager.json') if os.path.exists(calibration_path): backup_id = 'backup_' + re.sub('[ :.-]', '_', str(datetime.now().replace(microsecond=0))) shutil.copy(calibration_path, os.path.join(self.directory, 'CalibManager_%s.json' % backup_id))
[docs] def serialize_results(self): if self.all_results is None: return [] if not isinstance(self.all_results, pd.DataFrame): return self.all_results # handle resume case: restored self.all_results already has 'sample' column if 'sample' not in self.all_results.columns: self.all_results.index.name = 'sample' data = self.all_results.reset_index() else: data = self.all_results data.iteration = data.iteration.astype(int) data['sample'] = data['sample'].astype(int) return data.to_dict(orient='list')
[docs] def resume_calibration(self, **kwargs): iteration = kwargs.get('iteration', None) iter_step = kwargs.get('iter_step', None) loop = kwargs.get('loop', True) max_iterations = kwargs.get('max_iterations', None) backup = kwargs.get('backup', False) # backup Calibration.json dry_run = kwargs.get('dry_run', False) # show final parameters only from idmtools_calibra.utilities.resume_manager import ResumeManager resume_manager = ResumeManager(self, iteration, iter_step, max_iterations, loop, backup, dry_run) resume_manager.resume()
[docs] def delete(self): calib_data = self.read_calib_data(force=True) if not calib_data: return suite_list = calib_data.get('suites') for suite_dict in suite_list: suite_id = suite_dict['id'] self.platform._suites.platform_delete(suite_id) # Print confirmation logger.info("Calibration %s successfully cancelled!" % self.name)
[docs] def cleanup(self): """ Cleanup the current calibration - Delete the result directory - If LOCAL -> also delete the simulations """ self.delete() # Then delete the whole directory calib_dir = os.path.abspath(self.directory) if os.path.exists(calib_dir): try: shutil.rmtree(calib_dir) except OSError: logger.error("Failed to delete %s" % calib_dir) logger.error("Try deleting the folder manually before retrying the calibration.")
[docs] def read_calib_data(self, force=False): try: return json.load(open(os.path.join(self.directory, 'CalibManager.json'), 'rb')) except IOError: if not force: raise Exception('Unable to find metadata in %s/CalibManager.json' % self.directory) else: return None
@property def calibration_path(self): return os.path.join(self.directory, 'CalibManager.json') @property def analyzer_list(self): analyzer_list = [] for site in self.sites: for analyzer in site.analyzers: analyzer.result = [] analyzer_list.append(analyzer) return analyzer_list @property def required_components(self): # update required objects for resume, reanalyze and replot kwargs = { 'experiment_builder_function': self.experiment_builder_function, 'next_point_algo': self.next_point, 'task': self.task, 'analyzer_list': self.analyzer_list, 'plotters': self.plotters, 'all_results': self.all_results, 'calibration_start': self.calibration_start, 'site_analyzer_names': self.site_analyzer_names() } return kwargs
[docs] def iteration_directory(self): return os.path.join(self.directory, 'iter%d' % self.iteration)
[docs] def state_for_iteration(self, iteration): iter_directory = os.path.join(self.directory, f'iter{iteration}') it = IterationState.from_file(os.path.join(iter_directory, 'IterationState.json')) it.platform = self.platform return it
[docs] def param_names(self): return self.next_point.get_param_names()
[docs] def site_analyzer_names(self): return {site.name: [a.uid for a in site.analyzers] for site in self.sites}
[docs] def get_last_iteration(self): """ Determines the last (most recent) completed iteration number. Returns: the last completed iteration number as an int """ calib_data = self.read_calib_data() last_iteration_number = int(calib_data.get('iteration', None)) iteration = self.state_for_iteration(iteration=last_iteration_number) if last_iteration_number is None: raise KeyError('Could not determine what the most recent iteration is.') if iteration.status != StatusPoint.done: last_iteration_number -= 1 return last_iteration_number
[docs] def get_parameter_sets_with_likelihoods(self): """ Returns: a list of ParameterSet objects. """ last_iteration = self.get_last_iteration() parameter_sets = [] for iteration_number in range(last_iteration + 1): iteration = self.state_for_iteration(iteration=iteration_number) iteration_param_sets = iteration.get_parameter_sets_with_likelihoods() parameter_sets += iteration_param_sets return parameter_sets