Source code for idmtools_platform_comps.utils.package_version

"""idmtools Tools to filter versions of packages for requriements for asset collections.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import functools
import operator
import json
import os
import re
from abc import ABC
from datetime import datetime
from logging import getLogger, DEBUG
from typing import Optional, List, Type
from urllib import request
import requests
from packaging.requirements import Requirement
from packaging.version import Version, parse
from html.parser import HTMLParser

user_logger = getLogger('user')


PKG_PYPI = 'https://pypi.python.org/pypi/{}/json'
PYPI_PRODUCTION_SIMPLE = 'https://packages.idmod.org/artifactory/api/pypi/pypi-production/simple'

IDM_DOCKER_PROD = 'https://packages.idmod.org/artifactory/list/docker-production'
IDMTOOLS_DOCKER_PROD = f'{IDM_DOCKER_PROD}/idmtools/'
MANIFEST_URL = "https://hub.docker.com/v2/repositories/library/{repository}/tags/?page_size=25&page=1&name={tag}"

logger = getLogger(__name__)


[docs]class PackageHTMLParser(HTMLParser, ABC): """Base Parser for our other parsers.""" previous_tag = None pkg_version = None
[docs] def __init__(self): """Constructor.""" super().__init__() self.pkg_version = set()
[docs]class LinkHTMLParser(PackageHTMLParser): """Parse hrefs from links."""
[docs] def handle_starttag(self, tag, attrs): """Parse links and extra hrefs.""" self.previous_tag = tag if tag != 'a': return attr = dict(attrs) v = attr['href'] v = v.rstrip('/') self.pkg_version.add(v)
[docs]class LinkNameParser(PackageHTMLParser): """ Provides parsing of packages from pypi/arfifactory. We parse links that match versions patterns """ in_link = False ver_pattern = re.compile(r'^[\d\.brcdev\+nightly]+$')
[docs] def handle_starttag(self, tag, attrs): """Handle begin of links.""" self.previous_tag = tag self.in_link = tag == "a"
[docs] def handle_endtag(self, tag): """End link tags.""" if tag == "a": self.in_link = False
[docs] def handle_data(self, data): """Process links.""" if self.in_link: parts = data.split("-") if len(parts) >= 2: if self.ver_pattern.match(parts[1]): self.pkg_version.add(parts[1]) elif parts[1].endswith(".zip"): self.pkg_version.add(parts[1][:-4]) elif parts[1].endswith(".tar.gz"): self.pkg_version.add(parts[1][:-7])
[docs]def get_latest_package_version_from_pypi(pkg_name, display_all=False): """ Utility to get the latest version for a given package name. Args: pkg_name: package name given display_all: determine if output all package releases Returns: the latest version of ven package """ url = f'https://pypi.python.org/pypi/{pkg_name}/json' try: releases = json.loads(request.urlopen(url).read())['releases'] except Exception: return None all_releases = sorted(releases, key=parse, reverse=True) if display_all: print(all_releases) release_versions = [ver for ver in all_releases if not parse(ver).is_prerelease] latest_version = release_versions[0] return latest_version
[docs]def get_latest_pypi_package_version_from_artifactory(pkg_name, display_all=False, base_version: str = None): """ Utility to get the latest version for a given package name. Args: pkg_name: package name given display_all: determine if output all package releases base_version: Base version Returns: the latest version of ven package """ pkg_url = "/".join([PYPI_PRODUCTION_SIMPLE, pkg_name]) return get_latest_version_from_site(pkg_url, display_all=display_all, base_version=base_version)
[docs]def get_pypi_package_versions_from_artifactory(pkg_name, display_all=False, base_version: str = None, exclude_pre_release: bool = True): """ Utility to get versions of a package in artifactory. Args: pkg_name: package name given display_all: determine if output all package releases base_version: Base version exclude_pre_release: Exclude any prerelease versions Returns: the latest version of ven package """ pkg_url = "/".join([PYPI_PRODUCTION_SIMPLE, pkg_name]) return get_versions_from_site(pkg_url, base_version, display_all=display_all, parser=LinkNameParser, exclude_pre_release=exclude_pre_release)
[docs]def get_latest_ssmt_image_version_from_artifactory(pkg_name='comps_ssmt_worker', base_version: Optional[str] = None, display_all=False): """ Utility to get the latest version for a given package name. Args: pkg_name: package name given base_version: Optional base version. Versions above this will not be added. display_all: determine if output all package releases Returns: the latest version of ven package """ pkg_path = IDMTOOLS_DOCKER_PROD pkg_url = "/".join([pkg_path, pkg_name]) base_version = ".".join(base_version.replace("+nightly", "").split(".")[:2]) return get_latest_version_from_site(pkg_url, base_version=base_version, display_all=display_all, parser=LinkHTMLParser)
[docs]def get_docker_manifest(image_path="idmtools/comps_ssmt_worker", repo_base=IDM_DOCKER_PROD): """ Get docker manifest from IDM Artifactory. It mimics latest even when user has no latest tag defined. Args: image_path:Path of docker image we want repo_base:Base of the repo Returns: None Raises: ValueError - When the manifest cannot be found """ if ":" not in image_path: image_path += ":latest" path, tag = image_path.split(":") if tag == "latest": url = "/".join([IDM_DOCKER_PROD, path]) response = requests.get(url) content = response.text lines = [link.split(">") for link in content.split("\n") if "<a href" in link and "pre" not in link] lines = {item_date[1].replace("/</a", ''): datetime.strptime(item_date[2].strip(" -"), '%d-%b-%Y %H:%M') for item_date in lines} tag = list(sorted(lines.items(), key=operator.itemgetter(1), reverse=True))[0][0] image_path = ":".join([path, tag]) final_path = "/".join([path, tag, "manifest.json"]) pkg_path = f'{repo_base}/{final_path}' response = requests.get(pkg_path) if response.status_code != 200: raise ValueError("Could not find manifest for file") return response.json(), image_path
[docs]def get_digest_from_docker_hub(repo, tag='latest'): """ Get the digest for image from docker. Args: repo: string, repository (e.g. 'library/fedora') tag: string, tag of the repository (e.g. 'latest') """ response = requests.get( MANIFEST_URL.format(repository=repo, tag=tag) ) manifest = response.json() if response.ok and manifest['count']: images = list(filter(lambda x: x['architecture'] == "amd64", manifest['results'][0]['images'])) if len(images): return images[0]['digest'] return None
[docs]@functools.lru_cache(8) def fetch_versions_from_server(pkg_url: str, parser: Type[PackageHTMLParser] = LinkHTMLParser) -> List[str]: """ Fetch all versions from server. Args: pkg_url: Url to fetch parser: Parser tp use Returns: All the releases for a package """ resp = requests.get(pkg_url) pkg_name = pkg_url.split('/')[-1] if resp.status_code != 200: logger.warning(f"Error fetching package {pkg_name} information. Status code: {resp.status_code}") return [] html_str = resp.text parser = parser() parser.feed(html_str) releases = parser.pkg_version releases = [v for v in releases if not v.startswith('.')] all_releases = sorted(releases, key=parse, reverse=True) return all_releases
[docs]def fetch_versions_from_artifactory(pkg_name: str, parser: Type[PackageHTMLParser] = LinkHTMLParser) -> List[str]: """ Fetch all versions from server. Args: pkg_name: Url to fetch parser: Parser tp use Returns: Available releases """ pkg_path = IDM_DOCKER_PROD pkg_url = os.path.join(pkg_path, pkg_name) resp = requests.get(pkg_url) if resp.status_code != 200: logger.warning('Could not fetch URL') return None html_str = resp.text parser = parser() parser.feed(html_str) releases = parser.pkg_version releases = [v for v in releases if not v.startswith('.')] all_releases = sorted(releases, key=parse, reverse=True) return all_releases
[docs]@functools.lru_cache(3) def get_versions_from_site(pkg_url, base_version: Optional[str] = None, display_all=False, parser: Type[PackageHTMLParser] = LinkNameParser, exclude_pre_release: bool = True): """ Utility to get the the available versions for a package. The default properties filter out pre releases. You can also include a base version to only list items starting with a particular version Args: pkg_url: package name given base_version: Optional base version. Versions above this will not be added. For example, to get versions 1.18.5, 1.18.4, 1.18.3, 1.18.2 pass 1.18 display_all: determine if output all package releases parser: Parser needs to be a HTMLParser that returns a pkg_versions exclude_pre_release: Exclude prerelease versions Returns: the latest version of ven package Raises: ValueError - If a latest versions cannot be determined """ all_releases = fetch_versions_from_server(pkg_url, parser=parser) if all_releases is None: raise ValueError( f"Could not determine latest version for package {pkg_url}. You can manually specify a version to avoid this error") if display_all: print(all_releases) if exclude_pre_release: ver_pattern = re.compile(r'^[\d\.]+$') release_versions = [ver for ver in all_releases if ver_pattern.match(ver)] else: release_versions = all_releases if base_version: release_versions = [ver for ver in release_versions if ver.startswith(base_version)] # comps_ssmt_worker will store only x.x.x.x if 'comps_ssmt_worker' in pkg_url.lower(): release_versions = [ver for ver in release_versions if len(ver.split('.')) == 4] return release_versions
[docs]@functools.lru_cache(3) def get_latest_version_from_site(pkg_url, base_version: Optional[str] = None, display_all=False, parser: Type[PackageHTMLParser] = LinkNameParser, exclude_pre_release: bool = True): """ Utility to get the latest version for a given package name. Args: pkg_url: package name given base_version: Optional base version. Versions above this will not be added. display_all: determine if output all package releases parser: Parser needs to be a HTMLParser that returns a pkg_versions exclude_pre_release: Exclude pre-release versions Returns: the latest version of ven package """ if logger.isEnabledFor(DEBUG): logger.debug(f"Fetching version from {pkg_url} with base {base_version}") release_versions = get_versions_from_site(pkg_url, base_version, display_all=display_all, parser=parser, exclude_pre_release=exclude_pre_release) if base_version: # only use the longest match latest version_compatible_portion = ".".join(base_version.split(".")[:2]) if logger.isEnabledFor(DEBUG): logger.debug( f"Finding latest of matches for version {base_version} from {release_versions} using {version_compatible_portion}") for ver in release_versions: if ".".join(ver.split('.')[:2]) == version_compatible_portion: return ver return None return release_versions[0] if release_versions else None
[docs]def fetch_package_versions_from_pypi(pkg_name): """ Utility to get the latest version for a given package name. Args: pkg_name: package name given Returns: the latest version of ven package """ url = PKG_PYPI.format(pkg_name) try: releases = json.loads(request.urlopen(url).read())['releases'] except Exception: return None return releases
[docs]def fetch_package_versions(pkg_name, is_released=True, sort=True, display_all=False): """ Utility to get the latest version for a given package name. Args: pkg_name: package name given is_released: get released version only sort: make version sorted or not display_all: determine if output all package releases Returns: the latest version of ven package """ # First fetch versions from Artifactory pkg_url = "/".join([PYPI_PRODUCTION_SIMPLE, pkg_name]) versions = fetch_versions_from_server(pkg_url, parser=LinkNameParser) if versions is None: versions = fetch_package_versions_from_pypi(pkg_name) if sort: versions = sorted(versions, key=parse, reverse=True) if is_released: versions = [ver for ver in versions if not parse(ver).is_prerelease] if display_all: print(display_all) return versions
[docs]def get_highest_version(pkg_requirement: str): """ Utility to get the latest version for a given package name. Args: pkg_requirement: package requirement given Returns: the highest valid version of the package """ req = Requirement(pkg_requirement) available_versions = fetch_package_versions(req.name) # Filter versions that satisfy the specifier valid_versions = [Version(version) for version in available_versions if parse(version) in req.specifier] if not valid_versions: return None # No valid versions found # Return the highest valid version return str(max(valid_versions))
[docs]def get_latest_version(pkg_name): """ Utility to get the latest version for a given package name. Args: pkg_name: package name given Returns: the latest version of package """ version = get_highest_version(pkg_name) if version is None: user_logger.info(f"No valid versions found for '{pkg_name}'") exit(1) return version
[docs]def get_latest_compatible_version(pkg_name, base_version=None, versions=None, validate=True): """ Utility to get the latest compatible version from a given version list. Args: base_version: Optional base version. Versions above this will not be added. pkg_name: package name given versions: user input of version list validate: bool, if True, will validate base_version Returns: the latest compatible version from versions Raises: Exception - If we cannot find version Notes: - TODO - Make custom exception or use ValueError """ if versions is None: versions = fetch_package_versions(pkg_name) # Return None if given version list is None or empty if not versions: return None # Return the latest version if no base_version is given if base_version is None: return versions[0] # Cleanup base_version = base_version.replace('+nightly', '') # Make sure the input is valid parsed_version_to_check = parse(base_version) # Check if the version is in the list is_in_list = any(parsed_version_to_check == parse(version) for version in versions) if not is_in_list and validate: user_logger.info(f"Could not find the version of '{base_version}' for '{pkg_name}'.") return None # Find all possible candidates candidates = [version for version in versions if version.startswith(base_version)] # Pick the latest return candidates[0]