Source code for idmtools_platform_comps.ssmt_operations.workflow_item_operations

"""idmtools workflow item operations for ssmt.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
from dataclasses import dataclass
from uuid import UUID
from typing import List, Dict, Optional
from idmtools.entities.iworkflow_item import IWorkflowItem
from idmtools_platform_comps.comps_operations.workflow_item_operations import CompsPlatformWorkflowItemOperations
from COMPS.Data.WorkItem import WorkItem as COMPSWorkItem
from COMPS.Data import QueryCriteria
from logging import getLogger, DEBUG

logger = getLogger(__name__)


[docs]@dataclass class SSMTPlatformWorkflowItemOperations(CompsPlatformWorkflowItemOperations): """SSMTPlatformWorkflowItemOperations provides IWorkflowItem actions for SSMT Platform. In IWorkflowItem's case, we just need to change how get_assets works. """
[docs] def get(self, workflow_item_id: UUID, columns: Optional[List[str]] = None, load_children: Optional[List[str]] = None, query_criteria: Optional[QueryCriteria] = None, **kwargs) -> \ COMPSWorkItem: """ Get COMPSWorkItem. Args: workflow_item_id: Item id columns: Optional columns to load. Defaults to "id", "name", "state", "environment_name", "working_directory" load_children: Optional list of COMPS Children objects to load. Defaults to "Tags" query_criteria: Optional QueryCriteria **kwargs: Returns: COMPSWorkItem """ columns = columns or ["id", "name", "state", "environment_name", "working_directory"] if "working_directory" not in columns: columns.append("working_directory") return super().get(workflow_item_id, columns=columns, load_children=load_children, query_criteria=query_criteria)
[docs] def get_assets(self, workflow_item: IWorkflowItem, files: List[str], **kwargs) -> Dict[str, bytearray]: """ Get Assets for workflow_item. Args: workflow_item: WorkflowItem files: Files to get **kwargs: Returns: Files requested """ files = [f.replace("\\", '/') for f in files] po: COMPSWorkItem = workflow_item.get_platform_object() working_directory = po.working_directory results = dict() for file in files: full_path = os.path.join(working_directory, file) full_path = full_path.replace("\\", '/') if not os.path.exists(full_path): msg = f"Cannot find the file {file} at {full_path}" logger.error(msg) raise FileNotFoundError(msg) if logger.isEnabledFor(DEBUG): logger.debug(full_path) with open(full_path, 'rb') as fin: results[file] = fin.read() return results