Source code for idmtools.utils.decorators
Decorators defined for idmtools.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
import datetime
import functools
import importlib
import importlib.util
import os
import sys
import threading
from concurrent.futures import Executor, as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps
from logging import getLogger, DEBUG
from typing import Callable, Union, Optional, Type
user_logger = getLogger('user')
logger = getLogger(__name__)
[docs]class abstractstatic(staticmethod):
A decorator for defining a method both as static and abstract.
__slots__ = ()
[docs] def __init__(self, function):
Initialize abstractstatic.
function: Function to wrap as abstract
super(abstractstatic, self).__init__(function)
function.__isabstractmethod__ = True
__isabstractmethod__ = True
[docs]def optional_decorator(decorator: Callable, condition: Union[bool, Callable[[], bool]]):
A decorator that adds a decorator only if condition is true.
decorator: Decorator to add
condition: Condition to determine. Condition can be a callable as well
Optionally wrapped func.
if callable(condition):
condition = condition()
def decorate_in(func):
if condition:
func = decorator(func)
def wrapper(*args, **kwargs):
func(*args, **kwargs)
return wrapper
return decorate_in
[docs]class SingletonMixin(object):
SingletonMixin defines a singleton that can be added to any class.
As a singleton, on one instance will be made per process.
__singleton_lock = threading.Lock()
__singleton_instance = None
[docs] @classmethod
def instance(cls):
Return the instance of the object.
If the instance has not been created, it will be initialized before returning.
The singleton instance
if not cls.__singleton_instance:
with cls.__singleton_lock:
if not cls.__singleton_instance:
cls.__singleton_instance = cls()
return cls.__singleton_instance
[docs]def cache_for(ttl=None) -> Callable:
Cache a value for a certain time period.
ttl: Expiration of cache
Wrapper Function
if ttl is None:
ttl = datetime.timedelta(minutes=1)
def wrap(func):
time, value = None, None
def wrapped(*args, **kw):
# if we are testing, disable caching of functions as it complicates test-all setups
from idmtools.core import TRUTHY_VALUES
if os.getenv('TESTING', '0').lower() in TRUTHY_VALUES:
return func(*args, **kw)
nonlocal time
nonlocal value
now =
if not time or now - time > ttl:
value = func(*args, **kw)
time = now
return value
return wrapped
return wrap
[docs]def optional_yaspin_load(*yargs, **ykwargs) -> Callable:
Adds a CLI spinner to a function based on conditions.
The spinner will be present if
* yaspin package is present.
* NO_SPINNER environment variable is not defined.
*yargs: Arguments to pass to yaspin constructor.
**ykwargs: Keyword arguments to pass to yaspin constructor.
@optional_yaspin_load(text="Loading test", color="yellow")
def test():
A callable wrapper function.
has_yaspin = importlib.util.find_spec("yaspin")
spinner = None
if has_yaspin and not os.getenv('NO_SPINNER', False):
from yaspin import yaspin
spinner = yaspin(*yargs, **ykwargs)
def decorate(func):
def wrapper(*args, **kwargs):
if spinner and not os.getenv('NO_SPINNER', False):
kwargs['spinner'] = spinner
result = func(*args, **kwargs)
except Exception as e:
if spinner:
raise e
if spinner:
return result
return wrapper
return decorate
[docs]class ParallelizeDecorator:
ParallelizeDecorator allows you to easily parallelize a group of code.
A simple of example would be following
op_queue = ParallelizeDecorator()
class Ops:
def heavy_op():
def do_lots_of_heavy():
futures = [self.heavy_op() for i in range(100)]
results = op_queue.get_results(futures)
[docs] def __init__(self, queue=None, pool_type: Optional[Type[Executor]] = ThreadPoolExecutor):
Initialize our ParallelizeDecorator.
queue: Queue to use. If not provided, one will be created.
pool_type: Pool type to use. Defaults to ThreadPoolExecutor.
if queue is None:
self.queue = pool_type()
self.queue = queue
[docs] def parallelize(self, func):
Wrap a function in parallelization.
func: Function to wrap with parallelization
Function wrapped with parallelization object
def wrapper(*args, **kwargs):
future = self.queue.submit(func, *args, **kwargs)
return future
return wrapper
[docs] def join(self):
Join our queue.
Join operation from queue
return self.queue.join()
[docs] def get_results(self, futures, ordered=False):
Get Results from our decorator.
futures: Futures to get results from
ordered: Do we want results in order provided or as they complete. Default is as they complete which is False.
Results from all the futures.
results = []
if ordered:
for f in futures:
for f in as_completed(futures):
if logger.isEnabledFor(DEBUG):
logger.debug("Parallelize Total Results: " + str(results))
return results
def __del__(self):
Delete our queue before deleting ourselves.
del self.queue
[docs]def check_symlink_capabilities(func):
"""Decorator to check symlink creation capabilities."""
def wrapper(*args, **kwargs):
if sys.platform == 'win32' and sys.version_info < (3, 8):
user_logger.debug('Need administrator privileges to create symbolic links on Windows.')
return func(*args, **kwargs)
return wrapper