Source code for idmtools_platform_slurm.slurm_operations.local_operations

"""
Here we implement the SlurmPlatform local operations.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import shlex
import shutil
import subprocess
from dataclasses import dataclass
from logging import getLogger
from pathlib import Path
from idmtools import IdmConfigParser
from typing import Union, Any, List
from idmtools.core import ItemType, EntityStatus
from idmtools.entities import Suite
from idmtools.entities.experiment import Experiment
from idmtools.entities.simulation import Simulation
from idmtools_platform_slurm.assets import generate_batch, generate_script, generate_simulation_script
from idmtools_platform_slurm.slurm_operations.operations_interface import SlurmOperations
from idmtools_platform_slurm.slurm_operations.slurm_constants import SLURM_MAPS

logger = getLogger(__name__)


[docs]@dataclass class LocalSlurmOperations(SlurmOperations):
[docs] def get_directory(self, item: Union[Suite, Experiment, Simulation]) -> Path: """ Get item's path. Args: item: Suite, Experiment, Simulation Returns: item file directory """ if isinstance(item, Suite): item_dir = Path(self.platform.job_directory, item.id) elif isinstance(item, Experiment): suite_id = item.parent_id or item.suite_id if suite_id is None: raise RuntimeError("Experiment missing parent!") suite_dir = Path(self.platform.job_directory, str(suite_id)) item_dir = Path(suite_dir, item.id) elif isinstance(item, Simulation): exp = item.parent if exp is None: raise RuntimeError("Simulation missing parent!") exp_dir = self.get_directory(exp) item_dir = Path(exp_dir, item.id) else: raise RuntimeError(f"Get directory is not supported for {type(item)} object on SlurmPlatform") return item_dir
[docs] def get_directory_by_id(self, item_id: str, item_type: ItemType) -> Path: """ Get item's path. Args: item_id: entity id (Suite, Experiment, Simulation) item_type: the type of items (Suite, Experiment, Simulation) Returns: item file directory """ if item_type is ItemType.SIMULATION: pattern = f"*/*/{item_id}" elif item_type is ItemType.EXPERIMENT: pattern = f"*/{item_id}" elif item_type is ItemType.SUITE: pattern = f"{item_id}" else: raise RuntimeError(f"Unknown item type: {item_type}") root = Path(self.platform.job_directory) for item_path in root.glob(pattern=pattern): return item_path raise RuntimeError(f"Not found path for item_id: {item_id} with type: {item_type}.")
[docs] def mk_directory(self, item: Union[Suite, Experiment, Simulation] = None, dest: Union[Path, str] = None, exist_ok: bool = None) -> None: """ Make a new directory. Args: item: Suite/Experiment/Simulation dest: the folder path exist_ok: True/False Returns: None """ if dest is not None: target = Path(dest) elif isinstance(item, (Suite, Experiment, Simulation)): target = self.get_directory(item) else: raise RuntimeError('Only support Suite/Experiment/Simulation or not None dest.') exist_ok = exist_ok if exist_ok is not None else IdmConfigParser.get_option(option="EXIST_ITEM_DIR", fallback=self.platform.dir_exist_ok) if not exist_ok and os.path.exists(target): raise RuntimeError( f'Item directory {target} already exists. Exist_ok flag set to false to avoid data being overwritten.') else: target.mkdir(parents=True, exist_ok=exist_ok)
[docs] @staticmethod def update_script_mode(script_path: Union[Path, str], mode: int = 0o777) -> None: """ Change file mode. Args: script_path: script path mode: permission mode Returns: None """ script_path = Path(script_path) script_path.chmod(mode)
[docs] def make_command_executable(self, simulation: Simulation) -> None: """ Make simulation command executable Args: simulation: idmtools Simulation Returns: None """ exe = simulation.task.command.executable if exe == 'singularity': # split the command cmd = shlex.split(simulation.task.command.cmd.replace("\\", "/")) # get real executable exe = cmd[3] sim_dir = self.get_directory(simulation) exe_path = sim_dir.joinpath(exe) # see if it is a file if exe_path.exists(): exe = exe_path elif shutil.which(exe) is not None: exe = Path(shutil.which(exe)) else: logger.debug(f"Failed to find executable: {exe}") exe = None try: if exe and not os.access(exe, os.X_OK): self.update_script_mode(exe) except: logger.debug(f"Failed to change file mode for executable: {exe}")
[docs] def create_batch_file(self, item: Union[Experiment, Simulation], max_running_jobs: int = None, retries: int = None, array_batch_size: int = None, dependency: bool = True, **kwargs) -> None: """ Create batch file. Args: item: the item to build batch file for kwargs: keyword arguments used to expand functionality. Returns: None """ if isinstance(item, Experiment): generate_batch(self.platform, item, max_running_jobs, array_batch_size, dependency) generate_script(self.platform, item, max_running_jobs) elif isinstance(item, Simulation): generate_simulation_script(self.platform, item, retries) else: raise NotImplementedError(f"{item.__class__.__name__} is not supported for batch creation.")
[docs] def submit_job(self, item: Union[Experiment, Simulation], **kwargs) -> None: """ Submit a Slurm job. Args: item: idmtools Experiment or Simulation kwargs: keyword arguments used to expand functionality Returns: None """ if isinstance(item, Experiment): working_directory = self.get_directory(item) subprocess.run(['bash', 'batch.sh'], stdout=subprocess.PIPE, cwd=str(working_directory)) elif isinstance(item, Simulation): pass else: raise NotImplementedError(f"Submit job is not implemented on SlurmPlatform.")
[docs] def get_simulation_status(self, sim_id: str, **kwargs) -> EntityStatus: """ Retrieve simulation status. Args: sim_id: Simulation ID kwargs: keyword arguments used to expand functionality Returns: EntityStatus """ # Workaround (cancelling job not output -1): check if slurm job finished sim_dir = self.get_directory_by_id(sim_id, ItemType.SIMULATION) # Check process status job_status_path = sim_dir.joinpath('job_status.txt') if job_status_path.exists(): status = open(job_status_path).read().strip() if status in ['100', '0', '-1']: status = SLURM_MAPS[status] else: status = SLURM_MAPS['100'] # To be safe else: status = SLURM_MAPS['None'] return status
[docs] def create_file(self, file_path: str, content: str) -> None: """ Create a file with given content and file path. Args: file_path: the full path of the file to be created content: file content Returns: Nothing """ with open(file_path, 'w') as f: f.write(content)
# region: Cancel Slurm Job
[docs] @staticmethod def cancel_job(job_ids: Union[str, List[str]]) -> Any: """ Cancel Slurm job for given job ids. Args: job_ids: slurm jobs id Returns: Any """ if isinstance(job_ids, str): job_ids = [job_ids] logger.debug(f"Submit slurm cancel job: {job_ids}") result = subprocess.run(['scancel', *job_ids], stdout=subprocess.PIPE) stdout = "Success" if result.returncode == 0 else 'Error' return stdout
[docs] def get_job_id(self, item_id: str, item_type: ItemType) -> List: """ Retrieve the job id for item that had been run. Args: item_id: id of experiment/simulation item_type: ItemType (Experiment or Simulation) Returns: List of slurm job ids """ if item_type not in (ItemType.EXPERIMENT, ItemType.SIMULATION): raise RuntimeError(f"Not support item type: {item_type}") item_dir = self.get_directory_by_id(item_id, item_type) job_id_file = item_dir.joinpath('job_id.txt') if not job_id_file.exists(): logger.debug(f"{job_id_file} not found.") return None job_id = open(job_id_file).read().strip() return job_id.split('\n')
# endregion