"""idmtools singularity build workitem.
Notes:
- TODO add examples here.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import hashlib
import io
import json
import os
import re
import uuid
from dataclasses import dataclass, field, InitVar
from logging import getLogger, DEBUG
from os import PathLike
from pathlib import PurePath
from typing import List, Dict, Union, Optional, TYPE_CHECKING
from urllib.parse import urlparse
from uuid import UUID
from COMPS.Data import QueryCriteria
from jinja2 import Environment
from idmtools import IdmConfigParser
from idmtools.assets import AssetCollection, Asset
from idmtools.assets.file_list import FileList
from idmtools.core import EntityStatus, NoPlatformException
from idmtools.core.logging import SUCCESS
from idmtools.entities.command_task import CommandTask
from idmtools.entities.relation_type import RelationType
from idmtools.utils.hashing import calculate_md5_stream
from idmtools_platform_comps.ssmt_work_items.comps_workitems import InputDataWorkItem
from idmtools_platform_comps.utils.general import save_sif_asset_md5_from_ac_id
from idmtools_platform_comps.utils.package_version import get_docker_manifest, get_digest_from_docker_hub
if TYPE_CHECKING:
from idmtools.entities.iplatform import IPlatform
SB_BASE_WORKER_PATH = os.path.join(os.path.dirname(__file__), 'base_singularity_work_order.json')
logger = getLogger(__name__)
user_logger = getLogger('user')
[docs]@dataclass(repr=False)
class SingularityBuildWorkItem(InputDataWorkItem):
"""
Provides a wrapper to build utilizing the COMPS build server.
Notes:
- TODO add references to examples
"""
#: Path to definition file
definition_file: Union[PathLike, str] = field(default=None)
#: definition content. Alternative to file
definition_content: str = field(default=None)
#: Enables Jinja parsing of the definition file or content
is_template: bool = field(default=False)
#: template_args
template_args: Dict[str, str] = field(default_factory=dict)
#: Image Url
image_url: InitVar[str] = None
#: Destination image name
image_name: str = field(default=None)
#: Name of the workitem
name: str = field(default=None)
#: Tages to add to container asset collection
image_tags: Dict[str, str] = field(default_factory=dict)
#: Allows you to set a different library. (The default library is “https://library.sylabs.io”). See https://sylabs.io/guides/3.5/user-guide/cli/singularity_build.html
library: str = field(default=None)
#: only run specific section(s) of definition file (setup, post, files, environment, test, labels, none) (default [all])
section: List[str] = field(default_factory=lambda: ['all'])
#: build using user namespace to fake root user (requires a privileged installation)
fix_permissions: bool = field(default=False)
# AssetCollection created by build
asset_collection: AssetCollection = field(default=None)
#: Additional Mounts
additional_mounts: List[str] = field(default_factory=list)
#: Environment vars for remote build
environment_variables: Dict[str, str] = field(default_factory=dict)
#: Force build
force: bool = field(default=False)
#: Don't include default tags
disable_default_tags: bool = field(default=None)
# ID that is added to work item and then results collection that can be used to tied the items together
run_id: uuid.UUID = field(default_factory=uuid.uuid4)
#: loaded if url is docker://. Used to determine if we need to re-run a build
__digest: Dict[str, str] = field(default=None)
__image_tag: str = field(default=None)
#: rendered template. We have to store so it is calculated before RUN which means outside our normal pre-create hooks
__rendered_template: str = field(default=None)
def __post_init__(self, item_name: str, asset_collection_id: UUID, asset_files: FileList, user_files: FileList, image_url: str):
"""Constructor."""
self.work_item_type = 'ImageBuilderWorker'
self._image_url = None
# Set this for now. Later it should be replace with some type of Specialized worker identifier
self.task = CommandTask("ImageBuilderWorker")
super().__post_init__(item_name, asset_collection_id, asset_files, user_files)
self.image_url = image_url if isinstance(image_url, str) else None
if isinstance(self.definition_file, PathLike):
self.definition_file = str(self.definition_file)
[docs] def get_container_info(self) -> Dict[str, str]:
"""Get container info.
Notes:
- TODO remove this
"""
pass
@property
def image_url(self): # noqa: F811
"""Get the image url."""
return self._image_url
@image_url.setter
def image_url(self, value: str):
"""
Set the image url.
Args:
value: Value to set value to
Returns:
None
"""
url_info = urlparse(value)
if url_info.scheme == "docker":
if "packages.idmod.org" in value:
full_manifest, self.__image_tag = get_docker_manifest(url_info.path)
self.__digest = full_manifest['config']['digest']
else:
self.__image_tag = url_info.netloc + ":latest" if ":" not in value else url_info.netloc
image, tag = url_info.netloc.split(":")
self.__digest = get_digest_from_docker_hub(image, tag)
if self.fix_permissions:
self.__digest += "--fix-perms"
if self.name is None:
self.name = f"Load Singularity image from Docker {self.__image_tag}"
# TODO how to do this for shub
self._image_url = value
[docs] def context_checksum(self) -> str:
"""
Calculate the context checksum of a singularity build.
The context is the checksum of all the assets defined for input, the singularity definition file, and the environment variables
Returns:
Conext checksum.
"""
file_hash = hashlib.sha256()
# ensure our template is set
self.__add_common_assets()
for asset in sorted(self.assets + self.transient_assets, key=lambda a: a.short_remote_path()):
if asset.absolute_path:
with open(asset.absolute_path, mode='rb') as ain:
calculate_md5_stream(ain, file_hash=file_hash)
else:
self.__add_file_to_context(json.dumps([asset.filename, asset.relative_path, str(asset.checksum)], sort_keys=True) if asset.persisted else asset.bytes, file_hash)
if len(self.environment_variables):
contents = json.dumps(self.environment_variables, sort_keys=True)
self.__add_file_to_context(contents, file_hash)
if logger.isEnabledFor(DEBUG):
logger.debug(f'Context: sha256:{file_hash.hexdigest()}')
return f'sha256:{file_hash.hexdigest()}'
def __add_file_to_context(self, contents: Union[str, bytes], file_hash):
"""
Add a specific file content to context checksum.
Args:
contents: Contents
file_hash: File hash to add to
Returns:
None
"""
item = io.BytesIO()
item.write(contents.encode('utf-8') if isinstance(contents, str) else contents)
item.seek(0)
calculate_md5_stream(item, file_hash=file_hash)
[docs] def render_template(self) -> Optional[str]:
"""
Render template. Only applies when is_template is True. When true, it renders the template using Jinja to a cache value.
Returns:
Rendered Template
"""
if self.is_template:
# We don't allow re-running template rendering
if self.__rendered_template is None:
if logger.isEnabledFor(DEBUG):
logger.debug("Rendering template")
contents = None
# try from file first
if self.definition_file:
with open(self.definition_file, mode='r') as ain:
contents = ain.read()
elif self.definition_content:
contents = self.definition_content
if contents:
env = Environment()
template = env.from_string(contents)
self.__rendered_template = template.render(env=os.environ, sbi=self, **self.template_args)
return self.__rendered_template
return None
[docs] @staticmethod
def find_existing_container(sbi: 'SingularityBuildWorkItem', platform: 'IPlatform' = None) -> Optional[AssetCollection]:
"""
Find existing container.
Args:
sbi: SingularityBuildWorkItem to find existing container matching config
platform: Platform To load the object from
Returns:
Existing Asset Collection
"""
if platform is None:
from idmtools.core.context import CURRENT_PLATFORM
if CURRENT_PLATFORM is None:
raise NoPlatformException("No Platform defined on object, in current context, or passed to run")
platform = CURRENT_PLATFORM
ac = None
if not sbi.force: # don't search if it is going to be forced
qc = QueryCriteria().where_tag(['type=singularity']).select_children(['assets', 'tags']).orderby('date_created desc')
if sbi.__digest:
qc.where_tag([f'digest={sbi.__digest}'])
elif sbi.definition_file or sbi.definition_content:
qc.where_tag([f'build_context={sbi.context_checksum()}'])
if len(qc.tag_filters) > 1:
if logger.isEnabledFor(DEBUG):
logger.debug("Searching for existing containers")
ac = platform._assets.get(None, query_criteria=qc)
if ac:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Found: {len(ac)} previous builds")
ac = platform._assets.to_entity(ac[0])
if logger.isEnabledFor(DEBUG):
logger.debug(f'Found existing container in {ac.id}')
else:
ac = None
return ac
def __add_tags(self):
"""
Add default tags to the asset collection to be created.
The most important part of this logic is the digest/run_id information we add. This is what enables the build/pull-cache through comps.
Returns:
None
"""
self.image_tags['type'] = 'singularity'
# Disable all tags but image name and type
if not self.disable_default_tags:
if self.platform is not None and hasattr(self.platform, 'get_username'):
self.image_tags['created_by'] = self.platform.get_username()
# allow users to override run id using only the tag
if 'run_id' in self.tags:
self.run_id = self.tags['run_id']
else:
# set the run id on the workitem and resulting tags
self.tags['run_id'] = str(self.run_id)
self.image_tags['run_id'] = self.tags['run_id']
# Check for the digest
if self.__digest and isinstance(self.__digest, str):
self.image_tags['digest'] = self.__digest
self.image_tags['image_from'] = self.__image_tag
if self.image_name is None:
self.image_name = self.__image_tag.strip(" /").replace(":", "_").replace("/", "_") + ".sif"
# If we are building from a file, add the build context
elif self.definition_file:
self.image_tags['build_context'] = self.context_checksum()
if self.image_name is None:
bn = PurePath(self.definition_file).name
bn = str(bn).replace(".def", ".sif")
self.image_name = bn
elif self.definition_content:
self.image_tags['build_context'] = self.context_checksum()
if self.image_url:
self.image_tags['image_url'] = self.image_url
# Final fall back for image name
if self.image_name is None:
self.image_name = "image.sif"
if self.image_name and not self.image_name.endswith(".sif"):
self.image_name = f'{self.image_name}.sif'
# Add image name to the tags
self.image_tags['image_name'] = self.image_name
def _prep_work_order_before_create(self) -> Dict[str, str]:
"""
Prep work order before creation.
Returns:
Workorder for singularity build.
"""
self.__add_tags()
self.load_work_order(SB_BASE_WORKER_PATH)
if self.definition_file or self.definition_content:
self.work_order['Build']['Input'] = "Assets/Singularity.def"
else:
self.work_order['Build']['Input'] = self.image_url
if len(self.environment_variables):
self.work_order['Build']['StaticEnvironment'] = self.environment_variables
if len(self.additional_mounts):
self.work_order['Build']['AdditionalMounts'] = self.additional_mounts
self.work_order['Build']['Output'] = self.image_name if self.image_name else "image.sif"
self.work_order['Build']['Tags'] = self.image_tags
self.work_order['Build']['Flags'] = dict()
if self.fix_permissions:
self.work_order['Build']['Flags']['Switches'] = ["--fix-perms"]
if self.library:
self.work_order['Build']['Flags']['--library'] = self.library
if self.section:
self.work_order['Build']['Flags']['--section'] = self.section
return self.work_order
[docs] def pre_creation(self, platform: 'IPlatform') -> None:
"""
Pre-Creation item.
Args:
platform: Platform object
Returns:
None
"""
if self.name is None:
self.name = "Singularity Build"
if self.definition_file:
self.name += f" of {PurePath(self.definition_file).name}"
super(SingularityBuildWorkItem, self).pre_creation(platform)
self.__add_common_assets()
self._prep_work_order_before_create()
def __add_common_assets(self):
"""
Add common assets which in this case is the singularity definition file.
Returns:
None
"""
self.render_template()
if self.definition_file:
opts = dict(content=self.__rendered_template) if self.is_template else dict(absolute_path=self.definition_file)
self.assets.add_or_replace_asset(Asset(filename="Singularity.def", **opts))
elif self.definition_content:
opts = dict(content=self.__rendered_template if self.is_template else self.definition_content)
self.assets.add_or_replace_asset(Asset(filename="Singularity.def", **opts))
def __fetch_finished_asset_collection(self, platform: 'IPlatform') -> Union[AssetCollection, None]:
"""
Fetch the Singularity asset collection we created.
Args:
platform: Platform to fetch from.
Returns:
Asset Collection or None
"""
comps_workitem = self.get_platform_object(force=True)
acs = comps_workitem.get_related_asset_collections(RelationType.Created)
if acs:
self.asset_collection = AssetCollection.from_id(acs[0].id, platform=platform if platform else self.platform)
if IdmConfigParser.is_output_enabled():
user_logger.log(SUCCESS, f"Created Singularity image as Asset Collection: {self.asset_collection.id}")
user_logger.log(SUCCESS, f"View AC at {self.platform.get_asset_collection_link(self.asset_collection)}")
return self.asset_collection
return None
[docs] def run(self, wait_until_done: bool = True, platform: 'IPlatform' = None, wait_on_done_progress: bool = True, **run_opts) -> Optional[AssetCollection]:
"""
Run the build.
Args:
wait_until_done: Wait until build completes
platform: Platform to run on
wait_on_done_progress: Show progress while waiting
**run_opts: Extra run options
Returns:
Asset collection that was created if successful
"""
p = super()._check_for_platform_from_context(platform)
opts = dict(wait_on_done_progress=wait_on_done_progress, wait_until_done=wait_until_done, platform=p, wait_progress_desc=f"Waiting for build of Singularity container: {self.name}")
ac = self.find_existing_container(self, platform=p)
if ac is None or self.force:
super().run(**opts)
ac = self.asset_collection
else:
if IdmConfigParser.is_output_enabled():
user_logger.log(SUCCESS, f"Existing build of image found with Asset Collection ID of {ac.id}")
user_logger.log(SUCCESS, f"View AC at {self.platform.get_asset_collection_link(ac)}")
# Set id to None
self.uid = None
if ac:
self.image_tags = ac.tags
self.asset_collection = ac
# how do we get id for original work item from AC?
self.status = EntityStatus.SUCCEEDED
save_sif_asset_md5_from_ac_id(ac.id)
return self.asset_collection
[docs] def wait(self, wait_on_done_progress: bool = True, timeout: int = None, refresh_interval=None, platform: 'IPlatform' = None, wait_progress_desc: str = None) -> Optional[AssetCollection]:
"""
Waits on Singularity Build Work item to finish and fetches the resulting asset collection.
Args:
wait_on_done_progress: When set to true, a progress bar will be shown from the item
timeout: Timeout for waiting on item. If none, wait will be forever
refresh_interval: How often to refresh progress
platform: Platform
wait_progress_desc: Wait Progress Description Text
Returns:
AssetCollection created if item succeeds
"""
# wait on related items before we wait on our item
p = super()._check_for_platform_from_context(platform)
opts = dict(wait_on_done_progress=wait_on_done_progress, timeout=timeout, refresh_interval=refresh_interval, platform=p, wait_progress_desc=wait_progress_desc)
super().wait(**opts)
if self.status == EntityStatus.SUCCEEDED:
return self.__fetch_finished_asset_collection(p)
return None
[docs] def get_id_filename(self, prefix: str = None) -> str:
"""
Determine the id filename. Mostly used when use does not provide one.
The logic is combine prefix and either
* definition file minus extension
* image url using with parts filtered out of the name.
Args:
prefix: Optional prefix.
Returns:
id file name
Raises:
ValueError - When the filename cannot be calculated
"""
if prefix is None:
prefix = ''
if self.definition_file:
base_name = PurePath(self.definition_file).name.replace(".def", ".id")
if prefix:
base_name = f"{prefix}{base_name}"
filename = str(PurePath(self.definition_file).parent.joinpath(base_name))
elif self.image_url:
filename = re.sub(r"(docker|shub)://", "", self.image_url).replace(":", "_")
if filename:
filename = f"{prefix}{filename}"
else:
raise ValueError("Could not calculate the filename. Please specify one")
if not filename.endswith(".id"):
filename += ".id"
return filename
[docs] def to_id_file(self, filename: Union[str, PathLike] = None, save_platform: bool = False):
"""
Create an ID File.
If the filename is not provided, it will be calculate for definition files or for docker pulls
Args:
filename: Filename
save_platform: Save Platform info to file as well
Returns:
None
"""
if filename is None:
filename = self.get_id_filename(prefix='builder.')
super(SingularityBuildWorkItem, self).to_id_file(filename, save_platform)