Source code for idmtools.assets.asset

"""
idmtools asset class definition.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""

import io
import os
from dataclasses import dataclass, field, InitVar
from functools import partial
from io import BytesIO
from logging import getLogger, DEBUG
from pathlib import PurePosixPath
from typing import TypeVar, Union, List, Callable, Any, Optional, Generator, BinaryIO
import backoff
import requests
from idmtools import IdmConfigParser
from idmtools.utils.file import file_content_to_generator, content_generator
from idmtools.utils.hashing import calculate_md5, calculate_md5_stream

logger = getLogger(__name__)


[docs]@dataclass(repr=False) class Asset: """ A class representing an asset. An asset can either be related to a physical asset present on the computer or directly specified by a filename and content. """ #: The absolute path of the asset. Optional if **filename** and **content** are given. absolute_path: Optional[str] = field(default=None) #: The relative path (compared to the simulation root folder). relative_path: Optional[str] = field(default=None) #: Name of the file. Optional if **absolute_path** is given. filename: Optional[str] = field(default=None) #: The content of the file. Optional if **absolute_path** is given. content: InitVar[Any] = None _content: bytes = field(default=None, init=False) _length: Optional[int] = field(default=None, init=False) #: Persisted tracks if item has been saved persisted: bool = field(default=False) #: Handler to api handler: Callable = field(default=str, metadata=dict(exclude_from_metadata=True)) #: Hook to allow downloading from platform download_generator_hook: Callable = field(default=None, metadata=dict(exclude_from_metadata=True)) #: Checksum of asset. Only required for existing assets checksum: InitVar[Any] = None _checksum: Optional[str] = field(default=None, init=False) def __post_init__(self, content, checksum): """ After dataclass setup, validate options. Args: content: Content to set checksum: Checksum to set on object Returns: None Raises: ValueError - If absolute path and content are both provided. You can only provide one. If absolute path is set and is a directory. Only file paths are supported. If absolute path, filename, checksum, and content is not set. FileNotFoundError - If absolute_path doesn't exist """ # Cache of our asset key self._key = None self._content = None if isinstance(content, property) else content self._checksum = checksum if not isinstance(checksum, property) else None self.filename = self.filename or (os.path.basename(self.absolute_path) if self.absolute_path else None) # populate absolute path for conditions where user does not supply info if not self._checksum and self._content is None and not self.absolute_path and self.filename and not self.persisted: # try relative path if self.relative_path and os.path.exists(self.short_remote_path()): self.absolute_path = os.path.abspath(self.short_remote_path()) else: self.absolute_path = os.path.abspath(self.filename) if self.absolute_path and self._content is not None: raise ValueError("Absolute Path and Content are mutually exclusive. Please provide only one of the options") elif self.absolute_path and not os.path.exists(self.absolute_path): raise FileNotFoundError(f"Cannot find specified asset: {self.absolute_path}") elif self.absolute_path and os.path.isdir(self.absolute_path) and not self.persisted: raise ValueError("Asset cannot be a directory!") elif not self.absolute_path and (not self.filename or (self.filename and not self._checksum and self._content is None and not self.persisted)): raise ValueError("Impossible to create the asset without either absolute path, filename and content, or filename and checksum!") def __repr__(self): """ String representation of Asset. Returns: String representation """ return f"<Asset: {os.path.join(self.relative_path, self.filename)} from {self.absolute_path}>" @property def checksum(self): # noqa: F811 """ Returns checksum of object. This will return None unless the user has provided checksum or called calculate checksum to avoid computation. If you need to guarantee a checksum value, call calculate_checksum beforehand. Returns: Checksum """ return self._checksum @checksum.setter def checksum(self, checksum): """ Set the checksum property. Args: checksum: Checksum set Returns: None """ self._checksum = checksum @property def extension(self) -> str: """ Returns extension of asset. Returns: Extension Notes: This does not preserve the case of the extension in the filename. Extensions will always be returned in lowercase. """ return os.path.splitext(self.filename)[1].lstrip('.').lower() @property def filename(self): # noqa: F811 """ Filename as asset. Returns: Filename """ return self._filename or "" @filename.setter def filename(self, filename): """ Set the filename. Args: filename: Filename Returns: None """ self._filename = filename if not isinstance(filename, property) and filename else None self._key = None @property def relative_path(self): # noqa: F811 """ Get the relative path. Returns: Relative path """ return self._relative_path or "" @relative_path.setter def relative_path(self, relative_path): """ Sets the relative path of an asset. We filter out strings ending in forward or backward slashes. Args: relative_path: Relative path for the item Returns: None """ self._relative_path = relative_path.strip(" \\/") if not isinstance(relative_path, property) and relative_path else None self._key = None @property def bytes(self): """ Bytes is the content as bytes. Returns: None """ if isinstance(self.content, bytes): return self.content return str.encode(self.handler(self.content)) @property def length(self): """ Get length of item. Returns: Length of the content """ if self._length is None: self._length = len(self.content) return self._length @length.setter def length(self, new_length): """ Set length of asset. Args: new_length: Length to set Returns: None """ self._length = new_length @property def content(self): # noqa: F811 """ Content of the asset. Returns: The content of the file, either from the content attribute or by opening the absolute path. """ if self._content is None and self.absolute_path: with open(self.absolute_path, "rb") as fp: self._content = fp.read() elif self.download_generator_hook: if logger.isEnabledFor(DEBUG): logger.debug(f"Fetching {self.filename} content from platform") self._content = self.download_stream().getvalue() return self._content @content.setter def content(self, content): """ Content property setting. Args: content: Content to set Returns: None """ self._content = None if isinstance(content, property) else content # Reset checksum to None until requested if self._checksum: self._checksum = None # region Equality and Hashing def __eq__(self, other: 'Asset'): """ Equality between assets. Assets are the same if the key is the same. Args: other: Other assets to compare with Returns: True if the keys are the same. """ return self.__key() == other.__key()
[docs] def deep_equals(self, other: 'Asset') -> bool: """ Performs a deep comparison of assets, including contents. Args: other: Other asset to compare Returns: True if filename, relative path, and contents are equal, otherwise false """ if self.filename == other.filename and self.relative_path == other.relative_path: return self.calculate_checksum() == other.calculate_checksum() return False
def __key(self): """ Get asset key. Asset key is filename and relative path. Returns: Asset key """ # We only care to check if filename and relative path is same. Goal here is not identical check but rather that # two files don't exist in same remote path if self._key is None: self._key = self.filename, self.relative_path return self._key def __hash__(self): """ Hash of Asset item. Returns: Asset hash """ return hash(self.__key()) # endregion
[docs] def download_generator(self) -> Generator[bytearray, None, None]: """ A Download Generator that returns chunks of bytes from the file. Returns: Generator of bytearray Raises: ValueError - When there is not a download generator hook defined Notes: TODO - Add a custom error with doclink. """ if not self.download_generator_hook: raise ValueError("To be able to download, the Asset needs to be fetched from a platform object") else: return self.download_generator_hook()
[docs] def download_stream(self) -> BytesIO: """ Get a bytes IO stream of the asset. Returns: BytesIO of the Asset """ if logger.isEnabledFor(DEBUG): logger.debug(f"Download {self.filename} to stream") io = BytesIO() self.__write_download_generator_to_stream(io) return io
@backoff.on_exception(backoff.expo, (requests.exceptions.Timeout, requests.exceptions.ConnectionError), max_tries=8) def __write_download_generator_to_stream(self, stream: BinaryIO, progress: bool = False): """ Write the download generator to another stream. Args: stream: Stream to download progress: Show progress Returns: None """ gen = self.download_generator() if progress and not IdmConfigParser.is_progress_bar_disabled(): from tqdm import tqdm gen = tqdm(gen, total=self.length) try: for chunk in gen: if progress: gen.update(len(chunk)) stream.write(chunk) finally: # close progress if we have it open if progress: gen.close()
[docs] def download_to_path(self, dest: str, force: bool = False): """ Download an asset to path. This requires loadings the object through the platform. Args: dest: Path to write to. If it is a directory, the asset filename will be added to it force: Force download even if file exists Returns: None """ if os.path.isdir(dest): path = os.path.join(dest, self.short_remote_path()) path = path.replace("\\", os.path.sep) os.makedirs(os.path.dirname(path), exist_ok=True) else: path = dest if not os.path.exists(path) or force: with open(path, 'wb') as out: if logger.isEnabledFor(DEBUG): logger.debug(f"Download {self.filename} to {path}") self.__write_download_generator_to_stream(out)
[docs] def calculate_checksum(self) -> str: """ Calculate checksum on asset. If previous checksum was calculated, that value will be returned. Returns: Checksum string """ if not self._checksum: if self.absolute_path: self._checksum = calculate_md5(self.absolute_path) elif self.content is not None: self._checksum = calculate_md5_stream(io.BytesIO(self.bytes)) return self._checksum
[docs] def short_remote_path(self) -> str: """ Returns the short remote path. This is the join of the relative path and filename. Returns: Remote Path + Filename """ if self.relative_path: path = PurePosixPath(self.relative_path.replace("\\", "/")).joinpath(self.filename) else: path = PurePosixPath(self.filename) return str(path)
[docs] def save_as(self, dest: str, force: bool = False): # noqa: F811 """ Download asset object to destination file. Args: dest: the file path force: force download Returns: None """ if self.absolute_path is not None: self.download_generator_hook = partial(file_content_to_generator, self.absolute_path) elif self.content: self.download_generator_hook = partial(content_generator, self.bytes) else: raise ValueError("Asset has no content or absolute path") self.download_to_path(dest, force)
[docs] def save_md5_checksum(self): """ Save the md5 checksum of the asset to a file in the same directory as the asset. Returns: None """ asset = Asset(filename=f"{self.filename}.md5", content=f"{self.filename}:md5:{self.checksum}") if asset.checksum is None: asset.calculate_checksum() asset.save_as(os.path.curdir, force=True)
TAsset = TypeVar("TAsset", bound=Asset) # Assets types TAssetList = List[TAsset] # Filters types TAssetFilter = Union[Callable[[TAsset], bool], Callable] TAssetFilterList = List[TAssetFilter]