Source code for idmtools_calibra.utilities.resume_manager

import os
import re
import shutil
from datetime import datetime
import pandas as pd
from idmtools_calibra.calib_manager import CalibManager
from idmtools_calibra.iteration_state import IterationState
from idmtools_calibra.process_state import StatusPoint
from idmtools_calibra.cli.utils import read_calib_data
from logging import getLogger

logger = getLogger(__name__)


[docs]def status_to_iter_step(status: StatusPoint): if status is None: return None elif status == StatusPoint.iteration_start: return StatusPoint.commission elif status == StatusPoint.commission: return StatusPoint.analyze elif status == StatusPoint.running: return StatusPoint.analyze elif status == StatusPoint.analyze: return StatusPoint.plot elif status == StatusPoint.plot: return StatusPoint.next_point elif status == StatusPoint.next_point: return StatusPoint.next_point
[docs]class ResumeManager(object): """ Manages the creation, execution, and resumption of multi-iteration a calibration suite. Each iteration spawns a new ExperimentManager to configure and commission either local or HPC simulations for a set of random seeds, sample points, and site configurations. """ def __init__(self, calib_manager: CalibManager, iteration: int = None, iter_step: str = None, max_iterations: int = None, loop: bool = True, backup: bool = False, dry_run: bool = False): self.calib_manager = calib_manager self.iteration = iteration self.iter_step = iter_step self.max_iterations = max_iterations self.loop = loop self.backup = backup self.dry_run = dry_run self.calib_data = None self.location = None self.initialize()
[docs] def initialize(self): """ prepare calib_manager and iteration state for resume - restore calib_manager - restore iteration state - validate iteration - validate iter_step """ self.iter_step = None if self.iter_step is None else StatusPoint[self.iter_step] if self.iter_step: if self.iter_step.name not in ['commission', 'analyze', 'plot', 'next_point']: print(f"Invalid iter_step '{self.iter_step.name}', ignored.") exit() # restore calib_manager self.restore_calib_manager() # validate iteration self.adjust_iteration() # validate iter_step self.adjust_iteration_step() # restore iteration state self.restore_iteration_state() # restore LL_all.csv self.restore_ll_all()
[docs] def resume(self): """ Call calib_manager.run_iterations to start resume action """ self.calib_manager.resume = True if self.backup: self.backup_calibration() it = self.calib_manager.current_iteration print('\nResume will start with:') print(f' - iteration = {self.iteration}') print(f' - iter_step = {status_to_iter_step(it.status).name}') print(f' - loop = {self.loop}') print(f' - max_iterations = {self.max_iterations}') # resume from a given iteration if not self.dry_run: self.calib_manager.run_iterations(self.iteration, self.max_iterations, loop=self.loop)
[docs] def check_location(self): """ - Handle the case: resume on different environments - Handle environment change case: may resume from commission instead """ # restore iteration state it = self.calib_manager.state_for_iteration(iteration=self.iteration) # If location has been changed, will double check user for a special case before proceed... if self.calib_manager.platform._config_block != it.location: var = input( "\n/!\\ WARNING /!\\ Environment has been changed from '%s' to '%s'. Resume will start from 'commission' instead, do you want to continue? [Y/N]: " % ( it.location, self.calib_manager.platform._config_block)) if var.upper() == 'Y': logger.info(f"Answer is '{var.upper()}'. Continue...") self.calib_manager.suites = [] # will re-generate suite_id in commission_iteration step self.iter_step = StatusPoint.commission else: logger.info(f"Answer is '{var.upper()}'. Exiting...") exit()
[docs] def adjust_iteration(self): """ Validate iteration against latest_iteration return adjusted iteration """ # Get latest iteration # latest_iteration = self.calib_data.get('iteration', None) # handle special case if latest_iteration is None: self.iteration = 0 # if no iteration passed in, take latest_iteration as instead if self.iteration is None: self.iteration = latest_iteration # adjust input iteration if latest_iteration < self.iteration: self.iteration = latest_iteration # adjust max_iterations based on input max_iterations if not self.max_iterations: self.max_iterations = self.calib_manager.max_iterations if self.max_iterations <= self.iteration: self.max_iterations = self.iteration + 1 # check environment self.check_location()
[docs] def adjust_iteration_step(self): """ Validate iter_step """ it = self.calib_manager.state_for_iteration(iteration=self.iteration) latest_step = it.status if isinstance(it.status, StatusPoint) else StatusPoint[it.status] if self.iter_step is None: self.iter_step = latest_step if self.iter_step == StatusPoint.running: self.iter_step = StatusPoint.commission elif self.iter_step == StatusPoint.analyze: self.iter_step = StatusPoint.running if self.iter_step.value > latest_step.value: raise Exception(f"The iter_step '{self.iter_step.name}' is beyond the latest step '{latest_step.name}'") # move forward if status is done if self.iter_step == StatusPoint.done: self.iter_step = StatusPoint.next_point
[docs] def restore_calib_manager(self): """ Restore calib_manager """ self.calib_data = read_calib_data(self.calib_manager.calibration_path) self.calib_manager.suites = self.calib_data['suites'] # restore last time location self.location = self.calib_data['location'] # load all_results results = self.calib_data.get('results') if isinstance(results, dict): self.calib_manager.all_results = pd.DataFrame.from_dict(results, orient='columns') elif isinstance(results, list): self.calib_manager.all_results = results
[docs] def restore_iteration_state(self): """ Restore IterationState """ # restore initial iteration state it = self.calib_manager.state_for_iteration(self.iteration) it.platform = self.calib_manager.platform # in case environment has been changed and new suite_id & suites are generated if not self.calib_manager.suites: it.suite_id = self.calib_manager.suite_id it.suites = self.calib_manager.suites # update required objects for resume it.update(**self.calib_manager.required_components) # set calibration_directory IterationState.calibration_directory = self.calib_manager.directory # step 1: restore next_point if self.iter_step not in ( StatusPoint.plot, StatusPoint.next_point, StatusPoint.running) and self.iteration != 0: if self.iter_step == StatusPoint.commission or self.iter_step == StatusPoint.iteration_start: iteration_state = IterationState.restore_state(self.iteration - 1) it.next_point_algo.set_state(iteration_state.next_point, self.iteration - 1) elif self.iter_step == StatusPoint.analyze: iteration_state = IterationState.restore_state(self.iteration) it.next_point_algo.set_state(iteration_state.next_point, self.iteration) # For IMIS ONLY! it.next_point_algo.restore(IterationState.restore_state(self.iteration - 1)) else: it.next_point_algo.set_state(it.next_point, self.iteration) # step 2: restore Calibration results if self.iteration > 0: if self.iter_step.value < StatusPoint.plot.value: # it will combine current results with previous results it.restore_results(self.iteration - 1) else: # it will use the current results and resume from next iteration it.restore_results(self.iteration) else: if self.iter_step.value >= StatusPoint.plot.value: # it will combine current results with previous results it.restore_results(self.iteration) # it.all_results.reset_index(inplace=True) if it.iteration == 0 and self.iter_step.value < StatusPoint.plot.value: it.all_results = None # step 3: prepare resume states if self.iter_step.value <= StatusPoint.commission.value: # need to run simulations it.simulations = {} if self.iter_step.value <= StatusPoint.analyze.value: # just need to calculate the results it.results = {} # finally update current status it._status = StatusPoint(self.iter_step.value - 1) if self.iter_step.value > 0 else StatusPoint.iteration_start it.resume = True self.calib_manager.current_iteration = it
[docs] def restore_ll_all(self): from idmtools_calibra.utilities.ll_all_generator import generate_ll_all ll_all_name = 'LL_all.csv' ll_all_path = os.path.join(self.calib_manager.directory, '_plots', ll_all_name) if os.path.exists(ll_all_path): os.remove(ll_all_path) if self.iteration > 0: if self.iter_step.value <= StatusPoint.plot.value: generate_ll_all(self.calib_manager, iteration=self.iteration - 1, ll_all_name=ll_all_name) else: generate_ll_all(self.calib_manager, iteration=self.iteration, ll_all_name=ll_all_name) else: if self.iter_step.value > StatusPoint.plot.value: generate_ll_all(self.calib_manager, iteration=self.iteration, ll_all_name=ll_all_name)
[docs] def backup_calibration(self): """ Backup CalibManager.json for resume action """ # calibration_path = os.path.join(self.calib_manager.name, 'CalibManager.json') calibration_path = self.calib_manager.calibration_path if os.path.exists(calibration_path): backup_id = 'backup_' + re.sub('[ :.-]', '_', str(datetime.now().replace(microsecond=0))) shutil.copy(calibration_path, os.path.join(self.calib_manager.name, 'CalibManager_%s.json' % backup_id))