Source code for idmtools_slurm_utils.watcher
"""
Provides facility to watch bridge files.
Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import time
from logging import getLogger
from os import PathLike
from watchdog.events import FileSystemEventHandler
from watchdog_gevent import Observer
from idmtools_slurm_utils.utils import process_job
logger = getLogger()
[docs]class IdmtoolsJobWatcher:
"""
Watches the bridge directory and communicates jobs to slurm.
"""
[docs] def __init__(self, directory_to_watch: PathLike, directory_for_status: PathLike, check_every: int = 5):
"""
Creates our watcher.
Args:
directory_to_watch: Directory to sync from
directory_for_status: Directory for status messages
check_every: How often should directory be synced
"""
self.observer = Observer()
self._directory_to_watch = directory_to_watch
self._directory_for_status = directory_for_status
self._check_every = check_every
[docs] def run(self):
"""
Run the watcher.
"""
event_handler = IdmtoolsJobHandler(self._directory_for_status)
self.observer.schedule(event_handler, str(self._directory_to_watch), recursive=False)
self.observer.start()
try:
while True:
time.sleep(self._check_every)
except Exception as e:
self.observer.stop()
logger.exception(e)
self.observer.join()
[docs]class IdmtoolsJobHandler(FileSystemEventHandler):
"""
Handles messages about new messages.
"""
[docs] def __init__(self, directory_for_status: PathLike, cleanup_job: bool = True):
"""
Creates handler.
Args:
directory_for_status: Directory to use for status
cleanup_job: Should the job be cleaned up after submission
"""
self._directory_for_status = directory_for_status
self._cleanup_job = cleanup_job
super().__init__()
[docs] def on_created(self, event):
"""
On Created events.
Args:
event: Event details.
"""
if str(event.src_path).endswith(".json"):
process_job(event.src_path, self._directory_for_status, self._cleanup_job)