Source code for idmtools.core.docker_task

"""
DockerTask provides a utility to run docker images.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import re
import sys
import unicodedata
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from typing import Optional, Type
from idmtools import __version__ as idmtools_version, IdmConfigParser
from idmtools.assets import AssetCollection, json
from idmtools.core.logging import getLogger
from idmtools.entities.itask import ITask
from idmtools.entities.platform_requirements import PlatformRequirements
from idmtools.registry.task_specification import TaskSpecification

logger = getLogger(__name__)
user_logger = getLogger('user')


[docs]@dataclass class DockerTask(ITask): """ Provides a task to run or optionally build a docker container. """ image_name: str = field(default=None, metadata={"md": True}) # Optional config to build the docker image build: bool = field(default=False, metadata={"md": True}) build_path: Optional[str] = field(default=None, metadata={"md": True}) # This should in the build_path directory Dockerfile: Optional[str] = field(default=None, metadata={"md": True}) pull_before_build: bool = field(default=True, metadata={"md": True}) use_nvidia_run: bool = field(default=False, metadata={"md": True}) __image_built: bool = field(default=False) def __post_init__(self): """ Set our platform requirements and optionally trigger image build. Returns: None """ super().__post_init__() self.add_platform_requirement(PlatformRequirements.DOCKER) if self.build: self.build_image()
[docs] def gather_common_assets(self) -> AssetCollection: """ Gather common(experiment-level) assets from task. Returns: AssetCollection containing all the common assets """ if self.image_name is None: raise ValueError("Image Name is required") return self.common_assets
[docs] def gather_transient_assets(self) -> AssetCollection: """ Gather transient(simulation-level) assets from task. Returns: AssetCollection """ return self.transient_assets
[docs] def build_image(self, spinner=None, **extra_build_args): """ Build our docker image. Args: spinner: Should we display a CLI spinner **extra_build_args: Extra build arguments to pass to docker Returns: None """ if not self.__image_built: import docker from docker.errors import BuildError if spinner: spinner.text = f"Building {self.image_name}" # if the build_path is none use current working directory if self.build_path is None: self.build_path = os.getcwd() client = docker.client.from_env() build_config = dict( path=self.build_path, dockerfile=self.Dockerfile, tag=self.image_name, labels=dict( uildstamp=f'built-by idmtools {idmtools_version}', builddate=str(datetime.now(timezone(timedelta(hours=-8))))) ) if extra_build_args: build_config.update(extra_build_args) logger.debug(f"Build configuration used: {str(build_config)}") self.__image_built = True if not IdmConfigParser.is_progress_bar_disabled(): from tqdm import tqdm prog = tqdm( desc='Building docker image', total=10, bar_format='Building Docker Image: |{bar}| {percentage:3.0f}% [{n_fmt}/{total_fmt}] [{elapsed}] {desc}' ) try: build_step = None # regular expression to grab progress progress_grep = re.compile(r'Step ([0-9]+)/([0-9]+) : (.*)') # regular expression to filter out ansi codes ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') for line in client.api.build(**build_config): line = json.loads(line) if 'stream' in line: line = line['stream'] line = ansi_escape.sub('', line).strip() # strip unicode data line = "".join(ch for ch in line if unicodedata.category(ch)[0] != "C") logger.debug('Raw Docker Output: %s', line) if line: grps = progress_grep.match(line) if grps: try: step = int(grps.group(1)) total_steps = int(grps.group(2)) if prog: prog.n = step prog.total = total_steps line = grps.group(3) except: # noqa E722 pass if prog: prog.set_description(line) build_step = line # update build step with output elif build_step: if len(line) > 40: line = line[:40] if prog: prog.set_description(f'{build_step}: {line}') elif 'status' in line: line = line['status'].strip() if prog: prog.set_description(line) logger.info('Build Successful)') except BuildError as e: logger.info(f"Build failed for {self.image_name} with message {e.msg}") logger.info(f'Build log: {e.build_log}') sys.exit(-1) finally: if prog: prog.close()
[docs] def reload_from_simulation(self, simulation: 'Simulation'): # noqa E821 """ Method to reload task details from simulation object. Currently we do not do this for docker task. Args: simulation: Simulation to load data from Returns: None """ pass
[docs]class DockerTaskSpecification(TaskSpecification): """ DockerTaskSpecification provides the task plugin to idmtools for DockerTask. """
[docs] def get(self, configuration: dict) -> DockerTask: """ Get instance of DockerTask with configuration provided. Args: configuration: configuration for DockerTask Returns: DockerTask with configuration """ return DockerTask(**configuration)
[docs] def get_description(self) -> str: """ Get description of plugin. Returns: Plugin description """ return "Defines a docker command"
[docs] def get_type(self) -> Type[DockerTask]: """ Get type of task provided by plugin. Returns: DockerTask """ return DockerTask