Source code for idmtools.services.ipersistance_service

"""
IPersistenceService allows caching of items locally into a diskcache db that does not expire upon deletion.

Copyright 2021, Bill & Melinda Gates Foundation. All rights reserved.
"""
import os
import logging
import time
from pathlib import Path
from multiprocessing import cpu_count

import diskcache
from abc import ABCMeta

from idmtools.core import IDMTOOLS_USER_HOME

logger = logging.getLogger(__name__)


[docs]class IPersistenceService(metaclass=ABCMeta): """ IPersistenceService provides a persistent cache. This is useful for network heavy operations. """ cache_directory = None cache_name = None @classmethod def _open_cache(cls): """ Open cache. Returns: None """ import sqlite3 from idmtools import IdmConfigParser cls.cache_directory = Path( IdmConfigParser.get_option(option="cache_directory", fallback=IDMTOOLS_USER_HOME.joinpath("cache"))) # the more the cpus, the more likely we are to encounter a scaling issue. Let's try to scale with that up to # one second. above one second, we are introducing to much lag in processes default_timeout = min(max(0.25, cpu_count() * 0.025 * 2), 2) retries = 0 while retries < 5: try: os.makedirs(cls.cache_directory, exist_ok=True) cache = diskcache.FanoutCache(os.path.join(str(cls.cache_directory), 'disk_cache', cls.cache_name), timeout=default_timeout, shards=cpu_count() * 2) return cache except (sqlite3.OperationalError, FileNotFoundError): retries += 1 time.sleep(0.1)
[docs] @classmethod def retrieve(cls, uid): """ Retrieve item with id <uid> from cache. Args: uid: Id to fetch Returns: Item loaded from cache """ with cls._open_cache() as cache: obj = cache.get(uid, retry=True) return obj
[docs] @classmethod def save(cls, obj): """ Save an item to our cache. Args: obj: Object to save. Returns: Object uid """ with cls._open_cache() as cache: if logger.isEnabledFor(logging.DEBUG): logging.debug('Saving %s to %s', obj.uid, cls.cache_name) cache.set(obj.uid, obj, retry=True) return obj.uid
[docs] @classmethod def delete(cls, uid): """ Delete at item from our cache with id <uid>. Args: uid: Id to delete Returns: None """ with cls._open_cache() as cache: cache.delete(uid, retry=True)
[docs] @classmethod def clear(cls): """ Clear our cache. Returns: None """ with cls._open_cache() as cache: cache.clear(retry=True)
[docs] @classmethod def list(cls): """ List items in our cache. Returns: List of items in our cache """ with cls._open_cache() as cache: _list = list(cache) return _list
[docs] @classmethod def length(cls): """ Total length of our persistence cache. Returns: Count of our cache """ with cls._open_cache() as cache: _len = len(cache) return _len