Source code for idmtools.entities.iplatform_ops.iplatform_experiment_operations
"""
IPlatformExperimentOperations defines experiment item operations interface.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from abc import ABC, abstractmethod
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from dataclasses import dataclass
from logging import getLogger, DEBUG
from types import GeneratorType
from typing import Type, Any, NoReturn, Tuple, List, Dict, Iterator, Union, TYPE_CHECKING
from idmtools.assets import Asset
from idmtools.core.enums import EntityStatus, ItemType
from idmtools.entities.experiment import Experiment
from idmtools.entities.iplatform_ops.utils import batch_create_items
from idmtools.registry.functions import FunctionPluginManager
logger = getLogger(__name__)
if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
[docs]@dataclass
class IPlatformExperimentOperations(ABC):
"""
IPlatformExperimentOperations defines experiments item operations interface.
"""
platform: 'IPlatform' # noqa: F821
platform_type: Type
[docs] @abstractmethod
def get(self, experiment_id: str, **kwargs) -> Any:
"""
Returns the platform representation of an Experiment.
Args:
experiment_id: Item id of Experiments
**kwargs:
Returns:
Platform Representation of an experiment
"""
pass
[docs] def pre_create(self, experiment: Experiment, **kwargs) -> NoReturn:
"""
Run the platform/experiment post creation events.
Args:
experiment: Experiment to run post-creation events
**kwargs: Optional arguments mainly for extensibility
Returns:
NoReturn
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling idmtools_platform_pre_create_item hooks")
FunctionPluginManager.instance().hook.idmtools_platform_pre_create_item(item=experiment, kwargs=kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling experiment pre_creation")
experiment.pre_creation(self.platform)
[docs] def post_create(self, experiment: Experiment, **kwargs) -> NoReturn:
"""
Run the platform/experiment post creation events.
Args:
experiment: Experiment to run post-creation events
**kwargs: Optional arguments mainly for extensibility
Returns:
NoReturn
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling idmtools_platform_post_create_item hooks")
FunctionPluginManager.instance().hook.idmtools_platform_post_create_item(item=experiment, kwargs=kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling experiment post_creation")
experiment.post_creation(self.platform)
[docs] def create(self, experiment: Experiment, do_pre: bool = True, do_post: bool = True, **kwargs) -> Union[Experiment]:
"""
Creates an experiment from an IDMTools simulation object.
Also performs local/platform pre and post creation events.
Args:
experiment: Experiment to create
do_pre: Perform Pre creation events for item
do_post: Perform Post creation events for item
**kwargs: Optional arguments mainly for extensibility
Returns:
Created platform item and the id of said item
"""
if experiment.status is not None:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling experiment platform_modify_experiment")
experiment = self.platform_modify_experiment(experiment, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Finished platform_modify_experiment")
return experiment
if do_pre:
self.pre_create(experiment, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Finished pre_create")
if logger.isEnabledFor(DEBUG):
logger.debug("Calling platform_create")
experiment._platform_object = self.platform_create(experiment, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Finished platform_create")
experiment.platform = self.platform
if do_post:
self.post_create(experiment, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Finished post_create")
return experiment
[docs] @abstractmethod
def platform_create(self, experiment: Experiment, **kwargs) -> Any:
"""
Creates an experiment from an IDMTools experiment object.
Args:
experiment: Experiment to create
**kwargs: Optional arguments mainly for extensibility
Returns:
Created platform item and the id of said item
"""
pass
[docs] def batch_create(self, experiments: List[Experiment], display_progress: bool = True, **kwargs) -> List[Tuple[Experiment]]:
"""
Provides a method to batch create experiments.
Args:
experiments: List of experiments to create
display_progress: Show progress bar
**kwargs: Keyword arguments to pass to the batch
Returns:
List of tuples containing the create object and id of item that was created
"""
return batch_create_items(experiments, create_func=self.create, display_progress=display_progress,
progress_description="Creating Experiments", unit="experiment",
**kwargs)
[docs] @abstractmethod
def get_children(self, experiment: Any, **kwargs) -> List[Any]:
"""
Returns the children of an experiment object.
Args:
experiment: Experiment object
**kwargs: Optional arguments mainly for extensibility
Returns:
Children of experiment object
"""
pass
[docs] @abstractmethod
def get_parent(self, experiment: Any, **kwargs) -> Any:
"""
Returns the parent of item. If the platform doesn't support parents, you should throw a TopLevelItem error.
Args:
experiment: Experiment to get parent from
**kwargs:
Returns:
Parent of Experiment(Suite)
Raise:
TopLevelItem
"""
pass
[docs] def to_entity(self, experiment: Any, **kwargs) -> Experiment:
"""
Converts the platform representation of experiment to idmtools representation.
Args:
experiment:Platform experiment object
Returns:
IDMTools experiment object
"""
return experiment
[docs] def pre_run_item(self, experiment: Experiment, **kwargs):
"""
Trigger right before commissioning experiment on platform.
This ensures that the item is created. It also ensures that the children(simulations) have also been created.
Args:
experiment: Experiment to commission
Returns:
None
Raises:
ValueError - If there are no simulations
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling pre_run")
experiment.pre_run(self.platform)
# ensure the item is created before running
if experiment.status is None:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling create")
self.create(experiment, **kwargs)
else:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling platform_modify_experiment")
experiment = self.platform_modify_experiment(experiment, **kwargs)
# check sims
if logger.isEnabledFor(DEBUG):
logger.debug("Ensuring simulations exist")
if isinstance(experiment.simulations, (GeneratorType, Iterator)):
if logger.isEnabledFor(DEBUG):
logger.debug("Calling _create_items_of_type for sims")
experiment.simulations = self.platform._create_items_of_type(experiment.simulations, ItemType.SIMULATION,
**kwargs)
elif len(experiment.simulations) == 0:
raise ValueError("You cannot have an experiment with no simulations")
else:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling _create_items_of_type for sims")
experiment.simulations = self.platform._create_items_of_type(experiment.simulations, ItemType.SIMULATION,
**kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Finished checking simulations")
[docs] def post_run_item(self, experiment: Experiment, **kwargs):
"""
Trigger right after commissioning experiment on platform.
Args:
experiment: Experiment just commissioned
Returns:
None
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling idmtools_platform_post_run hooks")
FunctionPluginManager.instance().hook.idmtools_platform_post_run(item=experiment, kwargs=kwargs)
experiment.post_run(self.platform)
[docs] def run_item(self, experiment: Experiment, **kwargs):
"""
Called during commissioning of an item. This should create the remote resource.
Args:
experiment:Experiment
**kwargs: Keyword arguments to pass to pre_run_item, platform_run_item, post_run_item
Returns:
None
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling pre_run_item")
self.pre_run_item(experiment, **kwargs)
if experiment.status not in [EntityStatus.FAILED, EntityStatus.SUCCEEDED]:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling platform_run_item")
self.platform_run_item(experiment, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling post_run_item")
self.post_run_item(experiment, **kwargs)
[docs] @abstractmethod
def platform_run_item(self, experiment: Experiment, **kwargs):
"""
Called during commissioning of an item. This should perform what is needed to commission job on platform.
Args:
experiment:
Returns:
None
"""
pass
[docs] @abstractmethod
def send_assets(self, experiment: Any, **kwargs):
"""
Transfer Experiment assets to the platform.
Args:
experiment: Experiment to send assets for
Returns:
None
"""
pass
[docs] @abstractmethod
def refresh_status(self, experiment: Experiment, **kwargs):
"""
Refresh status for experiment object.
This should update the object directly. For experiments it is best if all simulation states are updated as well.
Args:
experiment: Experiment to get status for
Returns:
None
"""
pass
[docs] def get_assets(self, experiment: Experiment, files: List[str], **kwargs) -> Dict[str, Dict[str, bytearray]]:
"""
Get files from experiment.
Args:
experiment: Experiment to get files from
files: List files
**kwargs:
Returns:
Dict with each sim id and the files contents matching specified list
"""
ret = dict()
for sim in experiment.simulations:
ret[sim.uid] = self.platform._simulations.get_assets(sim, files, **kwargs)
return ret
[docs] def list_assets(self, experiment: Experiment, children: bool = False, **kwargs) -> List[Asset]:
"""
List available assets for a experiment.
Args:
experiment: Experiment to list files for
children: Should we load assets from children as well?
Returns:
List of Assets
"""
ret = self.platform_list_asset(experiment, **kwargs)
if children:
with ThreadPoolExecutor() as pool:
futures = dict()
for sim in experiment.simulations:
future = pool.submit(self.platform._simulations.list_assets, sim, **kwargs)
futures[future] = sim
for future in as_completed(futures):
result = future.result()
ret.extend(result)
return ret
[docs] def platform_list_asset(self, experiment: Experiment, **kwargs) -> List[Asset]:
"""
List the assets on an experiment.
Args:
experiment: Experiment to list.
**kwargs: Extra Arguments
Returns:
List of Assets
"""
return []
[docs] def platform_modify_experiment(self, experiment: Experiment, regather_common_assets: bool = False,
**kwargs) -> Experiment:
"""
API to allow detection of experiments already created.
Args:
experiment:
regather_common_assets: When modifying, should we gather assets from template/simulations. It is important to note that when using this feature, ensure the previous simulations have finished provisioning. Failure to do so can lead to unexpected behaviour
Returns:
Experiment updated
"""
return experiment
[docs] def create_sim_directory_map(self, experiment_id: str) -> Dict:
"""
Build simulation working directory mapping.
Args:
experiment_id: experiment id
Returns:
Dict
"""
return {}
[docs] def platform_delete(self, experiment_id: str) -> None:
"""
Delete platform experiment.
Args:
experiment_id: experiment id
Returns:
None
"""
pass
[docs] def platform_cancel(self, experiment_id: str) -> None:
"""
Cancel platform experiment.
Args:
experiment_id: experiment id
Returns:
None
"""
pass