Source code for idmtools_platform_slurm.slurm_operations.bridged_operations

"""
Here we implement the SlurmPlatform bridged operations.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import json
import os
import time
from dataclasses import dataclass
from logging import getLogger, INFO, DEBUG
from pathlib import Path
from typing import Union, Any, List
from uuid import uuid4
from idmtools.entities.experiment import Experiment
from idmtools.entities.simulation import Simulation
from idmtools_platform_slurm.slurm_operations.local_operations import LocalSlurmOperations

logger = getLogger(__name__)


[docs]def create_bridged_job(working_directory, bridged_jobs_directory, results_directory, cleanup_results: bool = True) -> None: """ Creates a bridged job. Args: working_directory: Work Directory bridged_jobs_directory: Jobs Directory results_directory: Results directory cleanup_results: Should we clean up results file Returns: None """ bridged_id = str(uuid4()) jn = Path(bridged_jobs_directory).joinpath(f'{bridged_id}.json') rf = Path(results_directory).joinpath(f'{bridged_id}.json.result') with open(jn, "w") as jout: info = dict(command='bash', working_directory=str(working_directory)) if logger.isEnabledFor(DEBUG): logger.debug(f"Requesting job: {jn} in {working_directory}") json.dump(info, jout) tries = 0 while tries < 15: time.sleep(1) if Path(rf).exists(): if logger.isEnabledFor(DEBUG): logger.debug(f"Found result job: {rf}") with open(rf, 'r') as rin: result = json.load(rin) if cleanup_results: try: if logger.isEnabledFor(DEBUG): logger.debug(f"Removing result: {rf}") os.unlink(rf) except: pass return tries += 1 if logger.isEnabledFor(DEBUG): logger.debug(f"Failed to get result from bridge") raise ValueError("FAILED: Bridge never reported result")
[docs]def cancel_bridged_job(job_ids: Union[str, List[str]], bridged_jobs_directory, results_directory, cleanup_results: bool = True) -> Any: """ Cancel a bridged job. Args: job_ids: slurm job list bridged_jobs_directory: Work Directory results_directory: Results directory cleanup_results: Should we clean up results file Returns: Result from scancel job """ if isinstance(job_ids, str): job_ids = [job_ids] bridged_id = str(uuid4()) jn = Path(bridged_jobs_directory).joinpath(f'{bridged_id}.json') rf = Path(results_directory).joinpath(f'{bridged_id}.json.result') with open(jn, "w") as jout: info = dict(command='scancel', job_ids=job_ids) if logger.isEnabledFor(DEBUG): logger.debug(f"Cancel job: {jn}") json.dump(info, jout) tries = 0 while tries < 15: time.sleep(1) if Path(rf).exists(): if logger.isEnabledFor(DEBUG): logger.debug(f"Found result job: {rf}") with open(rf, 'r') as rin: result = json.load(rin) if cleanup_results: try: if logger.isEnabledFor(DEBUG): logger.debug(f"Removing result: {rf}") os.unlink(rf) except: pass return result['output'] tries += 1 if logger.isEnabledFor(DEBUG): logger.debug(f"Failed to get result from bridge") return "FAILED: Bridge never reported result"
[docs]@dataclass class BridgedLocalSlurmOperations(LocalSlurmOperations): def __post_init__(self): if not isinstance(self.platform.bridged_jobs_directory, Path): self.platform.bridged_jobs_directory = Path(self.platform.bridged_jobs_directory) if not self.platform.bridged_jobs_directory.exists(): if logger.isEnabledFor(INFO): logger.info(f'Creating directory {self.platform.bridged_jobs_directory}') self.platform.bridged_jobs_directory.mkdir(parents=True, exist_ok=True)
[docs] def submit_job(self, item: Union[Experiment, Simulation], **kwargs) -> None: """ Submit a Slurm job. Args: item: idmtools Experiment or Simulation kwargs: keyword arguments used to expand functionality Returns: None """ if isinstance(item, Experiment): working_directory = self.get_directory(item) create_bridged_job(working_directory, self.platform.bridged_jobs_directory, self.platform.bridged_results_directory) elif isinstance(item, Simulation): pass else: raise NotImplementedError(f"Submit job is not implemented on SlurmPlatform.")
[docs] def cancel_job(self, job_ids: Union[str, List[str]], **kwargs) -> Any: """ Cancel slurm job generated from the item. Args: job_ids: Slurm job id kwargs: keyword arguments used to expand functionality Returns: Any """ return cancel_bridged_job(job_ids, self.platform.bridged_jobs_directory, self.platform.bridged_results_directory)