Source code for idmtools_platform_comps.utils.download.download

"""idmtools download work item output.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import sys
import warnings
import zipfile
from dataclasses import dataclass, field
from enum import Enum
from logging import getLogger, DEBUG
from pathlib import PurePath
from uuid import UUID
from COMPS.Data import WorkItem
from tqdm import tqdm

from idmtools import IdmConfigParser
from idmtools.assets.file_list import FileList
from idmtools.core import EntityStatus
from idmtools.entities.iplatform import IPlatform
from idmtools_platform_comps.utils.file_filter_workitem import FileFilterWorkItem
from idmtools_platform_comps.utils.general import get_file_as_generator

logger = getLogger(__name__)
user_logger = getLogger('user')


[docs]class CompressType(Enum): """Defines the compression types we support. lzma is the best balance between speed and compression ratio typically """ lzma = 'lzma' deflate = 'deflate' bz = "bz"
[docs]@dataclass(repr=False) class DownloadWorkItem(FileFilterWorkItem): """ DownloadWorkItem provides a utility to download items through a workitem with compression. The main advantage of this over Analyzers is the compression. This is most effective when the targets to download have many items that are similar to download. For example, an experiment with 1000 simulations with similar output can greatly benefit from downloading through this method. Notes: - TODO Link examples here. """ output_path: str = field(default_factory=os.getcwd) extract_after_download: bool = field(default=True) delete_after_download: bool = field(default=True) zip_name: str = field(default='output.zip') compress_type: CompressType = field(default=None) def __post_init__(self, item_name: str, asset_collection_id: UUID, asset_files: FileList, user_files: FileList, command: str): """ Constructor for DownloadWorkItem. Args: item_name: item name asset_collection_id: AssetCollection id asset_files: Asset files user_files: user asset files command: command Returns: None """ self._ssmt_script = str(PurePath(__file__).parent.joinpath("download_ssmt.py")) if self.compress_type is None: if (self.extract_after_download and self.delete_after_download) or sys.platform != 'win32': self.compress_type = CompressType.lzma elif sys.platform == 'win32': self.compress_type = CompressType.deflate else: self.compress_type = CompressType.lzma super().__post_init__(item_name, asset_collection_id, asset_files, user_files, command) warnings.warn( message="DownloadWorkItem is deprecated. Until a replacement is added, please use native COMPS scripts. See " "'https://github.com/InstituteforDiseaseModeling/pyCOMPS/blob/master/examples/retrieve_simulation_outputs_for_experiment.py'", category=FutureWarning) def _extra_command_args(self, command: str) -> str: """ Add specific additional arguments for the download command. In this case, only the zip name and compression type can be changed. Args: command: Command to append additional items to Returns: Updated command """ if self.zip_name != "output.zip": command += f" --zip-name {self.zip_name}" if self.compress_type != "lzma": command += f" --compress-type {self.compress_type.value}" return command
[docs] def wait(self, wait_on_done_progress: bool = True, timeout: int = None, refresh_interval=None, platform: 'IPlatform' = None) -> None: """ Waits on Download WorkItem to finish. This first waits on any dependent items to finish(Experiment/Simulation/WorkItems). Args: wait_on_done_progress: When set to true, a progress bar will be shown from the item timeout: Timeout for waiting on item. If none, wait will be forever refresh_interval: How often to refresh progress platform: Platform Returns: AssetCollection created if item succeeds """ # wait on related items before we wait on our item p = super()._check_for_platform_from_context(platform) opts = dict(wait_on_done_progress=wait_on_done_progress, timeout=timeout, refresh_interval=refresh_interval, platform=p) self._wait_on_children(**opts) super().wait(**opts) if self.status == EntityStatus.SUCCEEDED and not self.dry_run: # Download our zip po: WorkItem = self.get_platform_object(platform=self.platform) if self._uid: oi = po.retrieve_output_file_info([self.zip_name]) zip_name = PurePath(self.output_path).joinpath(self.zip_name) with tqdm(total=oi[0].length, unit='B', unit_scale=True, unit_divisor=1024, desc="Downloading Files") as pbar: self.__download_file(oi, pbar, zip_name) if self.extract_after_download: self.__extract_output(zip_name) if self.delete_after_download: if self.extract_after_download: if IdmConfigParser.is_output_enabled(): user_logger.debug(f"Removing {zip_name}") os.remove(zip_name) if IdmConfigParser.is_output_enabled(): user_logger.debug(f'Deleting workitem {self.uid}') po.delete() self.uid = None
def __extract_output(self, zip_name): """ Extra output from our zip file. Args: zip_name: Zip file to extract Returns: None """ if logger.isEnabledFor(DEBUG): logger.debug(f"Extracting {zip_name}") with zipfile.ZipFile(zip_name, 'r') as zin: zin.extractall(self.output_path) def __download_file(self, oi, pbar, zip_name): """ Download our file tracking progress as we go. Args: oi: Stream to download pbar: Prograss Bar zip_name: Zip file to save to Returns: None """ if logger.isEnabledFor(DEBUG): logger.debug(f"Downloading file to {zip_name}") parent_dir = PurePath(zip_name).parent os.makedirs(parent_dir, exist_ok=True) with open(zip_name, 'wb') as zo: for chunk in get_file_as_generator(oi[0]): pbar.update(len(chunk)) zo.write(chunk)