Source code for idmtools.analysis.map_worker_entry
"""
We define our map entry items here for analysis framework.
Most of these function are used either to initialize a thread or to handle exceptions while executing.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import itertools
from logging import getLogger, DEBUG
from idmtools.core.interfaces.ientity import IEntity
from idmtools.utils.file_parser import FileParser
from typing import TYPE_CHECKING, Dict
from idmtools.core.interfaces.iitem import IItem
from idmtools.entities.ianalyzer import TAnalyzerList
if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
logger = getLogger(__name__)
[docs]def map_item(item: IItem) -> Dict[str, Dict]:
"""
Initialize some worker-global values; a worker process entry point for analyzer item-mapping.
Args:
item: The item (often simulation) to process.
Returns:
Dict[str, Dict]
"""
# Retrieve the global variables coming from the pool initialization
if logger.isEnabledFor(DEBUG):
logger.debug(f"Init item {item.uid} in worker")
analyzers = map_item.analyzers
platform = map_item.platform
if item.platform is None:
item.platform = platform
return _get_mapped_data_for_item(item, analyzers, platform)
def _get_mapped_data_for_item(item: IEntity, analyzers: TAnalyzerList, platform: 'IPlatform') -> Dict[str, Dict]:
"""
Get mapped data from an item.
Args:
item: The :class:`~idmtools.entities.iitem.IItem` object to call analyzer
:meth:`~idmtools.analysis.AddAnalyzer.map` methods on.
analyzers: The :class:`~idmtools.analysis.IAnalyzer` items with
:meth:`~idmtools.analysis.AddAnalyzer.map` methods to call on the provided items.
platform: A platform object to query for information.
Returns:
Dict[str, Dict] - Array mapping file data to from str to contents
"""
try:
# determine which analyzers (and by extension, which filenames) are applicable to this item
# ensure item has a platform
item.platform = platform
analyzers_to_use = [a for a in analyzers if a.filter(item)]
analyzer_uids = [a.uid for a in analyzers]
filenames = set(itertools.chain(*(a.filenames for a in analyzers_to_use)))
filenames = [f.replace("\\", '/') for f in filenames]
if logger.isEnabledFor(DEBUG):
logger.debug(f"Analyzers to use on item: {str(analyzer_uids)}")
logger.debug(f"Filenames to analyze: {filenames}")
# The byte_arrays will associate filename with content
if len(filenames) > 0:
file_data = platform.get_files(item, filenames)
else:
file_data = dict()
# Selected data will be a dict with analyzer.uid: data entries
selected_data = {}
for analyzer in analyzers_to_use:
# If the analyzer needs the parsed data, parse
if analyzer.parse:
logger.debug(f'Parsing content for {analyzer.uid}')
data = {filename: FileParser.parse(filename, content) for filename, content in file_data.items() if filename in analyzer.filenames}
else:
# If the analyzer doesnt wish to parse, give the raw data
data = {filename: content for filename, content in file_data.items() if filename in analyzer.filenames}
# run the mapping routine for this analyzer and item
logger.debug("Running map on selected data")
selected_data[analyzer.uid] = analyzer.map(data, item)
# Store all analyzer results for this item in the result cache
if logger.isEnabledFor(DEBUG):
logger.debug(f"Setting result to cache on {item.id}")
logger.debug(f"Wrote Setting result to cache on {item.id}")
except Exception as e:
e.item = item
logger.error(e)
raise e
return selected_data