"""
Here we implement the ContainerPlatform docker operations.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import docker
import platform as sys_platform
import subprocess
from dataclasses import dataclass
from typing import List, Dict, NoReturn, Any, Union
from idmtools.core import ItemType
from idmtools_platform_container.utils.general import normalize_path, parse_iso8601
from idmtools_platform_container.utils.job_history import JobHistory
from docker.models.containers import Container
from docker.errors import NotFound as ErrorNotFound
from docker.errors import APIError as DockerAPIError
from logging import getLogger, DEBUG
logger = getLogger(__name__)
user_logger = getLogger('user')
# Only consider the containers that can be restarted
CONTAINER_STATUS = ['exited', 'running', 'paused']
[docs]def validate_container_running(platform, **kwargs) -> str:
"""
Check if the docker daemon is running, find existing container or start a new container.
Args:
platform: Platform
kwargs: keyword arguments used to expand functionality
Returns:
container id
"""
# Check image exists
if not check_local_image(platform.docker_image):
user_logger.info(f"Image {platform.docker_image} does not exist, pull the image first.")
succeeded = pull_docker_image(platform.docker_image)
if not succeeded:
user_logger.error(f"/!\\ ERROR: Failed to pull image {platform.docker_image}.")
exit(-1)
# User configuration
if logger.isEnabledFor(DEBUG):
logger.debug(f"User config: force_start={platform.force_start}")
logger.debug(f"User config: new_container={platform.new_container}")
logger.debug(f"User config: include_stopped={platform.include_stopped}")
# Check containers
container_id = None
container_match = platform.retrieve_match_containers()
container_running = [container for status, container in container_match if status == 'running']
container_stopped = [container for status, container in container_match if status != 'running']
if logger.isEnabledFor(DEBUG):
logger.debug(f"Found running matched containers: {container_running}")
if platform.include_stopped:
logger.debug(f"Found stopped matched containers: {container_stopped}")
if platform.force_start:
if logger.isEnabledFor(DEBUG) and len(container_running) > 0:
logger.debug(f"Stop all running containers {container_running}")
stop_all_containers(container_running, keep_running=False)
container_running = []
if logger.isEnabledFor(DEBUG) and len(container_stopped) > 0 and platform.include_stopped:
logger.debug(f"Stop all stopped containers {container_stopped}")
stop_all_containers(container_stopped, keep_running=False)
container_stopped = []
if not platform.new_container and platform.container_prefix is None:
if len(container_running) > 0:
# Pick up the first running container
container_running = sort_containers_by_start(container_running)
container_id = container_running[0].short_id
container = get_container(container_id)
if sys_platform.system() not in ["Windows"]:
command = f"bash -c '[ \"$(ls -lart {platform.data_mount} | wc -l)\" -ge 3 ] && echo exists || echo not_exists'"
result = container.exec_run(command)
output = result.output.decode().strip()
if output == "not_exists":
stop_container(container_id, remove=True)
if logger.isEnabledFor(DEBUG):
logger.debug(f"Existing container {container_id} is not usable")
container_id = None
if container_id is not None:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Pick running container {container_id}.")
elif len(container_stopped) > 0:
# Pick up the first stopped container and then restart it
container_stopped = sort_containers_by_start(container_stopped)
container = container_stopped[0]
container.restart()
container_id = container.short_id
if logger.isEnabledFor(DEBUG):
logger.debug(f"Pick and restart the stopped container {container.short_id}.")
# Start the container
if container_id is None:
container_id = platform.start_container(**kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug(f"Start container: {platform.docker_image}")
logger.debug(f"New container ID: {container_id}")
return container_id
#############################
# Check containers
#############################
[docs]def get_container(container_id) -> Any:
"""
Get the container object by container ID.
Args:
container_id: container id
Returns:
container object
"""
client = docker.from_env()
try:
# Retrieve the container
container = client.containers.get(container_id)
return container
except ErrorNotFound:
logger.debug(f"Container with ID {container_id} not found.")
return None
except DockerAPIError as e:
logger.debug(f"Error retrieving container with ID {container_id}: {str(e)}")
return None
[docs]def find_container_by_image(image: str, include_stopped: bool = False) -> Dict:
"""
Find the containers that match the image.
Args:
image: docker image
include_stopped: bool, if consider the stopped containers or not
Returns:
dict of containers
"""
container_found = {}
for status, container_list in get_containers(include_stopped).items():
container_found[status] = [container for container in container_list if
image == container.attrs['Config']['Image']]
return container_found
[docs]def stop_container(container: Union[str, Container], remove: bool = True) -> NoReturn:
"""
Stop a container.
Args:
container: container id or container object to be stopped
remove: bool, if remove the container or not
Returns:
No return
"""
try:
if isinstance(container, str):
container = get_container(container)
elif not isinstance(container, Container):
raise TypeError("Invalid container object.")
# Stop the container
if container.status == 'running':
container.stop()
if logger.isEnabledFor(DEBUG):
logger.debug(f"Container {str(container)} has been stopped.")
if remove:
container.remove()
if logger.isEnabledFor(DEBUG):
logger.debug(f"Container {str(container)} has been removed.")
except ErrorNotFound:
if isinstance(container, str):
logger.debug(f"Container with ID {container} not found.")
else:
logger.debug(f"Container {container.short_id} not found.")
exit(-1)
except DockerAPIError as e:
logger.debug(f"Error stopping container {str(container)}: {str(e)}")
exit(-1)
[docs]def stop_all_containers(containers: List[Union[str, Container]], keep_running: bool = True,
remove: bool = True) -> NoReturn:
"""
Stop all containers.
Args:
containers: list of container id or containers to be stopped
keep_running: bool, if keep the running containers or not
remove: bool, if remove the container or not
Returns:
No return
"""
for container in containers:
if container.status == 'running' and keep_running:
jobs = list_running_jobs(container.short_id)
if jobs:
continue
stop_container(container, remove=remove)
[docs]def restart_container(container: Union[str, Container]) -> NoReturn:
"""
Restart a container.
Args:
container: container id or container object to be restarted
Returns:
No return
"""
try:
if isinstance(container, str):
container = get_container(container)
elif not isinstance(container, Container):
raise TypeError("Invalid container object.")
if container is None:
user_logger.error(f"Container {container} not found.")
exit(-1)
# Restart the container
container.restart()
if logger.isEnabledFor(DEBUG):
logger.debug(f"Container {container.short_id} has been restarted.")
except DockerAPIError as e:
user_logger.error(f"Error restarting container {container.short_id}: {str(e)}")
exit(-1)
except Exception as e:
user_logger.error(f"Restarting container {container.short_id} encounters an unexpected error: {e}")
exit(-1)
[docs]def sort_containers_by_start(containers: List[Container], reverse: bool = True) -> List[Container]:
"""
Sort the containers by the start time.
Args:
containers: list of containers
reverse: bool, if sort in reverse order
Returns:
sorted list of containers
"""
# Sort containers by 'StartedAt' in descending order
sorted_container_list = sorted(
containers,
key=lambda container: parse_iso8601(container.attrs['State']['StartedAt']),
reverse=reverse
)
return sorted_container_list
[docs]def get_containers(include_stopped: bool = False) -> Dict:
"""
Find the containers that match the image.
Args:
include_stopped: bool, if consider the stopped containers or not
Returns:
dict of containers
"""
client = docker.from_env()
container_found = {}
# Get all containers
all_containers = client.containers.list(all=include_stopped)
# Filter the containers
all_containers = [ct for ct in all_containers if
ct.status in CONTAINER_STATUS and JobHistory.verify_container(ct.short_id)]
# Separate the containers
container_found['running'] = [ct for ct in all_containers if ct.status == 'running']
container_found['stopped'] = [ct for ct in all_containers if ct.status != 'running']
return container_found
[docs]def get_working_containers(container_id: str = None, entity: bool = False) -> List[Any]:
"""
Get the working containers.
Args:
container_id: Container ID
entity: bool, if return the container object or container id
Returns:
list of working containers or IDs
"""
if container_id is None:
if entity:
containers = get_containers().get('running', [])
else:
containers = [c.short_id for c in get_containers().get('running', [])]
else:
# Check if the container is in the history and running
if not JobHistory.verify_container(container_id):
# The container is not in the history.
logger.error(f"Container {container_id} not found in History.")
containers = []
else:
# The container is in the history, we need to verify if it still exists.
container = get_container(container_id)
if container:
# We only consider the running container
if container.status == 'running':
containers = [container] if entity else [container.short_id]
else:
logger.warning(f"Container {container_id} is not running.")
containers = []
else:
logger.warning(f"Container {container_id} not found.")
containers = []
return containers
#############################
# Check docker
#############################
[docs]def is_docker_installed() -> bool:
"""
Check if Docker is installed.
Returns:
True/False
"""
try:
# Run the 'docker --version' command
result = subprocess.run(['docker', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Check the return code to see if it executed successfully
if result.returncode == 0:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Docker is installed: {result.stdout.strip()}")
return True
else:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Docker is not installed. Error: {result.stderr.strip()}")
return False
except FileNotFoundError:
# If the docker executable is not found, it means Docker is not installed
if logger.isEnabledFor(DEBUG):
logger.debug("Docker is not installed or not found in PATH.")
return False
[docs]def is_docker_daemon_running() -> bool:
"""
Check if the Docker daemon is running.
Returns:
True/False
"""
try:
client = docker.from_env()
client.ping()
if logger.isEnabledFor(DEBUG):
logger.debug("Docker daemon is running.")
return True
except DockerAPIError as e:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Docker daemon is not running: {e}")
return False
except Exception as ex:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Error checking Docker daemon: {ex}")
return False
#############################
# Check images
#############################
[docs]def check_local_image(image_name: str) -> bool:
"""
Check if the image exists locally.
Args:
image_name: image name
Returns:
True/False
"""
client = docker.from_env()
for image in client.images.list():
if image_name in image.tags:
return True
return False
[docs]def pull_docker_image(image_name, tag='latest') -> bool:
"""
Pull a docker image from IDM artifactory.
Args:
image_name: image name
tag: image tag
Returns:
True/False
"""
# Check if the image name contains the tag
if ':' in image_name:
full_image_name = image_name
else:
full_image_name = f'{image_name}:{tag}'
# Pull the image
user_logger.info(f'Pulling image {full_image_name} ...')
try:
client = docker.from_env()
client.images.pull(f'{full_image_name}')
if logger.isEnabledFor(DEBUG):
logger.debug(f'Successfully pulled {full_image_name}')
return True
except DockerAPIError as e:
if logger.isEnabledFor(DEBUG):
logger.debug(f'Error pulling {full_image_name}: {e}')
return False
#############################
# Check binding/mounting
#############################
[docs]def compare_mounts(mounts1: List[Dict], mounts2: List[Dict]) -> bool:
"""
Compare two sets of mount configurations.
Args:
mounts1: container mounting configurations
mounts2: container mounting configurations
Returns:
True/False
"""
# Convert mount configurations to a set of tuples for easy comparison
mounts_set1 = set(
(mount['Type'], mount['Mode'], normalize_path(mount['Source']), normalize_path(mount['Destination'])) for
mount in mounts1
)
mounts_set2 = set(
(mount['Type'], mount['Mode'], normalize_path(mount['Source']), normalize_path(mount['Destination'])) for
mount in mounts2
)
return mounts_set1 == mounts_set2
[docs]def compare_container_mount(container1: Union[str, Container], container2: Union[str, Container]) -> bool:
"""
Compare the mount configurations of two containers.
Args:
container1: container object or id
container2: container object or id
Returns:
True/False
"""
# Get the container objects
if isinstance(container1, str):
container1 = get_container(container1)
if isinstance(container2, str):
container2 = get_container(container2)
# Get the mount configurations
mounts1 = container1.attrs['Mounts']
mounts2 = container2.attrs['Mounts']
return compare_mounts(mounts1, mounts2)
#############################
# Check jobs
#############################
PS_QUERY = 'ps xao pid,ppid,pgid,etime,cmd | head -n 1 && ps xao pid,ppid,pgid,etime,cmd | grep -e EXPERIMENT -e SIMULATION | grep -v grep'
[docs]@dataclass(repr=False)
class Job:
"""Running Job."""
item_id: str = None
item_type: ItemType = None
job_id: int = None
group_pid: int = None
container_id: str = None
elapsed: str = None
[docs] def __init__(self, container_id: str, process_line: str):
"""
Initialize Job.
Args:
container_id: Container ID
process_line: Process Input Line
"""
process = process_line.split()
parts = process[4].split(':')
self.item_id = parts[1]
self.group_pid = int(process[2])
self.item_type = ItemType.EXPERIMENT if parts[0] == 'EXPERIMENT' else ItemType.SIMULATION
if parts[0] == 'EXPERIMENT':
self.job_id = int(process[2])
elif parts[0] == 'SIMULATION':
self.job_id = int(process[0])
self.container_id = container_id
self.elapsed = process[3]
[docs] def display(self):
"""Display Job for debugging usage."""
user_logger.info(f"Item ID: {self.item_id:15}")
user_logger.info(f"Item Type: {self.item_type:15}")
user_logger.info(f"Job ID: {self.job_id:15}")
user_logger.info(f"Group PID: {self.group_pid:15}")
user_logger.info(f"Container ID: {self.container_id:15}")
user_logger.info(f"Elapsed: {self.elapsed:15}")
[docs]def list_running_jobs(container_id: str, limit: int = None) -> List[Job]:
"""
List all running jobs on the container.
Args:
container_id: Container ID
limit: number of jobs to view
Returns:
list of running jobs
"""
command = f'docker exec {container_id} bash -c "({PS_QUERY})"'
result = subprocess.run(command, shell=True, check=False, capture_output=True, text=True)
running_jobs = []
if result.returncode == 0:
processes = result.stdout
for line in processes.splitlines()[1:]: # Skip the first header line
if 'EXPERIMENT' in line or 'SIMULATION' in line:
# Create a new job
job = Job(container_id, line)
running_jobs.append(job)
elif result.returncode == 1:
pass
else:
logger.error(result.stderr)
user_logger.error(f"Command failed with return code {result.returncode}")
exit(-1)
if limit:
running_jobs = running_jobs[:limit]
return running_jobs[:limit]
[docs]def find_running_job(item_id: Union[int, str], container_id: str = None) -> Job:
"""
Check item running on container.
Args:
item_id: Experiment/Simulation ID or Running Job ID
container_id: Container ID
Returns:
running Job
"""
if container_id:
containers = [container_id]
else:
# Check if the item is an Experiment ID
his_job = JobHistory.get_job(item_id)
if his_job:
# item_id is an Experiment ID
containers = [his_job['CONTAINER']]
else:
# item_id is a Simulation ID or Job ID, we need to check all working containers
containers = get_working_containers()
match_jobs = []
for cid in containers:
# List all running jobs on the container
jobs = list_running_jobs(cid)
if len(jobs) == 0:
continue
# Container has running jobs
for job in jobs:
# Check if the job is the one we are looking for
if job.item_id == item_id or str(job.job_id) == str(item_id):
match_jobs.append(job)
break # One running container can't have multiple matches!
if len(match_jobs) > 1:
# item_id must be a Job ID in this case and container_id must be None!
user_logger.error(
f"Multiple jobs found for Job ID {item_id}, please provide the Container ID or use Entity ID instead.")
exit(-1)
elif len(match_jobs) == 1:
return match_jobs[0]
else:
return None