Source code for idmtools_platform_container.container_operations.docker_operations
"""Here we implement the ContainerPlatform docker operations.Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved."""importdockerimportplatformassys_platformimportsubprocessfromdataclassesimportdataclass,fieldfromtypingimportList,Dict,NoReturn,Any,Unionfromidmtools.coreimportItemTypefromidmtools_platform_container.utils.generalimportnormalize_path,parse_iso8601fromidmtools_platform_container.utils.job_historyimportJobHistoryfromdocker.models.containersimportContainerfromdocker.errorsimportNotFoundasErrorNotFoundfromdocker.errorsimportAPIErrorasDockerAPIErrorfromloggingimportgetLogger,DEBUGlogger=getLogger(__name__)user_logger=getLogger('user')# Only consider the containers that can be restartedCONTAINER_STATUS=['exited','running','paused']
[docs]defvalidate_container_running(platform,**kwargs)->str:""" Check if the docker daemon is running, find existing container or start a new container. Args: platform: Platform kwargs: keyword arguments used to expand functionality Returns: container id """# Check image existsifnotcheck_local_image(platform.docker_image):user_logger.info(f"Image {platform.docker_image} does not exist, pull the image first.")succeeded=pull_docker_image(platform.docker_image)ifnotsucceeded:user_logger.error(f"/!\\ ERROR: Failed to pull image {platform.docker_image}.")exit(-1)# User configurationiflogger.isEnabledFor(DEBUG):logger.debug(f"User config: force_start={platform.force_start}")logger.debug(f"User config: new_container={platform.new_container}")logger.debug(f"User config: include_stopped={platform.include_stopped}")# Check containerscontainer_id=Nonecontainer_match=platform.retrieve_match_containers()container_running=[containerforstatus,containerincontainer_matchifstatus=='running']container_stopped=[containerforstatus,containerincontainer_matchifstatus!='running']iflogger.isEnabledFor(DEBUG):logger.debug(f"Found running matched containers: {container_running}")ifplatform.include_stopped:logger.debug(f"Found stopped matched containers: {container_stopped}")ifplatform.force_start:iflogger.isEnabledFor(DEBUG)andlen(container_running)>0:logger.debug(f"Stop all running containers {container_running}")stop_all_containers(container_running,keep_running=False)container_running=[]iflogger.isEnabledFor(DEBUG)andlen(container_stopped)>0andplatform.include_stopped:logger.debug(f"Stop all stopped containers {container_stopped}")stop_all_containers(container_stopped,keep_running=False)container_stopped=[]ifnotplatform.new_containerandplatform.container_prefixisNone:iflen(container_running)>0:# Pick up the first running containercontainer_running=sort_containers_by_start(container_running)container_id=container_running[0].short_idcontainer=get_container(container_id)ifsys_platform.system()notin["Windows"]:command=f"bash -c '[ \"$(ls -lart {platform.data_mount} | wc -l)\" -ge 3 ] && echo exists || echo not_exists'"result=container.exec_run(command)output=result.output.decode().strip()ifoutput=="not_exists":stop_container(container_id,remove=True)iflogger.isEnabledFor(DEBUG):logger.debug(f"Existing container {container_id} is not usable")container_id=Noneifcontainer_idisnotNone:iflogger.isEnabledFor(DEBUG):logger.debug(f"Pick running container {container_id}.")eliflen(container_stopped)>0:# Pick up the first stopped container and then restart itcontainer_stopped=sort_containers_by_start(container_stopped)container=container_stopped[0]container.restart()container_id=container.short_idiflogger.isEnabledFor(DEBUG):logger.debug(f"Pick and restart the stopped container {container.short_id}.")# Start the containerifcontainer_idisNone:container_id=platform.start_container(**kwargs)iflogger.isEnabledFor(DEBUG):logger.debug(f"Start container: {platform.docker_image}")logger.debug(f"New container ID: {container_id}")returncontainer_id
[docs]defget_container(container_id)->Any:""" Get the container object by container ID. Args: container_id: container id Returns: container object """client=docker.from_env()try:# Retrieve the containercontainer=client.containers.get(container_id)returncontainerexceptErrorNotFound:logger.debug(f"Container with ID {container_id} not found.")returnNoneexceptDockerAPIErrorase:logger.debug(f"Error retrieving container with ID {container_id}: {str(e)}")returnNone
[docs]deffind_container_by_image(image:str,include_stopped:bool=False)->Dict:""" Find the containers that match the image. Args: image: docker image include_stopped: bool, if consider the stopped containers or not Returns: dict of containers """container_found={}forstatus,container_listinget_containers(include_stopped).items():container_found[status]=[containerforcontainerincontainer_listifimage==container.attrs['Config']['Image']]returncontainer_found
[docs]defstop_container(container:Union[str,Container],remove:bool=True)->NoReturn:""" Stop a container. Args: container: container id or container object to be stopped remove: bool, if remove the container or not Returns: No return """try:ifisinstance(container,str):container=get_container(container)elifnotisinstance(container,Container):raiseTypeError("Invalid container object.")# Stop the containerifcontainer.status=='running':container.stop()iflogger.isEnabledFor(DEBUG):logger.debug(f"Container {str(container)} has been stopped.")ifremove:container.remove()iflogger.isEnabledFor(DEBUG):logger.debug(f"Container {str(container)} has been removed.")exceptErrorNotFound:ifisinstance(container,str):logger.debug(f"Container with ID {container} not found.")else:logger.debug(f"Container {container.short_id} not found.")exit(-1)exceptDockerAPIErrorase:logger.debug(f"Error stopping container {str(container)}: {str(e)}")exit(-1)
[docs]defstop_all_containers(containers:List[Union[str,Container]],keep_running:bool=True,remove:bool=True)->NoReturn:""" Stop all containers. Args: containers: list of container id or containers to be stopped keep_running: bool, if keep the running containers or not remove: bool, if remove the container or not Returns: No return """forcontainerincontainers:ifcontainer.status=='running'andkeep_running:jobs=list_running_jobs(container.short_id)ifjobs:continuestop_container(container,remove=remove)
[docs]defrestart_container(container:Union[str,Container])->NoReturn:""" Restart a container. Args: container: container id or container object to be restarted Returns: No return """try:ifisinstance(container,str):container=get_container(container)elifnotisinstance(container,Container):raiseTypeError("Invalid container object.")ifcontainerisNone:user_logger.error(f"Container {container} not found.")exit(-1)# Restart the containercontainer.restart()iflogger.isEnabledFor(DEBUG):logger.debug(f"Container {container.short_id} has been restarted.")exceptDockerAPIErrorase:user_logger.error(f"Error restarting container {container.short_id}: {str(e)}")exit(-1)exceptExceptionase:user_logger.error(f"Restarting container {container.short_id} encounters an unexpected error: {e}")exit(-1)
[docs]defsort_containers_by_start(containers:List[Container],reverse:bool=True)->List[Container]:""" Sort the containers by the start time. Args: containers: list of containers reverse: bool, if sort in reverse order Returns: sorted list of containers """# Sort containers by 'StartedAt' in descending ordersorted_container_list=sorted(containers,key=lambdacontainer:parse_iso8601(container.attrs['State']['StartedAt']),reverse=reverse)returnsorted_container_list
[docs]defget_containers(include_stopped:bool=False)->Dict:""" Find the containers that match the image. Args: include_stopped: bool, if consider the stopped containers or not Returns: dict of containers """client=docker.from_env()container_found={}# Get all containersall_containers=client.containers.list(all=include_stopped)# Filter the containersall_containers=[ctforctinall_containersifct.statusinCONTAINER_STATUSandJobHistory.verify_container(ct.short_id)]# Separate the containerscontainer_found['running']=[ctforctinall_containersifct.status=='running']container_found['stopped']=[ctforctinall_containersifct.status!='running']returncontainer_found
[docs]defget_working_containers(container_id:str=None,entity:bool=False)->List[Any]:""" Get the working containers. Args: container_id: Container ID entity: bool, if return the container object or container id Returns: list of working containers or IDs """ifcontainer_idisNone:ifentity:containers=get_containers().get('running',[])else:containers=[c.short_idforcinget_containers().get('running',[])]else:# Check if the container is in the history and runningifnotJobHistory.verify_container(container_id):# The container is not in the history.logger.error(f"Container {container_id} not found in History.")containers=[]else:# The container is in the history, we need to verify if it still exists.container=get_container(container_id)ifcontainer:# We only consider the running containerifcontainer.status=='running':containers=[container]ifentityelse[container.short_id]else:logger.warning(f"Container {container_id} is not running.")containers=[]else:logger.warning(f"Container {container_id} not found.")containers=[]returncontainers
[docs]defis_docker_installed()->bool:""" Check if Docker is installed. Returns: True/False """try:# Run the 'docker --version' commandresult=subprocess.run(['docker','--version'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)# Check the return code to see if it executed successfullyifresult.returncode==0:iflogger.isEnabledFor(DEBUG):logger.debug(f"Docker is installed: {result.stdout.strip()}")returnTrueelse:iflogger.isEnabledFor(DEBUG):logger.debug(f"Docker is not installed. Error: {result.stderr.strip()}")returnFalseexceptFileNotFoundError:# If the docker executable is not found, it means Docker is not installediflogger.isEnabledFor(DEBUG):logger.debug("Docker is not installed or not found in PATH.")returnFalse
[docs]defis_docker_daemon_running()->bool:""" Check if the Docker daemon is running. Returns: True/False """try:client=docker.from_env()client.ping()iflogger.isEnabledFor(DEBUG):logger.debug("Docker daemon is running.")returnTrueexceptDockerAPIErrorase:iflogger.isEnabledFor(DEBUG):logger.debug(f"Docker daemon is not running: {e}")returnFalseexceptExceptionasex:iflogger.isEnabledFor(DEBUG):logger.debug(f"Error checking Docker daemon: {ex}")returnFalse
[docs]defcheck_local_image(image_name:str)->bool:""" Check if the image exists locally. Args: image_name: image name Returns: True/False """client=docker.from_env()forimageinclient.images.list():ifimage_nameinimage.tags:returnTruereturnFalse
[docs]defpull_docker_image(image_name,tag='latest')->bool:""" Pull a docker image from IDM artifactory. Args: image_name: image name tag: image tag Returns: True/False """# Check if the image name contains the tagif':'inimage_name:full_image_name=image_nameelse:full_image_name=f'{image_name}:{tag}'# Pull the imageuser_logger.info(f'Pulling image {full_image_name} ...')try:client=docker.from_env()client.images.pull(f'{full_image_name}')iflogger.isEnabledFor(DEBUG):logger.debug(f'Successfully pulled {full_image_name}')returnTrueexceptDockerAPIErrorase:iflogger.isEnabledFor(DEBUG):logger.debug(f'Error pulling {full_image_name}: {e}')returnFalse
[docs]defcompare_mounts(mounts1:List[Dict],mounts2:List[Dict])->bool:""" Compare two sets of mount configurations. Args: mounts1: container mounting configurations mounts2: container mounting configurations Returns: True/False """# Convert mount configurations to a set of tuples for easy comparisonmounts_set1=set((mount['Type'],mount['Mode'],normalize_path(mount['Source']),normalize_path(mount['Destination']))formountinmounts1)mounts_set2=set((mount['Type'],mount['Mode'],normalize_path(mount['Source']),normalize_path(mount['Destination']))formountinmounts2)returnmounts_set1==mounts_set2
[docs]defcompare_container_mount(container1:Union[str,Container],container2:Union[str,Container])->bool:""" Compare the mount configurations of two containers. Args: container1: container object or id container2: container object or id Returns: True/False """# Get the container objectsifisinstance(container1,str):container1=get_container(container1)ifisinstance(container2,str):container2=get_container(container2)# Get the mount configurationsmounts1=container1.attrs['Mounts']mounts2=container2.attrs['Mounts']returncompare_mounts(mounts1,mounts2)
[docs]deflist_running_jobs(container_id:str,limit:int=None)->List[Job]:""" List all running jobs on the container. Args: container_id: Container ID limit: number of jobs to view Returns: list of running jobs """command=f'docker exec {container_id} bash -c "({PS_QUERY})"'result=subprocess.run(command,shell=True,check=False,capture_output=True,text=True)running_jobs=[]ifresult.returncode==0:processes=result.stdout.splitlines()header=processes[0].split()# Extract the header (column names)forlineinprocesses[1:]:# Skip the first header lineif'EXPERIMENT'inlineor'SIMULATION'inline:# Split the line into columnscolumns=line.split(maxsplit=len(header)-1)# Convert columns to their respective typespid=int(columns[0])# pid is an integerppid=int(columns[1])# ppid is an integerpgid=int(columns[2])# pgid is an integeretime=columns[3]# etime is a stringcmd=columns[4]# cmd is a string# Determine the item type and job IDitem_type=ItemType.EXPERIMENTif'EXPERIMENT'incmdelseItemType.SIMULATIONjob_id=pgidif'EXPERIMENT'incmdelsepid# Find the item that starts with 'EXPERIMENT' or 'SIMULATION'columns=cmd.split()result=[itemforitemincolumnsifitem.startswith('EXPERIMENT')oritem.startswith('SIMULATION')]item_id=result[0].split(':')[1]# Create a new jobjob=Job(item_id=item_id,item_type=item_type,job_id=job_id,group_pid=pgid,parent_pid=ppid,container_id=container_id,elapsed=etime)running_jobs.append(job)elifresult.returncode==1:passelse:logger.error(result.stderr)user_logger.error(f"Command failed with return code {result.returncode}")exit(-1)iflimit:running_jobs=running_jobs[:limit]returnrunning_jobs[:limit]
[docs]deffind_running_job(item_id:Union[int,str],container_id:str=None)->Job:""" Check item running on container. Args: item_id: Experiment/Simulation ID or Running Job ID container_id: Container ID Returns: running Job """ifcontainer_id:containers=[container_id]else:# Check if the item is an Experiment IDhis_job=JobHistory.get_job(item_id)ifhis_job:# item_id is an Experiment IDcontainers=[his_job['CONTAINER']]else:# item_id is a Simulation ID or Job ID, we need to check all working containerscontainers=get_working_containers()match_jobs=[]forcidincontainers:# List all running jobs on the containerjobs=list_running_jobs(cid)iflen(jobs)==0:continue# Container has running jobsforjobinjobs:# Check if the job is the one we are looking forifjob.item_id==item_idorstr(job.job_id)==str(item_id):match_jobs.append(job)break# One running container can't have multiple matches!iflen(match_jobs)>1:# item_id must be a Job ID in this case and container_id must be None!user_logger.error(f"Multiple jobs found for Job ID {item_id}, please provide the Container ID or use Entity ID instead.")exit(-1)eliflen(match_jobs)==1:returnmatch_jobs[0]else:returnNone