from __future__ import print_function
from future.utils import raise_from
import os
import io
import sys
import json
from datetime import date, datetime
import logging
from enum import Enum
import uuid
from hashlib import md5
import threading
import multiprocessing
import os
import re
import copy
import inspect
from functools import reduce
from collections import namedtuple
from COMPS import Client, default_callback_print_args
from COMPS.Data import WorkItemFile, AssetFile, AssetManager, Priority, Simulation, Experiment, Suite, AssetCollection
from COMPS.Data.SerializableEntity import SerializableEntity, json_property, json_entity, parse_ISO8601_date, parse_namedtuple_from_dict, convert_if_string
from COMPS.Data.AssetManager import EntityType
from COMPS.Data.RelatableEntity import RelatableEntity
from COMPS.Data.TaggableEntity import TaggableEntity
from COMPS.Data.CommissionableEntity import CommissionableEntity
logger = logging.getLogger(__name__)
[docs]@json_entity()
class WorkItem(TaggableEntity, CommissionableEntity, RelatableEntity, SerializableEntity):
"""
Represents a single work-item.
Contains various basic properties accessible by getters (and, in some cases, +setters):
* id
* +name
* +description
* owner
* date_created
* last_modified
* state
* error_message
* worker
* environment_name
* host_name
* worker_instance_id
* priority
* working_directory
* working_directory_size
* asset_collection_id
Also contains "child objects" (which must be specifically requested for retrieval using the
QueryCriteria.select_children() method of QueryCriteria):
* tags
* files
* plugins
"""
__max_wi_batch_count = 100
__max_wi_batch_request_size_kb = 38912 # 38 MiB
__max_entity_retrieval_count = 100000
try:
__save_semaphore = multiprocessing.Semaphore(4) # no. of concurrent threads that can save work-items
except (ModuleNotFoundError, ImportError):
logger.warning('Unable to create process-local semaphore; proceeding, but without work-item save constraints!')
import dummy_threading
__save_semaphore = dummy_threading.Semaphore()
__tls = threading.local()
def __init__(self, name, worker, environment_name, description=None, asset_collection_id=None, priority=None):
if not name:
raise RuntimeError('WorkItem has not been initialized properly; non-null name required.')
if not environment_name:
raise RuntimeError('WorkItem has not been initialized properly; non-null environment_name required.')
if not worker or not worker.name or not worker.version:
raise RuntimeError('WorkItem has not been initialized properly; valid \'worker\' required.')
self._id = None
self._name = name
self._worker = worker
self._environment_name = environment_name
self._description = description
self._owner = Client.auth_manager().username
self._date_created = None
self._last_modified = None
self._state = None
self._error_message = None
self._host_name = None
self._worker_instance_id = None
self._priority = priority
self._working_directory = None
self._working_directory_size = None
self._asset_collection_id = asset_collection_id
self._tags = None
self._files = ()
self._plugins = None
self._is_dirty = None # this will be set in _register_change() below
self._tmp_file_parts = []
self._register_change()
@classmethod
def __internal_factory__(cls, id=None, name=None, worker=None, environment_name=None, description=None,
owner=None, date_created=None, last_modified=None, state=None, error_message=None,
host_name=None, worker_instance_id=None, priority=None, working_directory=None,
working_directory_size=None, asset_collection_id=None, tags=None, files=None,
plugins=None):
wi = cls.__new__(cls)
wi._id = convert_if_string(id, uuid.UUID)
wi._name = name
wi._worker = WorkerOrPluginKey(**parse_namedtuple_from_dict(worker)) if worker else None
wi._environment_name = environment_name
wi._description = description
wi._owner = owner
wi._date_created = convert_if_string(date_created, parse_ISO8601_date)
wi._last_modified = convert_if_string(last_modified, parse_ISO8601_date)
wi._state = convert_if_string(state, lambda x: WorkItemState[x])
wi._error_message = error_message
wi._host_name = host_name
wi._worker_instance_id = convert_if_string(worker_instance_id, uuid.UUID)
wi._priority = convert_if_string(priority, lambda x: Priority[x])
wi._working_directory = working_directory
wi._working_directory_size = working_directory_size
wi._asset_collection_id = convert_if_string(asset_collection_id, uuid.UUID)
wi._tags = tags
if files:
wi._files = tuple(WorkItemFile.__internal_factory__(**(WorkItemFile.rest2py(f))) for f in files)
# wi._files = [ WorkItemFile.__internal_factory__(**(WorkItemFile.rest2py(f))) for f in files ]
else:
wi._files = None
if plugins:
wi._plugins = tuple(WorkerOrPluginKey(p) for p in plugins)
# wi._plugins = [ WorkerOrPluginKey(p) for p in plugins ]
else:
wi._plugins = None
wi._is_dirty = False
wi._tmp_file_parts = []
return wi
@json_property()
def id(self):
return self._id
@json_property()
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
self._register_change()
@json_property()
def worker(self):
return self._worker
@json_property()
def environment_name(self):
return self._environment_name
@json_property()
def description(self):
return self._description
@description.setter
def description(self, description):
self._description = description
self._register_change()
@json_property()
def owner(self):
return self._owner
@json_property()
def date_created(self):
return self._date_created
@json_property()
def last_modified(self):
return self._last_modified
@json_property()
def state(self):
return self._state
@json_property()
def error_message(self):
return self._error_message
@json_property()
def host_name(self):
return self._host_name
@json_property()
def worker_instance_id(self):
return self._worker_instance_id
@json_property()
def priority(self):
return self._priority
@json_property()
def working_directory(self):
if 'COMPS_DATA_MAPPING' in os.environ:
mapping = os.environ.get('COMPS_DATA_MAPPING').split(';')
return re.sub(mapping[1].replace('\\', '\\\\'), mapping[0], self._working_directory, flags=re.IGNORECASE).replace('\\', '/')
else:
return self._working_directory
@json_property()
def working_directory_size(self):
return self._working_directory_size
@json_property()
def asset_collection_id(self):
return self._asset_collection_id
@json_property()
def tags(self):
return self._tags # todo: immutable dict?
@json_property()
def files(self):
return self._files
@json_property()
def plugins(self):
return self._plugins
########################
[docs] @classmethod
def get(cls, id=None, query_criteria=None):
"""
Retrieve one or more WorkItems.
:param id: The id (str or UUID) of the WorkItem to retrieve
:param query_criteria: A QueryCriteria object specifying basic property filters and tag-filters \
to apply to the set of WorkItems returned, as well as which properties and child-objects to \
fill for the returned WorkItems
:return: A WorkItem or list of WorkItems (depending on whether 'id' was specified) with \
basic properties and child-objects assigned as specified by 'query_criteria'
"""
if id and not isinstance(id, uuid.UUID):
try:
id = uuid.UUID(id)
except ValueError:
raise ValueError('Invalid id: {0}'.format(id))
qc_params = query_criteria.to_param_dict(WorkItem) if query_criteria else {}
user_handling_paging = any(f in qc_params for f in ['count','offset'])
qc_params['count'] = min(WorkItem.__max_entity_retrieval_count, qc_params.get('count', WorkItem.__max_entity_retrieval_count))
path = '/WorkItems{0}'.format('/' + str(id) if id else '')
resp = Client.get(path
, params = qc_params)
cr = resp.headers.get('Content-Range')
# If we got a Content-Range header in the response (meaning we didn't get the entire dataset back) and the user
# isn't handling paging (as inferred by not having a 'count' or 'offset' argument), double-check to see if
# we got the whole data-set (presumably not) and raise an error so the user knows.
if cr and not user_handling_paging:
try:
toks = cr.replace('-','/').split('/')
# from_val = int(toks[0])
# to_val = int(toks[1])
total_val = int(toks[2])
if total_val > WorkItem.__max_entity_retrieval_count:
raise RuntimeError('Unable to retrieve entire data-set (try paging); the maximum work-items currently retrievable is ' +
str(WorkItem.__max_entity_retrieval_count))
except (IndexError, ValueError) as e:
logger.debug(e.message)
raise RuntimeError('Invalid Content-Range response header: ' + str(cr))
json_resp = resp.json()
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug('WorkItem Response:')
# logger.debug(json.dumps(json_resp, indent=4))
if 'WorkItems' not in json_resp or \
( id is not None and len(json_resp['WorkItems']) != 1 ):
logger.debug(json_resp)
raise RuntimeError('Malformed WorkItems retrieve response!')
wis = []
for wi_json in json_resp['WorkItems']:
wi_json = cls.rest2py(wi_json)
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug('WorkItem:')
# logger.debug(json.dumps(wi_json, indent=4))
wi = WorkItem.__internal_factory__(**wi_json)
wis.append(wi)
if id is not None:
return wis[0]
else:
return wis
[docs] def refresh(self, query_criteria=None):
"""
Update properties of an existing WorkItem from the server.
:param query_criteria: A QueryCriteria object specifying which properties and child-objects \
to refresh on the WorkItem
"""
if not self._id:
raise RuntimeError('Can\'t refresh a WorkItem that hasn\'t been saved!')
wi = self.get(id=self.id, query_criteria=query_criteria)
# if wi.id: self._id = wi.id
if wi.name is not None: self._name = wi.name
if wi.worker is not None: self._worker = wi.worker
if wi.environment_name is not None: self._environment_name = wi.environment_name
if wi.description is not None: self._description = wi.description
if wi.owner is not None: self._owner = wi.owner
if wi.date_created is not None: self._date_created = wi.date_created
if wi.last_modified is not None: self._last_modified = wi.last_modified
if wi.state is not None: self._state = wi.state
if wi.error_message is not None: self._error_message = wi.error_message
if wi.host_name is not None: self._host_name = wi.host_name
if wi.worker_instance_id is not None: self._worker_instance_id = wi.worker_instance_id
if wi.priority is not None: self._priority = wi.priority
if wi.working_directory is not None: self._working_directory = wi.working_directory
if wi.working_directory_size is not None: self._working_directory_size = wi.working_directory_size
if wi.asset_collection_id is not None: self._asset_collection_id = wi.asset_collection_id
if wi.tags is not None: self._tags = wi.tags
if wi.files is not None: self._files = wi.files
if wi.plugins is not None: self._plugins = wi.plugins
def _get_related_entities(self, cls=None, relation_type=None):
query_params = {}
if cls:
query_params['RelatedObject'] = cls.__name__
if relation_type:
query_params['RelationType'] = relation_type.name
path = '/WorkItems/{0}/Related'.format(str(self._id))
resp = Client.get(path
, params = query_params)
json_resp = resp.json()
ent_map = { 'Simulation': Simulation,
'Experiment': Experiment,
'Suite': Suite,
'WorkItem': WorkItem,
'AssetCollection': AssetCollection }
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug('WorkItem Related Response:')
# logger.debug(json.dumps(json_resp, indent=4))
related_arr = json_resp.get('Related')
related_map = {}
if related_arr:
for rel_entity in related_arr:
enttype = rel_entity['ObjectType']
if enttype not in related_map:
related_map[enttype] = []
related_map[enttype].append(ent_map[enttype].get(id=rel_entity['Id']))
return related_map
def _add_related_entity(self, cls, related_id, relation_type):
if not self._id:
raise RuntimeError('Can\'t add related entity to WorkItem that hasn\'t yet been saved!')
if not isinstance(related_id, uuid.UUID):
try:
related_id = uuid.UUID(related_id)
except (AttributeError, ValueError):
raise_from(ValueError('Invalid related_id: {0}. Must pass a single (str or UUID) id'.format(str(related_id))), None)
# /WorkItems/{Id}/Related/{RelationType}/{RelatedObject}/{RelatedObjectId}
path = '/WorkItems/{0}/Related/{1}/{2}/{3}'.format(str(self._id),
relation_type.name,
cls.__name__,
str(related_id))
resp = Client.post(path)
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug('Save WorkItem Related Response:')
# logger.debug(json.dumps(resp.json(), indent=4))
[docs] def save(self, return_missing_files=False, save_semaphore=None):
"""
Save a single WorkItem. If it's a new WorkItem, an id is automatically assigned.
:param return_missing_files: A boolean that determines the behavior when the WorkItem \
being saved contains a WorkItemFile to be saved by md5 checksum (i.e. without \
uploading the data) that is not yet in COMPS. If true, when there are such files, \
return an array of UUIDs representing the md5 checksums of the missing files. If \
false, raise an error when there are any such files.
"""
if not self._is_dirty:
logger.info('WorkItem has not been altered... no point in saving it!')
return
prepped_self = WorkItem.__prep_wi(self)
estimated_wi_size = WorkItem.__estimate_workitem_size(prepped_self)
# Check if wi exceeds the request-size limit
if False and estimated_wi_size + 4096 >= WorkItem.__max_wi_batch_request_size_kb * 1024:
logger.debug('wi: {0}'.format(str(self)))
logger.debug('estimated_wi_size: {0}'.format(estimated_wi_size))
raise RuntimeError('WorkItem size exceeds single-workitem limit!')
untracked_ids = WorkItem.__save_batch([prepped_self], return_missing_files, save_semaphore)
if untracked_ids:
return untracked_ids
WorkItem._get_dirty_list().remove(self)
[docs] @classmethod
def get_save_semaphore(cls):
return cls.__save_semaphore
[docs] @staticmethod
def save_all(save_batch_callback=lambda: print('.', **default_callback_print_args), return_missing_files=False, save_semaphore=None):
"""
Batch-save all unsaved WorkItems.
WorkItems are saved in batches of at most '__max_wi_batch_count' and with a maximum request
size of '__max_wi_batch_request_size_kb'.
:param save_batch_callback: Callback to call whenever a request to save a batch of WorkItems completes. \
Default behavior is to print a single '.' to the console. If the callback supplied takes 1 argument, the \
number of WorkItems saved so far will be passed when it is called.
:param return_missing_files: A boolean that determines the behavior when any of the WorkItems \
being saved contains a WorkItemFile to be saved by md5 checksum (i.e. without uploading the data) \
that is not yet in COMPS. If true, when there are such files, return an array of UUIDs representing \
the md5 checksums of the missing files. If false, raise an error when there are any such files.
"""
if len(WorkItem._get_dirty_list()) == 0:
logger.info('No pending new work-items to batch-save!')
return
dirty_list = WorkItem._get_dirty_list()
num_wis_processed = 0
estimated_wi_size = 0
estimated_request_size = 4096 # generous overhead for HTTP headers, headers and '[' + ']' for base-entity
# multipart section, and final multipart ending "footer"
max_batch_count = min(len(dirty_list), WorkItem.__max_wi_batch_count)
prepped_wi = None
prepped_wis = []
if save_batch_callback:
num_callback_args = len(inspect.getfullargspec(save_batch_callback).args)
logger.info('Saving WorkItems')
while num_wis_processed < len(dirty_list):
wi = dirty_list[num_wis_processed]
if not wi._is_dirty:
logger.info('Skipping save for work-item {0} (already up-to-date).'.format(wi._id))
num_wis_processed += 1
continue
if not prepped_wi:
prepped_wi = WorkItem.__prep_wi(wi)
estimated_wi_size = WorkItem.__estimate_workitem_size(prepped_wi)
# add 2 because of ', ' between wis in the base-entity section
if estimated_wi_size + estimated_request_size + 2 < WorkItem.__max_wi_batch_request_size_kb * 1024:
prepped_wis.append(prepped_wi)
num_wis_processed += 1
estimated_request_size += estimated_wi_size
prepped_wi = None
estimated_wi_size = 0
# We want to try to save the batch now if 1 of the following 3 conditions is met:
# - we reached maximum batch count
# - we reached maximum batch size
# - this is the last wi
if len(prepped_wis) == max_batch_count or \
estimated_wi_size != 0 or \
num_wis_processed == len(dirty_list):
if len(prepped_wis) == 0:
# one wi already exceeds the limit. Raise an error and bail...
logger.debug('wi: {0}'.format(str(wi)))
logger.debug('estimated_wi_size: {0}'.format(estimated_wi_size))
raise RuntimeError('WorkItem size exceeds single-workitem limit!')
# ready to send this batch!
logger.debug("Ready to send single batch of {0} work-items".format(len(prepped_wis)))
untracked_ids = WorkItem.__save_batch(prepped_wis, return_missing_files, save_semaphore)
if untracked_ids:
del WorkItem._get_dirty_list()[:num_wis_processed-len(prepped_wis)]
return untracked_ids
if save_batch_callback:
if num_callback_args == 0:
save_batch_callback()
elif num_callback_args == 1:
save_batch_callback(num_wis_processed)
prepped_wis = []
estimated_request_size = 4096 # set back to initial value (w/ overhead)
del WorkItem._get_dirty_list()[:]
return
@staticmethod
def __prep_wi(wi):
if not wi._id:
tmp_wi = copy.copy(wi)
else:
tmp_wi = WorkItem.__internal_factory__(id=wi._id,
name=wi._name,
description=wi._description)
if len(wi._tmp_file_parts) > 0:
tmp_wi._files = tuple( fi[0] for fi in wi._tmp_file_parts )
save_wi = SerializableEntity.convertToDict(tmp_wi, include_hidden_props=True)
# indentval = 4 if logger.isEnabledFor(logging.DEBUG) else None
json_str = json.dumps(save_wi,
# indent=indentval,
default=lambda obj:
obj.isoformat() + '0Z' if isinstance(obj, (date, datetime))
else str(obj) if isinstance(obj, uuid.UUID)
else obj.name if isinstance(obj, Enum)
else obj)
return (wi, json_str)
@staticmethod
def __estimate_workitem_size(prepped_wi):
estimated_size = len(prepped_wi[1]) # Length contributed by this workitem in the base-entity section
for fp in filter(lambda x: x[1] is not None, prepped_wi[0]._tmp_file_parts):
estimated_size += 135 # Length of multipart headers for a file, minus the actual value for 'Content-Type'
estimated_size += len(fp[1][1][2]) # The value for 'Content-Type'
estimated_size += len(fp[1][1][1]) # Length of the data for this file
return estimated_size
@staticmethod
def __save_batch(prepped_wis, return_missing_files=False, save_semaphore=None):
if not save_semaphore:
logger.debug('No save_semaphore passed in; using process-local semaphore')
save_semaphore = WorkItem.__save_semaphore
joinstr = ', ' #', {0}'.format('\n' if logger.isEnabledFor(logging.DEBUG) else '') \
base_entity_str = joinstr.join(prepped_wi[1] for prepped_wi in prepped_wis)
files_to_send = [ ('not_a_file', ('WorkItems', '[' + base_entity_str + ']', 'application/json')) ]
files_to_send.extend(reduce(lambda x, y: x + y, [ [ fp[1] for fp in prepped_wi[0]._tmp_file_parts if fp[1] is not None ] for prepped_wi in prepped_wis ]))
with save_semaphore:
resp = Client.post('/WorkItems'
, files=files_to_send
, http_err_handle_exceptions=[400])
if resp.status_code == 400:
untracked_ids = None
try:
json_resp = resp.json()
untracked_ids = json_resp.get('UntrackedIds')
except:
pass
if untracked_ids and len(untracked_ids) > 0 and return_missing_files:
return [ uuid.UUID(x) for x in untracked_ids ]
else:
Client.raise_err_from_resp(resp)
json_resp = resp.json()
ids = json_resp.get('Ids')
if not ids or len(ids) != len(prepped_wis):
logger.debug(json_resp)
raise RuntimeError('Malformed WorkItems save response!')
for i in range(len(prepped_wis)):
wi = prepped_wis[i][0]
wi._is_dirty = False
wi._tmp_file_parts = []
if not wi._id:
wi._id = uuid.UUID(ids[i])
wi._state = WorkItemState.Created
elif wi._id != uuid.UUID(ids[i]):
raise RuntimeError('Response WorkItem Id doesn\'t match expected value!!! {0} != {1}'.format(wi._id, ids[i]))
[docs] def add_work_order(self, file_path=None, data=None):
"""
Add the WorkOrder for a WorkItem.
The contents of the WorkOrder file to add can be specified either by providing a path to the file
or by providing the actual data as a string.
:param file_path: The path to the work-order file to add.
:param data: The actual bytes of work-order data to add.
"""
fn = 'WorkOrder.json' if not file_path else os.path.basename(file_path)
self.add_file(WorkItemFile(fn, 'WorkOrder', ''), file_path, data)
[docs] def add_file(self, workitemfile, file_path=None, data=None, upload_callback=lambda: print('.', **default_callback_print_args)):
"""
Add a WorkItemFile to a WorkItem.
The contents of the file to add can be specified either by providing a path to the file
or by providing the actual data as a byte-array. Alternately, if the file/data is already in
COMPS, you can skip uploading it again and just provide a WorkItemFile that contains
the md5 checksum of the data.
If the file exceeds AssetManager.large_asset_upload_threshold bytes in size, the file will be
uploaded immediately, separately from the saving of the main WorkItem. This allows saving
of arbitrarily-large files while avoiding potential timeouts or having to start from scratch in
case the upload is interrupted by network issues.
NOTE: providing both file/data and an md5 is considered invalid, as providing the md5 implies
the caller knows the file/data is already in COMPS and doesn't need to be uploaded again.
:param workitemfile: A WorkItemFile containing the metadata for the file to add.
:param file_path: The path to the file to add.
:param data: The actual bytes of data to add.
:param upload_callback: Callback to call whenever a large file upload completes saving of a \
chunk of the file. Default behavior is to print a single '.' to the console. If the callback \
supplied takes 1 argument, the number of bytes saved so far will be passed when it is called.
"""
provided_md5 = workitemfile.md5_checksum is not None
# Check only one of these three values is provided...
if bool(provided_md5) + bool(file_path) + bool(data is not None) != 1:
raise ValueError('Invalid argument(s): must provide (only) one of workitemfile.md5_checksum, file_path, or data')
tmp_datastream = None
try:
if file_path:
tmp_datastream = open(file_path, 'rb')
elif data is not None:
if sys.version_info[0] >= 3 and isinstance(data, str):
raise ValueError('Argument \'data\' must be passed in as bytes (not a unicode string)')
tmp_datastream = io.BytesIO(data)
else:
tmp_datastream = None
if tmp_datastream is not None:
md5calc = md5()
while True:
datachunk = tmp_datastream.read(8192)
if not datachunk:
break
md5calc.update(datachunk)
md5_checksum_str = md5calc.hexdigest()
workitemfile._md5_checksum = uuid.UUID(md5_checksum_str)
datasize = tmp_datastream.seek(0, os.SEEK_END)
tmp_datastream.seek(0)
if datasize > AssetManager.large_asset_upload_threshold:
AssetManager.upload_large_asset(workitemfile._md5_checksum, tmp_datastream, upload_callback)
provided_md5 = True # we've uploaded it, no need to do so as part of the main entity save
logger.debug('md5 checksum for file {0} is {1}'.format(workitemfile.file_name, str(workitemfile.md5_checksum)))
self._files += (workitemfile,)
if not provided_md5:
tmp_file_tuple = (str(workitemfile.md5_checksum), (workitemfile.file_name, tmp_datastream.read(), AssetFile.get_media_type_from_filename(workitemfile.file_name)))
self._tmp_file_parts.append((workitemfile, tmp_file_tuple))
else:
self._tmp_file_parts.append((workitemfile, None))
finally:
if tmp_datastream:
tmp_datastream.close()
self._register_change()
[docs] def retrieve_output_files(self, paths, as_zip=False):
"""
Retrieve output files associated with this WorkItem.
This essentially combines the functionality of retrieve_output_file_info() and
retrieve_output_filess_from_info(), and can be used if user doesn't care about
specific metadata related to the files being retrieved.
:param paths: Partial paths (relative to the working directory) of the output files to retrieve. If \
'as_zip' is true, this can be None/empty or not specified, and all output files will be included in \
the zip returned.
:param as_zip: A boolean controlling whether the output files are returned individually or as \
a single zip-file (useful for attaching to an e-mail, etc).
:return: If 'as_zip' is true, returns a single byte-array of a zip-file; otherwise, returns a \
list of byte-arrays of the output files retrieved, in the same order as the 'paths' parameter.
"""
if (paths is None or len(paths) == 0) and not as_zip:
raise RuntimeError('Can\'t specify empty/None \'paths\' argument unless \'as_zip\' is True.')
metadata = self.retrieve_output_file_info(paths)
byte_arrs = self.retrieve_output_files_from_info(metadata, as_zip)
return byte_arrs
[docs] def retrieve_output_file_info(self, paths):
"""
Retrieve OutputFileMetadata about output files associated with this WorkItem.
:param paths: Partial paths (relative to the working directory) of the output files to retrieve. If \
None/empty or not specified, will default to return all output files.
:return: A list of OutputFileMetadata objects for the output files to retrieve, in the same order \
as the 'paths' parameter.
"""
return AssetManager.retrieve_output_file_info(entity_type=EntityType.WorkItems,
entity_id=self._id,
paths=paths)
[docs] def retrieve_output_files_from_info(self, metadata, as_zip=False):
"""
Actually retrieve the output files associated with this WorkItem.
:param metadata: A list of OutputFileMetadata objects representing the output files to retrieve \
associated with this WorkItem.
:param as_zip: A boolean controlling whether the output files are returned individually or as \
a single zip-file (useful for attaching to an e-mail, etc).
:return: If 'as_zip' is true, returns a single byte-array of a zip-file; otherwise, returns a \
list of byte-arrays of the output files retrieved, in the same order as the 'paths' parameter.
"""
return AssetManager.retrieve_output_files_from_info(entity_type=EntityType.WorkItems,
entity_id=self._id,
metadata=metadata,
as_zip=as_zip)
[docs] @staticmethod
def static_retrieve_output_files(workitem_id, paths, as_zip=False):
wi = WorkItem.__internal_factory__(id=workitem_id)
return wi.retrieve_output_files(paths, as_zip)
def _register_change(self):
if not self._is_dirty:
self._is_dirty = True
WorkItem._get_dirty_list().append(self)
@staticmethod
def _get_dirty_list():
dl = getattr(WorkItem.__tls, 'dirty_list', None)
if not dl:
WorkItem.__tls.dirty_list = []
return WorkItem.__tls.dirty_list
WorkerOrPluginKey = namedtuple('WorkerOrPluginKey', ['name', 'version'])
[docs]class WorkItemState(Enum):
"""
An enumeration representing the current state of a WorkItem
"""
Created = 0 # WorkItem has been saved to the database
CommissionRequested = 5 # WorkItem is ready to be processed by the next available worker of the correct type
Commissioned = 10 # WorkItem has been commissioned to a worker of the correct type and is beginning execution
Validating = 30 # WorkItem is being validated
Running = 40 # WorkItem is currently running
Waiting = 50 # WorkItem is waiting for dependent items to complete
ResumeRequested = 60 # Dependent items have completed and WorkItem is ready to be processed by the next available worker of the correct type
CancelRequested = 80 # WorkItem cancellation was requested
Canceled = 90 # WorkItem was successfully canceled
Resumed = 100 # WorkItem has been claimed by a worker of the correct type and is resuming
Canceling = 120 # WorkItem is in the process of being canceled by the worker
Succeeded = 130 # WorkItem completed successfully
Failed = 140 # WorkItem failed
[docs]class RelationType(Enum):
"""
An enumeration representing the type of relationship for related entities
"""
DependsOn = 0
Created = 1