Source code for idmtools_platform_comps.comps_operations.workflow_item_operations
"""idmtools comps workflow item operations.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import json
import typing
from dataclasses import dataclass, field
from logging import getLogger, DEBUG
from typing import Any, Dict, List, Tuple, Type, Optional
from uuid import UUID
from COMPS.Data import QueryCriteria, WorkItem as COMPSWorkItem, WorkItemFile
from COMPS.Data.WorkItem import RelationType, WorkerOrPluginKey
from idmtools import IdmConfigParser
from idmtools.assets import AssetCollection
from idmtools.core import ItemType
from idmtools.core.interfaces.ientity import IEntity
from idmtools.entities.generic_workitem import GenericWorkItem
from idmtools.entities.iplatform_ops.iplatform_workflowitem_operations import IPlatformWorkflowItemOperations
from idmtools.entities.iworkflow_item import IWorkflowItem
from idmtools_platform_comps.utils.general import convert_comps_workitem_status
if typing.TYPE_CHECKING:
from idmtools_platform_comps.comps_platform import COMPSPlatform
logger = getLogger(__name__)
user_logger = getLogger('user')
[docs]@dataclass
class CompsPlatformWorkflowItemOperations(IPlatformWorkflowItemOperations):
"""Provides IWorkflowItem COMPSPlatform."""
platform: 'COMPSPlatform' # noqa F821
platform_type: Type = field(default=COMPSWorkItem)
[docs] def get(self, workflow_item_id: UUID, columns: Optional[List[str]] = None, load_children: Optional[List[str]] = None,
query_criteria: Optional[QueryCriteria] = None, **kwargs) -> \
COMPSWorkItem:
"""
Get COMPSWorkItem.
Args:
workflow_item_id: Item id
columns: Optional columns to load. Defaults to "id", "name", "state"
load_children: Optional list of COMPS Children objects to load. Defaults to "Tags"
query_criteria: Optional QueryCriteria
**kwargs:
Returns:
COMPSWorkItem
"""
columns = columns or ["id", "name", "state", "environment_name"]
load_children = load_children if load_children is not None else ["tags"]
query_criteria = query_criteria or QueryCriteria().select(columns).select_children(load_children)
return COMPSWorkItem.get(workflow_item_id, query_criteria=query_criteria)
[docs] def platform_create(self, work_item: IWorkflowItem, **kwargs) -> Tuple[Any]:
"""
Creates an workflow_item from an IDMTools work_item object.
Args:
work_item: WorkflowItem to create
**kwargs: Optional arguments mainly for extensibility
Returns:
Created platform item and the UUID of said item
"""
self.send_assets(work_item)
if logger.isEnabledFor(DEBUG):
logger.debug(f"Creating workitem {work_item.name} of type {work_item.work_item_type}, "
f"{work_item.plugin_key} in {self.platform.environment}")
# Create a WorkItem
wi = COMPSWorkItem(name=work_item.name,
worker=WorkerOrPluginKey(work_item.work_item_type, work_item.plugin_key),
environment_name=self.platform.environment,
asset_collection_id=work_item.assets.id if len(work_item.assets) else None)
# Set tags
wi.set_tags({})
if work_item.tags:
wi.set_tags(work_item.tags)
# Update tags
wi.tags.update({'WorkItem_Type': work_item.work_item_type})
# Add work order file
wo = work_item.get_base_work_order()
wo.update(work_item.work_order)
wi.add_work_order(data=json.dumps(wo).encode('utf-8'))
# Add additional files
for af in work_item.transient_assets:
wi_file = WorkItemFile(af.filename, "input")
# Either the file has an absolute path or content
if af.absolute_path:
wi.add_file(wi_file, af.absolute_path)
else:
wi.add_file(wi_file, data=af.bytes)
if logger.isEnabledFor(DEBUG):
logger.debug("Sending workitem to COMPS")
# Save the work-item to the server
wi.save()
wi.refresh()
if logger.isEnabledFor(DEBUG):
logger.debug("Set the relationships")
# Ensure any items that are objects are converted to ids
for attr_name in ['related_experiments', 'related_simulations', 'related_work_items', 'related_asset_collections']:
if getattr(work_item, attr_name):
for item in getattr(work_item, attr_name):
getattr(wi, f'add_{attr_name[:-1]}')(item.id if isinstance(item, IEntity) else item, RelationType.DependsOn)
wi.save()
# Set the ID back in the object
work_item.uid = wi.id
return work_item
[docs] def platform_run_item(self, work_item: IWorkflowItem, **kwargs):
"""
Start to rum COMPS WorkItem created from work_item.
Args:
work_item: workflow item
Returns: None
"""
work_item.get_platform_object().commission()
if IdmConfigParser.is_output_enabled():
user_logger.info(
f"\nThe running WorkItem can be viewed at {self.platform.get_workitem_link(work_item)}\n"
)
[docs] def get_parent(self, work_item: IWorkflowItem, **kwargs) -> Any:
"""
Returns the parent of item. If the platform doesn't support parents, you should throw a TopLevelItem error.
Args:
work_item: COMPS WorkItem
**kwargs: Optional arguments mainly for extensibility
Returns: item parent
Raise:
TopLevelItem
"""
return None
[docs] def get_children(self, work_item: IWorkflowItem, **kwargs) -> List[Any]:
"""
Returns the children of an workflow_item object.
Args:
work_item: WorkflowItem object
**kwargs: Optional arguments mainly for extensibility
Returns:
Children of work_item object
"""
return None
[docs] def refresh_status(self, workflow_item: IWorkflowItem, **kwargs):
"""
Refresh status for workflow item.
Args:
work_item: Item to refresh status for
Returns:
None
"""
wi = self.get(workflow_item.uid, columns=["id", "state"], load_children=[])
workflow_item.status = convert_comps_workitem_status(wi.state) # convert_COMPS_status(wi.state)
[docs] def send_assets(self, workflow_item: IWorkflowItem, **kwargs):
"""
Add asset as WorkItemFile.
Args:
workflow_item: workflow item
Returns: None
"""
# Collect asset files
if workflow_item.assets and len(workflow_item.assets):
if logger.isEnabledFor(DEBUG):
logger.debug("Uploading assets for Workitem")
self.platform._assets.create(workflow_item.assets)
[docs] def list_assets(self, workflow_item: IWorkflowItem, **kwargs) -> List[str]:
"""
Get list of asset files.
Args:
workflow_item: workflow item
**kwargs: Optional arguments mainly for extensibility
Returns: list of assets associated with WorkItem
"""
wi: COMPSWorkItem = workflow_item.get_platform_object()
return wi.files
[docs] def get_assets(self, workflow_item: IWorkflowItem, files: List[str], **kwargs) -> Dict[str, bytearray]:
"""
Retrieve files association with WorkItem.
Args:
workflow_item: workflow item
files: list of file paths
**kwargs: Optional arguments mainly for extensibility
Returns: dict with key/value: file_path/file_content
"""
wi = self.platform.get_item(workflow_item.uid, ItemType.WORKFLOW_ITEM, raw=True)
byte_arrs = wi.retrieve_output_files(files)
return dict(zip(files, byte_arrs))
[docs] def to_entity(self, work_item: COMPSWorkItem, **kwargs) -> IWorkflowItem:
"""
Converts the platform representation of workflow_item to idmtools representation.
Args:
work_item:Platform workflow_item object
kwargs: optional arguments mainly for extensibility
Returns:
IDMTools workflow item
"""
# Creat a workflow item
# Eventually it would be nice to put the actual command here, but this requires fetching the work-order which is a bit to much overhead
obj = GenericWorkItem(name=work_item.name, command="")
# Set its correct attributes
obj.platform = self.platform
obj._platform_object = work_item
obj.uid = work_item.id
if work_item.asset_collection_id:
obj.assets = AssetCollection.from_id(work_item.asset_collection_id, platform=self.platform)
if work_item.files:
obj.transient_assets = self.platform._assets.to_entity(work_item.files)
obj.tags = work_item.tags
obj.status = convert_comps_workitem_status(work_item.state)
return obj
# def platform_run_item(self, workflow_item: IWorkflowItem, **kwargs):
# raise NotImplementedError("Running workflow items is not supported")