Source code for idmtools_platform_local.infrastructure.docker_io

"""idmtools docker input/output operations.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import io
import json
import logging
import os
import shutil
import tarfile
import time
from concurrent.futures.thread import ThreadPoolExecutor
from dataclasses import dataclass
from io import BytesIO
from logging import getLogger
from typing import BinaryIO, Dict, NoReturn, Optional, Union, Any
from docker.models.containers import Container
from idmtools import IdmConfigParser
from idmtools.core import TRUTHY_VALUES
from idmtools.core.system_information import get_data_directory
from idmtools.utils.decorators import ParallelizeDecorator
from idmtools_platform_local.infrastructure.postgres import PostgresContainer
from idmtools_platform_local.infrastructure.redis import RedisContainer
from idmtools_platform_local.infrastructure.workers import WorkersContainer

logger = getLogger(__name__)
# thread queue for docker copy operations
io_queue = ParallelizeDecorator()

# determine our default base directory. We almost always want to use the users home directory
# except odd environments like docker-in docker, special permissions, etc
SERVICES = [PostgresContainer, RedisContainer, WorkersContainer]


[docs]@dataclass class DockerIO: """ Provides most of our file operations for our docker containers/local platform. """ host_data_directory: str = get_data_directory() _fileio_pool = ThreadPoolExecutor() def __post_init__(self): """ Acts like our constructor after dataclasses has populated our fields. Currently we use it to initialize our docker client and get local system information """ # Make sure the host_data_dir exists os.makedirs(self.host_data_directory, exist_ok=True) self.timeout = 1
[docs] def delete_files_below_level(self, directory, target_level=1, current_level=1): """ Delete files below a certain depth in a target directory. Args: directory: Target directory target_level: Depth to begin deleting. Default to 1 current_level: Current level. We call recursively so this should be 1 on initial call. Returns: None Raises: PermissionError - Raised if there are issues deleting a file. """ for fn in os.listdir(directory): file_path = os.path.join(directory, fn) try: # we only delete items at the target level if os.path.isfile(file_path) and target_level <= current_level: logger.debug(f"Deleting {file_path}") os.remove(file_path) elif os.path.isdir(file_path): # if this is target level, let's delete it if target_level <= current_level: logger.debug(f"Deleting {file_path}") shutil.rmtree(file_path) else: clevel = current_level + 1 if current_level >= 1 else 1 self.delete_files_below_level(file_path, target_level, clevel) except PermissionError as e: if logger.isEnabledFor(logging.DEBUG): logger.exception(e) pass
[docs] def cleanup(self, delete_data: bool = True, shallow_delete: bool = os.getenv('SHALLOW_DELETE', '0').lower() in TRUTHY_VALUES) -> NoReturn: """ Stops the running services, removes local data, and removes network. You can optionally disable the deleting of local data. Args: delete_data(bool): When true, deletes local data shallow_delete(bool): Deletes the data but not the container folders(redis, workers). Preferred to preserve permissions and resolve docker issues Returns: (NoReturn) """ try: if delete_data and not shallow_delete: if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Deleting local platform data at: {self.host_data_directory}") shutil.rmtree(self.host_data_directory, True) elif delete_data and shallow_delete: if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Shallow deleting: {self.host_data_directory}") self.delete_files_below_level(self.host_data_directory, 2) except PermissionError: print(f"Cannot cleanup directory {self.host_data_directory} because it is still in use") logger.warning(f"Cannot cleanup directory {self.host_data_directory} because it is still in use") pass
[docs] @io_queue.parallelize def copy_to_container(self, container: Container, destination_path: str, file: Optional[Union[str, bytes]] = None, content: Union[str, bytes] = None, dest_name: Optional[str] = None) -> bool: """ Copies a physical file or content in memory to a container. You can also choose a different name for the destination file by using the dest_name option. Args: container: Container to copy the file to file: Path to the file to copy content: Content to copy destination_path: Path within the container to copy the file to(should be a directory) dest_name: Optional parameter for destination filename. By default, the source filename is used Returns: (bool) True if the copy succeeds, False otherwise """ if content: if isinstance(content, dict): content = json.dumps(content) if isinstance(content, str): file = BytesIO(content.encode('utf-8')) else: file = content if file and isinstance(file, bytes): file = BytesIO(file) if type(file) is str: name = dest_name if dest_name else os.path.basename(file) if destination_path.startswith("/data"): destination_path = destination_path.replace('/data', '/workers')[1:] target_file = os.path.join(self.host_data_directory, destination_path, name) logger.debug(f'Copying {file} to docker container {container.id}:{target_file}') # Make sure to have the correct separators for the path target_file = target_file.replace('/', os.sep).replace('\\', os.sep) # Do the copy shutil.copy(file, target_file) return True elif isinstance(file, BytesIO): target_file = os.path.join(self.host_data_directory, destination_path.replace('/data', '/workers')[1:], dest_name) # Make sure to have the correct separators for the path target_file = target_file.replace('/', os.sep).replace('\\', os.sep) logger.debug(f'Copying {dest_name} to docker container {container.id}:{destination_path} ' f'through {target_file}') with open(target_file, 'wb') as of: of.write(file.getvalue()) return True
[docs] def sync_copy(self, futures): """ Sync the copy operations queue in the io_queue. This allows us to take advantage of multi-threaded copying while also making it convenient to have sync points, such as uploading the assets in parallel but pausing just before sync point. Args: futures: Futures to wait on. Returns: Final results of copy operations. """ if not isinstance(futures, list): futures = [futures] return io_queue.get_results(futures)
[docs] def copy_multiple_to_container(self, container: Container, files: Dict[str, Dict[str, Any]], join_on_copy: bool = True): """ Copy multiple items to a container. Args: container: Target container files: Files to copy in form target directory -> filename -> data join_on_copy: Should we join the threading on copy(treat as an atomic unit) Returns: True if copy succeeded, false otherwise """ results = [] if IdmConfigParser.is_progress_bar_disabled(): prog_items = files.items() else: from tqdm import tqdm prog_items = tqdm(files.items(), desc="Copying Assets to Local Platform", unit='file') for dest_path, sub_files in prog_items: for fn in sub_files: results.append(self.copy_to_container(container, destination_path=dest_path, **fn)) if join_on_copy: return all(io_queue.get_results(results)) # If we don't join, we assume the copy succeeds for now. This really means somewhere else should be handling the # data join for this return True
[docs] @staticmethod def create_archive_from_bytes(content: Union[bytes, BytesIO, BinaryIO], name: str) -> BytesIO: """ Create a tar archive from bytes. Used to copy to docker. Args: content: Content to copy into tar name: Name for file in archive Returns: (BytesIO) Return bytesIO object """ if type(content) is bytes: content = BytesIO(content) content.seek(0, io.SEEK_END) file_length = content.tell() content.seek(0) pw_tarstream = BytesIO() pw_tar = tarfile.TarFile(fileobj=pw_tarstream, mode='w') tarinfo = tarfile.TarInfo(name=name) tarinfo.size = file_length tarinfo.mtime = time.time() # tarinfo.mode = 0600 pw_tar.addfile(tarinfo, content) pw_tar.close() pw_tarstream.seek(0) return pw_tarstream
[docs] def create_directory(self, dir: str) -> bool: """ Create a directory in a container. Args: dir: Path to directory to create Returns: (ExecResult) Result of the mkdir operation """ path = os.path.join(self.host_data_directory, dir.replace('/data', '/workers')[1:]) path.replace('/', os.sep).replace('\\', os.sep) os.makedirs(path, exist_ok=True) return True