"""Provides the TemplatedScriptTask.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import copy
import inspect
import os
from contextlib import suppress
from dataclasses import dataclass, field
from functools import partial
from logging import getLogger, DEBUG
from typing import List, Callable, Type, Dict, Any, Union, TYPE_CHECKING
from jinja2 import Environment
from idmtools.assets import AssetCollection, Asset
from idmtools.entities import CommandLine
from idmtools.entities.itask import ITask
from idmtools.entities.iworkflow_item import IWorkflowItem
from idmtools.entities.simulation import Simulation
from idmtools.registry.task_specification import TaskSpecification
if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
logger = getLogger(__name__)
LINUX_DICT_TO_ENVIRONMENT = """{% for key, value in vars.items() %}
export {{key}}="{{value}}"
{% endfor %}
"""
WINDOWS_DICT_TO_ENVIRONMENT = """{% for key, value in vars.items() %}
set {{key}}="{{value}}"
{% endfor %}
"""
# Define our common windows templates
WINDOWS_BASE_WRAPPER = """echo Running %*
%*"""
WINDOWS_PYTHON_PATH_WRAPPER = """set PYTHONPATH=%cd%\\Assets\\site-packages\\;%cd%\\Assets\\;%PYTHONPATH%
{}""".format(WINDOWS_BASE_WRAPPER)
WINDOWS_DICT_TO_ENVIRONMENT = WINDOWS_DICT_TO_ENVIRONMENT + WINDOWS_BASE_WRAPPER
# Define our linux common scripts
LINUX_BASE_WRAPPER = """echo Running $@
\"$@\"
"""
LINUX_PYTHON_PATH_WRAPPER = """export PYTHONPATH=$(pwd)/Assets/site-packages:$(pwd)/Assets/:$PYTHONPATH
{}""".format(LINUX_BASE_WRAPPER)
LINUX_DICT_TO_ENVIRONMENT = LINUX_DICT_TO_ENVIRONMENT + LINUX_BASE_WRAPPER
[docs]@dataclass()
class TemplatedScriptTask(ITask):
"""
Defines a task to run a script using a template. Best suited to shell scripts.
Examples:
In this example, we add modify the Python Path using TemplatedScriptTask and LINUX_PYTHON_PATH_WRAPPER
.. literalinclude:: ../../examples/cookbook/python/python-path/python-path.py
In this example, we modify environment variable using TemplatedScriptTask and LINUX_DICT_TO_ENVIRONMENT
.. literalinclude:: ../../examples/cookbook/environment/variables/environment-vars.py
"""
#: Name of script
script_path: str = field(default=None, metadata={"md": True})
#: If platform requires path to script executing binary(ie /bin/bash)
script_binary: str = field(default=None, metadata={"md": True})
#: The template contents
template: str = field(default=None, metadata={"md": True})
#: The template file. You can only use either template or template_file at once
template_file: str = field(default=None, metadata={"md": True})
#: Controls whether a template should be an experiment or a simulation level asset
template_is_common: bool = field(default=True, metadata={"md": True})
#: Template variables used for rendering the template
# Note: large amounts of variables will increase metadata size
variables: Dict[str, Any] = field(default_factory=dict, metadata={"md": True})
#: Platform Path Separator. For Windows execution platforms, use \, otherwise use the default of /
path_sep: str = field(default='/', metadata={"md": True})
#: Extra arguments to add to the command line
extra_command_arguments: str = field(default='', metadata={"md": True})
#: Hooks to gather common assets
gather_common_asset_hooks: List[Callable[[ITask], AssetCollection]] = field(default_factory=list)
#: Hooks to gather transient assets
gather_transient_asset_hooks: List[Callable[[ITask], AssetCollection]] = field(default_factory=list)
def __post_init__(self):
"""Constructor."""
super().__post_init__()
if self.script_path is None and self.template_file is None:
raise ValueError("Either script name or template file is required")
elif self.script_path is None: # Get name from the file
self.script_path = os.path.basename(self.template_file)
if self.template is None and self.template_file is None:
raise ValueError("Either template or template_file is required")
if self.template_file and not os.path.exists(self.template_file):
raise FileNotFoundError(f"Could not find the file the template file {self.template_file}")
# is the template a common(experiment) asset?
if self.template_is_common:
if logger.isEnabledFor(DEBUG):
logger.debug("Adding common asset hook")
# add hook to render it to common asset hooks
hook = partial(TemplatedScriptTask._add_template_to_asset_collection, asset_collection=self.common_assets)
self.gather_common_asset_hooks.append(hook)
else:
# it must be a simulation level asset
if logger.isEnabledFor(DEBUG):
logger.debug("Adding transient asset hook")
# create hook to render it to our transient assets
hook = partial(
TemplatedScriptTask._add_template_to_asset_collection,
asset_collection=self.transient_assets
)
# add the hook
self.gather_transient_asset_hooks.append(hook)
@staticmethod
def _add_template_to_asset_collection(task: 'TemplatedScriptTask', asset_collection: AssetCollection) -> \
AssetCollection:
"""
Add our rendered template to the asset collection.
Args:
task: Task to add
asset_collection:Asset collection
Returns:
Asset Collection with template added
"""
# setup our jinja environment
env = Environment()
# try to load template from string or file
if task.template:
template = env.from_string(task.template)
else:
template = env.get_template(task.template_file)
if logger.isEnabledFor(DEBUG):
logger.debug(f"Rendering Script template: {template}")
# render the template
result = template.render(vars=task.variables, env=os.environ)
if logger.isEnabledFor(DEBUG):
logger.debug(f"Rendered Template: {result}")
# create an asset
asset = Asset(filename=task.script_path, content=result)
asset_collection.add_asset(asset)
return asset_collection
[docs] def gather_common_assets(self) -> AssetCollection:
"""
Gather common(experiment-level) assets for task.
Returns:
AssetCollection containing common assets
"""
# TODO validate hooks have expected return type
ac = AssetCollection()
for x in self.gather_common_asset_hooks:
ac += x(self)
return ac
[docs] def gather_transient_assets(self) -> AssetCollection:
"""
Gather transient(experiment-level) assets for task.
Returns:
AssetCollection containing transient assets
"""
ac = AssetCollection()
for x in self.gather_transient_asset_hooks:
ac += x(self)
ac += self.transient_assets
if len(ac.assets) != 0:
self.transient_assets = ac
return ac
[docs] def reload_from_simulation(self, simulation: Simulation):
"""
Reload a templated script task. When reloading, you will only have the rendered template available.
Args:
simulation:
Returns:
None
"""
# check experiment level assets for our script
if simulation.parent.assets:
# prep new asset collection in case we have to remove our asset from the experiment level
new_assets = AssetCollection()
for _i, asset in enumerate(simulation.parent.assets.assets):
# is it our script?
if asset.filename != self.script_path and asset.absolute_path != self.script_path:
# nope keep it
new_assets.add_asset(asset)
# set filtered assets back to parent
simulation.parent.assets = new_assets
[docs] def pre_creation(self, parent: Union[Simulation, IWorkflowItem], platform: 'IPlatform'):
"""
Before creating simulation, we need to set our command line.
Args:
parent: Parent object
platform: Platform item is being ran on
Returns:
"""
# are we experiment or simulation level asset?
if self.script_binary:
sn = self.script_binary + ' '
else:
sn = ''
if self.template_is_common:
sn += platform.join_path(platform.common_asset_path, self.script_path)
else:
sn += self.script_path
# set the command line to the rendered script
self.command = CommandLine.from_string(sn)
if self.path_sep != "/":
self.command.executable = self.command.executable.replace("/", self.path_sep)
self.command.is_windows = True
# set any extra arguments
if self.extra_command_arguments:
other_command = CommandLine.from_string(self.extra_command_arguments)
self.command._args.append(other_command.executable)
if other_command._options:
self.command._args += other_command._options
if other_command._args:
self.command._args += other_command._args
super().pre_creation(parent, platform)
[docs]@dataclass()
class ScriptWrapperTask(ITask):
"""
Allows you to wrap a script with another script.
See Also:
:py:class:`idmtools_models.templated_script_task.TemplatedScriptTask`
Raises:
ValueError if the template Script Task is not defined
"""
template_script_task: TemplatedScriptTask = field(default=None)
task: ITask = field(default=None)
def __post_init__(self):
"""Constructor."""
if self.template_script_task is None:
raise ValueError("Template Script Task is required")
if self.task is None:
raise ValueError("Task is required")
if isinstance(self.task, dict):
self.task = self.from_dict(self.task)
if isinstance(self.template_script_task, dict):
self.template_script_task = self.from_dict(self.template_script_task)
[docs] @staticmethod
def from_dict(task_dictionary: Dict[str, Any]):
"""Load the task from a dictionary."""
from idmtools.core.task_factory import TaskFactory
task_args = {k: v for k, v in task_dictionary.items() if k not in ['task_type']}
return TaskFactory().create(task_dictionary['task_type'], **task_args)
@property
def command(self):
"""Our task property. Again, we have to overload this because of wrapping a task."""
cmd = copy.deepcopy(self.template_script_task.command)
cmd.add_raw_argument(str(self.task.command))
return cmd
@command.setter
def command(self, value: Union[str, CommandLine]):
"""Set our command. because we are wrapping a task, we have to overload this."""
callers = []
with suppress(KeyError, IndexError):
callers = inspect.stack()[1:3]
if not callers or callers[0].filename == __file__ or callers[1].filename == __file__:
if isinstance(value, property):
self._command = None
elif isinstance(value, str):
self._command = CommandLine.from_string(value)
else:
self._command = value
else:
self.task.command = value
@property
def wrapped_task(self):
"""
Our task we are wrapping with a shell script.
Returns:
Our wrapped task
"""
return self.task
@wrapped_task.setter
def wrapped_task(self, value: ITask):
"""Set our wrapped task."""
return None if isinstance(value, property) else value
[docs] def gather_common_assets(self):
"""
Gather all the common assets.
Returns:
Common assets(Experiment Assets)
"""
self.common_assets.add_assets(self.template_script_task.gather_common_assets())
self.common_assets.add_assets(self.task.gather_common_assets())
return self.common_assets
[docs] def gather_transient_assets(self) -> AssetCollection:
"""
Gather all the transient assets.
Returns:
Transient Assets(Simulation level assets)
"""
self.transient_assets.add_assets(self.template_script_task.gather_transient_assets())
self.transient_assets.add_assets(self.task.gather_transient_assets())
return self.transient_assets
[docs] def reload_from_simulation(self, simulation: Simulation):
"""
Reload from simulation.
Args:
simulation: simulation
Returns:
None
"""
pass
[docs] def pre_creation(self, parent: Union[Simulation, IWorkflowItem], platform: 'IPlatform'):
"""
Before creation, create the true command by adding the wrapper name.
Here we call both our wrapped task and our template_script_task pre_creation
Args:
parent: Parent Task
platform: Platform Templated Task is executing on
Returns:
None
"""
self.task.pre_creation(parent, platform)
self.template_script_task.pre_creation(parent, platform)
[docs] def post_creation(self, parent: Union[Simulation, IWorkflowItem], platform: 'IPlatform'):
"""
Post creation of task.
Here we call both our wrapped task and our template_script_task post_creation
Args:
parent: Parent of task
platform: Platform we are running on
Returns:
None
"""
self.task.post_creation(parent, platform)
self.template_script_task.post_creation(parent, platform)
def __getattr__(self, item):
"""Proxy get attr to child except for task item and items not in our object."""
if item not in self.__dict__:
return getattr(self.task, item)
else:
return super(ScriptWrapperTask, self).__getattr__(item)
[docs]def get_script_wrapper_task(task: ITask, wrapper_script_name: str, template_content: str = None,
template_file: str = None, template_is_common: bool = True,
variables: Dict[str, Any] = None, path_sep: str = '/') -> ScriptWrapperTask:
"""
Convenience function that will wrap a task for you with some defaults.
Args:
task: Task to wrap
wrapper_script_name: Wrapper script name
template_content: Template Content
template_file: Template File
template_is_common: Is the template experiment level
variables: Variables
path_sep: Path sep(Window or Linux)
Returns:
ScriptWrapperTask wrapping the task
See Also:
:class:`idmtools_models.templated_script_task.TemplatedScriptTask`
:func:`idmtools_models.templated_script_task.get_script_wrapper_windows_task`
:func:`idmtools_models.templated_script_task.get_script_wrapper_unix_task`
"""
if variables is None:
variables = dict()
template_task = TemplatedScriptTask(
script_path=wrapper_script_name,
template_file=template_file,
template=template_content,
template_is_common=template_is_common,
variables=variables,
path_sep=path_sep
)
return ScriptWrapperTask(template_script_task=template_task, task=task)
[docs]def get_script_wrapper_windows_task(task: ITask, wrapper_script_name: str = 'wrapper.bat',
template_content: str = WINDOWS_DICT_TO_ENVIRONMENT,
template_file: str = None, template_is_common: bool = True,
variables: Dict[str, Any] = None) -> ScriptWrapperTask:
"""
Get wrapper script task for windows platforms.
The default content wraps a another task with a batch script that exports the variables to the run environment defined in variables. To modify python path, use WINDOWS_PYTHON_PATH_WRAPPER
You can adapt this script to modify any pre-scripts you need or call others scripts in succession
Args:
task: Task to wrap
wrapper_script_name: Wrapper script name(defaults to wrapper.bat)
template_content: Template Content.
template_file: Template File
template_is_common: Is the template experiment level
variables: Variables for template
Returns:
ScriptWrapperTask
See Also::
:class:`idmtools_models.templated_script_task.TemplatedScriptTask`
:func:`idmtools_models.templated_script_task.get_script_wrapper_task`
:func:`idmtools_models.templated_script_task.get_script_wrapper_unix_task`
"""
return get_script_wrapper_task(task, wrapper_script_name, template_content, template_file, template_is_common,
variables, "\\")
[docs]def get_script_wrapper_unix_task(task: ITask, wrapper_script_name: str = 'wrapper.sh', template_content: str = LINUX_DICT_TO_ENVIRONMENT,
template_file: str = None, template_is_common: bool = True,
variables: Dict[str, Any] = None):
"""
Get wrapper script task for unix platforms.
The default content wraps a another task with a bash script that exports the variables to the run environment defined in variables. To modify python path, you can use LINUX_PYTHON_PATH_WRAPPER
You can adapt this script to modify any pre-scripts you need or call others scripts in succession
Args:
task: Task to wrap
wrapper_script_name: Wrapper script name(defaults to wrapper.sh)
template_content: Template Content
template_file: Template File
template_is_common: Is the template experiment level
variables: Variables for template
Returns:
ScriptWrapperTask
See Also:
:class:`idmtools_models.templated_script_task.TemplatedScriptTask`
:func:`idmtools_models.templated_script_task.get_script_wrapper_task`
:func:`idmtools_models.templated_script_task.get_script_wrapper_windows_task`
"""
return get_script_wrapper_task(task, wrapper_script_name, template_content, template_file, template_is_common,
variables, "/")
[docs]class TemplatedScriptTaskSpecification(TaskSpecification):
"""
TemplatedScriptTaskSpecification provides the plugin specs for TemplatedScriptTask.
"""
[docs] def get(self, configuration: dict) -> TemplatedScriptTask:
"""
Get instance of TemplatedScriptTask with configuration.
Args:
configuration: configuration for TemplatedScriptTask
Returns:
TemplatedScriptTask with configuration
"""
return TemplatedScriptTask(**configuration)
[docs] def get_description(self) -> str:
"""
Get description of plugin.
Returns:
Plugin description
"""
return "Defines a general command that provides user hooks. Intended for use in advanced scenarios"
[docs] def get_example_urls(self) -> List[str]:
"""
Get example urls related to TemplatedScriptTask.
Returns:
List of urls that have examples related to CommandTask
"""
return []
[docs] def get_type(self) -> Type[TemplatedScriptTask]:
"""
Get task type provided by plugin.
Returns:
TemplatedScriptTask
"""
return TemplatedScriptTask
[docs] def get_version(self) -> str:
"""
Returns the version of the plugin.
Returns:
Plugin Version
"""
from idmtools_models import __version__
return __version__
[docs]class ScriptWrapperTaskSpecification(TaskSpecification):
"""
ScriptWrapperTaskSpecification defines the plugin specs for ScriptWrapperTask.
"""
[docs] def get(self, configuration: dict) -> ScriptWrapperTask:
"""
Get instance of ScriptWrapperTask with configuration.
Args:
configuration: configuration for ScriptWrapperTask
Returns:
TemplatedScriptTask with configuration
"""
return ScriptWrapperTask(**configuration)
[docs] def get_description(self) -> str:
"""
Get description of plugin.
Returns:
Plugin description
"""
return "Defines a general command that provides user hooks. Intended for use in advanced scenarios"
[docs] def get_example_urls(self) -> List[str]:
"""
Get example urls related to ScriptWrapperTask.
Returns:
List of urls that have examples related to CommandTask
"""
return []
[docs] def get_type(self) -> Type[ScriptWrapperTask]:
"""
Get task type provided by plugin.
Returns:
TemplatedScriptTask
"""
return ScriptWrapperTask
[docs] def get_version(self) -> str:
"""
Returns the version of the plugin.
Returns:
Plugin Version
"""
from idmtools_models import __version__
return __version__