Source code for idmtools_platform_comps.utils.assetize_output.assetize_ssmt_script
"""idmtools ssmt script.
This script is used on server side only and not meant to be ran on a local machine.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
# flake8: noqa F405 F403
import sys
from argparse import Namespace
from logging import getLogger
from COMPS.Data.WorkItem import RelationType
from typing import List, Dict
import os
from COMPS.Data import AssetCollectionFile, WorkItem
# we have to support two ways to load the utils. The first is load from Assets
# the fallback is the installed package
# this allows us to move forward with changes to utils without need for new images
try:
from file_filter import *
from common import *
except (FileNotFoundError, ImportError):
from idmtools_platform_comps.utils.ssmt_utils.file_filter import *
from idmtools_platform_comps.utils.ssmt_utils.common import *
logger = getLogger(__name__)
user_logger = getLogger('user')
[docs]class NoFileFound(Exception):
doc_link: str = "platforms/comps/errors.html#errors"
[docs]def create_asset_collection(file_list: SetOfAssets, ac_files: List[AssetCollectionFile], tags: Dict[str, str]): # pragma: no cover
"""
Args:
file_list:
ac_files: AC Files
tags: Tags to add
Returns:
"""
asset_collection = AssetCollection()
asset_collection.set_tags(tags)
# Maps checksum to AssetCollectionFile and the path on disk to file
asset_collection_map: Dict[uuid.UUID, Tuple[AssetCollectionFile, str, int]] = dict()
for file in file_list:
fn = os.path.basename(file[1])
acf = AssetCollectionFile(file_name=fn, relative_path=file[1].replace(fn, "").strip("/"), md5_checksum=file[2])
asset_collection_map[file[2]] = (acf, file[0], file[3])
asset_collection.add_asset(acf)
# add files from acs
for f in ac_files:
asset_collection.add_asset(f)
# do initial save to see what assets are in comps
missing_files = asset_collection.save(return_missing_files=True)
if missing_files:
user_logger.info(f"{len(missing_files)} not currently in COMPS as Assets")
ac2 = AssetCollection()
new_files = 0
total_to_upload = 0
for checksum, asset_details in asset_collection_map.items():
if checksum in missing_files:
new_files += 1
total_to_upload += asset_details[2]
acf = AssetCollectionFile(file_name=asset_details[0].file_name, relative_path=asset_details[0].relative_path)
ac2.add_asset(acf, asset_details[1])
else:
ac2.add_asset(asset_details[0])
user_logger.info(f"Saving {new_files} totaling {humanfriendly.format_size(total_to_upload)} assets to comps")
ac2.set_tags(tags)
ac2.save()
asset_collection = ac2
return asset_collection
[docs]def get_argument_parser():
p = get_common_parser("AssetizeOutputs")
p.add_argument("--asset-tag", action='append', help="List of tags as value pairs. Eg: ABC=123")
return p
if __name__ == "__main__": # pragma: no cover
# build our argument parser and then parse the command line
parser = get_argument_parser()
args = parser.parse_args()
# Set our JOB config global with config provided
JOB_CONFIG = vars(args)
# register our error handler
sys.excepthook = get_error_handler_dump_config_and_error(JOB_CONFIG)
print(args)
# Parse the common arguments common to filter scripts
entity_filter_func, fn_format_func = parse_filter_args_common(args)
# Parse our Tags
asset_tags = build_asset_tags(args)
# login
client = login_to_env()
# Load our workitem
wi = WorkItem.get(os.environ['COMPS_WORKITEM_GUID'])
# Gather all our files in Experiments, Simulations, and Asset Collections
files, files_from_ac = filter_files_and_assets(args, entity_filter_func, wi, fn_format_func)
if len(files) == 0 and len(files_from_ac) == 0:
raise NoFileFound("No files found with patterns specified. Please verify your filters.")
ensure_no_duplicates(files_from_ac, files)
if args.dry_run:
print_results(files_from_ac, files)
else:
ac = create_asset_collection(files, files_from_ac, tags=asset_tags)
with open('asset_collection.id', 'w') as o:
user_logger.info(ac.id)
o.write(str(ac.id))
wi.add_related_asset_collection(ac.id, RelationType.Created)