Source code for idmtools_platform_slurm.platform_operations.simulation_operations
"""
Here we implement the SlurmPlatform simulation operations.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Dict, Type, Optional, Union, Any
import shutil
from idmtools.assets import Asset
from idmtools.core import ItemType, EntityStatus
from idmtools.entities.experiment import Experiment
from idmtools.entities.simulation import Simulation
from idmtools.entities.iplatform_ops.iplatform_simulation_operations import IPlatformSimulationOperations
from idmtools_platform_slurm.platform_operations.utils import SlurmSimulation, SlurmExperiment, clean_experiment_name
from logging import getLogger
logger = getLogger(__name__)
user_logger = getLogger('user')
if TYPE_CHECKING:
from idmtools_platform_slurm.slurm_platform import SlurmPlatform
logger = getLogger(__name__)
[docs]@dataclass
class SlurmPlatformSimulationOperations(IPlatformSimulationOperations):
platform: 'SlurmPlatform' # noqa: F821
platform_type: Type = field(default=SlurmSimulation)
[docs] def get(self, simulation_id: str, **kwargs) -> Dict:
"""
Gets a simulation from the Slurm platform.
Args:
simulation_id: Simulation id
kwargs: keyword arguments used to expand functionality
Returns:
Slurm Simulation object
"""
metas = self.platform._metas.filter(item_type=ItemType.SIMULATION, property_filter={'id': str(simulation_id)})
if len(metas) > 0:
# update status - data analysis may need this
slurm_sim = SlurmSimulation(metas[0])
slurm_sim.status = self.platform._op_client.get_simulation_status(slurm_sim.id)
return slurm_sim
else:
raise RuntimeError(f"Not found Simulation with id '{simulation_id}'")
[docs] def platform_create(self, simulation: Simulation, **kwargs) -> SlurmSimulation:
"""
Create the simulation on Slurm Platform.
Args:
simulation: Simulation
kwargs: keyword arguments used to expand functionality
Returns:
Slurm Simulation object created.
"""
simulation.name = clean_experiment_name(simulation.experiment.name if not simulation.name else simulation.name)
# Generate Simulation folder structure
self.platform._op_client.mk_directory(simulation, exist_ok=False)
meta = self.platform._metas.dump(simulation)
self.platform._assets.link_common_assets(simulation)
self.platform._assets.dump_assets(simulation)
self.platform._op_client.create_batch_file(simulation, **kwargs)
# Make command executable
self.platform._op_client.make_command_executable(simulation)
# Return Slurm Simulation
slurm_sim = SlurmSimulation(meta)
return slurm_sim
[docs] def get_parent(self, simulation: SlurmSimulation, **kwargs) -> SlurmExperiment:
"""
Fetches the parent of a simulation.
Args:
simulation: Slurm Simulation
kwargs: keyword arguments used to expand functionality
Returns:
The Experiment being the parent of this simulation.
"""
if simulation.parent_id is None:
return None
else:
return self.platform._experiments.get(simulation.parent_id, raw=True, **kwargs)
[docs] def platform_run_item(self, simulation: Simulation, **kwargs):
"""
For simulations on slurm, we let the experiment execute with sbatch
Args:
simulation: idmtools Simulation
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
pass
[docs] def send_assets(self, simulation: Simulation, **kwargs):
"""
Send assets.
Replaced by self.platform._metas.dump(simulation)
Args:
simulation: idmtools Simulation
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
pass
[docs] def get_assets(self, simulation: Simulation, files: List[str], **kwargs) -> Dict[str, bytearray]:
"""
Get assets for simulation.
Args:
simulation: idmtools Simulation
files: files to be retrieved
kwargs: keyword arguments used to expand functionality
Returns:
Dict[str, bytearray]
"""
ret = self.platform._assets.get_assets(simulation, files, **kwargs)
return ret
[docs] def list_assets(self, simulation: Simulation, **kwargs) -> List[Asset]:
"""
List assets for simulation.
Args:
simulation: idmtools Simulation
kwargs: keyword arguments used to expand functionality
Returns:
List[Asset]
"""
ret = self.platform._assets.list_assets(simulation, **kwargs)
return ret
[docs] def to_entity(self, slurm_sim: SlurmSimulation, parent: Optional[Experiment] = None, **kwargs) -> Simulation:
"""
Convert a SlurmSimulation object to idmtools Simulation.
Args:
slurm_sim: simulation to convert
parent: optional experiment object
kwargs: keyword arguments used to expand functionality
Returns:
Simulation object
"""
if parent is None:
parent = self.platform.get_item(slurm_sim.parent_id, ItemType.EXPERIMENT, force=True)
sim = Simulation(task=None)
sim.platform = self.platform
sim.uid = slurm_sim.uid
sim.name = slurm_sim.name
sim.parent_id = parent.id
sim.parent = parent
sim.tags = slurm_sim.tags
sim._platform_object = slurm_sim
# Convert status
sim.status = slurm_sim.status
return sim
[docs] def refresh_status(self, simulation: Simulation, **kwargs):
"""
Refresh simulation status: we actually don't really refresh simulation' status directly.
Args:
simulation: idmtools Simulation
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
raise NotImplementedError("Refresh simulation status is not called directly on the Slurm Platform")
[docs] def create_sim_directory_map(self, simulation_id: str) -> Dict:
"""
Build simulation working directory mapping.
Args:
simulation_id: simulation id
Returns:
Dict of simulation id as key and working dir as value
"""
sim = self.platform.get_item(simulation_id, ItemType.SIMULATION, raw=False)
return {sim.id: str(self.platform._op_client.get_directory_by_id(simulation_id, ItemType.SIMULATION))}
[docs] def platform_delete(self, sim_id: str) -> None:
"""
Delete platform simulation.
Args:
sim_id: platform simulation id
Returns:
None
"""
sim = self.platform.get_item(sim_id, ItemType.SIMULATION, raw=False)
try:
shutil.rmtree(self.platform._op_client.get_directory(sim))
except RuntimeError:
logger.info(f"Could not delete the simulation: {sim_id}..")
return
[docs] def platform_cancel(self, sim_id: str, force: bool = False) -> Any:
"""
Cancel platform simulation's slurm job.
Args:
sim_id: simulation id
force: bool, True/False
Returns:
Any
"""
sim = self.platform.get_item(sim_id, ItemType.SIMULATION, raw=False)
if force or sim.status == EntityStatus.RUNNING:
logger.debug(f"cancel slurm job for simulation: {sim_id}...")
job_id = self.platform._op_client.get_job_id(sim_id, ItemType.SIMULATION)
if job_id is None:
logger.debug(f"Slurm job for simulation: {sim_id} is not available!")
return
else:
result = self.platform._op_client.cancel_job(job_id)
user_logger.info(f"Cancel Simulation: {sim_id}: {result}")
return result
else:
user_logger.info(f"Simulation {sim_id} is not running, no cancel needed...")