Source code for idmtools_platform_local.infrastructure.service_manager

"""idmtools service manager. Manages all our local platform services.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from dataclasses import dataclass, field, fields
import logging
import time
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from json import JSONDecodeError
from logging import getLogger
from typing import NoReturn, Dict, Optional, List, Union
from docker import DockerClient
from docker.models.containers import Container
from docker.models.networks import Network
from idmtools.core.system_information import get_data_directory, get_system_information
from idmtools.utils.decorators import optional_yaspin_load
from idmtools_platform_local.infrastructure.base_service_container import BaseServiceContainer
from idmtools_platform_local.infrastructure.postgres import PostgresContainer
from idmtools_platform_local.infrastructure.redis import RedisContainer
from idmtools_platform_local.infrastructure.workers import WorkersContainer

SERVICES = [PostgresContainer, RedisContainer, WorkersContainer]
CONTAINER_WAIT: Dict[str, List[str]] = dict(
    workers=['postgres_port', 'redis_port']
)


logger = getLogger(__name__)


[docs]@dataclass class DockerServiceManager: """ Provides single interface to manage all the local platform services. """ client: DockerClient host_data_directory: str = get_data_directory() network: str = 'idmtools' redis_image: str = 'redis:5.0.4-alpine' heartbeat_timeout: int = 15 redis_port: int = 6379 runtime: Optional[str] = 'runc' redis_mem_limit: str = '256m' redis_mem_reservation: str = '32m' postgres_image: str = 'postgres:11.4' postgres_mem_limit: str = '128m' postgres_mem_reservation: str = '32m' postgres_port: Optional[str] = 5432 workers_image: str = None workers_ui_port: int = 5000 workers_mem_limit: str = None workers_mem_reservation: str = '64m' run_as: Optional[str] = field(default=None) enable_singularity_support: bool = False _services: Dict[str, BaseServiceContainer] = None def __post_init__(self): """Constructor.""" self.system_info = get_system_information() if self.run_as is None: self.run_as = self.system_info.user_group_str if self.run_as == "0:0": message = "You cannot run the containers as the root user!. Please select another value for 'run_as'. " if self.system_info.user_group_str == "0:0": message += " It appears you executed a script/command as the local root user or use sudo to start the" \ " Local Platform. If that is the case, use the 'run_as' configuration option in your " \ "idmtools.ini Local Platform configuration block or initializing the Local Platform object" raise ValueError(message) self.init_services()
[docs] def init_services(self): """Start all the containers we should have running.""" self._services = dict() for _i, service in enumerate(SERVICES): sn = service.__name__.replace("Container", "").lower() if sn not in self._services: self._services[sn] = service(**self.get_container_config(service))
[docs] def cleanup(self, delete_data: bool = False, tear_down_brokers: bool = False) -> NoReturn: """ Stops the containers and removes the network. Optionally the postgres data container can be deleted as well as closing any active Redis connections. Args: delete_data: Delete postgres data tear_down_brokers: True to close redis brokers, false otherwise Returns: NoReturn """ self.stop_services() if tear_down_brokers: from idmtools_platform_local.internals.workers.brokers import close_brokers logger.debug("Closing brokers down") close_brokers() if delete_data: postgres_volume = self.client.volumes.list(filters=dict(name='idmtools_local_postgres')) if postgres_volume: postgres_volume[0].remove(True) network = self.get_network() if network: logger.debug(f'Removing docker network: {self.network}') network.remove()
[docs] @staticmethod def setup_broker(heartbeat_timeout): """Start the broker to send data to workers.""" from idmtools_platform_local.internals.workers.brokers import setup_broker setup_broker(heartbeat_timeout)
[docs] @staticmethod def restart_brokers(heartbeat_timeout): """Restart brokers talking to workers.""" from idmtools_platform_local.internals.workers.brokers import setup_broker, close_brokers close_brokers() setup_broker(heartbeat_timeout)
[docs] @optional_yaspin_load(text="Ensure IDM Tools Local Platform services are loaded") def create_services(self, spinner=None) -> NoReturn: """ Create all the components of local platform. Our architecture is as depicted in the UML diagram below .. uml:: @startuml database "Postgres Container" as db node "Redis Container"as redis node "Workers Container" { [UI] -> db rectangle "Python Workers" { rectangle "Worker ..." as w2 rectangle "Worker 1" as w1 w1 <---> db : Get/Update Status w1 <---> redis : Get task/Add Task w2 <---> db w2 <---> redis } } file "User Python Script" as u u ...> redis : Submit task u <... redis : Get result @enduml Returns: (NoReturn) """ try: self.get_network() for _i, service in enumerate(SERVICES): sn = service.__name__.replace("Container", "").lower() # wait on services to be ready for workers if sn in CONTAINER_WAIT: self.wait_on_ports_to_open(CONTAINER_WAIT[sn]) attempts = 0 while attempts < 3: try: self._services[sn].get_or_create(spinner) break except JSONDecodeError: if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Failure to get service {sn}") if attempts == 3: logger.debug(f"Ran out of attempts to create {sn}") raise if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Retrying {sn}") attempts += 1 self.wait_on_ports_to_open(['workers_ui_port']) self.setup_broker(self.heartbeat_timeout) except Exception as e: if logger.isEnabledFor(logging.DEBUG): logger.exception(e) raise e
[docs] def wait_on_ports_to_open(self, ports: List[str], wait_between_tries: Union[int, float] = 0.2, max_retries: int = 5, sleep_after: Union[int, float] = 0.5) -> bool: """ Polls list of port attributes(eg postgres_port, redis_port and checks if they are currently open. We use this to verify postgres/redis are ready for our workers Args: ports: List of port attributes wait_between_tries: Time between port checks max_retries: Max checks sleep_after: Sleep after all our found open(Postgres starts accepting connections before actually ready) Returns: True if ports are ready """ if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Waiting on the following ports to become available: {str(ports)}") retries = 0 while retries <= max_retries: time.sleep(wait_between_tries) if all([self.is_port_open('127.0.0.1', getattr(self, port)) for port in ports]): if sleep_after > 0: time.sleep(sleep_after) return True retries += 1 logger.warning("Possible issue platform. Could not connect to server ports") return False
[docs] @optional_yaspin_load(text="Stopping IDM Tools Local Platform services") def stop_services(self, spinner=None) -> NoReturn: """ Stops all running IDM Tools services. Returns: (NoReturn) """ with ThreadPoolExecutor() as pool: futures = [] for service in self._services.values(): if logger.isEnabledFor(logging.DEBUG): logger.debug(f'Stopping {service.__class__.__name__}') pool.submit(self.stop_service_and_wait, service) for _done in as_completed(futures): pass
[docs] def get(self, container_name: str, create=True) -> Container: """ Get the server with specified name. Args: container_name: Name of container create: Create if it doesn't exists Returns: Container """ service = self._services[container_name.lower()] return service.get_or_create() if create else service.get()
[docs] def get_container_config(self, service: BaseServiceContainer, opts=None): """ Get the container config for the service. Args: service: Service to get config for opts: Opts to Extract. Should be a fields object Returns: Container config """ dest_fields = {f.name: f for f in fields(service)} if not opts: my_fields = fields(self) else: my_fields = list(opts.keys()) prefix = '' if 'config_prefix' in dest_fields: prefix = dest_fields['config_prefix'].default ret = dict(client=self.client) for opt in my_fields: if opts: dest_name = opt.replace(prefix, "") if dest_name in dest_fields and opts[opt] is not None: ret[dest_name] = opts[opt] else: # we are dealing with the local data class fields dest_name = opt.name.replace(prefix, "") if dest_name in dest_fields and getattr(self, opt.name, None) is not None: ret[dest_name] = getattr(self, opt.name) return ret
[docs] @optional_yaspin_load(text="Restarting IDM-Tools services") def restart_all(self, spinner=None) -> NoReturn: # noqa: F811 """ Restart all the services IDM-Tools services. Returns: (NoReturn) """ for service in self._services.values(): service.restart()
[docs] @staticmethod def is_port_open(host: str, port: int) -> bool: """ Check if a port is open. Args: host: Host to check port: Port to check Returns: True if port is open, False otherwise """ import socket from contextlib import closing if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Checking if port {port} is open on host {host}") with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: return isinstance(port, int) and sock.connect_ex((host, port)) == 0
[docs] @staticmethod def stop_service_and_wait(service) -> bool: """ Stop server and wait. Args: service: Service to stop Returns: True """ service.stop(True) container = service.get() while container: time.sleep(0.1) container = service.get() return True
[docs] def get_network(self) -> Network: """ Fetches the IDM Tools network. Returns: (Network) Return Docker network object """ # check that the network exists network = self.client.networks.list(filters=dict(name=self.network)) # check name specifically network = [x for x in network if x.name == self.network] if not network: logger.debug(f'Creating network {self.network}') network = self.client.networks.create(self.network, driver='bridge', internal=False, attachable=False, ingress=False, scope='local') else: network = network[0] return network