Source code for idmtools.entities.simulation

"""
Defines our Simulation object.

The simulation object can be thought as a metadata object. It represents a configuration of a remote job execution metadata.
All simulations have a task. Optionally that have assets. All simulations should belong to an Experiment.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from dataclasses import dataclass, field, fields
from logging import getLogger, DEBUG
from typing import List, Union, Mapping, Any, Type, TypeVar, Dict, TYPE_CHECKING
from idmtools.assets import AssetCollection, Asset
from idmtools.core import ItemType, NoTaskFound
from idmtools.core.enums import EntityStatus
from idmtools.core.interfaces.iassets_enabled import IAssetsEnabled
from idmtools.core.interfaces.iitem import IItem
from idmtools.core.interfaces.inamed_entity import INamedEntity
from idmtools.entities.task_proxy import TaskProxy
from idmtools.utils.language import get_qualified_class_name_from_obj

if TYPE_CHECKING:  # pragma: no cover
    from idmtools.entities.itask import ITask
    from idmtools.entities.iplatform import IPlatform
    from idmtools.entities.experiment import Experiment

logger = getLogger(__name__)
user_logger = getLogger('user')


[docs]@dataclass class Simulation(IAssetsEnabled, INamedEntity): """ Class that represents a generic simulation. This class needs to be implemented for each model type with specifics. """ #: Task representing the configuration of the command to be executed task: 'ITask' = field(default=None) # noqa: F821 #: Item Type. Should not be changed from Simulation item_type: ItemType = field(default=ItemType.SIMULATION, compare=False) #: Control whether we should replace the task with a proxy after creation __replace_task_with_proxy: bool = field(default=True, init=False, compare=False) #: Ensure we don't gather assets twice __assets_gathered: bool = field(default=False) #: Extra arguments to pass on creation to platform _platform_kwargs: dict = field(default_factory=dict) @property def experiment(self) -> 'Experiment': # noqa: F821 """ Get experiment parent. Returns: Parent Experiment """ return self.parent @experiment.setter def experiment(self, experiment: 'Experiment'): # noqa: F821 """ Set the parent experiment. Args: experiment: Experiment to set as parent. Returns: None """ self.parent = experiment def __repr__(self): """ String representation of simulation. """ return f"<Simulation: {self.uid} - Exp_id: {self.parent_id}>" def __hash__(self): """ Hash of simulation(id). Returns: Hash of simulation """ return id(self.uid)
[docs] def pre_creation(self, platform: 'IPlatform'): """ Runs before a simulation is created server side. Args: platform: Platform the item is being executed on Returns: None """ # skip the IItem created function IItem.pre_creation(self, platform) if self.task is None: msg = 'Task is required for simulations' user_logger.error(msg) raise NoTaskFound(msg) if logger.isEnabledFor(DEBUG): logger.debug('Calling task pre creation') self.task.pre_creation(self, platform) self.gather_assets() if self.__class__ is not Simulation: # Add a tag to keep the Simulation class name sn = get_qualified_class_name_from_obj(self) if logger.isEnabledFor(DEBUG): logger.debug(f'Setting Simulation Tag "simulation_type" to "{sn}"') self.tags["simulation_type"] = sn # Add a tag to for task if self.task is not None: tn = get_qualified_class_name_from_obj(self.task) if logger.isEnabledFor(DEBUG): logger.debug(f'Setting Simulation Tag "task_type" to "{tn}"') self.tags["task_type"] = tn
[docs] def post_creation(self, platform: 'IPlatform') -> None: """ Called after a simulation is created. Args: platform: Platform simulation is being executed on Returns: None """ if logger.isEnabledFor(DEBUG): logger.debug('Calling task post creation') if self.task is not None and not isinstance(self.task, TaskProxy): self.task.post_creation(self, self.platform) IItem.post_creation(self, platform) if self.__replace_task_with_proxy or (self.parent and self.parent._Experiment__replace_task_with_proxy): if logger.isEnabledFor(DEBUG): logger.debug('Replacing task with proxy') self.task = TaskProxy.from_task(self.task) if self.status is None: self.status = EntityStatus.CREATED
[docs] def pre_getstate(self): """ Return default values for :meth:`pickle_ignore_fields`. Call before pickling. """ from idmtools.assets import AssetCollection from idmtools.core.interfaces.entity_container import EntityContainer return {"assets": AssetCollection(), "simulations": EntityContainer()}
[docs] def gather_assets(self): """ Gather all the assets for the simulation. """ if not self.__assets_gathered: self.task.gather_transient_assets() self.assets.add_assets(self.task.transient_assets, fail_on_duplicate=False) self.__assets_gathered = True
[docs] @classmethod def from_task(cls, task: 'ITask', tags: Dict[str, Any] = None, # noqa E821 asset_collection: AssetCollection = None): """ Create a simulation from a task. Args: task: Task to create from tags: Tags to create on the simulation asset_collection: Simulation Assets Returns: Simulation using the parameters provided """ return Simulation(task=task, tags=dict() if tags is None else tags, assets=asset_collection if asset_collection else AssetCollection())
[docs] def list_static_assets(self, platform: 'IPlatform' = None, **kwargs) -> List[Asset]: """ List assets that have been uploaded to a server already. Args: platform: Optional platform to load assets list from **kwargs: Returns: List of assets Raises: ValueError - If you try to list an assets for an simulation that hasn't been created/loaded from a remote platform. """ if self.id is None: raise ValueError("You can only list static assets on an existing experiment") p = super()._check_for_platform_from_context(platform) return p._simulations.list_assets(self, **kwargs)
[docs] def to_dict(self) -> Dict: """ Do a lightweight conversation to json. Returns: Dict representing json of object """ result = dict() for f in fields(self): if not f.name.startswith("_") and f.name not in ['parent']: result[f.name] = getattr(self, f.name) result['_uid'] = self.uid result['task'] = self.task.to_dict() if self.task else None return result
# TODO Rename to T simulation once old simulation is one TTSimulation = TypeVar("TTSimulation", bound=Simulation) TTSimulationClass = Type[TTSimulation] TTSimulationBatch = List[TTSimulation] TTAllSimulationData = Mapping[TTSimulation, Any] TTSimulationList = List[Union[TTSimulation, str]]