idmtools.utils.decorators module¶
Decorators defined for idmtools.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
- class idmtools.utils.decorators.abstractstatic(function)¶
Bases:
staticmethod
A decorator for defining a method both as static and abstract.
- __init__(function)¶
Initialize abstractstatic.
- Parameters:
function – Function to wrap as abstract
- idmtools.utils.decorators.optional_decorator(decorator: Callable, condition: Union[bool, Callable[[], bool]])¶
A decorator that adds a decorator only if condition is true.
- Parameters:
decorator – Decorator to add
condition – Condition to determine. Condition can be a callable as well
- Returns:
Optionally wrapped func.
- class idmtools.utils.decorators.SingletonMixin¶
Bases:
object
SingletonMixin defines a singleton that can be added to any class.
As a singleton, on one instance will be made per process.
- classmethod instance()¶
Return the instance of the object.
If the instance has not been created, it will be initialized before returning.
- Returns:
The singleton instance
- idmtools.utils.decorators.cache_for(ttl=None) Callable ¶
Cache a value for a certain time period.
- Parameters:
ttl – Expiration of cache
- Returns:
Wrapper Function
- idmtools.utils.decorators.optional_yaspin_load(*yargs, **ykwargs) Callable ¶
Adds a CLI spinner to a function based on conditions.
The spinner will be present if
yaspin package is present.
NO_SPINNER environment variable is not defined.
- Parameters:
*yargs – Arguments to pass to yaspin constructor.
**ykwargs – Keyword arguments to pass to yaspin constructor.
Examples
@optional_yaspin_load(text="Loading test", color="yellow") def test(): time.sleep(100)
- Returns:
A callable wrapper function.
- class idmtools.utils.decorators.ParallelizeDecorator(queue=None, pool_type: ~typing.Optional[~typing.Type[~concurrent.futures._base.Executor]] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>)¶
Bases:
object
ParallelizeDecorator allows you to easily parallelize a group of code.
A simple of example would be following
Examples
op_queue = ParallelizeDecorator() class Ops: op_queue.parallelize def heavy_op(): time.sleep(10) def do_lots_of_heavy(): futures = [self.heavy_op() for i in range(100)] results = op_queue.get_results(futures)
- __init__(queue=None, pool_type: ~typing.Optional[~typing.Type[~concurrent.futures._base.Executor]] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>)¶
Initialize our ParallelizeDecorator.
- Parameters:
queue – Queue to use. If not provided, one will be created.
pool_type – Pool type to use. Defaults to ThreadPoolExecutor.
- parallelize(func)¶
Wrap a function in parallelization.
- Parameters:
func – Function to wrap with parallelization
- Returns:
Function wrapped with parallelization object
- join()¶
Join our queue.
- Returns:
Join operation from queue
- get_results(futures, ordered=False)¶
Get Results from our decorator.
- Parameters:
futures – Futures to get results from
ordered – Do we want results in order provided or as they complete. Default is as they complete which is False.
- Returns:
Results from all the futures.