"""
idmtools ContainerPlatform CLI commands.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import json
import click
import shutil
import subprocess
from typing import Union, List, Tuple
from pathlib import Path
from rich.console import Console
from rich.table import Table
from idmtools.core import ItemType
from idmtools_platform_container.container_operations.docker_operations import list_running_jobs, find_running_job, \
is_docker_installed, is_docker_daemon_running, get_working_containers, get_containers, get_container
from idmtools_platform_container.utils.job_history import JobHistory
from idmtools_platform_container.utils.status import summarize_status_files, get_simulation_status
from idmtools_platform_container.utils.general import convert_byte_size, format_timestamp
from logging import getLogger
logger = getLogger(__name__)
user_logger = getLogger('user')
EXPERIMENT_FILES = ['stdout.txt', 'stderr.txt']
SIMULATION_FILES = ['stdout.txt', 'stderr.txt', 'job_status.txt', 'status.txt', 'output']
##########################
# Container Commands
#########################
IMPORTANT_COMMANDS = ['status', 'cancel', 'jobs', 'history']
[docs]class CustomGroup(click.Group):
"""Custom Group class for Container Platform CLI commands."""
[docs] def __init__(self, *args, **kwargs):
"""
Initialize CustomGroup.
Args:
args: Positional arguments
kwargs: USer defined arguments
"""
self.allowed_commands = kwargs.pop('allowed_commands', None)
super().__init__(*args, **kwargs)
[docs] def parse_args(self, ctx, args):
"""
Parse arguments.
Args:
ctx: click context
args: user arguments
Returns:
None
"""
# Intercept and process --all flag early
if '--all' in args:
ctx.params['all'] = True
self.allowed_commands = None
super().parse_args(ctx, args)
[docs] def list_commands(self, ctx) -> List[str]:
"""
List commands.
Args:
ctx: click context
Returns:
list of commands
"""
commands = super().list_commands(ctx)
if not ctx.params.get('all') and self.allowed_commands:
commands = [cmd for cmd in commands if cmd in self.allowed_commands]
return commands
@click.group(cls=CustomGroup, allowed_commands=IMPORTANT_COMMANDS, short_help="Container platform related commands.")
@click.option('--all', is_flag=True, help="Show all commands")
def container(all):
"""
Container Platform CLI commands.
Args:
all: Bool, show all commands
Returns:
None
"""
pass
@container.command(help="Verify the Docker environment.")
def verify_docker():
"""Check docker environment."""
if not is_docker_installed():
user_logger.error("Docker is not installed.")
exit(-1)
if not is_docker_daemon_running():
user_logger.warning("Docker daemon is not running.")
exit(-1)
# Check docker version
result = subprocess.run(['docker', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
console = Console()
console.print(f"{result.stdout.strip()}.")
@container.command(help="Cancel an Experiment/Simulation job.\n\n"
"Arguments:\n\n"
" ITEM_ID: Experiment/Simulation ID or Job ID")
@click.argument('item-id', required=True)
@click.option('-c', '--container_id', help="Container Id")
def cancel(item_id: Union[int, str], container_id: str = None):
"""
Cancel Experiment/Simulation job.
Args:
item_id: Experiment/Simulation ID or Job ID
container_id: Container ID
Returns:
None
"""
console = Console()
job = find_running_job(item_id, container_id)
if job:
if job.item_type == ItemType.EXPERIMENT:
kill_cmd = f"docker exec {job.container_id} pkill -TERM -g {job.job_id}"
else:
kill_cmd = f"docker exec {job.container_id} kill -9 {job.job_id}"
result = subprocess.run(kill_cmd, shell=True, stderr=subprocess.PIPE, text=True) # default: check=False
if result.returncode == 0:
console.print(f"Successfully killed {job.item_type.name} {job.job_id}")
else:
console.print(f"Error killing {job.item_type.name} {job.item_id}: {result.stderr}")
else:
user_logger.warning(f"Not found job {item_id}.")
@container.command(help="Check the status of an Experiment/Simulation.\n\n"
"Arguments:\n\n"
" ITEM_ID: Experiment/Simulation ID or Job ID")
@click.argument('item-id', required=True)
@click.option('-c', '--container_id', help="Container Id")
@click.option('-l', '--limit', default=10, help="Max number of simulations to show")
@click.option('--verbose/--no-verbose', default=False, help="Display with working directory or not")
def status(item_id: Union[int, str], container_id: str = None, limit: int = 10, verbose: bool = False):
"""
Check Experiment/Simulation status.
Args:
item_id: Experiment/Simulation ID or Job ID
container_id: Container ID
limit: number of simulations to display
verbose: display simulation details or not
Returns:
None
"""
console = Console()
item_dir = JobHistory.get_item_path(item_id)
if item_dir is not None:
# Experiment/Simulation case
item_type = item_dir[1]
if item_type == ItemType.SIMULATION:
st = get_simulation_status(item_dir[0])
console.print(f"{item_type.name} {item_id} is {st}.")
elif item_type == ItemType.EXPERIMENT:
exp_dir = item_dir[0]
summarize_status_files(exp_dir, max_display=limit, verbose=verbose)
else:
user_logger.warning(f"{item_type.name} {item_id} status id not defined.")
else:
# Job ID case
job = find_running_job(item_id, container_id)
if job:
if job.item_type == ItemType.EXPERIMENT:
job_cache = JobHistory.get_job(job.item_id)
exp_dir = job_cache['EXPERIMENT_DIR']
summarize_status_files(exp_dir, max_display=limit, verbose=verbose)
elif job.item_type == ItemType.SIMULATION:
console.print(f"Simulation {job.item_id} is RUNNING.")
else:
user_logger.warning(f"Job {item_id} not found.")
@container.command(help="List running Experiment/Simulation jobs.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID (optional)")
@click.argument('container-id', required=False)
@click.option('-l', '--limit', default=10, help="Max number of simulations to show")
@click.option('-n', '--next', default=0, type=int, help="Next number of jobs to show")
def jobs(container_id: str = None, limit: int = 10, next: int = 0):
"""
List running Experiment/Simulation jobs in Container(s).
Args:
container_id: Container ID
limit: number of simulations to display
next: next number of jobs to show
Returns:
None
"""
containers = get_working_containers(container_id)
if len(containers) == 0:
if container_id:
user_logger.warning(f"Container {container_id} not found.")
else:
user_logger.warning("No containers found.")
return
for container_id in containers:
running_jobs = list_running_jobs(container_id)
if not running_jobs:
continue
# Separate jobs by group_pid
group = {}
for job in running_jobs:
if job.group_pid not in group:
group[job.group_pid] = []
group[job.group_pid].append(job)
console = Console()
for g in group:
_jobs = group[g]
# Get total number of running simulations
total_jobs = len(_jobs)
# Take the first job which is the experiment
exp_job = _jobs[0]
# Skip the first job which is the experiment
sim_jobs = _jobs[1:]
start = next * limit
end = start + limit
sim_next = sim_jobs[start:end]
# Include the experiment job
sim_next.insert(0, exp_job)
# Skip the first job which is the experiment
console.print(
f"[bold][cyan]Experiment[/][/] {exp_job.item_id} on [bold][cyan]Container[/][/] [red]{container_id}[/] has {total_jobs - 1} running [bold][cyan]simulations[/][/].")
table = Table()
table.add_column("Entity Type", justify="right", style="cyan", no_wrap=True)
table.add_column("Entity ID", style="yellow")
table.add_column("Job ID", justify="right", style="green")
table.add_column("Container", justify="right", style="plum2")
table.add_column("Status", justify="right", style="red")
table.add_column("Elapsed", justify="right", style="blue")
for job in sim_next:
table.add_row(job.item_type.name, str(job.item_id), str(job.job_id), job.container_id, 'running',
job.elapsed)
console.print(table)
@container.command(help="Retrieve Experiment history.\n\n"
"Arguments:\n\n"
" EXP_ID: Experiment ID")
@click.argument('exp-id', type=str, required=True)
def get_detail(exp_id: str):
"""
Get Experiment job history.
Args:
exp_id: Experiment ID
Returns:
None
"""
item = JobHistory.get_job(exp_id)
if item:
console = Console()
console.print_json(json.dumps(item, indent=2))
@container.command(help="View the job history.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID")
@click.argument('container-id', required=False)
@click.option('-l', '--limit', default=10, type=int, help="Max number of jobs to show")
@click.option('-n', '--next', default=0, type=int, help="Next number of jobs to show")
def history(container_id: str = None, limit: int = 10, next: int = 0):
"""
View job history.
Args:
container_id: Container ID
limit: number of jobs to show
next: next number of jobs to show
Returns:
None
"""
data = JobHistory.view_history(container_id)
start = next * limit
end = start + limit
data_next = data[start:end]
console = Console()
console.print(f"There are {len(data)} Experiment cache in history.")
for job in data_next:
console.print(f"{'':-^100}")
for k, v in job.items():
# Skip some keys
if k in ('EXPERIMENT_DIR', 'SUITE_ID'):
continue
console.print(f"[bold][cyan]{k:16}[/][/]: {v}")
@container.command(help="Locate Suite/Experiment/Simulation file directory.\n\n"
"Arguments:\n\n"
" ITEM_ID: Suite/Experiment/Simulation ID")
@click.argument('item-id', type=str, required=True)
def path(item_id: str):
"""
Find Suite/Experiment/Simulation file directory.
Args:
item_id: Suite/Experiment/Simulation ID
Returns:
None
"""
item = JobHistory.get_item_path(item_id)
if item:
console = Console()
console.print(f"{item[1].name}: {item[0]}")
@container.command(help="Check if an Experiment/Simulation is running.\n\n"
"Arguments:\n\n"
" ITEM_ID: Experiment/Simulation ID")
@click.argument('item-id', type=str, required=True)
def is_running(item_id: str):
"""
Check if Experiment/Simulation is running.
Args:
item_id: Experiment/Simulation ID
Returns:
None
"""
console = Console()
job = find_running_job(item_id)
if job:
console.print(f"{job.item_type.name} {job.item_id} is running on container {job.container_id}.")
else:
# Check if it is a valid Experiment/Simulation ID
his_path = JobHistory.get_item_path(item_id)
if his_path:
# Check item type
item_type = his_path[1]
if item_type == ItemType.SUITE:
console.print(f"{item_id} is not a valid Experiment/Simulation ID.")
else:
console.print(f"{item_type.name} {item_id} is not running.")
else:
console.print(f"Job {item_id} is not found.")
@container.command(help="Check the history volume.")
def volume():
"""Get job history volume."""
v = JobHistory.volume()
mv = convert_byte_size(v)
console = Console()
console.print(f"Job history volume: {mv}")
@container.command(help="Clear the job history.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID (optional)")
@click.argument('container-id', required=False)
def clear_history(container_id: str = None):
"""
Clear Job History.
Args:
container_id: Container ID
Returns:
None
"""
JobHistory.clear(container_id)
@container.command(help="Sync the file system with job history.")
def sync_history():
"""Sync file system with job history."""
JobHistory.sync()
@container.command(help="Get the count of count histories.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID (optional)")
@click.argument('container-id', required=False)
def history_count(container_id: str = None):
"""
Get History Count.
Args:
container_id: Container ID
Returns:
None
"""
console = Console()
console.print(JobHistory.count(container_id))
@container.command(help="Clear job results files and folders.\n\n"
"Arguments:\n\n"
" ITEM_ID: Experiment/Simulation ID")
@click.argument('item-id', type=str, required=True)
@click.option('-r', '--remove', multiple=True, help="Extra files/folders to be removed from simulation")
def clear_results(item_id: str, remove: Tuple = None):
"""
Clear the generated output files for a job.
Args:
item_id: Experiment/Simulation ID
remove: list of files/folders
Returns:
None
"""
def _clear_simulation(sim_dir, remove_list):
"""
Delete generated output files for simulation.
Args:
sim_dir: simulation directory
remove_list: extra files to be deleted
Returns:
None
"""
for f in SIMULATION_FILES + list(remove_list):
if sim_dir.joinpath(f).exists():
if sim_dir.joinpath(f).is_dir():
shutil.rmtree(sim_dir.joinpath(f))
else:
sim_dir.joinpath(f).unlink(missing_ok=True)
# Get item path
item = JobHistory.get_item_path(item_id)
# Check item type
item_type = item[1]
if item_type == ItemType.SIMULATION:
sim_dir = item[0]
_clear_simulation(sim_dir, remove)
elif item_type == ItemType.EXPERIMENT:
exp_dir = item[0]
# Delete generated files from experiment past run
for f in EXPERIMENT_FILES:
if exp_dir.joinpath(f).exists():
if exp_dir.joinpath(f).is_dir():
shutil.rmtree(exp_dir.joinpath(f))
else:
exp_dir.joinpath(f).unlink(missing_ok=True)
# Delete generated files for each of simulations
pattern = '*/metadata.json'
for meta_file in Path(exp_dir).glob(pattern=pattern):
sim_dir = meta_file.parent
_clear_simulation(sim_dir, remove)
else:
user_logger.warning("Suite level not supported, must provide Experiment/Simulation ID!")
exit(-1)
@container.command(help="Inspect a container.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID")
@click.argument('container-id', required=True)
def inspect(container_id: str):
"""
Check container information.
Args:
container_id: Container ID
Returns:
None
"""
console = Console()
container = get_container(container_id)
if container is None:
console.print(f"Container {container_id} not found.")
return
console.print('-' * 100)
console.print(f"[bold][cyan]Container ID[/][/]: {container.short_id}")
console.print(f"[bold][cyan]Container Name[/][/]: {container.name}")
console.print(f"[bold][cyan]Status[/][/]: {container.status}")
console.print(f"[bold][cyan]Created[/][/]: {format_timestamp(container.attrs['Created'])}")
console.print(f"[bold][cyan]StartedAt[/][/]: {format_timestamp(container.attrs['State']['StartedAt'])}")
console.print("[bold][cyan]Image[/][/]:")
console.print_json(json.dumps(container.attrs['Config']['Image']))
console.print("[bold][cyan]Image Tags[/][/]:")
console.print_json(json.dumps(container.image.tags))
console.print("[bold][cyan]State[/][/]:")
console.print_json(json.dumps(container.attrs['State']))
console.print("[bold][cyan]Mounts[/][/]:")
mounts = [m for m in container.attrs['Mounts'] if m['Type'] == 'bind']
console.print_json(json.dumps(mounts))
@container.command(help="Stop running container(s).\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID (optional)")
@click.argument('container-id', required=False)
@click.option('--remove/--no-remove', default=False, help="Remove the container or not")
def stop_container(container_id: str = None, remove: bool = False):
"""
Sopp running container(s).
Args:
container_id: container id
remove: remove container or not
Returns:
None
"""
console = Console()
# Get working containers
containers = get_working_containers(container_id, entity=True)
if len(containers) == 0:
if container_id:
user_logger.warning(f"Not found running Container {container_id}.")
else:
user_logger.warning("No running containers found.")
return
for container in containers:
container.stop()
if remove:
container.remove()
console.print(f"Container {container.short_id} is stopped and removed.")
else:
console.print(f"Container {container.short_id} is stopped.")
@container.command(help="Remove stopped containers.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID (optional)")
@click.argument('container-id', required=False)
def remove_container(container_id: str = None):
"""
Remove stopped containers.
Args:
container_id: container id
Returns:
None
"""
console = Console()
if container_id:
container = get_container(container_id)
if container:
if container.status != 'running':
container.remove()
console.print(f"Container {container_id} is removed.")
else:
user_logger.warning(f"Container {container_id} is running, need to stop first.")
else:
user_logger.warning(f"Container {container_id} not found.")
return
# Remove all stopped containers
container_list = get_containers(include_stopped=True)['stopped']
container_removed = []
for container in container_list:
container.remove()
container_removed.append(container.short_id)
if len(container_removed) > 0:
console.print(f"{len(container_removed)} container(s) removed.")
else:
user_logger.warning("No container removed.")
@container.command(help="pip install a package on a container.\n\n"
"Arguments:\n\n"
" PACKAGE: package to be installed")
@click.argument('package', required=True)
@click.option('-c', '--container-id', type=str, help="Container ID")
@click.option('-i', '--index-url', type=str, help="index-url for pip install")
@click.option('-e', '--extra-index-url', type=str, help="extra-index-url for pip install")
def install(package: str, container_id: str, index_url: str = None, extra_index_url: str = None):
"""
Pip install package on container.
Args:
package: package name
container_id: Container ID
index_url: index-url for pip install
extra_index_url: extra-index-url for pip install
Returns:
None
"""
console = Console()
if index_url:
package = f"--index-url {index_url} {package}"
elif extra_index_url:
package = f"--extra-index-url {extra_index_url} {package}"
else:
package = f"{package}"
command = f'docker exec {container_id} bash -c "pip3 install {package}"'
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
console.print(result.stdout)
except subprocess.CalledProcessError as e:
user_logger.error(e.stderr)
@container.command(help="List packages installed on a container.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID")
@click.argument('container-id', required=True)
def packages(container_id: str):
"""
List packages installed on container.
Args:
container_id: Container ID
Returns:
None
"""
console = Console()
if not JobHistory.verify_container(container_id):
user_logger.error(f"Container {container_id} not found.")
return
command = f'docker exec {container_id} bash -c "pip list"'
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
console.print(result.stdout)
except subprocess.CalledProcessError as e:
user_logger.error(e.stderr)
@container.command(help="List running processes in a container.\n\n"
"Arguments:\n\n"
" CONTAINER_ID: Container ID")
@click.argument('container-id', required=True)
def ps(container_id: str):
"""
List running processes in container.
Args:
container_id: Container ID
Returns:
None
"""
if not JobHistory.verify_container(container_id):
user_logger.error(f"Container {container_id} not found.")
return
command = f'docker exec {container_id} bash -c "ps -efj"'
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
console = Console()
console.print(result.stdout)
except subprocess.CalledProcessError as e:
user_logger.error(e.stderr)
@container.command(help="List all available containers.")
@click.option('--all/--no-all', default=False, help="Include stopped containers or not")
def list_containers(all: bool = False):
"""
List available containers.
Args:
all: bool, include stopped containers or not
Returns:
None
"""
containers = get_containers(include_stopped=all)
table = Table()
table.add_column("Container ID", justify="right", style="cyan", no_wrap=True)
table.add_column("Image", style="bright_magenta")
table.add_column("Status", style="red")
table.add_column("Created", style="yellow")
table.add_column("Started", style="orange1")
table.add_column("Name", style="wheat4")
for status, container_list in containers.items():
for container in container_list:
if container.status == 'running':
status = f"[green]{container.status}[/]"
else:
status = f"[red]{container.status}[/]"
table.add_row(container.short_id, container.attrs['Config']['Image'], status,
format_timestamp(container.attrs['Created']),
format_timestamp(container.attrs['State']['StartedAt']), container.name)
console = Console()
console.print(f"There are {table.row_count} container(s).")
console.print(table)