Source code for idmtools.entities.iplatform_ops.iplatform_workflowitem_operations
"""
IPlatformWorkflowItemOperations defines workflow item operations interface.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from logging import DEBUG
from typing import Type, Any, List, Tuple, Dict, NoReturn, TYPE_CHECKING
from idmtools.assets import Asset
from idmtools.core import CacheEnabled, getLogger
from idmtools.entities.iplatform_ops.utils import batch_create_items
from idmtools.entities.iworkflow_item import IWorkflowItem
from idmtools.registry.functions import FunctionPluginManager
if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
logger = getLogger(__name__)
[docs]@dataclass
class IPlatformWorkflowItemOperations(CacheEnabled, ABC):
"""
IPlatformWorkflowItemOperations defines workflow item operations interface.
"""
platform: 'IPlatform' # noqa: F821
platform_type: Type
[docs] @abstractmethod
def get(self, workflow_item_id: str, **kwargs) -> Any:
"""
Returns the platform representation of an WorkflowItem.
Args:
workflow_item_id: Item id of WorkflowItems
**kwargs:
Returns:
Platform Representation of an workflow_item
"""
pass
[docs] def batch_create(self, workflow_items: List[IWorkflowItem], display_progress: bool = True, **kwargs) -> List[Any]:
"""
Provides a method to batch create workflow items.
Args:
workflow_items: List of worfklow items to create
display_progress: Whether to display progress bar
**kwargs:
Returns:
List of tuples containing the create object and id of item that was created.
"""
return batch_create_items(workflow_items, create_func=self.create, display_progress=display_progress,
progress_description="Creating WorkItems", unit="workitem", **kwargs)
[docs] def pre_create(self, workflow_item: IWorkflowItem, **kwargs) -> NoReturn:
"""
Run the platform/workflow item post creation events.
Args:
workflow_item: IWorkflowItem to run post-creation events
**kwargs: Optional arguments mainly for extensibility
Returns:
NoReturn
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling idmtools_platform_pre_create_item")
FunctionPluginManager.instance().hook.idmtools_platform_pre_create_item(item=workflow_item, kwargs=kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling pre_creation")
workflow_item.pre_creation(self.platform)
[docs] def post_create(self, workflow_item: IWorkflowItem, **kwargs) -> NoReturn:
"""
Run the platform/workflow item post creation events.
Args:
workflow_item: IWorkflowItem to run post-creation events
**kwargs: Optional arguments mainly for extensibility
Returns:
NoReturn
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling idmtools_platform_post_create_item hooks")
FunctionPluginManager.instance().hook.idmtools_platform_post_create_item(item=workflow_item, kwargs=kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling post_creation")
workflow_item.post_creation(self.platform)
[docs] def create(self, workflow_item: IWorkflowItem, do_pre: bool = True, do_post: bool = True, **kwargs) -> Any:
"""
Creates an workflow item from an IDMTools IWorkflowItem object.
Also performs pre-creation and post-creation locally and on platform.
Args:
workflow_item: Suite to create
do_pre: Perform Pre creation events for item
do_post: Perform Post creation events for item
**kwargs: Optional arguments mainly for extensibility
Returns:
Created platform item and the UUID of said item
"""
if workflow_item.status is not None:
return workflow_item._platform_object, workflow_item.uid
if do_pre:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling pre_create")
self.pre_create(workflow_item, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling platform_create")
ret = self.platform_create(workflow_item, **kwargs)
workflow_item.platform = self.platform
if do_post:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling post_create")
self.post_create(workflow_item, **kwargs)
return ret
[docs] @abstractmethod
def platform_create(self, workflow_item: IWorkflowItem, **kwargs) -> Tuple[Any, str]:
"""
Creates an workflow_item from an IDMTools workflow_item object.
Args:
workflow_item: WorkflowItem to create
**kwargs: Optional arguments mainly for extensibility
Returns:
Created platform item and the id of said item
"""
pass
[docs] def pre_run_item(self, workflow_item: IWorkflowItem, **kwargs):
"""
Trigger right before commissioning experiment on platform.
This ensures that the item is created. It also ensures that the children(simulations) have also been created.
Args:
workflow_item: Experiment to commission
Returns:
None
"""
# ensure the item is created before running
# TODO what status are valid here? Create only?
if workflow_item.status is None:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling create")
self.create(workflow_item)
[docs] def post_run_item(self, workflow_item: IWorkflowItem, **kwargs):
"""
Trigger right after commissioning workflow item on platform.
Args:
workflow_item: Experiment just commissioned
Returns:
None
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling idmtools_platform_post_run hooks")
FunctionPluginManager.instance().hook.idmtools_platform_post_run(item=workflow_item, kwargs=kwargs)
[docs] def run_item(self, workflow_item: IWorkflowItem, **kwargs):
"""
Called during commissioning of an item. This should create the remote resource.
Args:
workflow_item:
Returns:
None
"""
if logger.isEnabledFor(DEBUG):
logger.debug("Calling pre_run_item")
self.pre_run_item(workflow_item, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling platform_run_item")
self.platform_run_item(workflow_item, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling post_run_item")
self.post_run_item(workflow_item, **kwargs)
[docs] @abstractmethod
def platform_run_item(self, workflow_item: IWorkflowItem, **kwargs):
"""
Called during commissioning of an item. This should perform what is needed to commission job on platform.
Args:
workflow_item:
Returns:
None
"""
pass
[docs] @abstractmethod
def get_parent(self, workflow_item: Any, **kwargs) -> Any:
"""
Returns the parent of item. If the platform doesn't support parents, you should throw a TopLevelItem error.
Args:
workflow_item: Workflow item to get parent of
**kwargs:
Returns:
Parent of Worktflow item
Raise:
TopLevelItem
"""
pass
[docs] @abstractmethod
def get_children(self, workflow_item: Any, **kwargs) -> List[Any]:
"""
Returns the children of an workflow_item object.
Args:
workflow_item: WorkflowItem object
**kwargs: Optional arguments mainly for extensibility
Returns:
Children of workflow_item object
"""
pass
[docs] def to_entity(self, workflow_item: Any, **kwargs) -> IWorkflowItem:
"""
Converts the platform representation of workflow_item to idmtools representation.
Args:
workflow_item:Platform workflow_item object
Returns:
IDMTools workflow_item object
"""
return workflow_item
[docs] @abstractmethod
def refresh_status(self, workflow_item: IWorkflowItem, **kwargs):
"""
Refresh status for workflow item.
Args:
workflow_item: Item to refresh status for
Returns:
None
"""
pass
[docs] @abstractmethod
def send_assets(self, workflow_item: Any, **kwargs):
"""
Send assets for workflow item to platform.
Args:
workflow_item: Item to send assets for
Returns:
None
"""
pass
[docs] @abstractmethod
def get_assets(self, workflow_item: IWorkflowItem, files: List[str], **kwargs) -> Dict[str, bytearray]:
"""
Load assets for workflow item.
Args:
workflow_item: Item
files: List of files to load
**kwargs:
Returns:
Dictionary with filename as key and value as binary content
"""
pass
[docs] @abstractmethod
def list_assets(self, workflow_item: IWorkflowItem, **kwargs) -> List[Asset]:
"""
List available assets for a workflow item.
Args:
workflow_item: workflow item to list files for
Returns:
List of filenames
"""
pass