Source code for idmtools_slurm_utils.utils

"""
Utils for slurm bridge.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import json
import os
from os import PathLike
from pathlib import Path
from typing import Dict
from logging import getLogger, DEBUG
from idmtools_slurm_utils.bash import command_bash
from idmtools_slurm_utils.sbatch import command_sbatch
from idmtools_slurm_utils.scancel import command_scancel
from idmtools_slurm_utils.verify import command_verify

ERROR_INVALID_COMMAND = "No command specified. You must specify either bash, scancel, or verify"

logger = getLogger()

VALID_COMMANDS = ['bash', 'sbatch', 'scancel', 'verify']


[docs]def process_job(job_path, result_dir, cleanup_job: bool = True): """ Process a job. Args: job_path: Path to the job result_dir: Result directory cleanup_job: Cleanup job when done(true), false leave it in place. """ try: result_dir = Path(result_dir) if not result_dir.exists(): result_dir.mkdir(parents=True, exist_ok=True) result_name = result_dir.joinpath(f'{os.path.basename(Path(job_path))}.result') result = get_job_result(job_path) write_result(result, result_name) if cleanup_job: os.unlink(job_path) except Exception as e: logger.exception(e) pass
[docs]def get_job_result(job_path: PathLike) -> Dict: """ Read a job file in from path and return a result. Args: job_path: Path Returns: Result """ with open(job_path, "r") as jin: info = json.load(jin) if "command" not in info or info['command'].lower() not in VALID_COMMANDS: result = dict( status="error", output=ERROR_INVALID_COMMAND ) else: command = info['command'].lower() if command == "sbatch": result = command_sbatch(info) elif command == "bash": result = command_bash(info) elif command == "verify": result = command_verify(info) elif command == "scancel": result = command_scancel(info) else: result = dict( status="error", output=ERROR_INVALID_COMMAND ) if logger.isEnabledFor(DEBUG): logger.debug(f'Result for {job_path}: {json.dumps(result, indent=4, sort_keys=True)}') return result
[docs]def write_result(result: Dict, result_name: Path): """ Write the result of a job to a directory. Args: result: Result to write result_name: Path to write result to. """ with open(result_name, "w") as rout: json.dump(result, rout)