"""idmtools Analyzer manager.
AnalyzerManager is the "driver" of analysis. Analysis is mostly a map reduce operation.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from logging import getLogger, DEBUG
from typing import NoReturn, List, Dict, Tuple, Optional, TYPE_CHECKING
from tqdm import tqdm
from idmtools import IdmConfigParser
from idmtools.analysis.map_worker_entry import map_item
from idmtools.core import NoPlatformException
from idmtools.core.enums import ItemType
from idmtools.core.interfaces.ientity import IEntity
from idmtools.core.logging import VERBOSE, SUCCESS
from idmtools.entities.ianalyzer import IAnalyzer
from idmtools.utils.language import on_off, verbose_timedelta
if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
logger = getLogger(__name__)
user_logger = getLogger('user')
[docs]def pool_worker_initializer(func, analyzers, platform: 'IPlatform') -> NoReturn:
"""
Initialize the pool worker, which allows the process pool to associate the analyzers, cache, and path mapping to the function executed to retrieve data.
Using an initializer improves performance.
Args:
func: The function that the pool will call.
analyzers: The list of all analyzers to run.
platform: The platform to communicate with to retrieve files from.
Returns:
None
"""
func.analyzers = analyzers
func.platform = platform
[docs]class AnalyzeManager:
"""
Analyzer Manager Class. This is the main driver of analysis.
"""
ANALYZE_TIMEOUT = 3600 * 8 # Maximum seconds before timing out - set to 8 hours
WAIT_TIME = 1.15 # How much time to wait between check if the analysis is done
EXCEPTION_KEY = '__EXCEPTION__'
[docs] class TimeOutException(Exception):
"""
TimeOutException is raised when the analysis times out.
"""
pass
[docs] class ItemsNotReady(Exception):
"""
ItemsNotReady is raised when items to be analyzed are still running.
Notes:
TODO - Add doc_link
"""
pass
[docs] def __init__(self, platform: 'IPlatform' = None, configuration: dict = None,
ids: List[Tuple[str, ItemType]] = None,
analyzers: List[IAnalyzer] = None, working_dir: str = None,
partial_analyze_ok: bool = False, max_items: Optional[int] = None, verbose: bool = True,
force_manager_working_directory: bool = False,
exclude_ids: List[str] = None, analyze_failed_items: bool = False,
max_workers: Optional[int] = None, executor_type: str = 'process'):
"""
Initialize the AnalyzeManager.
Args:
platform (IPlatform): Platform
configuration (dict, optional): Initial Configuration. Defaults to None.
ids (Tuple[str, ItemType], optional): List of ids as pair of Tuple and ItemType. Defaults to None.
analyzers (List[IAnalyzer], optional): List of Analyzers. Defaults to None.
working_dir (str, optional): The working directory. Defaults to os.getcwd().
partial_analyze_ok (bool, optional): Whether partial analysis is ok. When this is True, Experiments in progress or Failed can be analyzed. Defaults to False.
max_items (int, optional): Max Items to analyze. Useful when developing and testing an Analyzer. Defaults to None.
verbose (bool, optional): Print extra information about analysis. Defaults to True.
force_manager_working_directory (bool, optional): [description]. Defaults to False.
exclude_ids (List[str], optional): [description]. Defaults to None.
analyze_failed_items (bool, optional): Allows analyzing of failed items. Useful when you are trying to aggregate items that have failed. Defaults to False.
max_workers (int, optional): Set the max workers. If not provided, falls back to the configuration item *max_threads*. If max_workers is not set in configuration, defaults to CPU count
executor_type: (str): Whether to use process or thread pooling. Process pooling is more efficient but threading might be required in some environments
"""
super().__init__()
if working_dir is None:
working_dir = os.getcwd()
if executor_type.lower() in ['process', 'thread']:
self.executor_type = executor_type.lower()
else:
raise ValueError(f'{executor_type} is not a valid type for executor_type. Choose either "process" or "thread"')
self.configuration = configuration or {}
# load platform from context or from passed in value
self.platform = platform
self.__check_for_platform_from_context(platform)
if max_workers is None:
# check for max workers on platform, then in common
if self.platform and hasattr(self.platform, '_config_block') and IdmConfigParser.get_option(self.platform._config_block, "max_workers", None):
self.configuration['max_workers'] = int(IdmConfigParser.get_option(self.platform._config_block, "max_workers", None))
elif IdmConfigParser().get_option('COMMON', 'max_workers', None):
self.configuration['max_workers'] = int(IdmConfigParser().get_option('COMMON', 'max_workers'))
# validate max_workers
if max_workers is not None and max_workers < 1:
raise ValueError("max_workers must be greater or equal to one")
# ensure max workers is int
self.max_processes = max_workers if max_workers is not None else self.configuration.get('max_workers', os.cpu_count())
if logger.isEnabledFor(DEBUG):
logger.debug(f'AnalyzeManager set to {self.max_processes}')
# Should we continue analyzing even when we encounter an error?
self.continue_on_error = False
# should we attempt to analyze failed items
self.analyze_failed_items = analyze_failed_items
# analyze at most this many items, regardless of how many have been given
self.max_items_to_analyze = max_items
# allows analysis to be performed even if some items are not ready for analysis
self.partial_analyze_ok = partial_analyze_ok or (self.max_items_to_analyze is not None)
# Each analyzers results will be in the working_dir directory if not specified by them directly.
# force_wd overrides this by forcing all results to be in working_dir .
self.working_dir = working_dir
self.force_wd = force_manager_working_directory
# Take the provided ids and determine the full set of unique root items (e.g. simulations) in them to analyze
logger.debug("Load information about items from platform")
ids = list(set(ids or list())) # uniquify
items: List[IEntity] = []
for oid, otype in ids:
logger.debug(f'Getting metadata for {oid} and {otype}')
item = self.platform.get_item(oid, otype, force=True, raw=True)
item.uid = str(item.id)
item.platform = self.platform
items.append(item)
self.potential_items: List[IEntity] = []
for i in items:
logger.debug(f'Flattening items for {i.uid}')
self.potential_items.extend(self.platform.flatten_item(item=i, raw=True))
# These are leaf items to be ignored in analysis. Prune them from analysis.
self.exclude_ids = exclude_ids or []
for index, oid in enumerate(self.exclude_ids):
self.exclude_ids[index] = str(oid)
self.potential_items = [item for item in self.potential_items if item.uid not in self.exclude_ids]
for item in self.potential_items:
item.platform = self.platform
logger.debug(f"Potential items to analyze: {len(self.potential_items)}")
self._items = dict() # filled in later by _get_items_to_analyze
self.analyzers = analyzers or list()
self.verbose = verbose
def __check_for_platform_from_context(self, platform) -> 'IPlatform': # noqa: F821
"""
Try to determine platform of current object from self or current platform.
Args:
platform: Passed in platform object
Raises:
NoPlatformException: when no platform is on current context
Returns:
Platform object
"""
if self.platform is None:
# check context for current platform
if platform is None:
from idmtools.core.context import CURRENT_PLATFORM
if CURRENT_PLATFORM is None:
raise NoPlatformException("No Platform defined on object, in current context, or passed to run")
platform = CURRENT_PLATFORM
self.platform = platform
return self.platform
[docs] def add_item(self, item: IEntity) -> NoReturn:
"""
Add an additional item for analysis.
Args:
item: The new item to add for analysis.
Returns:
None
"""
self.potential_items.extend(self.platform.flatten_item(item=item, raw=True))
def _get_items_to_analyze(self) -> Dict[str, IEntity]:
"""
Get a list of items derived from :meth:`self._items` that are available to analyze.
Returns:
A list of :class:`~idmtools.entities.iitem.IItem` objects.
"""
# First sort items by whether they can currently be analyzed
can_analyze = {}
cannot_analyze = {}
for item in self.potential_items:
valid = self.platform.validate_item_for_analysis(item, self.analyze_failed_items)
if valid:
can_analyze[item.uid] = item
else:
cannot_analyze[item.uid] = item
# now consider item limiting arguments
if self.partial_analyze_ok:
if self.max_items_to_analyze is not None:
return {item.uid: item for item in list(can_analyze.values())[0:self.max_items_to_analyze]}
return can_analyze
if len(cannot_analyze) > 0:
raise self.ItemsNotReady('There are %d items that cannot be analyzed and partial_analyze_ok is off.' %
len(cannot_analyze))
return can_analyze
[docs] def add_analyzer(self, analyzer: IAnalyzer) -> NoReturn:
"""
Add another analyzer to use on the items to be analyzed.
Args:
analyzer: An analyzer object (:class:`~idmtools.entities.ianalyzer.IAnalyzer`).
Returns:
None
"""
self.analyzers.append(analyzer)
def _update_analyzer_uids(self) -> NoReturn:
"""
Ensure that each analyzer has a unique ID in this context by updating them as needed.
Returns:
None
"""
unique_uids = {analyzer.uid for analyzer in self.analyzers}
if len(unique_uids) < len(self.analyzers):
for i in range(len(self.analyzers)):
self.analyzers[i].uid += f'-{i}'
logger.debug(f'Analyzer {i.__class__} id set to {self.analyzers[i].uid}')
def _initialize_analyzers(self) -> NoReturn:
"""
Do the steps needed to prepare analyzers for item analysis.
Returns:
None
"""
logger.debug("Initializing Analyzers")
# Setup the working directory and call initialize() on each analyzer
for analyzer in self.analyzers:
if self.force_wd:
analyzer.working_dir = self.working_dir
else:
analyzer.working_dir = analyzer.working_dir or self.working_dir
if logger.isEnabledFor(DEBUG):
logger.debug(f"Analyzer working directory set to {analyzer.working_dir}")
analyzer.initialize()
# make sure each analyzer in self.analyzers has a unique uid
self._update_analyzer_uids()
def _print_configuration(self, n_items: int, n_processes: int) -> NoReturn:
"""
Display some information about an ongoing analysis.
Args:
n_items: The number of items being analyzed.
n_processes: The number of active item processing handlers.
Returns:
None
"""
n_ignored_items = len(self.potential_items) - n_items
user_logger.log(VERBOSE, 'Analyze Manager')
user_logger.log(VERBOSE, f' | {n_items} item(s) selected for analysis')
user_logger.log(VERBOSE, f' | partial_analyze_ok is {self.partial_analyze_ok}, max_items is '
f'{self.max_items_to_analyze}, and {n_ignored_items} item(s) are being ignored')
user_logger.log(VERBOSE, ' | Analyzer(s): ')
for analyzer in self.analyzers:
user_logger.log(VERBOSE, f' | - {analyzer.uid} File parsing: {on_off(analyzer.parse)} / Use '
f'cache: {on_off(hasattr(analyzer, "cache"))}')
if hasattr(analyzer, 'need_dir_map'):
user_logger.log(VERBOSE, f' | (Directory map: {on_off(analyzer.need_dir_map)}')
user_logger.log(VERBOSE, f' | Pool of {n_processes} analyzing process(es)')
def _run_and_wait_for_mapping(self, executor) -> Tuple[Dict, bool]:
"""
Run and manage the mapping call on each item.
Args:
executor: A pool of workers.
Returns:
False if an exception occurred processing **.map** on any item; otherwise True (succeeded).
"""
# add items to process (map)
n_items = len(self._items)
logger.debug(f"Number of items for analysis: {n_items}")
logger.debug("Mapping the items for analysis")
futures = dict()
results = dict()
status = True
# create status bar and then queue our futures
with tqdm(total=len(self._items)) as progress:
for i in self._items.values():
future = executor.submit(map_item, i)
future.add_done_callback(lambda p: progress.update())
futures[future] = i
# wait on our futures to complete, catch exceptions, and aggregate results
for future in as_completed(futures.keys()):
if future.exception():
status = False
ex = future.exception()
user_logger.error(ex)
if not self.continue_on_error:
raise ex
else:
results[futures[future]] = future.result()
logger.debug(f"Result fetching status: : {status}")
return results, status
def _run_and_wait_for_reducing(self, executor, results) -> dict:
"""
Run and manage the reduce call on the combined item results (by analyzer).
Args:
executor: A pool of workers.
Returns:
An analyzer ID keyed dictionary of finalize results.
"""
# the keys in self.cache from map() calls are expected to be item ids. Each keyed value
# contains analyzer_id: item_results_for_analyzer entries.
logger.debug("Running reduce results")
futures = {}
finalize_results = {}
# create a progress bar
with tqdm(total=len(self.analyzers), desc="Running Analyzer Reduces") as progress:
# for each analyzer, queue our futures
for analyzer in self.analyzers:
logger.debug(f"Gather data for {analyzer.uid}")
item_data_for_analyzer = {}
for item, data in results.items():
if analyzer.uid in data:
item_data_for_analyzer[item] = data[analyzer.uid]
future = executor.submit(analyzer.reduce, item_data_for_analyzer)
future.add_done_callback(lambda p: progress.update())
logger.debug(f"Queueing {analyzer.uid}")
futures[future] = analyzer.uid
# wait on our futures, catch exceptions, and aggregate results
logger.debug("Waiting for results")
for future in as_completed(futures.keys()):
if future.exception():
user_logger.error(f'Reduce for Analyzer {futures[future]} failed')
user_logger.exception(future.exception())
user_logger.error("See log for details")
if not self.continue_on_error:
sys.exit(-1)
else:
finalize_results[futures[future]] = future.result()
if logger.isEnabledFor(DEBUG):
logger.debug("Finished reducing results")
for future in futures.keys():
future.cancel()
return finalize_results
[docs] def analyze(self) -> bool:
"""
Process the provided items with the provided analyzers. This is the main driver method of :class:`AnalyzeManager`.
Args:
kwargs: extra parameters
Returns:
True on success; False on failure/exception.
"""
start_time = time.time()
# If no analyzers or simulations have been provided, there is nothing to do
if len(self.analyzers) == 0:
user_logger.error('No analyzers were provided; cannot run analysis.')
return False
self._initialize_analyzers()
if len(self.potential_items) == 0:
user_logger.error('No items were provided; cannot run analysis.')
return False
# trim processing to those items that are ready and match requested limits
self._items: Dict[str, IEntity] = self._get_items_to_analyze()
if len(self._items) == 0:
user_logger.error('No items are ready; cannot run analysis.')
return False
# initialize mapping results cache/storage
n_items = len(self._items)
n_processes = min(self.max_processes, max(n_items, 1))
logger.info(f'Analyzing {n_items}')
# do any platform-specific initializations
logger.debug("Triggering per group functions")
for analyzer in self.analyzers:
analyzer.per_group(items=self._items)
if self.verbose:
self._print_configuration(n_items, n_processes)
no_print_config_exists = False
# Before we initialize processes, ensure no warning about config are set
if 'IDMTOOLS_NO_PRINT_CONFIG_USED' not in os.environ:
os.environ['IDMTOOLS_NO_PRINT_CONFIG_USED'] = "1"
os.environ['IDMTOOLS_HIDE_DEV_WARNING'] = "1"
os.environ['IDMTOOLS_NO_CONFIG_WARNING'] = "1"
else:
no_print_config_exists = True
# create worker pool
try:
# To ensure subprocesses reuse same config file, pass it through environment vars
config_file = IdmConfigParser().get_config_path()
if config_file:
os.environ['IDMTOOLS_CONFIG_FILE'] = config_file
# our options for our executor
opts = dict(max_workers=n_processes, initializer=pool_worker_initializer, initargs=(map_item, self.analyzers, self.platform))
# determine type. Most cases we want a process, but sometimes(like in Jupyter notebooks, we want to use threads)
if self.executor_type == 'process':
executor = ProcessPoolExecutor(**opts)
else:
executor = ThreadPoolExecutor(**opts)
map_results, status = self._run_and_wait_for_mapping(executor)
finalize_results = self._run_and_wait_for_reducing(executor, map_results)
finally:
# because of debug mode, we have to leave executor and let python handle the shutdown through del
# see https://youtrack.jetbrains.com/issue/PY-34432
os.environ['NO_LOGGING_INIT'] = 'n'
logger.debug("Shutting down workers")
for analyzer in self.analyzers:
analyzer.results = finalize_results[analyzer.uid]
logger.debug("Destroying analyzers")
for analyzer in self.analyzers:
analyzer.destroy()
if not no_print_config_exists:
del os.environ['IDMTOOLS_NO_PRINT_CONFIG_USED']
del os.environ['IDMTOOLS_HIDE_DEV_WARNING']
del os.environ['IDMTOOLS_NO_CONFIG_WARNING']
if 'IDMTOOLS_CONFIG_FILE' in os.environ:
del os.environ['IDMTOOLS_CONFIG_FILE']
if self.verbose:
total_time = time.time() - start_time
time_str = verbose_timedelta(total_time)
user_logger.log(SUCCESS, '\r | Analysis complete. Took {} '
'(~ {:.3f} per item)'.format(time_str, total_time / n_items))
return True