Source code for idmtools_platform_comps.comps_operations.simulation_operations

"""idmtools comps simulation operations.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import suppress
from threading import Lock
import time
import json
import uuid
from dataclasses import dataclass, field
from COMPS.Data.Simulation import SimulationState
from functools import partial
from logging import getLogger, DEBUG
from typing import Any, List, Dict, Type, Optional, TYPE_CHECKING
from uuid import UUID
from COMPS.Data import Simulation as COMPSSimulation, QueryCriteria, Experiment as COMPSExperiment, SimulationFile, \
    Configuration
from idmtools.assets import AssetCollection, Asset
from idmtools.core import ItemType, EntityStatus
from idmtools.core.task_factory import TaskFactory
from idmtools.entities import CommandLine
from idmtools.entities.command_task import CommandTask
from idmtools.entities.experiment import Experiment
from idmtools.entities.iplatform_ops.iplatform_simulation_operations import IPlatformSimulationOperations
from idmtools.entities.iplatform_ops.utils import batch_create_items
from idmtools.entities.simulation import Simulation
from idmtools.utils.json import IDMJSONEncoder
from idmtools_platform_comps.utils.general import convert_comps_status, get_asset_for_comps_item, clean_experiment_name
from idmtools_platform_comps.utils.scheduling import scheduled

if TYPE_CHECKING:  # pragma: no cover
    from idmtools_platform_comps.comps_platform import COMPSPlatform

logger = getLogger(__name__)
user_logger = getLogger('user')

# Track commissioning in batches
COMPS_EXPERIMENT_BATCH_COMMISSION_LOCK = Lock()
COMPS_EXPERIMENT_BATCH_COMMISSION_TIMESTAMP = 0


[docs]def comps_batch_worker(simulations: List[Simulation], interface: 'CompsPlatformSimulationOperations', executor, num_cores: Optional[int] = None, priority: Optional[str] = None, asset_collection_id: str = None, min_time_between_commissions: int = 10, **kwargs) -> List[COMPSSimulation]: """ Run batch worker. Args: simulations: Batch of simulation to process interface: SimulationOperation Interface executor: Thread/process executor num_cores: Optional Number of core to allocate for MPI priority: Optional Priority to set to asset_collection_id: Override asset collection id min_time_between_commissions: Minimum amount of time(in seconds) between calls to commission on an experiment extra info for Returns: List of Comps Simulations """ global COMPS_EXPERIMENT_BATCH_COMMISSION_LOCK, COMPS_EXPERIMENT_BATCH_COMMISSION_TIMESTAMP if logger.isEnabledFor(DEBUG): logger.debug(f'Converting {len(simulations)} to COMPS') created_simulations = [] new_sims = 0 for simulation in simulations: if simulation.status is None: interface.pre_create(simulation) new_sims += 1 simulation.platform = interface.platform simulation._platform_object = interface.to_comps_sim(simulation, num_cores=num_cores, priority=priority, asset_collection_id=asset_collection_id, **kwargs) created_simulations.append(simulation) if logger.isEnabledFor(DEBUG): logger.debug(f'Finished converting to COMPS. Starting saving of {len(simulations)}') COMPSSimulation.save_all(None, save_semaphore=COMPSSimulation.get_save_semaphore()) if logger.isEnabledFor(DEBUG): logger.debug(f'Finished saving of {len(simulations)}. Starting post_create') for simulation in simulations: simulation.uid = simulation.get_platform_object().id simulation.status = convert_comps_status(simulation.get_platform_object().state) interface.post_create(simulation) if logger.isEnabledFor(DEBUG): logger.debug(f'Finished post-create of {len(simulations)}') current_time = time.time() # check current commission queue and last commission call if new_sims > 0 and current_time - COMPS_EXPERIMENT_BATCH_COMMISSION_TIMESTAMP > min_time_between_commissions: # be aggressive in waiting on lock. Worse case, another thread triggers this near same time locked = COMPS_EXPERIMENT_BATCH_COMMISSION_LOCK.acquire(timeout=0.015) if locked: # do commission asynchronously. If it fine if we happen to miss a commission def do_commission(): with suppress(RuntimeError): simulations[0].experiment.get_platform_object().commission() executor.submit(do_commission) COMPS_EXPERIMENT_BATCH_COMMISSION_TIMESTAMP = current_time COMPS_EXPERIMENT_BATCH_COMMISSION_LOCK.release() for simulation in simulations: simulation.status = EntityStatus.RUNNING return simulations
[docs]@dataclass class CompsPlatformSimulationOperations(IPlatformSimulationOperations): """Provides simuilation operations to COMPSPlatform.""" platform: 'COMPSPlatform' # noqa F821 platform_type: Type = field(default=COMPSSimulation)
[docs] def get(self, simulation_id: UUID, columns: Optional[List[str]] = None, load_children: Optional[List[str]] = None, query_criteria: Optional[QueryCriteria] = None, **kwargs) -> COMPSSimulation: """ Get Simulation from Comps. Args: simulation_id: ID columns: Optional list of columns to load. Defaults to "id", "name", "experiment_id", "state" load_children: Optional children to load. Defaults to "tags", "configuration" query_criteria: Optional query_criteria object to use your own custom criteria object **kwargs: Returns: COMPSSimulation """ columns = columns or ["id", "name", "experiment_id", "state"] children = load_children if load_children is not None else ["tags", "configuration", "files"] query_criteria = query_criteria or QueryCriteria().select(columns).select_children(children) return COMPSSimulation.get( id=simulation_id, query_criteria=query_criteria )
[docs] def platform_create(self, simulation: Simulation, num_cores: int = None, priority: str = None, enable_platform_task_hooks: bool = True, asset_collection_id: str = None, **kwargs) -> COMPSSimulation: """ Create Simulation on COMPS. Args: simulation: Simulation to create num_cores: Optional number of MPI Cores to allocate priority: Priority to load enable_platform_task_hooks: Should platform task hoooks be ran asset_collection_id: Override for asset collection id on sim **kwargs: Expansion fields Returns: COMPS Simulation """ from idmtools_platform_comps.utils.python_version import platform_task_hooks if enable_platform_task_hooks: simulation.task = platform_task_hooks(simulation.task, self.platform) s = self.to_comps_sim(simulation, num_cores=num_cores, priority=priority, asset_collection_id=asset_collection_id, **kwargs) COMPSSimulation.save(s, save_semaphore=COMPSSimulation.get_save_semaphore()) return s
[docs] def to_comps_sim(self, simulation: Simulation, num_cores: int = None, priority: str = None, config: Configuration = None, asset_collection_id: str = None, **kwargs): """ Covert IDMTools object to COMPS Object. Args: simulation: Simulation object to convert num_cores: Optional Num of MPI Cores to allocate priority: Optional Priority config: Optional Configuration object asset_collection_id: **kwargs additional option for comps Returns: COMPS Simulation """ if config is None: if asset_collection_id and isinstance(asset_collection_id, str): asset_collection_id = uuid.UUID(asset_collection_id) kwargs['num_cores'] = num_cores kwargs['priority'] = priority kwargs['asset_collection_id'] = asset_collection_id kwargs.update(simulation._platform_kwargs) config = self.get_simulation_config_from_simulation(simulation, **kwargs) if simulation.name: simulation.name = clean_experiment_name(simulation.name) s = COMPSSimulation( name=clean_experiment_name(simulation.experiment.name if not simulation.name else simulation.name), experiment_id=simulation.parent_id, configuration=config ) self.send_assets(simulation, s, **kwargs) s.set_tags(simulation.tags) simulation._platform_object = s return s
[docs] def get_simulation_config_from_simulation(self, simulation: Simulation, num_cores: int = None, priority: str = None, asset_collection_id: str = None, **kwargs) -> \ Configuration: """ Get the comps configuration for a Simulation Object. Args: simulation: Simulation num_cores: Optional Num of core for MPI priority: Optional Priority asset_collection_id: Override simulation asset_collection_id **kwargs additional option for comps Returns: Configuration """ global_scheduling = kwargs.get("scheduling", False) sim_scheduling = scheduled(simulation) scheduling = global_scheduling or sim_scheduling comps_configuration = dict() if global_scheduling: config = getattr(self.platform, 'comps_config', {}) comps_exp_config = Configuration(**config) else: comps_exp: COMPSExperiment = simulation.parent.get_platform_object() comps_exp_config: Configuration = comps_exp.configuration if asset_collection_id: comps_configuration['asset_collection_id'] = asset_collection_id if num_cores is not None and num_cores != comps_exp_config.max_cores: logger.info(f'Overriding cores for sim to {num_cores}') comps_configuration['max_cores'] = num_cores comps_configuration['min_cores'] = num_cores if priority is not None and priority != comps_exp_config.priority: logger.info(f'Overriding priority for sim to {priority}') comps_configuration['priority'] = priority if comps_exp_config.executable_path != simulation.task.command.executable: logger.info(f'Overriding executable_path for sim to {simulation.task.command.executable}') from idmtools_platform_comps.utils.python_version import platform_task_hooks platform_task_hooks(simulation.task, self.platform) comps_configuration['executable_path'] = simulation.task.command.executable sim_task = simulation.task.command.arguments + " " + simulation.task.command.options sim_task = sim_task.strip() if comps_exp_config.simulation_input_args != sim_task: logger.info(f'Overriding simulation_input_args for sim to {sim_task}') comps_configuration['simulation_input_args'] = sim_task if logger.isEnabledFor(DEBUG): logger.debug(f'Simulation config: {str(comps_configuration)}') if scheduling: comps_configuration.update(executable_path=None, node_group_name=None, min_cores=None, max_cores=None, exclusive=None, simulation_input_args=None) return Configuration(**comps_configuration)
[docs] def batch_create(self, simulations: List[Simulation], num_cores: int = None, priority: str = None, asset_collection_id: str = None, **kwargs) -> List[COMPSSimulation]: """ Perform batch creation of Simulations. Args: simulations: Simulation to create num_cores: Optional MPI Cores to allocate per simulation priority: Optional Priority asset_collection_id: Asset collection id for sim(overide experiment) **kwargs: Future expansion Returns: List of COMPSSimulations that were created """ global COMPS_EXPERIMENT_BATCH_COMMISSION_TIMESTAMP executor = ThreadPoolExecutor() thread_func = partial(comps_batch_worker, interface=self, num_cores=num_cores, priority=priority, asset_collection_id=asset_collection_id, min_time_between_commissions=self.platform.min_time_between_commissions, executor=executor, **kwargs) results = batch_create_items( simulations, batch_worker_thread_func=thread_func, progress_description="Creating Simulations on Comps", unit="simulation" ) # Always commission again try: results[0].parent.get_platform_object().commission() except RuntimeError as ex: # occasionally we hit this because double commissioning. Its ok to ignore though because that means we have already commissioned this experiment if logger.isEnabledFor(DEBUG): logger.debug(f"COMMISSION Response: {ex.args}") COMPS_EXPERIMENT_BATCH_COMMISSION_TIMESTAMP = 0 # set commission here in comps objects to prevent commission in Experiment when batching for sim in results: sim.status = EntityStatus.RUNNING sim.get_platform_object()._state = SimulationState.CommissionRequested return results
[docs] def get_parent(self, simulation: Any, **kwargs) -> COMPSExperiment: """ Get the parent of the simulation. Args: simulation: Simulation to load parent for **kwargs: Returns: COMPSExperiment """ return self.platform._experiments.get(simulation.experiment_id, **kwargs) if simulation.experiment_id else None
[docs] def platform_run_item(self, simulation: Simulation, **kwargs): """For simulations, there is no running for COMPS.""" pass
[docs] def send_assets(self, simulation: Simulation, comps_sim: Optional[COMPSSimulation] = None, add_metadata: bool = False, **kwargs): """ Send assets to Simulation. Args: simulation: Simulation to send asset for comps_sim: Optional COMPSSimulation object to prevent reloading it add_metadata: Add idmtools metadata object **kwargs: Returns: None """ if comps_sim is None: comps_sim = simulation.get_platform_object() for asset in simulation.assets: if asset.filename.lower() == 'workorder.json' and scheduled(simulation): comps_sim.add_file(simulationfile=SimulationFile(asset.filename, 'WorkOrder'), data=asset.bytes) else: comps_sim.add_file(simulationfile=SimulationFile(asset.filename, 'input'), data=asset.bytes) # add metadata if add_metadata: if logger.isEnabledFor(DEBUG): logger.debug("Creating idmtools metadata for simulation and task on COMPS") # later we should add some filtering for passwords and such here in case anything weird happens metadata = json.dumps(simulation.task.to_dict(), cls=IDMJSONEncoder) from idmtools import __version__ comps_sim.add_file( SimulationFile("idmtools_metadata.json", 'input', description=f'IDMTools {__version__}'), data=metadata.encode() )
[docs] def refresh_status(self, simulation: Simulation, additional_columns: Optional[List[str]] = None, **kwargs): """ Refresh status of a simulation. Args: simulation: Simulation to refresh additional_columns: Optional additional columns to load from COMPS **kwargs: Returns: None """ cols = ['state'] if additional_columns: cols.extend(additional_columns) s = COMPSSimulation.get(id=simulation.uid, query_criteria=QueryCriteria().select(cols)) simulation.status = convert_comps_status(s.state)
[docs] def to_entity(self, simulation: COMPSSimulation, load_task: bool = False, parent: Optional[Experiment] = None, load_parent: bool = False, load_metadata: bool = False, load_cli_from_workorder: bool = False, **kwargs) -> Simulation: """ Convert COMPS simulation object to IDM Tools simulation object. Args: simulation: Simulation object load_task: Should we load tasks. Defaults to No. This can increase the load items on fetchs parent: Optional parent object to prevent reloads load_parent: Force load of parent(Beware, This could cause loading loops) load_metadata: Should we load metadata by default. If load task is enabled, this is also enabled load_cli_from_workorder: Used with COMPS scheduling where the CLI is defined in our workorder **kwargs: Returns: Simulation object """ # Recreate the experiment if needed if parent is None or load_parent: if logger.isEnabledFor(DEBUG): logger.debug(f'Loading parent {simulation.experiment_id}') parent = self.platform.get_item(simulation.experiment_id, item_type=ItemType.EXPERIMENT) # Get a simulation obj = Simulation() obj.platform = self.platform obj._platform_object = simulation obj.parent = parent obj.experiment = parent # Set its correct attributes obj.uid = simulation.id obj.tags = simulation.tags obj.status = convert_comps_status(simulation.state) if simulation.files: obj.assets = self.platform._assets.to_entity(simulation.files) # should we load metadata metadata = self.__load_metadata_from_simulation(obj) if load_metadata else None if load_task: self._load_task_from_simulation(obj, simulation, metadata) else: obj.task = None self.__extract_cli(simulation, parent, obj, load_cli_from_workorder) # call task load options(load configs from files, etc) obj.task.reload_from_simulation(obj) return obj
[docs] def get_asset_collection_from_comps_simulation(self, simulation: COMPSSimulation) -> Optional[AssetCollection]: """ Get assets from COMPS Simulation. Args: simulation: Simulation to get assets from Returns: Simulation Asset Collection, if any. """ if simulation.configuration and simulation.configuration.asset_collection_id: return self.platform.get_item(simulation.configuration.asset_collection_id, ItemType.ASSETCOLLECTION) return None
def _load_task_from_simulation(self, simulation: Simulation, comps_sim: COMPSSimulation, metadata: Dict = None): """ Load task from the simulation object. Args: simulation: Simulation to populate with task comps_sim: Experiment object metadata: Metadata loaded to be used in the task object Returns: None """ simulation.task = None if comps_sim.tags and 'task_type' in comps_sim.tags: # check for metadata if not metadata: metadata = self.__load_metadata_from_simulation(simulation) try: if logger.isEnabledFor(DEBUG): logger.debug(f"Metadata: {metadata}") simulation.task = TaskFactory().create(comps_sim.tags['task_type'], **metadata) except Exception as e: user_logger.warning(f"Could not load task of type {comps_sim.tags['task_type']}. " f"Received error {str(e)}") logger.exception(e) # ensure we have loaded the configuration if comps_sim.configuration is None: comps_sim.refresh(QueryCriteria().select_children('configuration')) def __extract_cli(self, comps_sim, parent, simulation, load_cli_from_workorder): cli = self._detect_command_line_from_simulation(parent, comps_sim, simulation, load_cli_from_workorder) # if we could not find task, set it now, otherwise rebuild the cli if simulation.task is None: simulation.task = CommandTask(CommandLine.from_string(cli)) else: simulation.task.command = CommandLine.from_string(cli) @staticmethod def __load_metadata_from_simulation(simulation) -> Dict[str, Any]: """ Load IDMTools metadata from a simulation. Args: simulation: Returns: Metadata if found Raise: FileNotFoundError error if metadata not found """ metadata = None for file in simulation.assets: if file.filename == "idmtools_metadata.json": # load the asset metadata = json.loads(file.content.decode('utf-8')) return metadata raise FileNotFoundError(f"Cannot find idmtools_metadata.json on the simulation {simulation.uid}") @staticmethod def _detect_command_line_from_simulation(experiment, comps_sim, simulation, load_cli_from_workorder=False): """ Detect Command Line from the Experiment/Simulation objects. Args: experiment: Experiment object comps_sim: Comps sim object simulation: Simulation(idmtools) object load_cli_from_workorder: Should we load metadata. we use this to determine if we should load our workorder.json Returns: CommandLine Raise: ValueError when command cannot be detected """ cli = None # do we have a configuration? po: COMPSExperiment = experiment.get_platform_object() if po.configuration is None: po.refresh(QueryCriteria().select_children('configuration')) # simulation configuration for executable? if comps_sim.configuration and comps_sim.configuration.executable_path: cli = f'{comps_sim.configuration.executable_path}' if comps_sim.configuration.simulation_input_args: cli += " " + comps_sim.configuration.simulation_input_args.strip() elif po.configuration and po.configuration.executable_path: cli = f'{po.configuration.executable_path} {po.configuration.simulation_input_args.strip()}' if cli is None: # check if we should try to load our workorder if load_cli_from_workorder: # filter for workorder workorder_obj = None for a in simulation.assets: if getattr(a, '_platform_object', None) and isinstance(a._platform_object, SimulationFile) and a._platform_object.file_type == "WorkOrder": workorder_obj = a break # if assets if workorder_obj: asset: Asset = workorder_obj wo = json.loads(asset.content.decode('utf-8')) cli = wo['Command'] else: raise ValueError("Could not detect cli") else: # if user doesn't care oabout metadata don't error logger.debug( "Could not load the cli from simulation. This is usually due to the use of scheduling config.") cli = "WARNING_CouldNotLoadCLI" elif logger.isEnabledFor(DEBUG): logger.debug(f"Detected task CLI {cli}") return cli
[docs] def get_assets(self, simulation: Simulation, files: List[str], include_experiment_assets: bool = True, **kwargs) -> \ Dict[str, bytearray]: """ Fetch the files associated with a simulation. Args: simulation: Simulation files: List of files to download include_experiment_assets: Should we also load experiment assets? **kwargs: Returns: Dictionary of filename -> ByteArray """ # since assets could be in the common assets, we should check that firs # load comps config first comps_sim: COMPSSimulation = simulation.get_platform_object(load_children=["files", "configuration"]) if include_experiment_assets and ( comps_sim.configuration is None or comps_sim.configuration.asset_collection_id is None): if logger.isEnabledFor(DEBUG): logger.debug("Gathering assets from experiment first") exp_assets = get_asset_for_comps_item(self.platform, simulation.experiment, files, self.cache, load_children=["configuration"]) if exp_assets is None: exp_assets = dict() else: exp_assets = dict() exp_assets.update(get_asset_for_comps_item(self.platform, simulation, files, self.cache, comps_item=comps_sim)) return exp_assets
[docs] def list_assets(self, simulation: Simulation, common_assets: bool = False, **kwargs) -> List[Asset]: """ List assets for a simulation. Args: simulation: Simulation to load data for common_assets: Should we load asset files **kwargs: Returns: AssetCollection """ comps_sim: COMPSSimulation = simulation.get_platform_object(load_children=["files", "configuration"]) assets = [] # load non comps objects if comps_sim.files: assets = self.platform._assets.to_entity(comps_sim.files).assets if common_assets: # here we are loading the simulation assets sa = self.get_asset_collection_from_comps_simulation(comps_sim) if sa: assets.extend(sa.assets) return assets
[docs] def retrieve_output_files(self, simulation: Simulation): """ Retrieve the output files for a simulation. Args: simulation: Simulation to fetch files for Returns: List of output files for simulation """ metadata = simulation.get_platform_object().retrieve_output_file_info([]) assets = self.platform._assets.to_entity(metadata).assets return assets
[docs] def all_files(self, simulation: Simulation, common_assets: bool = False, outfiles: bool = True, **kwargs) -> List[Asset]: """ Returns all files for a specific simulation including experiments or non-assets. Args: simulation: Simulation all files common_assets: Include experiment assets outfiles: Include output files **kwargs: Returns: AssetCollection """ ac = AssetCollection() ac.add_assets(self.list_assets(simulation, **kwargs)) if common_assets: ac.add_assets(simulation.parent.assets) if outfiles: ac.add_assets(self.retrieve_output_files(simulation)) return ac.assets
[docs] def create_sim_directory_map(self, simulation_id: str) -> Dict: """ Build simulation working directory mapping. Args: simulation_id: simulation id Returns: Dict of simulation id as key and working dir as value """ from idmtools_platform_comps.utils.linux_mounts import set_linux_mounts, clear_linux_mounts set_linux_mounts(self.platform) comps_sim = self.platform.get_item(simulation_id, ItemType.SIMULATION, query_criteria=QueryCriteria().select(['id', 'state']).select_children( 'hpc_jobs'), raw=True) sim_map = {str(comps_sim.id): comps_sim.hpc_jobs[-1].working_directory} clear_linux_mounts(self.platform) return sim_map