Source code for idmtools_platform_comps.comps_operations.experiment_operations

"""idmtools comps experiment operations.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import copy
import os
from dataclasses import dataclass, field
from itertools import tee
from logging import getLogger, DEBUG
from typing import List, Dict, Type, Generator, NoReturn, Optional, TYPE_CHECKING
from uuid import UUID
from COMPS.Data import Experiment as COMPSExperiment, QueryCriteria, Configuration, Suite as COMPSSuite, \
    Simulation as COMPSSimulation
from COMPS.Data.Simulation import SimulationState
from idmtools import IdmConfigParser
from idmtools.assets import AssetCollection, Asset
from idmtools.core import ItemType, EntityStatus
from idmtools.core.experiment_factory import experiment_factory
from idmtools.core.logging import SUCCESS
from idmtools.entities import CommandLine
from idmtools.entities.experiment import Experiment
from idmtools.entities.iplatform_ops.iplatform_experiment_operations import IPlatformExperimentOperations
from idmtools.entities.templated_simulation import TemplatedSimulations
from idmtools.utils.collections import ExperimentParentIterator
from idmtools.utils.info import get_doc_base_url
from idmtools.utils.time import timestamp
from idmtools_platform_comps.utils.general import clean_experiment_name, convert_comps_status

if TYPE_CHECKING:  # pragma: no cover
    from idmtools_platform_comps.comps_platform import COMPSPlatform

logger = getLogger(__name__)
user_logger = getLogger('user')


[docs]@dataclass class CompsPlatformExperimentOperations(IPlatformExperimentOperations): """ Provides Experiment operations to the COMPSPlatform. """ platform: 'COMPSPlatform' # noqa F821 platform_type: Type = field(default=COMPSExperiment)
[docs] def get(self, experiment_id: UUID, columns: Optional[List[str]] = None, load_children: Optional[List[str]] = None, query_criteria: Optional[QueryCriteria] = None, **kwargs) -> COMPSExperiment: """ Fetch experiments from COMPS. Args: experiment_id: Experiment ID columns: Optional Columns. If not provided, id, name, and suite_id are fetched load_children: Optional Children. If not provided, tags and configuration are specified query_criteria: Optional QueryCriteria **kwargs: Returns: COMPSExperiment with items """ columns = columns or ["id", "name", "suite_id"] comps_children = load_children if load_children is not None else ["tags", "configuration"] query_criteria = query_criteria or QueryCriteria().select(columns).select_children(comps_children) try: result = COMPSExperiment.get( id=experiment_id, query_criteria=query_criteria ) except AttributeError as e: user_logger.error(f"The id {experiment_id} could not be converted to an UUID. Please verify your id") raise e return result
[docs] def pre_create(self, experiment: Experiment, **kwargs) -> NoReturn: """ Pre-create for Experiment. At moment, validation related to COMPS is all that is done. Args: experiment: Experiment to run pre-create for **kwargs: Returns: None """ if experiment.name is None: raise ValueError("Experiment name is required on COMPS") super().pre_create(experiment, **kwargs)
[docs] def platform_create(self, experiment: Experiment, num_cores: Optional[int] = None, executable_path: Optional[str] = None, command_arg: Optional[str] = None, priority: Optional[str] = None, check_command: bool = True, use_short_path: bool = False, **kwargs) -> COMPSExperiment: """ Create Experiment on the COMPS Platform. Args: experiment: IDMTools Experiment to create num_cores: Optional num of cores to allocate using MPI executable_path: Executable path command_arg: Command Argument priority: Priority of command check_command: Run task hooks on item use_short_path: When set to true, simulation roots will be set to "$COMPS_PATH(USER) **kwargs: Keyword arguments used to expand functionality. At moment these are usually not used Returns: COMPSExperiment that was created """ # TODO check experiment task supported # Cleanup the name experiment.name = clean_experiment_name(experiment.name) # Define the subdirectory subdirectory = experiment.name[0:self.platform.MAX_SUBDIRECTORY_LENGTH] + '_' + timestamp() if use_short_path: logger.debug("Setting Simulation Root to $COMPS_PATH(USER)") simulation_root = "$COMPS_PATH(USER)" subdirectory = 'rac' + '_' + timestamp() # also shorten subdirectory else: simulation_root = self.platform.simulation_root # Get the experiment command line exp_command: CommandLine = self._get_experiment_command_line(check_command, experiment) if command_arg is None and exp_command is not None: command_arg = exp_command.arguments + " " + exp_command.options if executable_path is None and exp_command is not None: executable_path = exp_command.executable # create initial configuration object comps_config = dict( environment_name=self.platform.environment, simulation_input_args=command_arg.strip() if command_arg is not None else None, working_directory_root=os.path.join(simulation_root, subdirectory).replace('\\', '/'), executable_path=executable_path, node_group_name=self.platform.node_group, maximum_number_of_retries=self.platform.num_retries, priority=self.platform.priority if priority is None else priority, min_cores=self.platform.num_cores if num_cores is None else num_cores, max_cores=self.platform.num_cores if num_cores is None else num_cores, exclusive=self.platform.exclusive ) if isinstance(experiment.simulations.items, TemplatedSimulations): scheduling = getattr(experiment.simulations.items.base_simulation, 'scheduling', False) elif isinstance(experiment.simulations.items, List): scheduling = getattr(experiment.simulations.items[0], 'scheduling', False) else: scheduling = False # kwargs.get("scheduling", False) case is for adding work_order file during sweeping call back which is too late # to detect the scheduling flag from the simulation if scheduling or kwargs.get("scheduling", False): import copy # save a copy of default config setattr(self.platform, 'comps_config', copy.deepcopy(comps_config)) # clear some not-supported parameters comps_config.update(executable_path=None, node_group_name=None, min_cores=None, max_cores=None, exclusive=None, simulation_input_args=None) if logger.isEnabledFor(DEBUG): logger.debug(f'COMPS Experiment Configs: {str(comps_config)}') config = Configuration(**comps_config) e = COMPSExperiment(name=experiment.name, configuration=config, suite_id=experiment.parent_id) # Add tags if present if experiment.tags: e.set_tags(experiment.tags) # Save the experiment e.save() # Set the ID back in the object experiment.uid = e.id # Send the assets for the experiment self.send_assets(experiment) return e
[docs] def platform_modify_experiment(self, experiment: Experiment, regather_common_assets: bool = False, **kwargs) -> Experiment: """ Executed when an Experiment is being ran that is already in Created, Done, In Progress, or Failed State. Args: experiment: Experiment to modify regather_common_assets: Triggers a new AC to be associated with experiment. It is important to note that when using this feature, ensure the previous simulations have finished provisioning. Failure to do so can lead to unexpected behaviour. Returns: Modified experiment. """ if logger.isEnabledFor(DEBUG): logger.debug( f"Experiment Status: {experiment.status}. " f"Modifying experiment: {experiment.id}. " f"Asset Editable: {experiment.assets.is_editable()}. " f"Regather assets: {regather_common_assets}." ) if experiment.status is not None and experiment.assets.is_editable() and regather_common_assets: experiment.pre_creation(self.platform, gather_assets=regather_common_assets) self.send_assets(experiment) else: user_logger.warning( f"Not gathering common assets again since experiment exists on platform. " f"If you need to add additional common assets, see " f"{get_doc_base_url()}cookbook/asset_collections.html#modifying-asset-collection" ) return experiment
def _get_experiment_command_line(self, check_command: bool, experiment: Experiment) -> CommandLine: """ Get the command line for COMPS. Args: check_command: Should we run the platform task hooks on comps? experiment: Experiment to get command line for Returns: Command line for Experiment """ from idmtools_platform_comps.utils.python_version import platform_task_hooks if isinstance(experiment.simulations, Generator): if logger.isEnabledFor(DEBUG): logger.debug("Simulations generator detected. Copying generator and using first task as command") sim_gen1, sim_gen2 = tee(experiment.simulations) experiment.simulations = sim_gen2 sim = next(sim_gen1) if check_command: task = platform_task_hooks(sim.task, self.platform) # run pre-creation in case task use it to produce the command line dynamically task.pre_creation(sim, self.platform) exp_command = task.command elif isinstance(experiment.simulations, ExperimentParentIterator) and isinstance(experiment.simulations.items, TemplatedSimulations): if logger.isEnabledFor(DEBUG): logger.debug("ParentIterator/TemplatedSimulations detected. Using base_task for command") from idmtools.entities.simulation import Simulation task = experiment.simulations.items.base_task if check_command: task = platform_task_hooks(task, self.platform) # run pre-creation in case task use it to produce the command line dynamically task.pre_creation(Simulation(task=task), self.platform) exp_command = task.command else: if logger.isEnabledFor(DEBUG): logger.debug("List of simulations detected. Using base_task for command") task = experiment.simulations[0].task if check_command: task = platform_task_hooks(task, self.platform) # run pre-creation in case task use it to produce the command line dynamically task.pre_creation(experiment.simulations[0], self.platform) exp_command = task.command return exp_command
[docs] def post_create(self, experiment: Experiment, **kwargs) -> NoReturn: """ Post create of experiment. The default behaviour is to display the experiment url if output is enabled. """ if IdmConfigParser.is_output_enabled(): user_logger.log(SUCCESS, f"\nThe created experiment can be viewed at {self.platform.endpoint}/#explore/" f"Simulations?filters=ExperimentId={experiment.uid}\nSimulations are still being created\n" ) super().post_create(experiment, **kwargs)
[docs] def post_run_item(self, experiment: Experiment, **kwargs): """ Ran after experiment. Nothing is done on comps other that alerting the user to the item. Args: experiment: Experiment to run post run item **kwargs: Returns: None """ super().post_run_item(experiment, **kwargs)
[docs] def get_children(self, experiment: COMPSExperiment, columns: Optional[List[str]] = None, children: Optional[List[str]] = None, **kwargs) -> List[COMPSSimulation]: """ Get children for a COMPSExperiment. Args: experiment: Experiment to get children of Comps Experiment columns: Columns to fetch. If not provided, id, name, experiment_id, and state will be loaded children: Children to load. If not provided, Tags will be loaded **kwargs: Returns: Simulations belonging to the Experiment """ columns = columns or ["id", "name", "experiment_id", "state"] children = children if children is not None else ["tags", "configuration", "files"] children = experiment.get_simulations(query_criteria=QueryCriteria().select(columns).select_children(children)) return children
[docs] def get_parent(self, experiment: COMPSExperiment, **kwargs) -> COMPSSuite: """ Get Parent of experiment. Args: experiment: Experiment to get parent of **kwargs: Returns: Suite of the experiment """ if experiment.suite_id is None: return None return self.platform._suites.get(experiment.suite_id, **kwargs)
[docs] def platform_run_item(self, experiment: Experiment, **kwargs): """ Run experiment on COMPS. Here we commission the experiment. Args: experiment: Experiment to run **kwargs: Returns: None """ if logger.isEnabledFor(DEBUG): logger.debug(f'Commissioning experiment: {experiment.uid}') # commission only if rules we have items in created or none. # TODO add new status to entity status to track commissioned as well instead of raw comps if any([s.status in [None, EntityStatus.CREATED] for s in experiment.simulations]) and any( [s.get_platform_object().state in [SimulationState.Created] for s in experiment.simulations]): po = experiment.get_platform_object() po.commission() # for now, we update here in the comps objects to reflect the new state for sim in experiment.simulations: spo = sim.get_platform_object() spo._state = SimulationState.CommissionRequested
[docs] def send_assets(self, experiment: Experiment, **kwargs): """ Send assets related to the experiment. Args: experiment: Experiment to send assets for **kwargs: Returns: None """ if experiment.assets.count == 0: logger.warning('Experiment has no assets to send') return ac = self.platform._assets.create(experiment.assets) if logger.isEnabledFor(DEBUG): logger.debug(f'Asset collection for experiment: {experiment.id} is: {ac.id}') # associate the assets with the experiment in COMPS e = COMPSExperiment.get(id=experiment.uid) e.configuration = Configuration(asset_collection_id=ac.id) e.save()
[docs] def refresh_status(self, experiment: Experiment, **kwargs): """ Reload status for experiment(load simulations). Args: experiment: Experiment to load status for **kwargs: Returns: None """ simulations = self.get_children(experiment.get_platform_object(), force=True, columns=["id", "state"], load_children=[]) for s in simulations: experiment.simulations.set_status_for_item(s.id, convert_comps_status(s.state))
[docs] def to_entity(self, experiment: COMPSExperiment, parent: Optional[COMPSSuite] = None, children: bool = True, **kwargs) -> Experiment: """ Converts a COMPSExperiment to an idmtools Experiment. Args: experiment: COMPS Experiment objet to convert parent: Optional suite parent children: Should we load children objects? **kwargs: Returns: Experiment """ # Recreate the suite if needed if experiment.suite_id is None: suite = kwargs.get('suite') else: if parent: suite = parent else: suite = kwargs.get('suite') or self.platform.get_item(experiment.suite_id, item_type=ItemType.SUITE) # Create an experiment experiment_type = experiment.tags.get("type") if experiment.tags is not None else "" obj = experiment_factory.create(experiment_type, tags=experiment.tags, name=experiment.name, fallback=Experiment) obj.platform = self.platform obj._platform_object = experiment # Set parent obj.parent = suite # Set the correct attributes obj.uid = experiment.id obj.comps_experiment = experiment # load assets first so children can access during their load obj.assets = self.get_assets_from_comps_experiment(experiment) if obj.assets is None: obj.assets = AssetCollection() # if we are loading the children, convert them if children: # Convert all simulations comps_sims = experiment.get_simulations( QueryCriteria().select( ["id", "name", "experiment_id", "state"] ).select_children( ["tags", "files", "configuration"] ) ) obj.simulations = [] for s in comps_sims: obj.simulations.append( self.platform._simulations.to_entity(s, parent=obj, **kwargs) ) return obj
[docs] def get_assets_from_comps_experiment(self, experiment: COMPSExperiment) -> Optional[AssetCollection]: """ Get assets for a comps experiment. Args: experiment: Experiment to get asset collection for. Returns: AssetCollection if configuration is set and configuration.asset_collection_id is set. """ if experiment.configuration and experiment.configuration.asset_collection_id: return self.platform.get_item(experiment.configuration.asset_collection_id, ItemType.ASSETCOLLECTION) return None
[docs] def platform_list_asset(self, experiment: Experiment, **kwargs) -> List[Asset]: """ List assets for an experiment. Args: experiment: Experiment to list assets for. **kwargs: Returns: List of assets """ assets = [] if experiment.assets is None: po: COMPSExperiment = experiment.get_platform_object() ac = self.get_assets_from_comps_experiment(po) if ac: assets = ac.assets else: assets = copy.deepcopy(experiment.assets.assets) return assets
[docs] def create_sim_directory_map(self, experiment_id: str) -> Dict: """ Build simulation working directory mapping. Args: experiment_id: experiment id Returns: Dict of simulation id as key and working dir as value """ from idmtools_platform_comps.utils.linux_mounts import set_linux_mounts, clear_linux_mounts set_linux_mounts(self.platform) comps_exp = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=True, force=True) comps_sims = comps_exp.get_simulations(QueryCriteria().select(['id', 'state']).select_children('hpc_jobs')) sim_map = {str(sim.id): sim.hpc_jobs[-1].working_directory for sim in comps_sims if sim.hpc_jobs} clear_linux_mounts(self.platform) return sim_map
[docs] def platform_delete(self, experiment_id: str) -> None: """ Delete platform experiment. Args: experiment_id: experiment id Returns: None """ comps_exp = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=True) try: comps_exp.delete() except RuntimeError: logger.info(f"Could not delete the experiment ({comps_exp.id})...") return
[docs] def platform_cancel(self, experiment_id: str) -> None: """ Cancel platform experiment. Args: experiment_id: experiment id Returns: None """ def experiment_is_running(comps_exp): from COMPS.Data.Simulation import SimulationState for sim in comps_exp.get_simulations(): if sim.state not in (SimulationState.Succeeded, SimulationState.Failed, SimulationState.Canceled, SimulationState.Created, SimulationState.CancelRequested): return True return False comps_experiment = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=True) if comps_experiment and experiment_is_running(comps_experiment): comps_experiment.cancel()