Source code for idmtools_platform_slurm.platform_operations.experiment_operations
"""
Here we implement the SlurmPlatform experiment operations.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import shutil
from pathlib import Path
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Type, Dict, Optional, Any
from idmtools.assets import Asset, AssetCollection
from idmtools.core import EntityStatus
from idmtools.core import ItemType
from idmtools.entities import Suite
from idmtools.entities.experiment import Experiment
from idmtools.entities.iplatform_ops.iplatform_experiment_operations import IPlatformExperimentOperations
from idmtools_platform_slurm.platform_operations.utils import SlurmExperiment, SlurmSimulation, SlurmSuite, \
add_dummy_suite
from logging import getLogger
logger = getLogger(__name__)
user_logger = getLogger('user')
if TYPE_CHECKING:
from idmtools_platform_slurm.slurm_platform import SlurmPlatform
[docs]@dataclass
class SlurmPlatformExperimentOperations(IPlatformExperimentOperations):
platform: 'SlurmPlatform' # noqa: F821
platform_type: Type = field(default=SlurmExperiment)
[docs] def get(self, experiment_id: str, **kwargs) -> Dict:
"""
Gets an experiment from the Slurm platform.
Args:
experiment_id: experiment id
kwargs: keyword arguments used to expand functionality
Returns:
Slurm Experiment object
"""
metas = self.platform._metas.filter(item_type=ItemType.EXPERIMENT, property_filter={'id': str(experiment_id)})
if len(metas) > 0:
return SlurmExperiment(metas[0])
else:
raise RuntimeError(f"Not found Experiment with id '{experiment_id}'")
[docs] def platform_create(self, experiment: Experiment, **kwargs) -> SlurmExperiment:
"""
Creates an experiment on Slurm Platform.
Args:
experiment: idmtools experiment
kwargs: keyword arguments used to expand functionality
Returns:
Slurm Experiment object created
"""
# ensure experiment's parent
experiment.parent_id = experiment.parent_id or experiment.suite_id
if experiment.parent_id is None:
suite = add_dummy_suite(experiment)
self.platform._suites.platform_create(suite)
# update parent
experiment.parent = suite
# Generate Suite/Experiment/Simulation folder structure
self.platform._op_client.mk_directory(experiment, exist_ok=False)
meta = self.platform._metas.dump(experiment)
self.platform._assets.dump_assets(experiment)
self.platform._op_client.create_batch_file(experiment, **kwargs)
# Copy file run_simulation.sh
run_simulation_script = Path(__file__).parent.parent.joinpath('assets/run_simulation.sh')
dest_script = Path(self.platform._op_client.get_directory(experiment)).joinpath('run_simulation.sh')
shutil.copy(str(run_simulation_script), str(dest_script))
# Make executable
self.platform._op_client.update_script_mode(dest_script)
# Return Slurm Experiment
return SlurmExperiment(meta)
[docs] def get_children(self, experiment: SlurmExperiment, parent: Experiment = None, raw=True, **kwargs) -> List[Any]:
"""
Fetch slurm experiment's children.
Args:
experiment: Slurm experiment
raw: True/False
parent: the parent of the simulations
kwargs: keyword arguments used to expand functionality
Returns:
List of slurm simulations
"""
sim_list = []
sim_meta_list = self.platform._metas.get_children(experiment)
for meta in sim_meta_list:
slurm_sim = SlurmSimulation(meta)
slurm_sim.status = self.platform._op_client.get_simulation_status(slurm_sim.id)
if raw:
sim_list.append(slurm_sim)
else:
sim = self.platform._simulations.to_entity(slurm_sim, parent=parent)
sim_list.append(sim)
return sim_list
[docs] def get_parent(self, experiment: SlurmExperiment, **kwargs) -> SlurmSuite:
"""
Fetches the parent of an experiment.
Args:
experiment: Slurm experiment
kwargs: keyword arguments used to expand functionality
Returns:
The Suite being the parent of this experiment.
"""
if experiment.parent_id is None:
return None
else:
return self.platform._suites.get(experiment.parent_id, raw=True, **kwargs)
[docs] def platform_run_item(self, experiment: Experiment, dry_run: bool = False, **kwargs):
"""
Run experiment.
Args:
experiment: idmtools Experiment
dry_run: True/False
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
# Ensure parent
experiment.parent.add_experiment(experiment)
self.platform._metas.dump(experiment.parent)
# Generate/update metadata
self.platform._metas.dump(experiment)
# Commission
if not dry_run:
self.platform._op_client.submit_job(experiment, **kwargs)
suite_id = experiment.parent_id or experiment.suite_id
# user_logger.info(f'job_id: {slurm_job_id}')
user_logger.info(f'job_directory: {Path(self.platform.job_directory).resolve()}')
user_logger.info(f'suite: {str(suite_id)}')
user_logger.info(f'experiment: {experiment.id}')
user_logger.info(f"\nExperiment Directory: \n{self.platform.get_directory(experiment)}")
[docs] def send_assets(self, experiment: Experiment, **kwargs):
"""
Copy our experiment assets.
Replaced by self.platform._assets.dump_assets(experiment)
Args:
experiment: idmtools Experiment
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
pass
[docs] def list_assets(self, experiment: Experiment, **kwargs) -> List[Asset]:
"""
List assets for an experiment.
Args:
experiment: Experiment to get assets for
kwargs:
Returns:
List[Asset]
"""
assets = self.platform._assets.list_assets(experiment, **kwargs)
return assets
[docs] def get_assets_from_slurm_experiment(self, experiment: SlurmExperiment) -> AssetCollection:
"""
Get assets for a comps experiment.
Args:
experiment: Experiment to get asset collection for.
Returns:
AssetCollection if configuration is set and configuration.asset_collection_id is set.
"""
assets = AssetCollection()
assets_dir = Path(self.platform._op_client.get_directory_by_id(experiment.id, ItemType.EXPERIMENT), 'Assets')
if assets_dir.exists():
assets_list = AssetCollection.assets_from_directory(assets_dir, recursive=True)
for a in assets_list:
assets.add_asset(a)
return assets
[docs] def to_entity(self, slurm_exp: SlurmExperiment, parent: Optional[Suite] = None, children: bool = True,
**kwargs) -> Experiment:
"""
Convert a SlurmExperiment to idmtools Experiment.
Args:
slurm_exp: simulation to convert
parent: optional experiment object
children: bool
kwargs:
Returns:
Experiment object
"""
if parent is None:
parent = self.platform.get_item(slurm_exp.parent_id, ItemType.SUITE, force=True)
exp = Experiment()
exp.platform = self.platform
exp.uid = slurm_exp.uid
exp.name = slurm_exp.name
exp.parent_id = parent.id
exp.parent = parent
exp.tags = slurm_exp.tags
exp._platform_object = slurm_exp
exp.simulations = []
exp.assets = self.get_assets_from_slurm_experiment(slurm_exp)
if exp.assets is None:
exp.assets = AssetCollection()
if children:
exp.simulations = self.get_children(slurm_exp, parent=exp, raw=False)
return exp
[docs] def refresh_status(self, experiment: Experiment, **kwargs):
"""
Refresh status of experiment.
Args:
experiment: idmtools Experiment
kwargs: keyword arguments used to expand functionality
Returns:
Dict of simulation id as key and working dir as value
"""
# Check if file job_id.txt exists
job_id_path = self.platform._op_client.get_directory(experiment).joinpath('job_id.txt')
if not job_id_path.exists():
logger.debug(f'job_id is not available for experiment: {experiment.id}')
return
# Refresh status for each simulation
for sim in experiment.simulations:
sim.status = self.platform._op_client.get_simulation_status(sim.id, **kwargs)
[docs] def create_sim_directory_map(self, experiment_id: str) -> Dict:
"""
Build simulation working directory mapping.
Args:
experiment_id: experiment id
Returns:
Dict of simulation id as key and working dir as value
"""
exp = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=False)
sims = exp.simulations
return {sim.id: str(self.platform._op_client.get_directory(sim)) for sim in sims}
[docs] def platform_delete(self, experiment_id: str) -> None:
"""
Delete platform experiment.
Args:
experiment_id: platform experiment id
Returns:
None
"""
exp = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=False)
try:
shutil.rmtree(self.platform._op_client.get_directory(exp))
except RuntimeError:
logger.info("Could not delete the associated experiment...")
return
[docs] def platform_cancel(self, experiment_id: str, force: bool = True) -> None:
"""
Cancel platform experiment's slurm job.
Args:
experiment_id: experiment id
force: bool, True/False
Returns:
Any
"""
experiment = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=False)
if force or experiment.status == EntityStatus.RUNNING:
logger.debug(f"cancel slurm job for experiment: {experiment_id}...")
job_id = self.platform._op_client.get_job_id(experiment_id, ItemType.EXPERIMENT)
if job_id is None:
logger.debug(f"Slurm job for experiment: {experiment_id} is not available!")
else:
result = self.platform._op_client.cancel_job(job_id)
user_logger.info(f"Cancel Experiment {experiment_id}: {result}")
else:
user_logger.info(f"Experiment {experiment_id} is not running, no cancel needed...")
[docs] def post_run_item(self, experiment: Experiment, **kwargs):
"""
Trigger right after commissioning experiment on platform.
Args:
experiment: Experiment just commissioned
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
super().post_run_item(experiment, **kwargs)
job_ids = self.platform._op_client.get_job_id(experiment.id, ItemType.EXPERIMENT)
if job_ids is None:
logger.debug(f"Slurm job for experiment: {experiment.id} is not available!")
user_logger.info("Slurm Job Ids: None")
else:
job_ids = [f'{" ".ljust(3)}{id}' for id in job_ids]
user_logger.info(f"Slurm Job Ids ({len(job_ids)}):")
user_logger.info('\n'.join(job_ids))
user_logger.info(
f'\nYou may try the following command to check simulations running status: \n idmtools slurm {os.path.abspath(self.platform.job_directory)} status --exp-id {experiment.id}')