Source code for COMPS.utils.get_output_files_for_experiment

import os
import logging
from functools import partial
from multiprocessing import Pool
from requests.exceptions import HTTPError

from COMPS import Client
from COMPS.Data import Experiment, QueryCriteria
from COMPS.Data.Simulation import SimulationState

logger = logging.getLogger(__name__)

##########################

utility_metadata = {
    'aliases': [ 'getexpout' ],
    'help': 'Download output files from each simulation in an experiment',
    'description': 'This utility downloads one or more output files from each simulation in an experiment.  By ' +
                   'default, files are written relative to the current directory, in the hierarchy:' + os.linesep +
                   '    ./<exp-id>/<sim-id>/<filename>' + os.linesep +
                   'but if calling from script, a custom function can be provided to control location and ' +
                   'name of the files written.',
    'epilog': '''examples:
  %(prog)s 11111111-2222-3333-4444-000000000000 insetchart.json
  %(prog)s 11111111-2222-3333-4444-000000000000 stdout.txt,stderr.txt
'''
}

[docs]def fill_parser(p): p.add_argument('experiment_id', help='Id of the experiment containing the simulations to download files from') p.add_argument('filename', help='Name(s) of the file(s) to download from each simulation. This can be a comma-delimited list, or an actual (python) list if calling from code') p.add_argument('--overwrite', '-ow', action='store_true', help='Overwrite local files if they already exist (default is to skip if a local file with the same subdir/name exists)') p.add_argument('--casesensitive', '-cs', action='store_true', help='Make filename comparisons case-sensitive (default is case-insensitive, i.e. ignoring case)')
########################## # The default path builder
[docs]def path_builder_simple(sim, filename): return os.path.join(str(sim.experiment_id), str(sim.id), filename)
# A sample, custom path builder that puts all the files in a single directory but modifies the output file names # to avoid collissions and distinguish between the output for the simulations.
[docs]def path_builder_single_dir(sim, filename): sp = os.path.splitext(filename) return os.path.join(str(sim.experiment_id), f'{sp[0]}_{str(sim.id)}{sp[1]}')
##########################
[docs]def get_files( experiment_id, files_to_get, overwrite=False, casesensitive=False, output_path_builder=path_builder_simple ): if not isinstance(files_to_get, list): files_to_get = [ files_to_get ] files_to_get_int = files_to_get if casesensitive else [ f.lower() for f in files_to_get ] exp = Experiment.get(experiment_id) logger.info(f'Found experiment {exp.id}') sims = exp.get_simulations() logger.info(f'{len(sims)} child simulations found') # sims that haven't finished Provisioning yet don't have an hpc-job / working-directory, so filter # to only the potentially valid set of sims valid_sims = [ s for s in sims if s.state.value >= SimulationState.Commissioned.value ] if len(valid_sims) < len(sims): logger.warning(f'!!! WARNING !!! Sims cannot have output downloaded prior to completing commissioning') if len(valid_sims) == 0: logger.warning(f'No valid simulations to attempt file download for') else: logger.warning(f'Only attempting file download for {len(valid_sims)} simulations') with Pool() as p: results = p.map(partial(_get_files_internal, Client.auth_manager().hoststring, files_to_get_int, overwrite, casesensitive, output_path_builder), valid_sims) hit_fileexists = any([r[0] for r in results]) missing_files = any([r[1] for r in results]) if missing_files: logger.warning('') logger.warning(f'Couldn\'t find files matching requested for some sims (possible typo, casing issue, or requested an input-asset?)') if hit_fileexists: logger.warning('') logger.warning('Skipped downloading of some files because they already exist locally. Rerun using the overwrite argument if you want these overwritten instead')
def _get_files_internal(hoststring, files_to_get_int, overwrite, casesensitive, output_path_builder, sim): # Python pools get run in subprocesses (not threads), hence we're not logged in from that process and have to # do so again. But these subprocesses also appear to get reused, so after the first time we login from the # subprocess, we actually *will* already be logged in, so we'll get a bunch of "skipping login" messages. # To avoid this spam, let's suppress logger messages from COMPS.Client temporarily. from COMPS.Client import logger as client_logger client_logger.disabled = True Client.login(hoststring) client_logger.disabled = False try: so = sim.retrieve_output_file_info(None) except HTTPError as e: sim.refresh(QueryCriteria().select_children(['hpc_jobs'])) if not sim.hpc_jobs or len(sim.hpc_jobs) == 0: logger.warning(f'No hpc-job found for simulation {sim.id}') return (False, True) raise e logger.debug(f'sim {sim.id} - {len(so)} output files found') found_file_num = 0 hit_fileexists = False missing_files = False for ofmd in so: fn_comp = ofmd.friendly_name if casesensitive else ofmd.friendly_name.lower() if fn_comp in files_to_get_int: found_file_num += 1 filepath = output_path_builder(sim, ofmd.friendly_name) pn = os.path.dirname(filepath) if not os.path.exists(pn): os.makedirs(pn, exist_ok=True) logger.info(filepath) oba = sim.retrieve_output_files_from_info([ofmd]) try: with open(filepath, 'wb' if overwrite else 'xb') as outfile: outfile.write(oba[0]) except FileExistsError as e: logger.error(f'Output file already exists at {filepath}. Skipping...') logger.debug(e, exc_info=True) hit_fileexists = True # if we didn't find enough matching files, spit out a message if found_file_num < len(files_to_get_int): logger.warning(f'Didn\'t find files to match all requested for sim {sim.id}') missing_files = True return (hit_fileexists, missing_files)
[docs]def main(args): Client.login(args.comps_server) get_files(args.experiment_id, args.filename.split(','), args.overwrite, args.casesensitive)