Source code for idmtools_platform_local.infrastructure.base_service_container

"""idmtools base service container.

This defined the base docker container. We use this amongst each service to build our containers as needed.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import difflib
import io
import json
import tarfile
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from getpass import getpass
from logging import getLogger, DEBUG
from typing import Optional, Union, Dict, List
from docker.errors import APIError
from idmtools_platform_local import __version__
from docker import DockerClient
from docker.models.containers import Container

logger = getLogger(__name__)
user_logger = getLogger('user')


[docs]@dataclass class BaseServiceContainer(ABC): """Providers base abstract class for local platform container objects/managers.""" container_name: str = None image: str = None client: DockerClient = None config_prefix: str = None network: str = None
[docs] @staticmethod def get_common_config(container_name: str, image: str, network: str, port_bindings: Optional[Dict] = None, volumes: Optional[Dict] = None, mem_limit: Optional[str] = None, mem_reservation: Optional[str] = None, environment: Optional[List[str]] = None, extra_labels: Optional[Dict] = None, **extras) -> dict: """ Returns portions of docker container configs that are common between all the different containers used within our platform. Args: container_name: Container name image: Image to use network: Network to use port_bindings: Port binding volumes: Volume definitions mem_limit: Memory limit mem_reservation: Memory reservation environment: Environment vars extra_labels: Extra labels to use **extras: Returns: Common configuration object to use when creating a container. Notes: Memory strings should match those used by docker. See --memory details at https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources """ config = dict( name=container_name, image=image, hostname=container_name, network=network, restart_policy=dict(MaximumRetryCount=15, name='on-failure'), detach=True, labels=dict(idmtools_version=__version__) ) if port_bindings: config['ports'] = port_bindings if volumes: config['volumes'] = volumes if environment: config['environment'] = environment if extra_labels: config['labels'].update(extra_labels) if mem_limit: config['mem_limit'] = mem_limit if mem_reservation: config['mem_reservation'] = mem_reservation if extras: config.update(extras) return config
@staticmethod def _get_optional_port_bindings(src_port: Optional[Union[str, int]], dest_port: Optional[Union[str, int]]) -> Optional[dict]: """ Used to generate port bindings configurations if the inputs are not set to none. Args: src_port: Host Port dest_port: Container Port Returns: (Optional[dict]) Dictionary representing the docker port bindings configuration for port if all inputs have values """ return {dest_port: src_port} if src_port is not None and dest_port is not None else None
[docs] def get(self) -> Union[Container, None]: """ Get the container. Returns: Container if it is running. """ container = self.client.containers.list(filters=dict(name=self.container_name), all=True) container = [x for x in container if x.name == self.container_name] if container: container = container[0] if logger.isEnabledFor(DEBUG): logger.debug(f"Found {container.name}") return container return None
[docs] def get_or_create(self, spinner=None) -> Container: """ Get or Create a container. Args: spinner: Optional spinner to display Returns: Docker container object representing service container """ container = self.get() if container is None: if logger.isEnabledFor(DEBUG): logger.debug(f"Creating {self.__class__.__name__}") container = self.create(spinner) else: self.ensure_container_is_running(container, spinner) return container
[docs] def ensure_container_is_running(self, container: Container, spinner=None) -> Container: """ Ensures is running. Args: container:container to check if it is running spinner: Optional spinner to show we are busy while checking. Returns: Container """ if container.status in ['exited', 'created']: logger.debug(f"Restarting container: {container.name}") container.start() container.reload() else: # validate config matched what we expect if self.has_different_config(container): self.stop(remove=True, container=container) container = self.create(spinner) return container
[docs] def has_different_config(self, container, show_diff: bool = True): """ Detect if the config is difference that running container. Args: container: Container show_diff: Should we display diff Returns: True if there is differing configuration """ running_config = self.get_running_config(container) current_config = json.dumps(self.get_configuration(), indent=4, sort_keys=True) if current_config != running_config: if show_diff: lines = difflib.unified_diff(running_config.split("\n"), current_config.split("\n")) user_logger.info(f"Configuration changed for {self.container_name}\n") user_logger.info("\n".join(lines)) return True return False
[docs] def get_running_config(self, container): """ Fetches the config used to start a container. Args: container:Container to use Returns: The config from the running container. """ stream, stat = container.get_archive("/local_config.json") file_obj = io.BytesIO() for i in stream: file_obj.write(i) file_obj.seek(0) tar = tarfile.open(mode='r', fileobj=file_obj) running_config = tar.extractfile('local_config.json').read().decode("utf-8") if logger.isEnabledFor(DEBUG): logger.debug(f"Running Config for {self.container_name}.\n{running_config}") return running_config
[docs] @staticmethod def copy_config_to_container(container: Container, config: dict): """ Copy the configuration used to create container to a container for future reference. Args: container: Target container config: Config to copy Returns: None """ if logger.isEnabledFor(DEBUG): logger.debug("Copying config used to create container to container") file_like_object = io.BytesIO(initial_bytes=json.dumps(config, indent=4, sort_keys=True).encode('utf-8')) tar_memory = io.BytesIO() tar = tarfile.open(fileobj=tar_memory, mode='w') try: ti = tarfile.TarInfo('local_config.json') ti.size = len(file_like_object.getvalue()) tar.addfile(ti, file_like_object) finally: tar.close() tar_memory.seek(0) container.put_archive("/", tar_memory.read())
[docs] def create(self, spinner=None) -> Container: """ Create our container. Args: spinner: Optional spinner. When provided, we will use it to indicate we are busy during long-running tasks. Returns: Created Container. Raises: EnvironmentError - If a container tries to start with a port in use ValueError - Unkown cause Notes: - TODO - Add Exception for environment error with doc_link """ retries = 0 while retries < 3: try: container_config = self.get_configuration() if logger.isEnabledFor(DEBUG): logger.debug(f'Container Config {str(container_config)}') container = self.client.containers.run(**container_config) # give some start time to containers time.sleep(0.25) # check status of container until it is no longer starting/created self.wait_on_status(container) logger.debug(f'{self.container_name}: {container.status}: {container.id}') if container.status in ['failed']: raise EnvironmentError(f"Could not start {self.__class__.__name__}") self.copy_config_to_container(container, container_config) return container except APIError as e: retries += 1 if logger.isEnabledFor(DEBUG): logger.exception(e) if e.status_code == 409: self.stop(True) elif e.status_code in [500]: content = e.response.json() if 'message' in content and 'unauthorized' in content['message']: if spinner: spinner.hide() registry = self.image.split("/")[0] print(f"\nAuthentication needed for {registry}.\nIt is best to login manually outside of " f"idmtools using\n docker login {registry}\nas this will save your password\n" f"Prompting for credentials for one time user:\n") username = input(f'{registry} Username:') password = getpass('Password:') self.client.login(username, password, registry=registry) if spinner: spinner.show() elif 'message' in content and 'address already in use' in content['message']: raise EnvironmentError(f"Could not start docker service {self.container_name} due to a port" f" being already in use. See Full error: {content['message']}") elif e.status_code == 404: print(f'\n\nCould not locate a docker image with the tag: {self.image}\n' f'Please check the name of the image or ensure you have built that image locally.' f'You can test a manual pull using \n' f'docker pull {self.image}') raise e else: raise e if retries > 2: raise ValueError("Could not run workers image. Likely causes are:\n\t- A used port" "\n\t-A service being down such as redis or postgres" "\n\t-Authentication issues with the docker registry")
[docs] @staticmethod def wait_on_status(container, sleep_interval: float = 0.2, max_time: float = 2, statutes_to_wait_for: List[str] = None): """ Wait on a container to achieve a specific status. Args: container: Target container sleep_interval: How long to wait between checks max_time: Max time to wait(default to 2 seconds) statutes_to_wait_for: List of statuss to wait for. When not set, we use starting and created. Returns: None """ if statutes_to_wait_for is None: statutes_to_wait_for = ['starting', 'created'] start = time.time() if logger.isEnabledFor(DEBUG): logger.debug(f'Waiting on {container.name} to become in {statutes_to_wait_for}') while container.status in statutes_to_wait_for and (time.time() - start) < max_time: time.sleep(sleep_interval) container.reload()
[docs] def stop(self, remove=False, container: Container = None): """ Stop a container. Args: remove: When true, the container will be removed after being stopped container: Container to stop Returns: None """ if container is None: container = self.get() if container: if logger.isEnabledFor(DEBUG): logger.debug(f'Stopping container: {container.name}') container.stop() if remove: if logger.isEnabledFor(DEBUG): logger.debug(f'Removing container: {container.name}') container.remove()
[docs] def restart(self, container: Container = None): """ Restart a container. Args: container: Container to restart. Returns: None """ if container is None: container = self.get() if container: if logger.isEnabledFor(DEBUG): logger.debug(f'Restarting container: {container.name}') container.restart() if logger.isEnabledFor(DEBUG): logger.debug(f'Container restarted: {container.name}')
[docs] def get_logs(self): """ Get logs from container. Returns: Container logs as a string. """ container = self.get() if container: if logger.isEnabledFor(DEBUG): logger.debug(f'Get container logs: {container.name}') logs = container.logs() if logs: logs = logs.decode('utf-8') return logs return ''
[docs] @abstractmethod def get_configuration(self) -> Dict: """ Get configuration. Each sub-class should define this to provide their own specific configuration. Returns: Configuration dict """ pass