Source code for idmtools_platform_container.utils.job_history
"""
idmtools ContainerPlatform JobHistory Utility.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import diskcache
from pathlib import Path
from datetime import datetime
from typing import NoReturn, Dict, Tuple, List
from idmtools.core import ItemType
from idmtools.entities.experiment import Experiment
from idmtools_platform_container.utils.general import normalize_path, is_valid_uuid
from logging import getLogger
logger = getLogger(__name__)
user_logger = getLogger('user')
JOB_HISTORY_DIR = "idmtools_container_history"
[docs]def initialize():
"""
Initialization decorator for JobHistory.
Returns:
Wrapper function
"""
def wrap(func):
def wrapped_f(*args, **kwargs):
JobHistory.initialization()
value = func(*args, **kwargs)
return value
return wrapped_f
return wrap
[docs]class JobHistory:
"""Job History Utility for idmtools Container Platform."""
history = None
history_path = Path.home().joinpath(".idmtools").joinpath(JOB_HISTORY_DIR)
[docs] @classmethod
def initialization(cls):
"""Initialize JobHistory."""
if cls.history is None:
cls.history_path.mkdir(parents=True, exist_ok=True)
cls.history = diskcache.Cache(str(cls.history_path))
[docs] @classmethod
@initialize()
def save_job(cls, job_dir: str, container_id: str, experiment: Experiment, platform=None) -> NoReturn:
"""
Save job to history.
Args:
job_dir: job directory
container_id: container id
experiment: Experiment
platform: Platform
Returns:
NoReturn
"""
cache = cls.history
from idmtools.core.context import get_current_platform
if platform is None:
platform = get_current_platform()
# Get current datetime
current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_item = {"JOB_DIRECTORY": normalize_path(job_dir),
"SUITE_ID": experiment.parent_id,
"EXPERIMENT_DIR": normalize_path(platform.get_directory(experiment)),
"EXPERIMENT_NAME": experiment.name,
"EXPERIMENT_ID": experiment.id,
"CONTAINER": container_id,
"CREATED": current_datetime}
cache.set(experiment.id, new_item)
cache.close()
[docs] @classmethod
@initialize()
def get_job(cls, exp_id: str) -> Dict:
"""
Get job from history.
Args:
exp_id: Experiment ID
Returns:
job data in dict
"""
if not is_valid_uuid(exp_id):
return None
cache = cls.history
value, expire_time = cache.get(exp_id, expire_time=True)
if value is None:
if exp_id in list(cache):
logger.debug(f"Item {exp_id} expired.")
else:
logger.debug(f"Item {exp_id} not found.")
else:
local_expire_time = datetime.fromtimestamp(expire_time) if expire_time else None
expire_time_str = local_expire_time.strftime('%Y-%m-%d %H:%M:%S') if local_expire_time else None
if expire_time_str:
value['EXPIRE'] = expire_time_str
return value
[docs] @classmethod
def get_job_dir(cls, exp_id: str) -> str:
"""
Get job directory from history.
Args:
exp_id: Experiment ID
Returns:
job directory
"""
if not is_valid_uuid(exp_id):
user_logger.info(f"Invalid item id: {exp_id}")
return None
data = cls.get_job(exp_id)
if data is None:
return None
return data['JOB_DIRECTORY']
[docs] @classmethod
@initialize()
def get_item_path(cls, item_id: str) -> Tuple:
"""
Get item path from history.
Args:
item_id: Suite/Experiment/Simulation ID
Returns:
item path, item type
"""
if not is_valid_uuid(item_id):
logger.debug(f"Invalid item id: {item_id}")
return
cache = JobHistory.history
item = cache.get(item_id)
# Consider Experiment case
if item:
return Path(item['EXPERIMENT_DIR']), ItemType.EXPERIMENT
for key in cache:
value = cache.get(key)
suite_id = value.get('SUITE_ID')
exp_dir = value.get('EXPERIMENT_DIR')
# Consider Suite case
if suite_id == item_id:
return Path(exp_dir).parent, ItemType.SUITE
# Consider Simulation case
pattern = f'*{item_id}/metadata.json'
for meta_file in Path(exp_dir).glob(pattern=pattern):
sim_dir = meta_file.parent
return sim_dir, ItemType.SIMULATION
return None
[docs] @classmethod
@initialize()
def view_history(cls, container_id: str = None) -> List:
"""
View job history.
Args:
container_id: Container ID
Returns:
list of job data
"""
cache = cls.history
data = []
for key in cache:
value, expire_time = cache.get(key, expire_time=True)
if value is None:
if key in list(cache):
user_logger.info(f"Item {key} expired.")
else:
user_logger.info(f"Item {key} not found.")
continue
local_expire_time = datetime.fromtimestamp(expire_time) if expire_time else None
expire_time_str = local_expire_time.strftime('%Y-%m-%d %H:%M:%S') if local_expire_time else None
if expire_time_str:
value['EXPIRE'] = expire_time_str
if container_id is not None:
if value['CONTAINER'] == container_id:
data.append(value)
else:
data.append(value)
# Sort data by datetime
sorted_data = sorted(data, key=lambda x: datetime.strptime(x["CREATED"], "%Y-%m-%d %H:%M:%S"), reverse=True)
return sorted_data
[docs] @classmethod
@initialize()
def delete(cls, exp_id: str) -> NoReturn:
"""
Delete job from history.
Args:
exp_id: Experiment ID
Returns:
NoReturn
"""
cache = cls.history
cache.pop(exp_id)
cache.close()
[docs] @classmethod
@initialize()
def expire_history(cls, dt: str = None) -> NoReturn:
"""
Expire job history based on the input expiration time.
Args:
dt: datetime to expire (format like "2024-07-30 15:12:05")
Returns:
NoReturn
"""
from datetime import datetime
# Parse the datetime string into a datetime object
dt_object = datetime.strptime(dt, '%Y-%m-%d %H:%M:%S') if dt else None
# Convert the datetime object to a timestamp (seconds since epoch)
timestamp = dt_object.timestamp() if dt_object else None
cache = cls.history
cache.expire(now=timestamp)
cache.close()
[docs] @classmethod
@initialize()
def clear(cls, container_id: str = None) -> NoReturn:
"""
Clear job history.
Args:
container_id: Container ID
Returns:
NoReturn
"""
cache = cls.history
if container_id is None:
cache.clear()
else:
for key in cache:
value = cache.get(key)
if value is None:
user_logger.info(f"key {key} not found in cache")
continue
if value['CONTAINER'] == container_id:
cache.delete(key)
cache.close()
[docs] @classmethod
@initialize()
def volume(cls) -> NoReturn:
"""Clear job history."""
cache = cls.history
return cache.volume()
[docs] @classmethod
@initialize()
def sync(cls) -> NoReturn:
"""Sync job history."""
cache = cls.history
for key in cache:
values = cache.get(key)
exp_dir = values.get('EXPERIMENT_DIR')
root = Path(exp_dir)
if not root.exists():
cache.pop(key)
logger.debug(f"Remove job {key} from job history.")
cache.close()
[docs] @classmethod
@initialize()
def count(cls, container_id: str = None) -> int:
"""
Count job history.
Args:
container_id: Container ID
Returns:
job history count
"""
if container_id is None:
return len(cls.history)
else:
jobs = [key for key in cls.history if cls.history[key]['CONTAINER'] == container_id]
return len(jobs)
[docs] @classmethod
@initialize()
def container_history(cls) -> List:
"""List of job containers."""
cache = cls.history
data = {}
for key in cache:
value = cache[key]
container_id = value['CONTAINER']
if container_id not in data:
data[container_id] = []
data[container_id].append(key)
return data
[docs] @classmethod
@initialize()
def verify_container(cls, container_id) -> bool:
"""Verify history container."""
cache = cls.history
for key in cache:
value = cache[key]
if container_id.startswith(value['CONTAINER']):
return True
return False