"""
Our Experiment class definition.
Experiments can be thought of as a metadata object analogous to a folder on a filesystem. An experiment is a container that
contains one or more simulations. Before creations, *experiment.simulations* can be either a list of a TemplatedSimulations.
TemplatedSimulations are useful for building large numbers of similar simulations such as sweeps.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import copy
from dataclasses import dataclass, field, InitVar, fields
from logging import getLogger, DEBUG
from types import GeneratorType
from typing import NoReturn, Set, Union, Iterator, Type, Dict, Any, List, TYPE_CHECKING, Generator
from idmtools import IdmConfigParser
from idmtools.assets import AssetCollection, Asset
from idmtools.builders import SimulationBuilder
from idmtools.core import ItemType, EntityStatus
from idmtools.core.interfaces.entity_container import EntityContainer
from idmtools.core.interfaces.iassets_enabled import IAssetsEnabled
from idmtools.core.interfaces.iitem import IItem
from idmtools.core.interfaces.inamed_entity import INamedEntity
from idmtools.core.interfaces.irunnable_entity import IRunnableEntity
from idmtools.core.logging import SUCCESS, NOTICE
from idmtools.entities.itask import ITask
from idmtools.core.interfaces.ientity import IEntity
from idmtools.entities.platform_requirements import PlatformRequirements
from idmtools.entities.templated_simulation import TemplatedSimulations
from idmtools.registry.experiment_specification import ExperimentPluginSpecification, get_model_impl, \
get_model_type_impl
from idmtools.registry.plugin_specification import get_description_impl
from idmtools.utils.caller import get_caller
from idmtools.utils.collections import ExperimentParentIterator
from idmtools.utils.entities import get_default_tags
if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
from idmtools.entities.simulation import Simulation # noqa: F401
logger = getLogger(__name__)
user_logger = getLogger('user')
SUPPORTED_SIM_TYPE = Union[
EntityContainer,
Generator['Simulation', None, None],
TemplatedSimulations,
Iterator['Simulation']
]
[docs]@dataclass(repr=False)
class Experiment(IAssetsEnabled, INamedEntity, IRunnableEntity):
"""
Class that represents a generic experiment.
This class needs to be implemented for each model type with specifics.
Args:
name: The experiment name.
assets: The asset collection for assets global to this experiment.
"""
#: Suite ID
suite_id: str = field(default=None)
#: Item Item(always an experiment)
item_type: ItemType = field(default=ItemType.EXPERIMENT, compare=False, init=False)
#: Task Type(defaults to command)
task_type: str = field(default='idmtools.entities.command_task.CommandTask')
#: List of Requirements for the task that a platform must meet to be able to run
platform_requirements: Set[PlatformRequirements] = field(default_factory=set)
#: Is the Experiment Frozen
frozen: bool = field(default=False, init=False)
#: Simulation in this experiment
simulations: InitVar[SUPPORTED_SIM_TYPE] = None
#: Internal storage of simulation
__simulations: Union[SUPPORTED_SIM_TYPE] = field(default_factory=lambda: EntityContainer(), compare=False)
#: Determines if we should gather assets from the first task. Only use when not using TemplatedSimulations
gather_common_assets_from_task: bool = field(default=None, compare=False)
#: Determines if we should gather assets from the first task. Only use when not using TemplatedSimulations
disable_default_pre_create: bool = field(default=False, compare=False)
#: Enable replacing the task with a proxy to reduce the memory footprint. Useful in provisioning large sets of
# simulations
__replace_task_with_proxy: bool = field(default=True, init=False, compare=False)
def __post_init__(self, simulations):
"""
Initialize Experiment.
Args:
simulations: Simulations to initialize with
Returns:
None
"""
super().__post_init__()
if simulations is not None and not isinstance(simulations, property):
self.simulations = simulations
if self.gather_common_assets_from_task is None:
self.gather_common_assets_from_task = isinstance(self.simulations.items, EntityContainer)
self.__simulations.parent = self
[docs] def post_creation(self, platform: 'IPlatform') -> None:
"""
Post creation of experiments.
Args:
platform: Platform the experiment was created on
Returns:
None
"""
IItem.post_creation(self, platform)
@property
def status(self):
"""
Get status of experiment. Experiment status is based in simulations.
The first rule to be true is used. The rules are:
* If simulations is a TemplatedSimulations we assume status is None if _platform_object is not set.
* If simulations is a TemplatedSimulations we assume status is CREATED if _platform_object is set.
* If simulations length is 0 or all simulations have a status of None, experiment status is none
* If any simulation has a running status, experiment is considered running.
* If any simulation has a created status and any other simulation has a FAILED or SUCCEEDED status, experiment is considered running.
* If any simulation has a None status and any other simulation has a FAILED or SUCCEEDED status, experiment is considered Created.
* If any simulation has a status of failed, experiment is considered failed.
* If any simulation has a status of SUCCEEDED, experiment is considered SUCCEEDED.
* If any simulation has a status of CREATED, experiment is considered CREATED.
Returns:
Status
"""
# still creating sims since we have a template. When adding new simulations, we will pre-create sim objects unless
# the item is new
if isinstance(self.simulations.items, TemplatedSimulations):
status = EntityStatus.CREATED if self._platform_object else None
return status
sim_statuses = set([s.status for s in self.simulations.items])
any_succeeded_failed = any([s in [EntityStatus.FAILED, EntityStatus.SUCCEEDED] for s in sim_statuses])
if len(self.simulations.items) == 0 or all([s is None for s in sim_statuses]):
status = None # this will trigger experiment creation on a platform
elif any([s == EntityStatus.RUNNING for s in sim_statuses]):
status = EntityStatus.RUNNING
elif any([s == EntityStatus.CREATED for s in sim_statuses]) and any_succeeded_failed:
status = EntityStatus.RUNNING
elif any([s is None for s in sim_statuses]) and any_succeeded_failed:
status = EntityStatus.CREATED
elif any([s == EntityStatus.FAILED for s in sim_statuses]):
status = EntityStatus.FAILED
elif all([s == EntityStatus.SUCCEEDED for s in sim_statuses]):
status = EntityStatus.SUCCEEDED
else:
status = EntityStatus.CREATED
return status
@status.setter
def status(self, value):
"""
Set status of experiment. Experiments status is an aggregate of its children so you cannot set status.
Args:
value: Value to set
Returns:
None
Notes:
TODO: Deprecate this
"""
# this method is needed because dataclasses will always try to set each field, even if not allowed to in
# the case of Experiment.
caller = get_caller()
if caller not in ['__init__']:
logger.warning('Experiment status cannot be directly altered. Status unchanged.')
def __repr__(self):
"""Experiment as string."""
return f"<Experiment: {self.uid} - {self.name} / Sim count {len(self.simulations) if self.simulations else 0}>"
@property
def suite(self):
"""
Suite the experiment belongs to.
Returns:
Suite
"""
return self.parent
@suite.setter
def suite(self, suite):
"""
Set suite of the experiment.
Args:
suite: Suite to set
Returns:
None
"""
self.parent = suite
@IEntity.parent.setter
def parent(self, parent: 'IEntity'):
"""
Sets the parent object for Entity.
Args:
parent: Parent object
Returns:
None
"""
if parent:
if parent.experiments is None:
parent.experiments = [self]
else:
parent.experiments.append(self)
IEntity.parent.__set__(self, parent)
[docs] def display(self):
"""
Display the experiment.
Returns:
None
"""
from idmtools.utils.display import display, experiment_table_display
display(self, experiment_table_display)
[docs] def pre_creation(self, platform: 'IPlatform', gather_assets=True) -> None:
"""
Experiment pre_creation callback.
Args:
platform: Platform experiment is being created on
gather_assets: Determines if an experiment will try to gather the common assets or defer. It most cases, you want this enabled but when modifying existing experiments you may want to disable if there are new assets and the platform has performance hits to determine those assets
Returns:
None
Raises:
ValueError - If simulations length is 0
"""
# Gather the assets
IItem.pre_creation(self, platform)
if not self.disable_default_pre_create:
self.gather_assets()
# to keep experiments clean, let's only do this is we have a special experiment class
if self.__class__ is not Experiment:
# Add a tag to keep the Experiment class name
self.tags["experiment_type"] = f'{self.__class__.__module__}.{self.__class__.__name__}'
# if it is a template, set task type on experiment
if gather_assets:
if isinstance(self.simulations.items, TemplatedSimulations):
if len(self.simulations.items) == 0:
raise ValueError("You cannot run an empty experiment")
if logger.isEnabledFor(DEBUG):
logger.debug("Using Base task from template for experiment level assets")
self.simulations.items.base_task.gather_common_assets()
self.assets.add_assets(self.simulations.items.base_task.common_assets, fail_on_duplicate=False)
for sim in self.simulations.items.extra_simulations():
self.assets.add_assets(sim.task.gather_common_assets(), fail_on_duplicate=False)
if "task_type" not in self.tags:
task_class = self.simulations.items.base_task.__class__
self.tags["task_type"] = f'{task_class.__module__}.{task_class.__name__}'
elif self.gather_common_assets_from_task and isinstance(self.simulations.items, List):
if len(self.simulations.items) == 0:
raise ValueError("You cannot run an empty experiment")
if logger.isEnabledFor(DEBUG):
logger.debug("Using all tasks to gather assets")
task_class = self.__simulations[0].task.__class__
self.tags["task_type"] = f'{task_class.__module__}.{task_class.__name__}'
pbar = self.__simulations
if not IdmConfigParser.is_progress_bar_disabled():
from tqdm import tqdm
pbar = tqdm(self.__simulations, desc="Discovering experiment assets from tasks",
unit="simulation")
for sim in pbar:
# don't gather assets from simulations that have been provisioned
if sim.status is None:
assets = sim.task.gather_common_assets()
if assets is not None:
self.assets.add_assets(assets, fail_on_duplicate=True, fail_on_deep_comparison=True)
elif isinstance(self.simulations.items, List) and len(self.simulations.items) == 0:
raise ValueError("You cannot run an empty experiment")
self.tags.update(get_default_tags())
@property
def done(self):
"""
Return if an experiment has finished executing.
Returns:
True if all simulations have ran, False otherwise
"""
return all([s.done for s in self.simulations])
@property
def succeeded(self) -> bool:
"""
Return if an experiment has succeeded. An experiment is succeeded when all simulations have succeeded.
Returns:
True if all simulations have succeeded, False otherwise
"""
return all([s.succeeded for s in self.simulations])
@property
def any_failed(self) -> bool:
"""
Return if an experiment has any simulation in failed state.
Returns:
True if all simulations have succeeded, False otherwise
"""
return any([s.failed for s in self.simulations])
@property
def simulations(self) -> ExperimentParentIterator:
"""
Returns the Simulations.
Returns:
Simulations
"""
return ExperimentParentIterator(self.__simulations, parent=self)
@simulations.setter
def simulations(self, simulations: Union[SUPPORTED_SIM_TYPE]):
"""
Set the simulations object.
Args:
simulations:
Returns:
None
Raises:
ValueError - If simulations is a list has items that are not simulations or tasks
If simulations is not a list, set, TemplatedSimulations or EntityContainer
"""
if isinstance(simulations, (GeneratorType, TemplatedSimulations, EntityContainer)):
self.gather_common_assets_from_task = isinstance(simulations, (GeneratorType, EntityContainer))
self.__simulations = simulations
elif isinstance(simulations, (list, set)):
from idmtools.entities.simulation import Simulation # noqa: F811
self.gather_common_assets_from_task = True
self.__simulations = EntityContainer()
for sim in simulations:
if isinstance(sim, ITask):
self.__simulations.append(sim.to_simulation())
elif isinstance(sim, Simulation):
self.__simulations.append(sim)
else:
raise ValueError("Only list of tasks/simulations can be passed to experiment simulations")
else:
raise ValueError("You can only set simulations to an EntityContainer, a Generator, a TemplatedSimulations "
"or a List/Set of Simulations")
@property
def simulation_count(self) -> int:
"""
Return the total simulations.
Returns:
Length of simulations
"""
return len(self.simulations)
[docs] def refresh_simulations(self) -> NoReturn:
"""
Refresh the simulations from the platform.
Returns:
None
"""
from idmtools.core import ItemType
self.simulations = self.platform.get_children(self.uid, ItemType.EXPERIMENT, force=True)
[docs] def refresh_simulations_status(self):
"""
Refresh the simulation status.
Returns:
None
"""
self.platform.refresh_status(item=self)
[docs] def pre_getstate(self):
"""
Return default values for :meth:`~idmtools.interfaces.ientity.pickle_ignore_fields`.
Call before pickling.
"""
from idmtools.assets import AssetCollection
return {"assets": AssetCollection(), "simulations": EntityContainer()}
[docs] def gather_assets(self) -> AssetCollection():
"""
Gather all our assets for our experiment.
Returns:
Assets
"""
return self.assets
[docs] @classmethod
def from_task(cls, task, name: str = None, tags: Dict[str, Any] = None, assets: AssetCollection = None,
gather_common_assets_from_task: bool = True) -> 'Experiment':
"""
Creates an Experiment with one Simulation from a task.
Args:
task: Task to use
assets: Asset collection to use for common tasks. Defaults to gather assets from task
name: Name of experiment
tags: Tags for the items
gather_common_assets_from_task: Whether we should attempt to gather assets from the Task object for the
experiment. With large amounts of tasks, this can be expensive as we loop through all
Returns:
"""
if tags is None:
tags = dict()
if name is None:
name = task.__class__.__name__
e = Experiment(name=name, tags=tags, assets=AssetCollection() if assets is None else assets,
gather_common_assets_from_task=gather_common_assets_from_task)
e.simulations = [task]
return e
[docs] @classmethod
def from_builder(cls, builders: Union[SimulationBuilder, List[SimulationBuilder]], base_task: ITask,
name: str = None,
assets: AssetCollection = None, tags: Dict[str, Any] = None) -> 'Experiment':
"""
Creates an experiment from a SimulationBuilder object(or list of builders.
Args:
builders: List of builder to create experiment from
base_task: Base task to use as template
name: Experiment name
assets: Experiment level assets
tags: Experiment tags
Returns:
Experiment object from the builders
"""
ts = TemplatedSimulations(base_task=base_task)
if not isinstance(builders, list):
builders = [builders]
for builder in builders:
ts.add_builder(builder)
if name is None:
name = base_task.__class__.__name__
if len(builders) == 1:
name += " " + builders[0].__class__.__name__
return cls.from_template(ts, name=name, tags=tags, assets=assets)
[docs] @classmethod
def from_template(cls, template: TemplatedSimulations, name: str = None, assets: AssetCollection = None,
tags: Dict[str, Any] = None) -> 'Experiment':
"""
Creates an Experiment from a TemplatedSimulation object.
Args:
template: TemplatedSimulation object
name: Experiment name
assets: Experiment level assets
tags: Tags
Returns:
Experiment object from the TemplatedSimulation object
"""
if tags is None:
tags = dict()
if name is None:
name = template.base_task.__class__.__name__
e = Experiment(name=name, tags=tags, assets=AssetCollection() if assets is None else assets)
e.simulations = template
return e
def __deepcopy__(self, memo):
"""
Deep copy for experiments. It converts generators and templates to realized lists to allow copying.
Args:
memo: The memo object used for copying
Returns:
Copied experiment
"""
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k in ['_Experiment__simulations'] and isinstance(v, (GeneratorType, TemplatedSimulations)):
v = list(v)
setattr(result, k, copy.deepcopy(v, memo))
result._task_log = getLogger(__name__)
return result
[docs] def list_static_assets(self, children: bool = False, platform: 'IPlatform' = None, **kwargs) -> List[Asset]:
"""
List assets that have been uploaded to a server already.
Args:
children: When set to true, simulation assets will be loaded as well
platform: Optional platform to load assets list from
**kwargs:
Returns:
List of assets
"""
if self.id is None:
raise ValueError("You can only list static assets on an existing experiment")
p = super()._check_for_platform_from_context(platform)
return p._experiments.list_assets(self, children, **kwargs)
[docs] def run(self, wait_until_done: bool = False, platform: 'IPlatform' = None, regather_common_assets: bool = None,
wait_on_done_progress: bool = True, wait_on_done: bool = False,
**run_opts) -> NoReturn:
"""
Runs an experiment on a platform.
Args:
wait_until_done: Whether we should wait on experiment to finish running as well. Defaults to False
platform: Platform object to use. If not specified, we first check object for platform object then the current context
regather_common_assets: Triggers gathering of assets for *existing* experiments. If not provided, we use the platforms default behaviour. See platform details for performance implications of this. For most platforms, it should be ok but for others, it could decrease performance when assets are not changing.
It is important to note that when using this feature, ensure the previous simulations have finished provisioning. Failure to do so can lead to unexpected behaviour
wait_on_done_progress: Should experiment status be shown when waiting
wait_on_done: extra name for backward compatibility for wait_until_done
**run_opts: Options to pass to the platform
Returns:
None
"""
p = super()._check_for_platform_from_context(platform)
if regather_common_assets is None:
regather_common_assets = p.is_regather_assets_on_modify()
if regather_common_assets and not self.assets.is_editable():
message = "To modify an experiment's asset collection, you must make a copy of it first. For example\nexperiment.assets = experiment.assets.copy()"
user_logger.error(message) # Show it bold red to user
raise ValueError(message)
if not self.assets.is_editable() and isinstance(self.simulations.items,
TemplatedSimulations) and not regather_common_assets:
user_logger.warning(
"You are modifying and existing experiment by using a template without gathering common assets. Ensure your Template configuration is the same as existing experiments or enable gathering of new common assets through regather_common_assets.")
run_opts['regather_common_assets'] = regather_common_assets
p.run_items(self, **run_opts)
if wait_until_done or wait_on_done:
self.wait(wait_on_done_progress=wait_on_done_progress)
[docs] def to_dict(self):
"""
Convert experiment to dictionary.
Returns:
Dictionary of experiment.
"""
result = dict()
for f in fields(self):
if not f.name.startswith("_") and f.name not in ['parent']:
result[f.name] = getattr(self, f.name)
result['_uid'] = self.uid
return result
# Define this here for better completion in IDEs for end users
[docs] @classmethod
def from_id(cls, item_id: str, platform: 'IPlatform' = None, copy_assets: bool = False,
**kwargs) -> 'Experiment':
"""
Helper function to provide better intellisense to end users.
Args:
item_id: Item id to load
platform: Optional platform. Fallbacks to context
copy_assets: Allow copying assets on load. Makes modifying experiments easier when new assets are involved.
**kwargs: Optional arguments to be passed on to the platform
Returns:
Experiment loaded with ID
"""
result = super().from_id(item_id, platform, **kwargs)
if copy_assets:
result.assets = result.assets.copy()
return result
[docs] def print(self, verbose: bool = False):
"""
Print summary of experiment.
Args:
verbose: Verbose printing
Returns:
None
"""
user_logger.info(f"Experiment <{self.id}>")
user_logger.info(f"Total Simulations: {self.simulation_count}")
user_logger.info(f"Tags: {self.tags}")
user_logger.info(f"Platform: {self.platform.__class__.__name__}")
# determine status
if self.status:
# if succeeded print that
if self.succeeded:
user_logger.log(SUCCESS, "Succeeded")
elif not self.done:
user_logger.log(NOTICE, "RUNNING")
else:
user_logger.critical("Experiment failed. Please check output")
if verbose:
user_logger.info(f"Simulation Type: {type(self.__simulations)}")
user_logger.info(f"Assets: {self.assets}")
[docs]class ExperimentSpecification(ExperimentPluginSpecification):
"""
ExperimentSpecification is the spec for Experiment plugins.
"""
[docs] @get_description_impl
def get_description(self) -> str:
"""
Description of our plugin.
Returns:
Description
"""
return "Provides access to the Local Platform to IDM Tools"
[docs] @get_model_impl
def get(self, configuration: dict) -> Experiment: # noqa: F821
"""
Get experiment with configuration.
"""
return Experiment(**configuration)
[docs] @get_model_type_impl
def get_type(self) -> Type[Experiment]:
"""
Return the experiment type.
Returns:
Experiment type.
"""
return Experiment