Source code for idmtools_platform_container.platform_operations.experiment_operations

"""
Here we implement the ContainerPlatform experiment operations.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import shutil
import subprocess
from dataclasses import dataclass
from typing import NoReturn, Dict
from idmtools.core import ItemType
from idmtools.entities.experiment import Experiment
from idmtools_platform_file.platform_operations.experiment_operations import FilePlatformExperimentOperations
from idmtools_platform_container.container_operations.docker_operations import find_running_job
from logging import getLogger

logger = getLogger(__name__)
user_logger = getLogger('user')


[docs]@dataclass class ContainerPlatformExperimentOperations(FilePlatformExperimentOperations): """ Experiment Operations for Process Platform. """
[docs] def platform_run_item(self, experiment: Experiment, **kwargs): """ Run experiment. Args: experiment: idmtools Experiment kwargs: keyword arguments used to expand functionality Returns: None """ super().platform_run_item(experiment, **kwargs) # Commission self.platform.submit_job(experiment, **kwargs)
[docs] def post_run_item(self, experiment: Experiment, **kwargs): """ Trigger right after commissioning experiment on platform. Args: experiment: Experiment just commissioned kwargs: keyword arguments used to expand functionality Returns: None """ super().post_run_item(experiment, **kwargs) dry_run = kwargs.get('dry_run', False) if not dry_run: user_logger.info(f"\nContainer ID: {self.platform.container_id}") user_logger.info( f'\nYou may try the following command to check simulations running status: \n idmtools container status {experiment.id}')
[docs] def platform_cancel(self, experiment_id: str) -> NoReturn: """ Cancel platform experiment's container job. Args: experiment_id: Experiment ID Returns: No Return """ job = find_running_job(experiment_id) if job: logger.debug( f"{job.item_type.name} {experiment_id} is running on Container {job.container_id}.") kill_cmd = f"docker exec {job.container_id} pkill -TERM -g {job.job_id}" result = subprocess.run(kill_cmd, shell=True, stderr=subprocess.PIPE, text=True) if result.returncode == 0: logger.debug(f"Successfully killed {job.item_type.name} {experiment_id}") else: logger.debug(f"Error killing {job.item_type.name} {experiment_id}: {result.stderr}") else: logger.debug(f"Experiment {experiment_id} is not running, no cancel needed...")
[docs] def platform_delete(self, experiment_id: str) -> NoReturn: """ Delete platform experiment. Args: experiment_id: Experiment ID Returns: No Return """ from idmtools_platform_container.utils.job_history import JobHistory job = JobHistory.get_job(experiment_id) exp_dir = job['EXPERIMENT_DIR'] try: logger.debug(f"Deleting experiment {experiment_id}") shutil.rmtree(exp_dir) # Delete the job history logger.debug(f"Deleting job history {experiment_id}") JobHistory.delete(experiment_id) except RuntimeError: logger.debug(f"Could not delete the associated experiment {experiment_id}")
[docs] def create_sim_directory_map(self, experiment_id: str) -> Dict: """ Build simulation working directory mapping. Args: experiment_id: experiment id Returns: Dict of simulation id as key and working dir as value """ exp = self.platform.get_item(experiment_id, ItemType.EXPERIMENT, raw=False) sims = exp.simulations return {sim.id: str(self.platform.get_container_directory(sim)) for sim in sims}