Source code for idmtools_platform_container.cli.container
"""idmtools ContainerPlatform CLI commands.Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved."""importjsonimportclickimportshutilimportsubprocessfromtypingimportUnion,List,TuplefrompathlibimportPathfromrich.consoleimportConsolefromrich.tableimportTablefromidmtools.coreimportItemTypefromidmtools_platform_container.container_operations.docker_operationsimportlist_running_jobs,find_running_job, \
is_docker_installed,is_docker_daemon_running,get_working_containers,get_containers,get_containerfromidmtools_platform_container.utils.job_historyimportJobHistoryfromidmtools_platform_container.utils.statusimportsummarize_status_files,get_simulation_statusfromidmtools_platform_container.utils.generalimportconvert_byte_size,format_timestampfromloggingimportgetLoggerlogger=getLogger(__name__)user_logger=getLogger('user')EXPERIMENT_FILES=['stdout.txt','stderr.txt']SIMULATION_FILES=['stdout.txt','stderr.txt','job_status.txt','status.txt','output']########################### Container Commands#########################IMPORTANT_COMMANDS=['status','cancel','jobs','history']
[docs]classCustomGroup(click.Group):"""Custom Group class for Container Platform CLI commands."""
[docs]def__init__(self,*args,**kwargs):""" Initialize CustomGroup. Args: args: Positional arguments kwargs: USer defined arguments """self.allowed_commands=kwargs.pop('allowed_commands',None)super().__init__(*args,**kwargs)
[docs]defparse_args(self,ctx,args):""" Parse arguments. Args: ctx: click context args: user arguments Returns: None """# Intercept and process --all flag earlyif'--all'inargs:ctx.params['all']=Trueself.allowed_commands=Nonesuper().parse_args(ctx,args)
[docs]deflist_commands(self,ctx)->List[str]:""" List commands. Args: ctx: click context Returns: list of commands """commands=super().list_commands(ctx)ifnotctx.params.get('all')andself.allowed_commands:commands=[cmdforcmdincommandsifcmdinself.allowed_commands]returncommands
@click.group(cls=CustomGroup,allowed_commands=IMPORTANT_COMMANDS,short_help="Container platform related commands.")@click.option('--all',is_flag=True,help="Show all commands")defcontainer(all):""" Container Platform CLI commands. Args: all: Bool, show all commands Returns: None """pass@container.command(help="Verify the Docker environment.")defverify_docker():"""Check docker environment."""ifnotis_docker_installed():user_logger.error("Docker is not installed.")exit(-1)ifnotis_docker_daemon_running():user_logger.warning("Docker daemon is not running.")exit(-1)# Check docker versionresult=subprocess.run(['docker','--version'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)console=Console()console.print(f"{result.stdout.strip()}.")@container.command(help="Cancel an Experiment/Simulation job.\n\n""Arguments:\n\n"" ITEM_ID: Experiment/Simulation ID or Job ID")@click.argument('item-id',required=True)@click.option('-c','--container_id',help="Container Id")defcancel(item_id:Union[int,str],container_id:str=None):""" Cancel Experiment/Simulation job. Args: item_id: Experiment/Simulation ID or Job ID container_id: Container ID Returns: None """console=Console()job=find_running_job(item_id,container_id)ifjob:ifjob.item_type==ItemType.EXPERIMENT:kill_cmd=f"docker exec {job.container_id} pkill -TERM -g {job.job_id}"else:kill_cmd=f"docker exec {job.container_id} kill -9 {job.job_id}"result=subprocess.run(kill_cmd,shell=True,stderr=subprocess.PIPE,text=True)# default: check=Falseifresult.returncode==0:console.print(f"Successfully killed {job.item_type.name}{job.job_id}")else:console.print(f"Error killing {job.item_type.name}{job.item_id}: {result.stderr}")else:user_logger.warning(f"Not found job {item_id}.")@container.command(help="Check the status of an Experiment/Simulation.\n\n""Arguments:\n\n"" ITEM_ID: Experiment/Simulation ID or Job ID")@click.argument('item-id',required=True)@click.option('-c','--container_id',help="Container Id")@click.option('-l','--limit',default=10,help="Max number of simulations to show")@click.option('--verbose/--no-verbose',default=False,help="Display with working directory or not")defstatus(item_id:Union[int,str],container_id:str=None,limit:int=10,verbose:bool=False):""" Check Experiment/Simulation status. Args: item_id: Experiment/Simulation ID or Job ID container_id: Container ID limit: number of simulations to display verbose: display simulation details or not Returns: None """console=Console()item_dir=JobHistory.get_item_path(item_id)ifitem_dirisnotNone:# Experiment/Simulation caseitem_type=item_dir[1]ifitem_type==ItemType.SIMULATION:st=get_simulation_status(item_dir[0])console.print(f"{item_type.name}{item_id} is {st}.")elifitem_type==ItemType.EXPERIMENT:exp_dir=item_dir[0]summarize_status_files(exp_dir,max_display=limit,verbose=verbose)else:user_logger.warning(f"{item_type.name}{item_id} status id not defined.")else:# Job ID casejob=find_running_job(item_id,container_id)ifjob:ifjob.item_type==ItemType.EXPERIMENT:job_cache=JobHistory.get_job(job.item_id)exp_dir=job_cache['EXPERIMENT_DIR']summarize_status_files(exp_dir,max_display=limit,verbose=verbose)elifjob.item_type==ItemType.SIMULATION:console.print(f"Simulation {job.item_id} is RUNNING.")else:user_logger.warning(f"Job {item_id} not found.")@container.command(help="List running Experiment/Simulation jobs.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID (optional)")@click.argument('container-id',required=False)@click.option('-l','--limit',default=10,help="Max number of simulations to show")@click.option('-n','--next',default=0,type=int,help="Next number of jobs to show")defjobs(container_id:str=None,limit:int=10,next:int=0):""" List running Experiment/Simulation jobs in Container(s). Args: container_id: Container ID limit: number of simulations to display next: next number of jobs to show Returns: None """containers=get_working_containers(container_id)iflen(containers)==0:ifcontainer_id:user_logger.warning(f"Container {container_id} not found.")else:user_logger.warning("No containers found.")returnforcontainer_idincontainers:running_jobs=list_running_jobs(container_id)ifnotrunning_jobs:continue# Separate jobs by group_pidgroup={}forjobinrunning_jobs:ifjob.group_pidnotingroup:group[job.group_pid]=[]group[job.group_pid].append(job)console=Console()forgingroup:_jobs=group[g]# Get total number of running simulationstotal_jobs=len(_jobs)# Take the first job which is the experimentexp_job=_jobs[0]# Skip the first job which is the experimentsim_jobs=_jobs[1:]start=next*limitend=start+limitsim_next=sim_jobs[start:end]# Include the experiment jobsim_next.insert(0,exp_job)# Skip the first job which is the experimentconsole.print(f"[bold][cyan]Experiment[/][/] {exp_job.item_id} on [bold][cyan]Container[/][/] [red]{container_id}[/] has {total_jobs-1} running [bold][cyan]simulations[/][/].")table=Table()table.add_column("Entity Type",justify="right",style="cyan",no_wrap=True)table.add_column("Entity ID",style="yellow")table.add_column("Job ID",justify="right",style="green")table.add_column("Container",justify="right",style="plum2")table.add_column("Status",justify="right",style="red")table.add_column("Elapsed",justify="right",style="blue")forjobinsim_next:table.add_row(job.item_type.name,str(job.item_id),str(job.job_id),job.container_id,'running',job.elapsed)console.print(table)@container.command(help="Retrieve Experiment history.\n\n""Arguments:\n\n"" EXP_ID: Experiment ID")@click.argument('exp-id',type=str,required=True)defget_detail(exp_id:str):""" Get Experiment job history. Args: exp_id: Experiment ID Returns: None """item=JobHistory.get_job(exp_id)ifitem:console=Console()console.print_json(json.dumps(item,indent=2))@container.command(help="View the job history.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID")@click.argument('container-id',required=False)@click.option('-l','--limit',default=10,type=int,help="Max number of jobs to show")@click.option('-n','--next',default=0,type=int,help="Next number of jobs to show")defhistory(container_id:str=None,limit:int=10,next:int=0):""" View job history. Args: container_id: Container ID limit: number of jobs to show next: next number of jobs to show Returns: None """data=JobHistory.view_history(container_id)start=next*limitend=start+limitdata_next=data[start:end]console=Console()console.print(f"There are {len(data)} Experiment cache in history.")forjobindata_next:console.print(f"{'':-^100}")fork,vinjob.items():# Skip some keysifkin('EXPERIMENT_DIR','SUITE_ID'):continueconsole.print(f"[bold][cyan]{k:16}[/][/]: {v}")@container.command(help="Locate Suite/Experiment/Simulation file directory.\n\n""Arguments:\n\n"" ITEM_ID: Suite/Experiment/Simulation ID")@click.argument('item-id',type=str,required=True)defpath(item_id:str):""" Find Suite/Experiment/Simulation file directory. Args: item_id: Suite/Experiment/Simulation ID Returns: None """item=JobHistory.get_item_path(item_id)ifitem:console=Console()console.print(f"{item[1].name}: {item[0]}")@container.command(help="Check if an Experiment/Simulation is running.\n\n""Arguments:\n\n"" ITEM_ID: Experiment/Simulation ID")@click.argument('item-id',type=str,required=True)defis_running(item_id:str):""" Check if Experiment/Simulation is running. Args: item_id: Experiment/Simulation ID Returns: None """console=Console()job=find_running_job(item_id)ifjob:console.print(f"{job.item_type.name}{job.item_id} is running on container {job.container_id}.")else:# Check if it is a valid Experiment/Simulation IDhis_path=JobHistory.get_item_path(item_id)ifhis_path:# Check item typeitem_type=his_path[1]ifitem_type==ItemType.SUITE:console.print(f"{item_id} is not a valid Experiment/Simulation ID.")else:console.print(f"{item_type.name}{item_id} is not running.")else:console.print(f"Job {item_id} is not found.")@container.command(help="Check the history volume.")defvolume():"""Get job history volume."""v=JobHistory.volume()mv=convert_byte_size(v)console=Console()console.print(f"Job history volume: {mv}")@container.command(help="Clear the job history.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID (optional)")@click.argument('container-id',required=False)defclear_history(container_id:str=None):""" Clear Job History. Args: container_id: Container ID Returns: None """JobHistory.clear(container_id)@container.command(help="Sync the file system with job history.")defsync_history():"""Sync file system with job history."""JobHistory.sync()@container.command(help="Get the count of count histories.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID (optional)")@click.argument('container-id',required=False)defhistory_count(container_id:str=None):""" Get History Count. Args: container_id: Container ID Returns: None """console=Console()console.print(JobHistory.count(container_id))@container.command(help="Clear job results files and folders.\n\n""Arguments:\n\n"" ITEM_ID: Experiment/Simulation ID")@click.argument('item-id',type=str,required=True)@click.option('-r','--remove',multiple=True,help="Extra files/folders to be removed from simulation")defclear_results(item_id:str,remove:Tuple=None):""" Clear the generated output files for a job. Args: item_id: Experiment/Simulation ID remove: list of files/folders Returns: None """def_clear_simulation(sim_dir,remove_list):""" Delete generated output files for simulation. Args: sim_dir: simulation directory remove_list: extra files to be deleted Returns: None """forfinSIMULATION_FILES+list(remove_list):ifsim_dir.joinpath(f).exists():ifsim_dir.joinpath(f).is_dir():shutil.rmtree(sim_dir.joinpath(f))else:sim_dir.joinpath(f).unlink(missing_ok=True)# Get item pathitem=JobHistory.get_item_path(item_id)# Check item typeitem_type=item[1]ifitem_type==ItemType.SIMULATION:sim_dir=item[0]_clear_simulation(sim_dir,remove)elifitem_type==ItemType.EXPERIMENT:exp_dir=item[0]# Delete generated files from experiment past runforfinEXPERIMENT_FILES:ifexp_dir.joinpath(f).exists():ifexp_dir.joinpath(f).is_dir():shutil.rmtree(exp_dir.joinpath(f))else:exp_dir.joinpath(f).unlink(missing_ok=True)# Delete generated files for each of simulationspattern='*/metadata.json'formeta_fileinPath(exp_dir).glob(pattern=pattern):sim_dir=meta_file.parent_clear_simulation(sim_dir,remove)else:user_logger.warning("Suite level not supported, must provide Experiment/Simulation ID!")exit(-1)@container.command(help="Inspect a container.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID")@click.argument('container-id',required=True)definspect(container_id:str):""" Check container information. Args: container_id: Container ID Returns: None """console=Console()container=get_container(container_id)ifcontainerisNone:console.print(f"Container {container_id} not found.")returnconsole.print('-'*100)console.print(f"[bold][cyan]Container ID[/][/]: {container.short_id}")console.print(f"[bold][cyan]Container Name[/][/]: {container.name}")console.print(f"[bold][cyan]Status[/][/]: {container.status}")console.print(f"[bold][cyan]Created[/][/]: {format_timestamp(container.attrs['Created'])}")console.print(f"[bold][cyan]StartedAt[/][/]: {format_timestamp(container.attrs['State']['StartedAt'])}")console.print("[bold][cyan]Image[/][/]:")console.print_json(json.dumps(container.attrs['Config']['Image']))console.print("[bold][cyan]Image Tags[/][/]:")console.print_json(json.dumps(container.image.tags))console.print("[bold][cyan]State[/][/]:")console.print_json(json.dumps(container.attrs['State']))console.print("[bold][cyan]Mounts[/][/]:")mounts=[mformincontainer.attrs['Mounts']ifm['Type']=='bind']console.print_json(json.dumps(mounts))@container.command(help="Stop running container(s).\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID (optional)")@click.argument('container-id',required=False)@click.option('--remove/--no-remove',default=False,help="Remove the container or not")defstop_container(container_id:str=None,remove:bool=False):""" Sopp running container(s). Args: container_id: container id remove: remove container or not Returns: None """console=Console()# Get working containerscontainers=get_working_containers(container_id,entity=True)iflen(containers)==0:ifcontainer_id:user_logger.warning(f"Not found running Container {container_id}.")else:user_logger.warning("No running containers found.")returnforcontainerincontainers:container.stop()ifremove:container.remove()console.print(f"Container {container.short_id} is stopped and removed.")else:console.print(f"Container {container.short_id} is stopped.")@container.command(help="Remove stopped containers.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID (optional)")@click.argument('container-id',required=False)defremove_container(container_id:str=None):""" Remove stopped containers. Args: container_id: container id Returns: None """console=Console()ifcontainer_id:container=get_container(container_id)ifcontainer:ifcontainer.status!='running':container.remove()console.print(f"Container {container_id} is removed.")else:user_logger.warning(f"Container {container_id} is running, need to stop first.")else:user_logger.warning(f"Container {container_id} not found.")return# Remove all stopped containerscontainer_list=get_containers(include_stopped=True)['stopped']container_removed=[]forcontainerincontainer_list:container.remove()container_removed.append(container.short_id)iflen(container_removed)>0:console.print(f"{len(container_removed)} container(s) removed.")else:user_logger.warning("No container removed.")@container.command(help="pip install a package on a container.\n\n""Arguments:\n\n"" PACKAGE: package to be installed")@click.argument('package',required=True)@click.option('-c','--container-id',type=str,help="Container ID")@click.option('-i','--index-url',type=str,help="index-url for pip install")@click.option('-e','--extra-index-url',type=str,help="extra-index-url for pip install")definstall(package:str,container_id:str,index_url:str=None,extra_index_url:str=None):""" Pip install package on container. Args: package: package name container_id: Container ID index_url: index-url for pip install extra_index_url: extra-index-url for pip install Returns: None """console=Console()ifindex_url:package=f"--index-url {index_url}{package}"elifextra_index_url:package=f"--extra-index-url {extra_index_url}{package}"else:package=f"{package}"command=f'docker exec {container_id} bash -c "pip3 install {package}"'try:result=subprocess.run(command,shell=True,check=True,capture_output=True,text=True)console.print(result.stdout)exceptsubprocess.CalledProcessErrorase:user_logger.error(e.stderr)@container.command(help="List packages installed on a container.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID")@click.argument('container-id',required=True)defpackages(container_id:str):""" List packages installed on container. Args: container_id: Container ID Returns: None """console=Console()ifnotJobHistory.verify_container(container_id):user_logger.error(f"Container {container_id} not found.")returncommand=f'docker exec {container_id} bash -c "pip list"'try:result=subprocess.run(command,shell=True,check=True,capture_output=True,text=True)console.print(result.stdout)exceptsubprocess.CalledProcessErrorase:user_logger.error(e.stderr)@container.command(help="List running processes in a container.\n\n""Arguments:\n\n"" CONTAINER_ID: Container ID")@click.argument('container-id',required=True)defps(container_id:str):""" List running processes in container. Args: container_id: Container ID Returns: None """ifnotJobHistory.verify_container(container_id):user_logger.error(f"Container {container_id} not found.")returncommand=f'docker exec {container_id} bash -c "ps -efj"'try:result=subprocess.run(command,shell=True,check=True,capture_output=True,text=True)console=Console()console.print(result.stdout)exceptsubprocess.CalledProcessErrorase:user_logger.error(e.stderr)@container.command(help="List all available containers.")@click.option('--all/--no-all',default=False,help="Include stopped containers or not")deflist_containers(all:bool=False):""" List available containers. Args: all: bool, include stopped containers or not Returns: None """containers=get_containers(include_stopped=all)table=Table()table.add_column("Container ID",justify="right",style="cyan",no_wrap=True)table.add_column("Image",style="bright_magenta")table.add_column("Status",style="red")table.add_column("Created",style="yellow")table.add_column("Started",style="orange1")table.add_column("Name",style="wheat4")forstatus,container_listincontainers.items():forcontainerincontainer_list:ifcontainer.status=='running':status=f"[green]{container.status}[/]"else:status=f"[red]{container.status}[/]"table.add_row(container.short_id,container.attrs['Config']['Image'],status,format_timestamp(container.attrs['Created']),format_timestamp(container.attrs['State']['StartedAt']),container.name)console=Console()console.print(f"There are {table.row_count} container(s).")console.print(table)