Source code for idmtools_platform_comps.utils.ssmt_utils.file_filter

"""idmtools ssmt file filter tools.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import argparse
import glob
import json
import os
import uuid
from collections import defaultdict
from concurrent.futures._base import as_completed, Future
from concurrent.futures.thread import ThreadPoolExecutor
from logging import DEBUG, getLogger
from pathlib import PurePath
from typing import List, Tuple, Set, Callable
import humanfriendly
from COMPS.Data import WorkItem, Experiment, Simulation, AssetCollectionFile, AssetCollection, QueryCriteria, CommissionableEntity
from COMPS.Data.Simulation import SimulationState
from COMPS.Data.WorkItem import RelationType
from tabulate import tabulate
from tqdm import tqdm
from idmtools_platform_comps.utils.file_filter_workitem import FilenameFormatFunction

try:
    from common import setup_verbose
except (FileNotFoundError, ImportError):
    from idmtools_platform_comps.utils.ssmt_utils.common import setup_verbose

logger = getLogger(__name__)
user_logger = getLogger('user')
# Our Asset Tuple we use to gather data on files
# The format is Source Filename, Destination Filename, Checksum, and then Filesize
AssetTuple = Tuple[str, str, uuid.UUID, int]
SetOfAssets = Set[AssetTuple]
# Store the Done State
DONE_STATE = [SimulationState.Failed, SimulationState.Canceled, SimulationState.Succeeded]
HPC_JOBS_QUERY = QueryCriteria().select_children(['tags', 'hpc_jobs']).orderby('date_created desc')

# Define our function that can be used as callbacks for filtering entities
EntityFilterFunc = Callable[[CommissionableEntity.CommissionableEntity], bool]


[docs]def get_common_parser(app_description): """Creates a get common argument parser used with file filter function.""" parser = argparse.ArgumentParser(app_description) parser.add_argument("--file-pattern", nargs="+", help="File Pattern to Filter") parser.add_argument("--exclude-pattern", default=None, nargs="+", help="Exclude File Pattern to Filter") parser.add_argument( "--simulation-prefix-format-str", default="{simulation.id}", help="Format for prefix of outputs from simulations. Defaults to the simulation id. When setting, you have access to the full simulation object. " "If you are filtering an experiment, you have also have access to experiment object" ) parser.add_argument("--no-simulation-prefix", default=False, action='store_true', help="No simulation prefix. Be careful because this could cause file collisions") parser.add_argument("--work-item-prefix-format-str", default=None, help="Format for prefix of workitem outputs. Defaults to None. Useful when combining outputs of multiple work-items") parser.add_argument("--assets", default=False, action='store_true', help="Include Assets") parser.add_argument("--verbose", default=False, action="store_true", help="Verbose logging") parser.add_argument("--pre-run-func", default=None, action='append', help="List of function to run before starting analysis. Useful to load packages up in docker container before run") parser.add_argument("--entity-filter-func", default=None, help="Name of function that can be used to filter items") parser.add_argument("--filename-format-func", default=None, help="Name of function that can be used to format filenames") parser.add_argument("--dry-run", default=False, action="store_true", help="Find files, but don't add") return parser
[docs]def gather_files(directory: str, file_patterns: List[str], exclude_patterns: List[str] = None, assets: bool = False, prefix: str = None, filename_format_func: FilenameFormatFunction = None) -> SetOfAssets: """ Gather file_list. Args: directory: Directory to gather from file_patterns: List of file patterns exclude_patterns: List of patterns to exclude assets: Should assets be included prefix: Prefix for file_list filename_format_func: Function that can format the filename Returns: Return files that match patterns. """ from idmtools.utils.hashing import calculate_md5 file_list = set() # Loop through our patterns for pattern in file_patterns: # User glob to search each directory using the pattern. We also do full recursion here sd = os.path.join(directory, pattern) if logger.isEnabledFor(DEBUG): logger.debug(f'Looking for files with pattern {sd}') for file in glob.iglob(sd, recursive=True): # Ensure it is a file and not a directory if logger.isEnabledFor(DEBUG): logger.debug(f'{file.encode("ascii", "ignore").decode("utf-8")} matching pattern. Is Dir: {os.path.isdir(file)}. Is Link: {os.path.islink(file)}') if os.path.isfile(file): if logger.isEnabledFor(DEBUG): logger.debug(f'Found file {file.encode("ascii", "ignore").decode("utf-8")}') # Create our shortname. This will remove the base directory from the file. Eg # If are scanning C:\ABC\, the file C:\ABC\DEF\123.txt will be DEF\123.txt short_name = file.replace(directory + os.path.sep, "") # Setup destination name which is just joining prefix if it exists dest_name = os.path.join(prefix if prefix else '', short_name) filesize = os.stat(file).st_size if filename_format_func: dest_name = filename_format_func(dest_name) # Process assets separately than regular files if short_name.startswith("Assets"): if assets: file_list.add((file, dest_name, uuid.UUID(calculate_md5(file)), filesize)) else: file_list.add((file, dest_name, uuid.UUID(calculate_md5(file)), filesize)) # Now strip file that match exclude patterns. We do this after since the regular expressions here are a bit more expensive, so a we are at the # minimum possible files we must scan as this point. We did possible calculate extra md5s here if logger.isEnabledFor(DEBUG): logger.debug(f"File count before excluding: {len(file_list)} in {directory}") result = set([f for f in file_list if not is_file_excluded(f[0], exclude_patterns)]) if logger.isEnabledFor(DEBUG): logger.debug(f"File count after excluding: {len(file_list)} in {directory}") return result
[docs]def is_file_excluded(filename: str, exclude_patterns: List[str]) -> bool: """ Is file excluded by excluded patterns. Args: filename: File to filter exclude_patterns: List of file patterns to exclude Returns: True is file is excluded """ for pattern in exclude_patterns: if PurePath(filename.lower()).match(pattern.lower()): return True return False
[docs]def filter_experiments(assets: bool, entity_filter_func: EntityFilterFunc, exclude_patterns_compiles: List, file_patterns: List[str], futures: List[Future], pool: ThreadPoolExecutor, simulation_prefix_format_str: str, work_item: WorkItem, filename_format_func: FilenameFormatFunction): # pragma: no cover """ Filter Experiments outputs using our patterns. Args: assets: Assets to filter entity_filter_func: Function to filter functions exclude_patterns_compiles: List of patterns to exclude file_patterns: File patterns to match futures: Future queue pool: Pool to execute jobs on simulation_prefix_format_str: Format string for prefix of Simulations work_item: Parent WorkItem filename_format_func: Function to filter filenames Returns: None """ # Start with experiments since they are the most complex portion experiments: List[Experiment] = work_item.get_related_experiments() for experiment in experiments: if logger.isEnabledFor(DEBUG): logger.debug(f'Running filter on {experiment.name}/{experiment.id}') if entity_filter_func(experiment): if logger.isEnabledFor(DEBUG): logger.debug(f'Loading simulations for filter on {experiment.name}/{experiment.id}') # Fetch simulations with the hpc_jobs criteria. This allows us to lookup the directory simulations = experiment.get_simulations(HPC_JOBS_QUERY) if logger.isEnabledFor(DEBUG): logger.debug(f"Total simulations to evaluate {len(simulations)}") if len(simulations) > 0: filter_experiment_assets(work_item, assets, entity_filter_func, exclude_patterns_compiles, experiment, file_patterns, futures, pool, simulation_prefix_format_str, simulations, filename_format_func=filename_format_func) # Loop through each simulation and queue it up for file matching filter_simulation_list(assets, entity_filter_func, exclude_patterns_compiles, file_patterns, futures, pool, simulation_prefix_format_str, simulations, work_item, experiment=experiment, filename_format_func=filename_format_func)
[docs]def get_simulation_prefix(parent_work_item: WorkItem, simulation: Simulation, simulation_prefix_format_str: str, experiment: Experiment = None) -> str: """ Get Simulation Prefix. Args: parent_work_item: Parent workitem simulation: Simulation to form simulation_prefix_format_str: Prefix format string experiment: Optional experiment to be used with the Returns: Name of the simulation """ prefix = simulation_prefix_format_str.format(simulation=simulation, experiment=experiment, parent_workitem=parent_work_item) if simulation_prefix_format_str else None if logger.isEnabledFor(DEBUG): logger.debug(f'Simulation Prefix: {prefix}') return prefix
[docs]def filter_experiment_assets( work_item: WorkItem, assets: bool, entity_filter_func: EntityFilterFunc, exclude_patterns_compiles: List, experiment: Experiment, file_patterns: List[str], futures: List[Future], pool: ThreadPoolExecutor, simulation_prefix_format_str: str, simulations: List[Simulation], filename_format_func: FilenameFormatFunction): # pragma: no cover """ Filter experiment assets. This method uses the first simulation to gather experiment assets. Args: work_item: Parent Workitem assets: Whether assets should be matched entity_filter_func: Entity Filter Function exclude_patterns_compiles: List of files to exclude experiment: Experiment file_patterns: File patterns to filter futures: List of futures pool: Pool to submit search jobs to simulation_prefix_format_str: Format string for simulation simulations: List of simulations filename_format_func: Name function for filename Returns: None """ # If we should gather assets, use the first simulation. It means we will duplicate some work, but the set will filter out duplicated if assets: # find the first simulation we can use to gather assets from i = 0 while not entity_filter_func(simulations[i]) and i < len(simulations): i += 1 if i < len(simulations): simulation = simulations[i] if logger.isEnabledFor(DEBUG): logger.debug(f'Loading assets for {experiment.name} from simulation {simulation.id}') # create prefix from the format var prefix = get_simulation_prefix(work_item, simulation, simulation_prefix_format_str, experiment) futures.append(pool.submit(gather_files, directory=simulation.hpc_jobs[0].working_directory, file_patterns=file_patterns, exclude_patterns=exclude_patterns_compiles, assets=assets, prefix=prefix, filename_format_func=filename_format_func))
[docs]def filter_simulations_files(assets: bool, entity_filter_func: EntityFilterFunc, exclude_patterns_compiles: List, file_patterns: List[str], futures: List[Future], pool: ThreadPoolExecutor, simulation_prefix_format_str: str, work_item: WorkItem, filename_format_func: FilenameFormatFunction): # pragma: no cover """ Filter Simulations files. Args: assets: Whether assets should be matched entity_filter_func: Entity Filter Function exclude_patterns_compiles: List of files to exclude file_patterns: File patterns to filter futures: List of futures pool: Pool to submit search jobs to simulation_prefix_format_str: Format string for simulation work_item: filename_format_func: Filename function Returns: None """ # Here we loop through simulations directly added by user. We do not simulations: List[Simulation] = work_item.get_related_simulations() filter_simulation_list(assets, entity_filter_func, exclude_patterns_compiles, file_patterns, futures, pool, simulation_prefix_format_str, simulations, work_item, filename_format_func=filename_format_func)
[docs]def filter_simulation_list(assets: bool, entity_filter_func: EntityFilterFunc, exclude_patterns_compiles: List, file_patterns: List[str], futures: List[Future], pool: ThreadPoolExecutor, simulation_prefix_format_str: str, simulations: List[Simulation], work_item: WorkItem, experiment: Experiment = None, filename_format_func: FilenameFormatFunction = None): # pragma: no cover """ Filter simulations list. This method is used for experiments and simulations. Args: assets: Whether assets should be matched entity_filter_func: Entity Filter Function exclude_patterns_compiles: List of files to exclude file_patterns: File patterns to filter futures: List of futures pool: Pool to submit search jobs to simulation_prefix_format_str: Format string for simulation simulations: List of simulations work_item: Parent workitem experiment: Optional experiment. filename_format_func: Filename function Returns: None """ for simulation in simulations: if entity_filter_func(simulation): if logger.isEnabledFor(DEBUG): logger.debug(f'Loading outputs from Simulation {simulation.id} with status of {simulation.state}') prefix = get_simulation_prefix(parent_work_item=work_item, experiment=experiment, simulation=simulation, simulation_prefix_format_str=simulation_prefix_format_str) if simulation.hpc_jobs is None: simulation = simulation.get(simulation.id, HPC_JOBS_QUERY) futures.append(pool.submit(gather_files, directory=simulation.hpc_jobs[0].working_directory, file_patterns=file_patterns, exclude_patterns=exclude_patterns_compiles, assets=assets, prefix=prefix, filename_format_func=filename_format_func))
[docs]def filter_work_items_files(assets: bool, entity_filter_func: EntityFilterFunc, exclude_patterns_compiles: List, file_patterns: List[str], futures: List[Future], pool: ThreadPoolExecutor, work_item: WorkItem, work_item_prefix_format_str: str, filename_format_func: FilenameFormatFunction): # pragma: no cover """ Filter work items files. Args: assets: Whether assets should be matched entity_filter_func: Entity Filter Function exclude_patterns_compiles: List of files to exclude file_patterns: File patterns to filter futures: List of futures pool: Pool to submit search jobs to work_item: WorkItem work_item_prefix_format_str: WorkItemPrefix filename_format_func: Filename function Returns: None """ # Here we loop through workitems work_items: List[WorkItem] = work_item.get_related_work_items() for related_work_item in work_items: if entity_filter_func(related_work_item): if logger.isEnabledFor(DEBUG): logger.debug(f'Loading outputs from WorkItem {related_work_item.name} - {related_work_item.id}') prefix = work_item_prefix_format_str.format(work_item=related_work_item, parent_work_item=work_item) if work_item_prefix_format_str else None futures.append(pool.submit(gather_files, directory=related_work_item.working_directory, file_patterns=file_patterns, exclude_patterns=exclude_patterns_compiles, assets=assets, prefix=prefix, filename_format_func=filename_format_func))
[docs]def filter_ac_files(wi: WorkItem, patterns, exclude_patterns) -> List[AssetCollectionFile]: # pragma: no cover """ Filter Asset Collection File. Args: wi: WorkItem patterns: File patterns exclude_patterns: Exclude patterns Returns: List of filters asset collection files """ if logger.isEnabledFor(DEBUG): logger.debug('Filtering asset collections') relates_acs: List[AssetCollection] = wi.get_related_asset_collections(relation_type=RelationType.DependsOn) filtered_ac_files = set() for ac in relates_acs: ac = ac.get(ac.id, QueryCriteria().select_children("assets")) for file in ac.assets: file_path = get_asset_file_path(file) for pattern in patterns: if PurePath(file_path).match(pattern): filtered_ac_files.add(file) # break out of pattern loop since there was a match break return [f for f in filtered_ac_files if not is_file_excluded(get_asset_file_path(f), exclude_patterns)]
[docs]def get_asset_file_path(file): """ Get asset file path which combined the relative path and filename if relative path is set. Otherwise we use just the filename. Args: file: Filename Returns: Filename """ return os.path.join(file.relative_path, file.file_name) if file.relative_path else file.file_name
[docs]class DuplicateAsset(Exception): """Error for when we encountered output paths that overlap.""" doc_link: str = "platforms/comps/assetize_output.html#errors"
[docs]def ensure_no_duplicates(ac_files, files): # pragma: no cover """ Ensure no duplicates are in asset. Args: ac_files: Ac files files: Simulation/Experiment/Workitem files Returns: None Raises: DuplicateAsset - if asset with same output path is found """ dest_paths = defaultdict(int) for file in ac_files: fn = os.path.join(file.relative_path, file.file_name) if file.relative_path else file.file_name dest_paths[fn] += 1 for file in files: dest_paths[file[1]] += 1 # we should have one count for all items(1). If we have more than one count, than there are duplicates if any([x > 1 for x in set(dest_paths.values())]): duplicate_assets = [x for x, count in dest_paths.items() if count > 1] error_files = [] # match up to assets for asset in ac_files: fn = os.path.join(asset.relative_path, asset.file_name) if asset.relative_path else asset.file_name if fn in duplicate_assets: error_files.append(f'{fn} from Asset Collections') for file in files: if file[1] in duplicate_assets: error_files.append(f'{file[1]}<{file[0]}> from Experiment, Simulation, or WorkItem') nl = "\n" raise DuplicateAsset(f"The following assets have duplicate destination paths:{nl} {nl.join(sorted(error_files))}")
[docs]def apply_custom_filters(args: argparse.Namespace): """ Apply user defined custom filter functions. The function does the following workflow. 1. Check if there is a pre_run_func(s) defined. 1b) If there are pre-run funcs, run each of those 2) Is there an entity_filter_func. This function allows us to filter items(Experiment/Simulations/etc) directly. If not defined, we use a default function returns true. 3) If filename format function is defined, we set that, otherwise we use the default which just uses the original file name Args: args: argparse namespace. Returns: entity_filter_func and filename format func """ if args.pre_run_func: import pre_run for pre_run_func in args.pre_run_func: if logger.isEnabledFor(DEBUG): logger.debug(f"Calling PreRunFunc: {pre_run_func}") getattr(pre_run, args.pre_run_func)() # set a default filter function that returns true if none are set if args.entity_filter_func: import entity_filter_func entity_filter_func = getattr(entity_filter_func, args.entity_filter_func) else: if logger.isEnabledFor(DEBUG): logger.debug("Setting default filter function") def default_filter_func(x): return True entity_filter_func = default_filter_func if args.filename_format_func: import filename_format_func fn_format_func = getattr(filename_format_func, args.filename_format_func) else: def default_format_func(s: str): return s fn_format_func = default_format_func return entity_filter_func, fn_format_func
[docs]def parse_filter_args_common(args: argparse.Namespace): """ Parse filter arguments from an argparse namespace. We need this because we use filtering across multiple scripts. Args: args: Argparse args Returns: entity_filter_func and filename formart func """ if args.verbose: setup_verbose(args) if "**" in args.file_pattern: args.file_pattern = ["**"] entity_filter_func, fn_format_func = apply_custom_filters(args) for i, a in enumerate(args.exclude_pattern): args.exclude_pattern[i] = a.replace("\\*", "*") for i, a in enumerate(args.file_pattern): if a.startswith("'") and a.endswith("'"): args.file_pattern[i] = a.replace("\\*", "*") for i in ['simulation_prefix_format_str', 'work_item_prefix_format_str']: si = getattr(args, i) if si and si.startswith("'") and si.endswith("'"): si = si.strip("'") setattr(args, i, si) return entity_filter_func, fn_format_func
[docs]def filter_files_and_assets(args: argparse.Namespace, entity_filter_func: EntityFilterFunc, wi: WorkItem, filename_format_func: FilenameFormatFunction) -> Tuple[SetOfAssets, List[AssetCollectionFile]]: """ Filter files and assets using provided parameters. Args: args: Argparse details entity_filter_func: Optional filter function for entities. This function is ran on every item. If it returns true, we return the item wi: WorkItem we are running in filename_format_func: Filename format function allows use to customize how we filter filenames for output. Returns: Files that matches the filter and the assets that matches the filter as well. """ files = gather_files_from_related( wi, file_patterns=args.file_pattern, exclude_patterns=args.exclude_pattern if args.exclude_pattern else [], assets=args.assets, work_item_prefix_format_str=args.work_item_prefix_format_str, simulation_prefix_format_str=args.simulation_prefix_format_str if not args.no_simulation_prefix else None, entity_filter_func=entity_filter_func, filename_format_func=filename_format_func ) files_from_ac: List[AssetCollectionFile] = filter_ac_files(wi, args.file_pattern, args.exclude_pattern) return files, files_from_ac