Source code for idmtools.entities.iplatform_ops.utils

"""
Utils for platform operations.

Here we have mostly utilities to handle batch operations which tend to overlap across different item types.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
from concurrent.futures import as_completed, Future
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from logging import getLogger, DEBUG
from os import cpu_count
from typing import List, Union, Generator, Iterable, Callable, Any
from more_itertools import chunked
from idmtools.core import EntityContainer
from idmtools.entities.templated_simulation import TemplatedSimulations

logger = getLogger(__name__)
user_logger = getLogger('user')
# Global executor
EXECUTOR = None


[docs]def batch_items(items: Union[Iterable, Generator], batch_size=16): """ Batch items. Args: items: Items to batch batch_size: Size of the batch Returns: Generator Raises: StopIteration """ for item_chunk in chunked(items, batch_size): logger.info('created chunk') yield item_chunk raise StopIteration
[docs]def item_batch_worker_thread(create_func: Callable, items: Union[List], **kwargs) -> List: """ Default batch worker thread function. It just calls create on each item. Args: create_func: Create function for item items: Items to create Returns: List of items created """ if logger.isEnabledFor(DEBUG): logger.debug(f'Create {len(items)}') ret = [] for item in items: ret.append(create_func(item, **kwargs)) return ret
[docs]def batch_create_items(items: Union[Iterable, Generator], batch_worker_thread_func: Callable[[List], List] = None, create_func: Callable[..., Any] = None, display_progress: bool = True, progress_description: str = "Commissioning items", unit: str = None, **kwargs): """ Batch create items. You must specify either batch_worker_thread_func or create_func. Args: items: Items to create batch_worker_thread_func: Optional Function to execute. Should take a list and return a list create_func: Optional Create function display_progress: Enable progress bar progress_description: Description to show in progress bar unit: Unit for progress bar **kwargs: Returns: Batches crated results """ global EXECUTOR from idmtools.config import IdmConfigParser from idmtools.utils.collections import ExperimentParentIterator max_workers = kwargs.get('max_workers', None) # Consider values from the block that Platform uses _batch_size = int(IdmConfigParser.get_option(None, "batch_size", fallback=16)) batch_size = kwargs.get('batch_size', None) if batch_size is not None: _batch_size = batch_size if display_progress and not IdmConfigParser.is_progress_bar_disabled(): from tqdm import tqdm extra_args = dict(unit=unit) if unit else dict() prog = tqdm(desc="Initializing objects for creation", **extra_args) else: prog = None if EXECUTOR is None: _workers_per_cpu = IdmConfigParser.get_option(None, "workers_per_cpu", fallback=None) if _workers_per_cpu: _max_workers = int(_workers_per_cpu) * cpu_count() if logger.isEnabledFor(DEBUG): logger.debug(f"workers set by cpu: {_workers_per_cpu} * {cpu_count()}") else: _max_workers = int(IdmConfigParser.get_option(None, "max_workers", fallback=16)) if max_workers is not None: _max_workers = max_workers logger.info(f'Creating {_max_workers} Platform Workers') default_pool_executor = IdmConfigParser.get_option(None, "default_pool_executor", fallback="thread").lower() if default_pool_executor == "process": EXECUTOR = ProcessPoolExecutor(max_workers=_max_workers) else: EXECUTOR = ThreadPoolExecutor(max_workers=_max_workers) if batch_worker_thread_func is None: if create_func is None: raise ValueError("You must provide either an item create callback or a item batch worker thread callback to" " perform batches") batch_worker_thread_func = partial(item_batch_worker_thread, create_func, **kwargs) if logger.isEnabledFor(DEBUG): logger.debug(f'Batching creation by {_batch_size}') futures = [] total = 0 parent = None if isinstance(items, ExperimentParentIterator) and isinstance(items.items, TemplatedSimulations): parent = items.parent i = items.items.simulations().generator elif isinstance(items, ExperimentParentIterator) and isinstance(items.items, EntityContainer): parent = items.parent i = items.items else: i = items if display_progress and not IdmConfigParser.is_progress_bar_disabled() and hasattr(items, '__len__'): prog.total = len(items) for chunk in chunked(i, _batch_size): total += len(chunk) if parent: for c in chunk: c.parent = parent if logger.isEnabledFor(DEBUG): logger.debug(f"Submitting chunk: {len(chunk)}") if display_progress and not IdmConfigParser.is_progress_bar_disabled(): prog.update(len(chunk)) futures.append(EXECUTOR.submit(batch_worker_thread_func, chunk)) results = [] if display_progress and not IdmConfigParser.is_progress_bar_disabled(): prog.set_description(progress_description) prog.reset(total) results = show_progress_of_batch(prog, futures) else: for future in futures: results.extend(future.result()) return results
[docs]def show_progress_of_batch(progress_bar: 'tqdm', futures: List[Future]) -> List: # noqa: F821 """ Show progress bar for batch. Args: progress_bar: Progress bar futures: List of futures that are still running/queued Returns: Returns results """ results = [] for future in as_completed(futures): result = future.result() progress_bar.update(len(result)) results.extend(future.result()) return results