Source code for COMPS.Client

import os
import json
import logging
import re
import requests
try:
    from js import XMLHttpRequest, Blob, URLSearchParams, FormData
except ImportError:
    XMLHttpRequest = None
import time
import traceback
import urllib3
import COMPS

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger(__name__)

[docs]class Client(object): """ Client object for managing access to COMPS """ __auth_manager = None def __init__(self): pass
[docs] @classmethod def auth_manager(cls): """ Retrieve the AuthManager. Must be logged in first in, otherwise this raises a RuntimeError. :return: the AuthManager instance """ if not cls.__auth_manager: raise RuntimeError('login() is required.') return cls.__auth_manager
[docs] @classmethod def login(cls, hoststring, credential_prompt=None): """ Log in to the COMPS service. The specified COMPS hoststring allows a couple points of flexibility: * Secure vs. Unsecure - Specifying the protocol as http or https allows the user to control whether the SSL transport is used for requests. By default, https is used. * Port - Specifying a particular port allows the user to control the port to communicate over for requests. By default, the standard port for the chosen protocol is used (i.e. 80 for http, 443 for https). For example, the following are all valid formats: * comps.idmod.org - uses secure https protocol over port 443. * http://internal.idmod.org - uses unsecure http protocol over port 80. * localhost:54321 - uses secure https protocol over port 54321. Calling login() when already logged into a different host is invalid and will raise a RuntimeError. When already logged into the same host, nothing is done and the function returns immediately. :param hoststring: the COMPS host to connect to :param credential_prompt: a CredentialPrompt object that controls how the user will supply their login \ credentials. By default, pyCOMPS will try to open a graphical prompt (TKCredentialPrompt) and fall back \ to console (ConsoleCredentialPrompt) if that fails. """ if not hoststring: raise RuntimeError('COMPS host required for login') tmphoststring = hoststring.rstrip('/') match_obj = re.match('(?:(https?)(?:://))?([\w.-]*)(?:(?::)(\d+))?$', tmphoststring) if match_obj is None: raise RuntimeError('Invalid format for host string: "{0}". See help for correct usage.'.format(hoststring)) logger.debug('Parsed login host: {0}'.format(match_obj.groups())) protocol = match_obj.group(1) host_name = match_obj.group(2) port = match_obj.group(3) tmphoststring = '{0}://{1}{2}'.format( protocol if protocol else 'https', host_name, ':{0}'.format(str(port)) if port else '' ) if cls.__auth_manager is not None: oldhoststring = cls.__auth_manager.hoststring if oldhoststring != (os.environ.get('COMPS_SERVER') or tmphoststring).rstrip('/'): raise RuntimeError('Already logged into host \'{0}\'. Must logout before logging into a different host.'.format(oldhoststring)) if cls.__auth_manager.has_auth_token(): logger.info('Already logged into {0}. Skipping login.'.format(oldhoststring)) return else: cls.__auth_manager = COMPS.AuthManager(tmphoststring, credential_prompt=credential_prompt) try: cls.__auth_manager.get_auth_token() # force credentials prompt and caching of auth token except: cls.__auth_manager = None raise
[docs] @classmethod def logout(cls, hoststring = None): """ Log out of the COMPS service. If logged in, this clears any cached credentials and nulls the AuthManager instance. Otherwise, you may pass a hoststring parameter to clear cached credentials for a particular COMPS host. :param hoststring: the COMPS host to clear credentials for """ if hoststring is not None: tmp_authmgr = COMPS.AuthManager(hoststring) # If we're not already logged into the endpoint the user specified, then we're not really "logging # out", just clearing cached credentials if cls.__auth_manager is None or tmp_authmgr.hoststring != cls.__auth_manager.hoststring: logger.info('Clearing cached credentials for {}'.format(hoststring)) tmp_authmgr.clear_auth_token() return # Otherwise, the hoststring is basically redundant, so just fall through... if cls.__auth_manager is not None: logger.info('Logging out of {}'.format(cls.__auth_manager.hoststring)) cls.__auth_manager.clear_auth_token() cls.__auth_manager = None
[docs] @classmethod def post(cls, path, include_comps_auth_token=True, http_err_handle_exceptions=None, **kwargs): return cls.request('POST', path, include_comps_auth_token, http_err_handle_exceptions or [], **kwargs)
[docs] @classmethod def put(cls, path, include_comps_auth_token=True, http_err_handle_exceptions=None, **kwargs): return cls.request('PUT', path, include_comps_auth_token, http_err_handle_exceptions or [], **kwargs)
[docs] @classmethod def get(cls, path, include_comps_auth_token=True, http_err_handle_exceptions=None, **kwargs): return cls.request('GET', path, include_comps_auth_token, http_err_handle_exceptions or [], **kwargs)
[docs] @classmethod def delete(cls, path, include_comps_auth_token=True, http_err_handle_exceptions=None, **kwargs): return cls.request('DELETE', path, include_comps_auth_token, http_err_handle_exceptions or [], **kwargs)
[docs] @classmethod def request(cls, method, path, include_comps_auth_token=True, http_err_handle_exceptions=None, **kwargs): http_err_handle_exceptions = http_err_handle_exceptions or [] if 'headers' not in kwargs: kwargs['headers'] = {} if include_comps_auth_token: authtoken = cls.auth_manager().get_auth_token() kwargs['headers'][authtoken[0]] = authtoken[1] kwargs['headers']['Accept'] = 'application/json' if not kwargs.get('files') and 'Content-Type' not in kwargs['headers']: kwargs['headers']['Content-Type'] = 'application/json' max_tries = 10 retry_delay_ms = 100 req_url = cls.__build_url(path, method, kwargs.get('params')) for i in range(1, max_tries + 1): try: if XMLHttpRequest is None: # normal python scenario resp = requests.request(method, req_url, **kwargs) else: # pyodide scenario req = XMLHttpRequest.new() if 'params' in kwargs: req_url = req_url + "?" + URLSearchParams.new([ [k,str(kwargs['params'][k])] for k in kwargs['params'].keys() ]).toString() req.open(method, req_url, False) if 'headers' in kwargs: for k,v in kwargs['headers'].items(): req.setRequestHeader(k, v) if 'json' in kwargs: req_body = Blob.new([json.dumps(kwargs['json'])], Client.__BlobPropertyBag('application/json')) elif 'files' in kwargs: req_body = FormData.new() for f in kwargs['files']: req_body.set(f[0], Blob.new([f[1][1]], Client.__BlobPropertyBag(f[1][2])), f[1][0]) elif 'data' in kwargs: logger.info('Not Implemented!') pass else: req_body = None def callback(evt): logger.info(req.readyState) logger.info(req.status) if req.readyState == XMLHttpRequest.DONE: resp.status_code = req.status #print(f'Body:\r' + str(req_body)) req.send(req_body) resp = Client.__XmlHttpResponse(req) #print(f'Response:\r' + str(resp)) if i > 1: logger.debug('Succeeded on attempt {0} of {1}'.format(str(i), str(max_tries))) break except Exception as e: logger.debug('Failed attempt {0} of {1}: {2}'.format(str(i), str(max_tries), traceback.format_exception_only(type(e), e)[0][:-1])) # It should always be safe to retry GET calls, but for PUT/POST/DELETE, retrying something that # already succeeded on the server-side could have unintended side-effects so we need to be more cautious if i < max_tries and (method == "GET" or cls.__should_retry(e)): time.sleep(i * retry_delay_ms / 1000) logger.debug('Retrying...') else: logger.debug('NOT RETRYING') logger.debug(type(e)) if hasattr(e.args[0], 'reason'): logger.debug(type(e.args[0].reason)) raise if 400 <= resp.status_code < 600 and \ resp.status_code not in http_err_handle_exceptions: # there was an error, let's try to get a good error-message from the response cls.raise_err_from_resp(resp) return resp
[docs] @classmethod def raise_err_from_resp(cls, resp): resp_msg = None try: json_resp = resp.json() resp_msg = json_resp['ResponseMessage'] corr_id = json_resp.get('CorrelationId') except json.decoder.JSONDecodeError as e: logger.debug('Invalid response: {0}'.format(resp.content)) logger.debug('Exception: {0}'.format(str(e))) # raise this exception below to stop exception chaining except Exception as e: # couldn't get a good error-message from the response, just log and then exception out logger.debug('Invalid response: {0}'.format(resp.content)) logger.debug('Exception: {0}'.format(str(e))) resp.raise_for_status() if not resp_msg: resp.raise_for_status() raise RuntimeError('{0} {1} - {2}{3}'.format(str(resp.status_code), resp.reason, resp_msg, ' (CorrelationId = {0})'.format(corr_id) if corr_id else ''))
@classmethod def __build_url(cls, path, method, params): url = '{0}{1}/{2}'.format(cls.__auth_manager.hoststring, '/api' if not path.startswith('asset/') and not path.startswith('/asset/') else '', (path[1:] if path.startswith('/') else path)) logger.debug('REQUEST -> {0} {1}{2}'.format(method, url, ' ' + str(params) if params else '')) return url @classmethod def __should_retry(cls, e): try: if isinstance(e, requests.exceptions.ChunkedEncodingError): return True elif isinstance(e, requests.exceptions.SSLError): return True elif isinstance(e, requests.exceptions.ConnectionError) and \ hasattr(e.args[0], 'reason') and \ ( isinstance(e.args[0].reason, urllib3.exceptions.NewConnectionError) or \ isinstance(e.args[0].reason, urllib3.exceptions.SSLError) or \ isinstance(e.args[0].reason, requests.exceptions.SSLError) or \ isinstance(e.args[0].reason, BrokenPipeError) ): return True except Exception: pass return False class __XmlHttpResponse(object): status_code = None content = None headers = None reason = None def __init__(self, req): self.content = req.response self.status_code = req.status self.reason = req.statusText self.headers = {} # TODO: fill this in def json(self): return json.loads(self.content) class __BlobPropertyBag: def __init__(self, type='', endings='transparent'): self.type = type self.endings = endings