idmtools.utils.decorators module¶
Decorators defined for idmtools.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
- class idmtools.utils.decorators.abstractstatic(function)¶
- Bases: - staticmethod- A decorator for defining a method both as static and abstract. - __init__(function)¶
- Initialize abstractstatic. - Parameters:
- function – Function to wrap as abstract 
 
 
- idmtools.utils.decorators.optional_decorator(decorator: Callable, condition: Union[bool, Callable[[], bool]])¶
- A decorator that adds a decorator only if condition is true. - Parameters:
- decorator – Decorator to add 
- condition – Condition to determine. Condition can be a callable as well 
 
- Returns:
- Optionally wrapped func. 
 
- class idmtools.utils.decorators.SingletonMixin¶
- Bases: - object- SingletonMixin defines a singleton that can be added to any class. - As a singleton, on one instance will be made per process. - classmethod instance()¶
- Return the instance of the object. - If the instance has not been created, it will be initialized before returning. - Returns:
- The singleton instance 
 
 
- idmtools.utils.decorators.cache_for(ttl=None) Callable¶
- Cache a value for a certain time period. - Parameters:
- ttl – Expiration of cache 
- Returns:
- Wrapper Function 
 
- idmtools.utils.decorators.optional_yaspin_load(*yargs, **ykwargs) Callable¶
- Adds a CLI spinner to a function based on conditions. - The spinner will be present if - yaspin package is present. 
- NO_SPINNER environment variable is not defined. 
 - Parameters:
- *yargs – Arguments to pass to yaspin constructor. 
- **ykwargs – Keyword arguments to pass to yaspin constructor. 
 
 - Examples - @optional_yaspin_load(text="Loading test", color="yellow") def test(): time.sleep(100) - Returns:
- A callable wrapper function. 
 
- class idmtools.utils.decorators.ParallelizeDecorator(queue=None, pool_type: ~typing.Optional[~typing.Type[~concurrent.futures._base.Executor]] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>)¶
- Bases: - object- ParallelizeDecorator allows you to easily parallelize a group of code. - A simple of example would be following - Examples - op_queue = ParallelizeDecorator() class Ops: op_queue.parallelize def heavy_op(): time.sleep(10) def do_lots_of_heavy(): futures = [self.heavy_op() for i in range(100)] results = op_queue.get_results(futures) - __init__(queue=None, pool_type: ~typing.Optional[~typing.Type[~concurrent.futures._base.Executor]] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>)¶
- Initialize our ParallelizeDecorator. - Parameters:
- queue – Queue to use. If not provided, one will be created. 
- pool_type – Pool type to use. Defaults to ThreadPoolExecutor. 
 
 
 - parallelize(func)¶
- Wrap a function in parallelization. - Parameters:
- func – Function to wrap with parallelization 
- Returns:
- Function wrapped with parallelization object 
 
 - join()¶
- Join our queue. - Returns:
- Join operation from queue 
 
 - get_results(futures, ordered=False)¶
- Get Results from our decorator. - Parameters:
- futures – Futures to get results from 
- ordered – Do we want results in order provided or as they complete. Default is as they complete which is False. 
 
- Returns:
- Results from all the futures.