from __future__ import print_function
import os
import io
import sys
import json
from datetime import date, datetime
import logging
from enum import Enum
import uuid
from hashlib import md5
import threading
import multiprocessing
import copy
import inspect
from functools import reduce
from COMPS import Client, default_callback_print_args
from COMPS.Data import Configuration, SimulationFile, HpcJob, AssetFile, AssetManager
from COMPS.Data.SerializableEntity import SerializableEntity, json_property, json_entity, parse_ISO8601_date, convert_if_string
from COMPS.Data.AssetManager import EntityType
from COMPS.Data.RelatableEntity import RelatableEntity
from COMPS.Data.TaggableEntity import TaggableEntity
from COMPS.Data.CommissionableEntity import CommissionableEntity
logger = logging.getLogger(__name__)
[docs]@json_entity()
class Simulation(TaggableEntity, CommissionableEntity, RelatableEntity, SerializableEntity):
"""
Represents a single simulation run.
Contains various basic properties accessible by getters (and, in some cases, +setters):
* id
* +experiment_id
* +name
* +description
* owner
* date_created
* last_modified
* state
* error_message
Also contains "child objects" (which must be specifically requested for retrieval using the
QueryCriteria.select_children() method of QueryCriteria):
* tags
* configuration
* files
* hpc_jobs
"""
__max_sim_batch_count = 100
__max_sim_batch_request_size_kb = 38912 # 38 MiB
__max_entity_retrieval_count = 100000
try:
__save_semaphore = multiprocessing.Semaphore(4) # no. of concurrent threads that can save sims
except (ModuleNotFoundError, ImportError):
logger.warning('Unable to create process-local semaphore; proceeding, but without simulation save constraints!')
import dummy_threading
__save_semaphore = dummy_threading.Semaphore()
__tls = threading.local()
def __init__(self, name, experiment_id=None, description=None, configuration=None):
if not name:
raise RuntimeError('Simulation has not been initialized properly; non-null name required.')
self._id = None
self._name = name
self._experiment_id = experiment_id
self._description = description
self._owner = Client.auth_manager().username
self._date_created = None
self._last_modified = None
self._state = None
self._error_message = None
self._tags = None
self._configuration = configuration
self._files = ()
self._hpc_jobs = None
self._is_dirty = None # these will be set in _register_change() below
self._is_config_dirty = None
self._tmp_file_parts = []
self._register_change(config_changed=(configuration is not None))
@classmethod
def __internal_factory__(cls, id=None, name=None, experiment_id=None, description=None, owner=None,
date_created=None, last_modified=None, state=None, error_message=None,
tags=None, configuration=None, files=None, hpc_jobs=None):
sim = cls.__new__(cls)
sim._id = convert_if_string(id, uuid.UUID)
sim._name = name
sim._experiment_id = convert_if_string(experiment_id, uuid.UUID)
sim._description = description
sim._owner = owner
sim._date_created = convert_if_string(date_created, parse_ISO8601_date)
sim._last_modified = convert_if_string(last_modified, parse_ISO8601_date)
sim._state = convert_if_string(state, lambda x: SimulationState[x])
sim._error_message = error_message
sim._tags = tags
if configuration:
if isinstance(configuration, Configuration):
sim._configuration = configuration
else:
config_json = Configuration.rest2py(configuration)
sim._configuration = Configuration(**config_json)
else:
sim._configuration = None
if files:
sim._files = tuple(SimulationFile.__internal_factory__(**(SimulationFile.rest2py(f))) for f in files)
# sim._files = [ SimulationFile.__internal_factory__(**(SimulationFile.rest2py(f))) for f in files ]
else:
sim._files = None
if hpc_jobs:
sim._hpc_jobs = tuple(HpcJob.__internal_factory__(**(HpcJob.rest2py(j))) for j in hpc_jobs)
# sim._hpc_jobs = [ HpcJob.__internal_factory__(**(HpcJob.rest2py(j))) for j in hpc_jobs ]
else:
sim._hpc_jobs = None
sim._is_dirty = False
sim._is_config_dirty = False
sim._tmp_file_parts = []
return sim
@json_property()
def id(self):
return self._id
@json_property()
def experiment_id(self):
return self._experiment_id
@experiment_id.setter
def experiment_id(self, experiment_id):
self._experiment_id = experiment_id
self._register_change()
@json_property()
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
self._register_change()
@json_property()
def description(self):
return self._description
@description.setter
def description(self, description):
self._description = description
self._register_change()
@json_property()
def owner(self):
return self._owner
@json_property()
def date_created(self):
return self._date_created
@json_property()
def last_modified(self):
return self._last_modified
@json_property('SimulationState')
def state(self):
return self._state
@json_property()
def error_message(self):
return self._error_message
@json_property()
def tags(self):
return self._tags # todo: immutable dict?
@json_property()
def configuration(self):
return self._configuration
@configuration.setter
def configuration(self, configuration):
self._configuration = configuration
self._register_change(config_changed=True)
@json_property()
def files(self):
return self._files
@json_property('HPCJobs')
def hpc_jobs(self):
return self._hpc_jobs
########################
[docs] @classmethod
def get(cls, id=None, query_criteria=None):
"""
Retrieve one or more Simulations.
:param id: The id (str or UUID) of the Simulation to retrieve
:param query_criteria: A QueryCriteria object specifying basic property filters and tag-filters \
to apply to the set of Simulations returned, as well as which properties and child-objects to \
fill for the returned Simulations
:return: A Simulation or list of Simulations (depending on whether 'id' was specified) with \
basic properties and child-objects assigned as specified by 'query_criteria'
"""
if id and not isinstance(id, uuid.UUID):
try:
id = uuid.UUID(id)
except ValueError:
raise ValueError('Invalid id: {0}'.format(id))
qc_params = query_criteria.to_param_dict(Simulation) if query_criteria else {}
user_handling_paging = any(f in qc_params for f in ['count', 'offset'])
qc_params['count'] = min(Simulation.__max_entity_retrieval_count, qc_params.get('count', Simulation.__max_entity_retrieval_count))
path = '/Simulations{0}'.format('/' + str(id) if id else '')
resp = Client.get(path
, params = qc_params)
cr = resp.headers.get('Content-Range')
# If we got a Content-Range header in the response (meaning we didn't get the entire dataset back) and the user
# isn't handling paging (as inferred by not having a 'count' or 'offset' argument), double-check to see if
# we got the whole data-set (presumably not) and raise an error so the user knows.
if cr and not user_handling_paging:
try:
toks = cr.replace('-','/').split('/')
# from_val = int(toks[0])
# to_val = int(toks[1])
total_val = int(toks[2])
if total_val > Simulation.__max_entity_retrieval_count:
raise RuntimeError('Unable to retrieve entire data-set (try paging); the maximum simulations currently retrievable is ' +
str(Simulation.__max_entity_retrieval_count))
except (IndexError, ValueError) as e:
logger.debug(e.message)
raise RuntimeError('Invalid Content-Range response header: ' + str(cr))
json_resp = resp.json()
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug('Simulation Response:')
# logger.debug(json.dumps(json_resp, indent=4))
if 'Simulations' not in json_resp or \
( id is not None and len(json_resp['Simulations']) != 1 ):
logger.debug(json_resp)
raise RuntimeError('Malformed Simulations retrieve response!')
sims = []
for sim_json in json_resp['Simulations']:
sim_json = cls.rest2py(sim_json)
# if logger.isEnabledFor(logging.DEBUG):
# logger.debug('Simulation:')
# logger.debug(json.dumps(sim_json, indent=4))
sim = Simulation.__internal_factory__(**sim_json)
sims.append(sim)
if id is not None:
return sims[0]
else:
return sims
[docs] def refresh(self, query_criteria=None):
"""
Update properties of an existing Simulation from the server.
:param query_criteria: A QueryCriteria object specifying which properties and child-objects \
to refresh on the Simulation
"""
if not self._id:
raise RuntimeError('Can\'t refresh a Simulation that hasn\'t been saved!')
sim = self.get(id=self.id, query_criteria=query_criteria)
# if sim.id: self._id = sim.id
if sim.name is not None: self._name = sim.name
if sim.experiment_id is not None: self._experiment_id = sim.experiment_id
if sim.description is not None: self._description = sim.description
if sim.owner is not None: self._owner = sim.owner
if sim.date_created is not None: self._date_created = sim.date_created
if sim.last_modified is not None: self._last_modified = sim.last_modified
if sim.state is not None: self._state = sim.state
if sim.error_message is not None: self._error_message = sim.error_message
if sim.tags is not None: self._tags = sim.tags
if sim.configuration is not None: self._configuration = sim.configuration
if sim.files is not None: self._files = sim.files
if sim.hpc_jobs is not None: self._hpc_jobs = sim.hpc_jobs
[docs] def save(self, return_missing_files=False, save_semaphore=None):
"""
Save a single Simulation. If it's a new Simulation, an id is automatically assigned.
:param return_missing_files: A boolean that determines the behavior when the Simulation \
being saved contains a SimulationFile to be saved by md5 checksum (i.e. without \
uploading the data) that is not yet in COMPS. If true, when there are such files, \
return an array of UUIDs representing the md5 checksums of the missing files. If \
false, raise an error when there are any such files.
"""
if not self._is_dirty:
logger.info('Simulation has not been altered... no point in saving it!')
return
prepped_self = Simulation.__prep_sim(self)
estimated_sim_size = Simulation.__estimate_simulation_size(prepped_self)
# Check if sim exceeds the request-size limit
if False and estimated_sim_size + 4096 >= Simulation.__max_sim_batch_request_size_kb * 1024:
logger.debug('sim: {0}'.format(str(self)))
logger.debug('estimated_sim_size: {0}'.format(estimated_sim_size))
raise RuntimeError('Simulation size exceeds single-sim limit!')
untracked_ids = Simulation.__save_batch([prepped_self], return_missing_files, save_semaphore)
if untracked_ids:
return untracked_ids
Simulation._get_dirty_list().remove(self)
[docs] @classmethod
def get_save_semaphore(cls):
return cls.__save_semaphore
[docs] @staticmethod
def save_all(save_batch_callback=lambda: print('.', **default_callback_print_args), return_missing_files=False, save_semaphore=None):
"""
Batch-save all unsaved Simulations.
Simulations are saved in batches of at most '__max_sim_batch_count' and with a maximum request
size of '__max_sim_batch_request_size_kb'.
:param save_batch_callback: Callback to call whenever a request to save a batch of Simulations completes. \
Default behavior is to print a single '.' to the console. If the callback supplied takes 1 argument, the \
number of Simulations saved so far will be passed when it is called.
:param return_missing_files: A boolean that determines the behavior when any of the Simulations \
being saved contains a SimulationFile to be saved by md5 checksum (i.e. without uploading the data) \
that is not yet in COMPS. If true, when there are such files, return an array of UUIDs representing \
the md5 checksums of the missing files. If false, raise an error when there are any such files.
"""
if len(Simulation._get_dirty_list()) == 0:
logger.info('No pending new simulations to batch-save!')
return
dirty_list = Simulation._get_dirty_list()
num_sims_processed = 0
estimated_sim_size = 0
estimated_request_size = 4096 # generous overhead for HTTP headers, headers and '[' + ']' for base-entity
# multipart section, and final multipart ending "footer"
max_batch_count = min(len(dirty_list), Simulation.__max_sim_batch_count)
prepped_sim = None
prepped_sims = []
if save_batch_callback:
num_callback_args = len(inspect.getfullargspec(save_batch_callback).args)
logger.info('Saving simulations')
while num_sims_processed < len(dirty_list):
sim = dirty_list[num_sims_processed]
if not sim._is_dirty:
logger.info('Skipping save for simulation {0} (already up-to-date).'.format(sim._id))
num_sims_processed += 1
continue
if not prepped_sim:
prepped_sim = Simulation.__prep_sim(sim)
estimated_sim_size = Simulation.__estimate_simulation_size(prepped_sim)
# add 2 because of ', ' between sims in the base-entity section
if estimated_sim_size + estimated_request_size + 2 < Simulation.__max_sim_batch_request_size_kb * 1024:
prepped_sims.append(prepped_sim)
num_sims_processed += 1
estimated_request_size += estimated_sim_size
prepped_sim = None
estimated_sim_size = 0
# We want to try to save the batch now if 1 of the following 3 conditions is met:
# - we reached maximum batch count
# - we reached maximum batch size
# - this is the last sim
if len(prepped_sims) == max_batch_count or \
estimated_sim_size != 0 or \
num_sims_processed == len(dirty_list):
if len(prepped_sims) == 0:
# one sim already exceeds the limit. Raise an error and bail...
logger.debug('sim: {0}'.format(str(sim)))
logger.debug('estimated_sim_size: {0}'.format(estimated_sim_size))
raise RuntimeError('Simulation size exceeds single-sim limit!')
# ready to send this batch!
logger.debug("Ready to send single batch of {0} sims".format(len(prepped_sims)))
untracked_ids = Simulation.__save_batch(prepped_sims, return_missing_files, save_semaphore)
if untracked_ids:
del Simulation._get_dirty_list()[:num_sims_processed-len(prepped_sims)]
return untracked_ids
if save_batch_callback:
if num_callback_args == 0:
save_batch_callback()
elif num_callback_args == 1:
save_batch_callback(num_sims_processed)
prepped_sims = []
estimated_request_size = 4096 # set back to initial value (w/ overhead)
del Simulation._get_dirty_list()[:]
return
@staticmethod
def __prep_sim(sim):
if not sim._id:
tmp_sim = copy.copy(sim)
else:
tmp_sim = Simulation.__internal_factory__(id=sim._id,
name=sim._name,
experiment_id=sim._experiment_id,
description=sim._description,
configuration=sim._configuration if sim._is_config_dirty else None)
if len(sim._tmp_file_parts) > 0:
tmp_sim._files = tuple( fi[0] for fi in sim._tmp_file_parts )
save_sim = SerializableEntity.convertToDict(tmp_sim, include_hidden_props=True)
# indentval = 4 if logger.isEnabledFor(logging.DEBUG) else None
json_str = json.dumps(save_sim,
# indent=indentval,
default=lambda obj:
obj.isoformat() + '0Z' if isinstance(obj, (date, datetime))
else str(obj) if isinstance(obj, uuid.UUID)
else obj.name if isinstance(obj, Enum)
else obj)
return (sim, json_str)
@staticmethod
def __estimate_simulation_size(prepped_sim):
estimated_size = len(prepped_sim[1]) # Length contributed by this sim in the base-entity section
for fp in filter(lambda x: x[1] is not None, prepped_sim[0]._tmp_file_parts):
estimated_size += 135 # Length of multipart headers for a file, minus the actual value for 'Content-Type'
estimated_size += len(fp[1][1][2]) # The value for 'Content-Type'
estimated_size += len(fp[1][1][1]) # Length of the data for this file
return estimated_size
@staticmethod
def __save_batch(prepped_sims, return_missing_files=False, save_semaphore=None):
if not save_semaphore:
logger.debug('No save_semaphore passed in; using process-local semaphore')
save_semaphore = Simulation.__save_semaphore
joinstr = ', ' #', {0}'.format('\n' if logger.isEnabledFor(logging.DEBUG) else '') \
base_entity_str = joinstr.join(prepped_sim[1] for prepped_sim in prepped_sims)
files_to_send = [ ('not_a_file', ('Simulations', '[' + base_entity_str + ']', 'application/json')) ]
files_to_send.extend(reduce(lambda x, y: x + y, [ [ fp[1] for fp in prepped_sim[0]._tmp_file_parts if fp[1] is not None ] for prepped_sim in prepped_sims ]))
with save_semaphore:
resp = Client.post('/Simulations'
, files=files_to_send
, http_err_handle_exceptions=[400])
if resp.status_code == 400:
untracked_ids = None
try:
json_resp = resp.json()
untracked_ids = json_resp.get('UntrackedIds')
except:
pass
if untracked_ids and len(untracked_ids) > 0 and return_missing_files:
return [ uuid.UUID(x) for x in untracked_ids ]
else:
Client.raise_err_from_resp(resp)
json_resp = resp.json()
ids = json_resp.get('Ids')
if not ids or len(ids) != len(prepped_sims):
logger.debug(json_resp)
raise RuntimeError('Malformed Simulations save response!')
for i in range(len(prepped_sims)):
sim = prepped_sims[i][0]
sim._is_dirty = False
sim._is_config_dirty = False
sim._tmp_file_parts = []
if not sim._id:
sim._id = uuid.UUID(ids[i])
sim._state = SimulationState.Created
elif sim._id != uuid.UUID(ids[i]):
raise RuntimeError('Response Simulation Id doesn\'t match expected value!!! {0} != {1}'.format(sim._id, ids[i]))
[docs] def add_file(self, simulationfile, file_path=None, data=None, upload_callback=lambda: print('.', **default_callback_print_args)):
"""
Add a SimulationFile to a Simulation.
The contents of the file to add can be specified either by providing a path to the file
or by providing the actual data as a byte-array. Alternately, if the file/data is already in
COMPS, you can skip uploading it again and just provide a SimulationFile that contains
the md5 checksum of the data.
If the file exceeds AssetManager.large_asset_upload_threshold bytes in size, the file will be
uploaded immediately, separately from the saving of the main Simulation. This allows saving
of arbitrarily-large files while avoiding potential timeouts or having to start from scratch in
case the upload is interrupted by network issues.
NOTE: providing both file/data and an md5 is considered invalid, as providing the md5 implies
the caller knows the file/data is already in COMPS and doesn't need to be uploaded again.
:param simulationfile: A SimulationFile containing the metadata for the file to add.
:param file_path: The path to the file to add.
:param data: The actual bytes of data to add.
:param upload_callback: Callback to call whenever a large file upload completes saving of a \
chunk of the file. Default behavior is to print a single '.' to the console. If the callback \
supplied takes 1 argument, the number of bytes saved so far will be passed when it is called.
"""
provided_md5 = simulationfile.md5_checksum is not None
# Check only one of these three values is provided...
if bool(provided_md5) + bool(file_path) + bool(data is not None) != 1:
raise ValueError('Invalid argument(s): must provide (only) one of simulationfile.md5_checksum, file_path, or data')
tmp_datastream = None
try:
if file_path:
tmp_datastream = open(file_path, 'rb')
elif data is not None:
if sys.version_info[0] >= 3 and isinstance(data, str):
raise ValueError('Argument \'data\' must be passed in as bytes (not a unicode string)')
tmp_datastream = io.BytesIO(data)
else:
tmp_datastream = None
if tmp_datastream is not None:
md5calc = md5()
while True:
datachunk = tmp_datastream.read(8192)
if not datachunk:
break
md5calc.update(datachunk)
md5_checksum_str = md5calc.hexdigest()
simulationfile._md5_checksum = uuid.UUID(md5_checksum_str)
datasize = tmp_datastream.seek(0, os.SEEK_END)
tmp_datastream.seek(0)
if datasize > AssetManager.large_asset_upload_threshold:
AssetManager.upload_large_asset(simulationfile._md5_checksum, tmp_datastream, upload_callback)
provided_md5 = True # we've uploaded it, no need to do so as part of the main entity save
logger.debug('md5 checksum for file {0} is {1}'.format(simulationfile.file_name, str(simulationfile.md5_checksum)))
self._files += (simulationfile,)
if not provided_md5:
tmp_file_tuple = (str(simulationfile.md5_checksum), (simulationfile.file_name, tmp_datastream.read(), AssetFile.get_media_type_from_filename(simulationfile.file_name)))
self._tmp_file_parts.append((simulationfile, tmp_file_tuple))
else:
self._tmp_file_parts.append((simulationfile, None))
finally:
if tmp_datastream:
tmp_datastream.close()
self._register_change()
[docs] def retrieve_output_files(self, paths, job=None, as_zip=False):
"""
Retrieve output files associated with this Simulation.
This essentially combines the functionality of retrieve_output_file_info() and
retrieve_output_filess_from_info(), and can be used if user doesn't care about
specific metadata related to the files being retrieved.
:param paths: Partial paths (relative to the working directory) of the output files to retrieve. If \
'as_zip' is true, this can be None/empty or not specified, and all output files will be included in \
the zip returned.
:param job: The HpcJob associated with the given Simulation to retrieve assets for. If not \
specified, will default to the last HpcJob chronologically.
:param as_zip: A boolean controlling whether the output files are returned individually or as \
a single zip-file (useful for attaching to an e-mail, etc).
:return: If 'as_zip' is true, returns a single byte-array of a zip-file; otherwise, returns a \
list of byte-arrays of the output files retrieved, in the same order as the 'paths' parameter.
"""
if (paths is None or len(paths) == 0) and not as_zip:
raise RuntimeError('Can\'t specify empty/None \'paths\' argument unless \'as_zip\' is True.')
metadata = self.retrieve_output_file_info(paths, job)
byte_arrs = self.retrieve_output_files_from_info(metadata, job, as_zip)
return byte_arrs
[docs] def retrieve_output_file_info(self, paths, job=None):
"""
Retrieve OutputFileMetadata about output files associated with this Simulation.
:param paths: Partial paths (relative to the working directory) of the output files to retrieve. If \
None/empty or not specified, will default to return all output files.
:param job: The HpcJob associated with the given Simulation to retrieve output files for. If not \
specified, will default to the last HpcJob chronologically.
:return: A list of OutputFileMetadata objects for the output files to retrieve, in the same order \
as the 'paths' parameter.
"""
return AssetManager.retrieve_output_file_info(entity_type=EntityType.Simulations,
entity_id=self._id,
paths=paths,
job=job)
[docs] def retrieve_output_files_from_info(self, metadata, job=None, as_zip=False):
"""
Actually retrieve the output files associated with this Simulation.
:param metadata: A list of OutputFileMetadata objects representing the output files to retrieve \
associated with this Simulation.
:param job: The HpcJob associated with the given Simulation to retrieve output files for. This \
should match the 'job' provided to the retrieve_output_file_info() call. If not specified, will \
default to the last HpcJob chronologically.
:param as_zip: A boolean controlling whether the output files are returned individually or as \
a single zip-file (useful for attaching to an e-mail, etc).
:return: If 'as_zip' is true, returns a single byte-array of a zip-file; otherwise, returns a \
list of byte-arrays of the output files retrieved, in the same order as the 'paths' parameter.
"""
return AssetManager.retrieve_output_files_from_info(entity_type=EntityType.Simulations,
entity_id=self._id,
metadata=metadata,
job=job,
as_zip=as_zip)
[docs] @staticmethod
def static_retrieve_output_files(sim_id, paths, job=None, as_zip=False):
s = Simulation.__internal_factory__(id=sim_id)
return s.retrieve_output_files(paths, job, as_zip)
def _register_change(self, config_changed=False):
if not self._is_dirty:
self._is_dirty = True
Simulation._get_dirty_list().append(self)
if config_changed and not self._is_config_dirty:
self._is_config_dirty = True
@staticmethod
def _get_dirty_list():
dl = getattr(Simulation.__tls, 'dirty_list', None)
if not dl:
Simulation.__tls.dirty_list = []
return Simulation.__tls.dirty_list
[docs]class SimulationState(Enum):
"""
An enumeration representing the current state of a Simulation
"""
Created = 0 # Simulation has been saved to the database
CommissionRequested = 1 # Simulation is ready to be processed by the job-service
Provisioning = 2 # Simulation is being provisioned by the job-service (creating the working-directory, copying input files, etc)
Commissioned = 3 # Simulation has been commissioned to be run and is awaiting processing resources
Running = 4 # Simulation is currently running
Retry = 5 # Simulation failed and is going to be retried by the job-service
Succeeded = 6 # Simulation completed successfully
Failed = 7 # Simulation failed and will not go through any (more) retries
CancelRequested = 8 # Simulation cancellation was requested
Canceled = 9 # Simulation was successfully canceled