Source code for idmtools_platform_slurm.platform_operations.experiment_operations
"""
Here we implement the SlurmPlatform experiment operations.
Copyright 2025, Gates Foundation. All rights reserved.
"""
import os
from pathlib import Path
from dataclasses import dataclass
from typing import TYPE_CHECKING
from idmtools.core import EntityStatus
from idmtools.core import ItemType
from idmtools.entities.experiment import Experiment
from idmtools_platform_file.platform_operations.experiment_operations import FilePlatformExperimentOperations
from logging import getLogger
logger = getLogger(__name__)
user_logger = getLogger('user')
if TYPE_CHECKING:
from idmtools_platform_slurm.slurm_platform import SlurmPlatform
[docs]@dataclass
class SlurmPlatformExperimentOperations(FilePlatformExperimentOperations):
platform: 'SlurmPlatform' # noqa: F821
RUN_SIMULATION_SCRIPT_PATH = Path(__file__).parent.parent.joinpath('assets/run_simulation.sh')
[docs] def platform_run_item(self, experiment: Experiment, dry_run: bool = False, **kwargs):
"""
Run experiment.
Args:
experiment: idmtools Experiment
dry_run: True/False
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
# Ensure parent
super().platform_run_item(experiment, **kwargs)
# Commission
if not dry_run:
self.platform.submit_job(experiment, **kwargs)
[docs] def refresh_status(self, experiment: Experiment, **kwargs):
"""
Refresh status of experiment.
Args:
experiment: idmtools Experiment
kwargs: keyword arguments used to expand functionality
Returns:
Dict of simulation id as key and working dir as value
"""
# Check if file job_id.txt exists
job_id_path = self.platform.get_directory(experiment).joinpath('job_id.txt')
if not job_id_path.exists():
logger.debug(f'job_id is not available for experiment: {experiment.id}')
return
# Refresh status for each simulation
for sim in experiment.simulations:
sim.status = self.platform.get_simulation_status(sim.id, **kwargs)
[docs] def platform_cancel(self, experiment_id: str, force: bool = True) -> None:
"""
Cancel platform experiment's slurm job.
Args:
experiment_id: experiment id
force: bool, True/False
Returns:
Any
"""
experiment = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=False)
if force or experiment.status == EntityStatus.RUNNING:
logger.debug(f"cancel slurm job for experiment: {experiment_id}...")
job_id = self.platform.get_job_id(experiment_id, ItemType.EXPERIMENT)
if job_id is None:
logger.debug(f"Slurm job for experiment: {experiment_id} is not available!")
else:
result = self.platform._op_client.cancel_job(job_id)
user_logger.info(f"Cancel Experiment {experiment_id}: {result}")
else:
user_logger.info(f"Experiment {experiment_id} is not running, no cancel needed...")
[docs] def post_run_item(self, experiment: Experiment, **kwargs):
"""
Trigger right after commissioning experiment on platform.
Args:
experiment: Experiment just commissioned
kwargs: keyword arguments used to expand functionality
Returns:
None
"""
super().post_run_item(experiment, **kwargs)
job_ids = self.platform.get_job_id(experiment.id, ItemType.EXPERIMENT)
if job_ids is None:
logger.debug(f"Slurm job for experiment: {experiment.id} is not available!")
user_logger.info("Slurm Job Ids: None")
else:
job_ids = [f'{" ".ljust(3)}{id}' for id in job_ids]
user_logger.info(f"Slurm Job Ids ({len(job_ids)}):")
user_logger.info('\n'.join(job_ids))
user_logger.info(
f'\nYou may try the following command to check simulations running status: \n idmtools slurm {os.path.abspath(self.platform.job_directory)} status --exp-id {experiment.id}')