Source code for COMPS.utils.rerun_failed_simulations_for_experiment

import logging
from functools import partial
from multiprocessing import Pool

from COMPS import Client
from COMPS.Data import Experiment, Simulation, QueryCriteria
from COMPS.Data.Simulation import SimulationState

from COMPS.utils.clone_simulation import clone_simulation

logger = logging.getLogger(__name__)

##########################

utility_metadata = {
    'aliases': [ 'rerunsims' ],
    'help': 'Rerun failed simulations for an experiment',
    'description': 'This utility creates duplicates of failed simulations in an experiment and reruns them.  By ' +
                   'default, failed simulations are considered to be those that are in state \'Failed\', but if ' +
                   'calling from script, another predicate can be provided (for example, to detect expected output ' +
                   'files that are missing, etc).  If the user running the utility is the owner of the original ' +
                   'experiment, the newly-created simulations will be placed in the original experiment, otherwise' +
                   'a new experiment will be created to hold the new simulations.',
    'epilog': '''examples:
  %(prog)s 11111111-2222-3333-4444-000000000000
  %(prog)s 11111111-2222-3333-4444-000000000000 --createonly
'''
}

[docs]def fill_parser(p): p.add_argument('experiment_id', help='Id of the experiment containing the failed simulations to rerun') p.add_argument('--createonly', '-co', action='store_true', help='Only recreate the failed simulations but suppress running (default is to automatically run)') p.add_argument('--deleteold', '-del', action='store_true', help='Delete the failed simulations after the new ones have been created (default is to not delete them). ' + 'If the sims are not deleted, they will be tagged for easier identification for manual deletion later')
##########################
[docs]def is_failed_sim_simple(hoststring, sim): return sim.state == SimulationState.Failed
# If you want to use a custom predicate to determine what is considered a 'failed' simulation, you can define # a function something like this and pass it in when calling this utility: # # def is_failed_sim(hoststring, sim): # if sim.state == SimulationState.Failed: # return True # # # Python pools get run in subprocesses (not threads), hence we're not logged in from that process and have to # # do so again. But these subprocesses also appear to get reused, so after the first time we login from the # # subprocess, we actually *will* already be logged in, so we'll get a bunch of "skipping login" messages. # # To avoid this spam, let's suppress logger messages from COMPS.Client temporarily. # from COMPS.Client import logger as client_logger # # client_logger.disabled = True # Client.login(hoststring) # client_logger.disabled = False # # fi = sim.retrieve_output_file_info(None) # # if not any(filter(lambda x: x.path_from_root == 'output' and x.friendly_name.startswith('RequiredReport_'), fi)): # return True # # return False ##########################
[docs]def rerun_sims(expid, createonly=False, deleteold=False, predicate=is_failed_sim_simple): exp = Experiment.get(expid) sims = exp.get_simulations(query_criteria=QueryCriteria().select_children(['files','tags','configuration'])) # Depending on what the user does in the predicate and the size of the experiment, this step can # take quite a while, so do it in a Pool to speed things up. This may sometimes be overkill (and # slower because of the overhead of cross-process stuff), but that's going to be in the very quick # scenarios anyway, and adding more logic seems like unnecessary complexity. with Pool() as p: results = p.map(partial(predicate, Client.auth_manager().hoststring), sims) # Not sure why this is needed, but it seems to flush out some weirdness between multiprocessing + logging # that is causing logging to file to be goofed up. *sigh* logger.debug('') sims_to_rerun = [ sims[i] for i in filter(lambda x: results[x] == True, range(len(results))) ] if len(sims_to_rerun) == 0: logger.info('No sims found to rerun') return logger.info(f'Found {len(sims_to_rerun)} sims to rerun') new_expid = None logger.info(f'Recreating simulations') for sim in sims_to_rerun: new_sim = clone_simulation(sim, new_expid, False) new_expid = new_sim.experiment_id if exp.id != new_expid: logger.warning('Current user is not the owner of the original experiment') logger.warning(f'Created new experiment to hold the new simulations: {new_expid}') exp = Experiment.get(new_expid) Simulation.save_all() logger.info('') if not createonly: logger.info('Recommissioning simulations') exp.commission() if deleteold: logger.info('Deleting old simulations') for sim in sims_to_rerun: sim.delete() else: if str(expid) == str(new_expid): logger.info('Tagging old simulations with \'ClonedToRerun\' tag for easier deletion later') for sim in sims_to_rerun: sim.merge_tags({'ClonedToRerun': None}) logger.info('Done')
[docs]def main(args): Client.login(args.comps_server) rerun_sims(args.experiment_id, args.createonly, args.deleteold)