Source code for idmtools_platform_comps.utils.python_requirements_ac.create_asset_collection
"""idmtools create asset collection script.
This is part of the RequirementsToAssetCollection tool. This is ran on the SSMT to convert installed files to a AssetCollection.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import sys
from COMPS import Client
from COMPS.Data import AssetCollectionFile, QueryCriteria
from COMPS.Data import Experiment
from COMPS.Data.AssetCollection import AssetCollection
from idmtools.utils.hashing import calculate_md5
MD5_KEY = 'idmtools-requirements-md5-{}'
AC_FILE = 'ac_info.txt'
LIBRARY_ROOT_PREFIX = 'L'
[docs]def build_asset_file_list(prefix=LIBRARY_ROOT_PREFIX):
"""
Utility function to build all library files.
Args:
prefix: used to identify library files
Returns: file paths as a list
"""
output = []
for root, _, filenames in os.walk(prefix):
for filename in filenames:
asset = AssetCollectionFile(file_name=os.path.basename(filename),
relative_path=os.path.join("site-packages",
root.replace(prefix, "").strip("/")).strip("/"),
md5_checksum=calculate_md5(os.path.join(root, filename))
)
output.append(asset)
return output
[docs]def get_first_simulation_of_experiment(exp_id):
"""
Retrieve the first simulation from an experiment.
Args:
exp_id: use input (experiment id)
Returns: list of files paths
"""
comps_exp = Experiment.get(exp_id)
comps_sims = comps_exp.get_simulations(QueryCriteria().select_children('hpc_jobs'))
comps_sim = comps_sims[0]
return comps_sim
[docs]def main(): # pragma: no cover
"""Main entry point for our create asset collection script."""
print(sys.argv)
if len(sys.argv) < 3:
raise Exception(
"The script needs to be called with `python <model.py> <experiment_id> <endpoint> <os_str>'.\n{}".format(
" ".join(sys.argv)))
# Get the experiments
exp_id = sys.argv[1]
print('exp_id: ', exp_id)
# Get endpoint
endpoint = sys.argv[2]
print('endpoint: ', endpoint)
# Platform key
os_target = sys.argv[3]
print('os: ', os_target)
client = Client()
client.login(endpoint)
# Retrieve the first simulation of the experiment
comps_sim = get_first_simulation_of_experiment(exp_id)
print('sim_id: ', comps_sim.id)
# Build files metadata
base_path = os.path.join(comps_sim.hpc_jobs[-1].working_directory, LIBRARY_ROOT_PREFIX)
asset_files = build_asset_file_list(prefix=base_path)
print('asset files count: ', len(asset_files))
# Output files
max_files = 10
print('Display the first 10 files:\n',
"\n".join([f"{a.relative_path}/{a.file_name}" for a in asset_files[0:max_files]]))
# Retrieve experiment's tags
comps_exp = Experiment.get(exp_id, QueryCriteria().select_children('tags'))
exp_tags = comps_exp.tags
# Retrieve experiment's tags
_reserved_tag = ['idmtools', 'task_type', MD5_KEY.format(os_target)]
comps_exp = Experiment.get(exp_id, QueryCriteria().select_children('tags'))
user_tags = {key: value for key, value in comps_exp.tags.items() if key not in _reserved_tag}
# Get md5_str
md5_str = exp_tags.get(MD5_KEY.format(os_target), None)
# Collect ac's tags
ac = AssetCollection()
tags = {MD5_KEY.format(os_target): md5_str}
tags.update(user_tags)
ac.set_tags(tags)
# Create asset collection
for af in asset_files:
ac.add_asset(af)
sys.stdout.flush()
missing_files = ac.save(return_missing_files=True)
# If COMPS responds that we're missing some files, then try creating it again,
# uploading only the files that COMPS doesn't already have.
if missing_files:
print(f"Total of {len(ac.assets) - len(missing_files)} files currently in comps. Resolving missing files")
ac2 = AssetCollection()
ac2.set_tags(tags)
for acf in ac.assets:
if acf.md5_checksum in missing_files:
rp = acf.relative_path
fn = acf.file_name
acf2 = AssetCollectionFile(fn, rp, tags=acf.tags)
rfp = os.path.join(base_path, rp.replace("site-packages", "").strip(os.path.sep), fn)
ac2.add_asset(acf2, rfp)
else:
ac2.add_asset(acf)
print("\n\n\n=====================\nUploading files not in comps: " + "\n".join(
[f"{a.relative_path}/{a.file_name}" for a in ac2.assets if
a.md5_checksum in missing_files or a.md5_checksum is None]))
sys.stdout.flush()
ac2.save()
ac = ac2
# Output ac
print('ac_id: ', ac.id)
# write ac_id to file ac_info.txt
with open(AC_FILE, 'w') as outfile:
outfile.write(str(ac.id))
sys.stdout.flush()
if __name__ == "__main__": # pragma: no cover
main()