idmtools.utils.decorators module#

Decorators defined for idmtools.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.

class idmtools.utils.decorators.abstractstatic(function)[source]#

Bases: staticmethod

A decorator for defining a method both as static and abstract.

__init__(function)[source]#

Initialize abstractstatic.

Parameters:

function – Function to wrap as abstract

idmtools.utils.decorators.optional_decorator(decorator: Callable, condition: bool | Callable[[], bool])[source]#

A decorator that adds a decorator only if condition is true.

Parameters:
  • decorator – Decorator to add

  • condition – Condition to determine. Condition can be a callable as well

Returns:

Optionally wrapped func.

class idmtools.utils.decorators.SingletonMixin[source]#

Bases: object

SingletonMixin defines a singleton that can be added to any class.

As a singleton, on one instance will be made per process.

classmethod instance()[source]#

Return the instance of the object.

If the instance has not been created, it will be initialized before returning.

Returns:

The singleton instance

idmtools.utils.decorators.cache_for(ttl=None) Callable[source]#

Cache a value for a certain time period.

Parameters:

ttl – Expiration of cache

Returns:

Wrapper Function

idmtools.utils.decorators.optional_yaspin_load(*yargs, **ykwargs) Callable[source]#

Adds a CLI spinner to a function based on conditions.

The spinner will be present if

  • yaspin package is present.

  • NO_SPINNER environment variable is not defined.

Parameters:
  • *yargs – Arguments to pass to yaspin constructor.

  • **ykwargs – Keyword arguments to pass to yaspin constructor.

Examples

@optional_yaspin_load(text="Loading test", color="yellow")
def test():
    time.sleep(100)
Returns:

A callable wrapper function.

class idmtools.utils.decorators.ParallelizeDecorator(queue=None, pool_type: ~typing.Type[~concurrent.futures._base.Executor] | None = <class 'concurrent.futures.thread.ThreadPoolExecutor'>)[source]#

Bases: object

ParallelizeDecorator allows you to easily parallelize a group of code.

A simple of example would be following

Examples

op_queue = ParallelizeDecorator()

class Ops:
    op_queue.parallelize
    def heavy_op():
        time.sleep(10)

    def do_lots_of_heavy():
        futures = [self.heavy_op() for i in range(100)]
        results = op_queue.get_results(futures)
__init__(queue=None, pool_type: ~typing.Type[~concurrent.futures._base.Executor] | None = <class 'concurrent.futures.thread.ThreadPoolExecutor'>)[source]#

Initialize our ParallelizeDecorator.

Parameters:
  • queue – Queue to use. If not provided, one will be created.

  • pool_type – Pool type to use. Defaults to ThreadPoolExecutor.

parallelize(func)[source]#

Wrap a function in parallelization.

Parameters:

func – Function to wrap with parallelization

Returns:

Function wrapped with parallelization object

join()[source]#

Join our queue.

Returns:

Join operation from queue

get_results(futures, ordered=False)[source]#

Get Results from our decorator.

Parameters:
  • futures – Futures to get results from

  • ordered – Do we want results in order provided or as they complete. Default is as they complete which is False.

Returns:

Results from all the futures.

Decorator to check symlink creation capabilities.