Source code for laser_cholera.metapop.recorder

import gzip
import io
import logging
from datetime import datetime
from pathlib import Path
from types import MethodType
from typing import Union

import h5py as h5

logger = logging.getLogger(__name__)


[docs] class Recorder: def __init__(self, model) -> None: self.model = model return
[docs] def check(self): if not hasattr(self.model, "agents"): Warning("Recorder: model expected to have a 'agents' attribute.") if not hasattr(self.model, "patches"): Warning("Recorder: model expected to have a 'patches' attribute.") return
def __call__(self, model, tick: int) -> None: # write the current state of the model to an HDF5 file if it is the last tick of the simulation if tick == (model.params.nticks - 1): # To get output must a) specify hdf5_output = true in the params file and b) specify properties to return write_output = ( ("hdf5_output" in model.params) and model.params.hdf5_output and ("return" in model.params) and model.params["return"] ) if write_output: root = Path(model.params.outdir) if "outdir" in model.params and model.params.outdir else Path.cwd() root.mkdir(parents=True, exist_ok=True) filename = root / f"{datetime.now().strftime('%Y%m%d%H%M%S')}.h5" # noqa: DTZ005 if ("compress" in model.params) and model.params.compress: filename = save_compressed_hdf5_parameters(model, filename) else: filename = save_hdf5_parameters(model, filename) logger.info(f"Recorder: model state saved to {filename}") else: logger.info("Recorder: model state not saved to HDF5 file.") logger.info( f"\t'hdf5_output' {'is' if 'hdf5_output' in model.params else 'is not'} in the params file." + (f" hdf5_output = {model.params.hdf5_output}" if "hdf5_output" in model.params else "") ) logger.info( f"\t'return' {'is' if 'return' in model.params else 'is not'} in the params file." + (f" return = {model.params['return']}" if "return" in model.params else "") ) return
# def plot(self, fig: Figure = None): # pragma: no cover # yield # return
[docs] def save_hdf5_parameters(model, filename: Union[str, Path]) -> Path: # Step 1: Write HDF5 content to the file with h5.File(filename, "w") as h5file: save_hdf5(h5file, model) return filename # Unmodified
[docs] def save_compressed_hdf5_parameters(model, filename: Union[str, Path]) -> Path: # Step 1: Create an in-memory buffer for HDF5 hdf5_buffer = io.BytesIO() # Step 2: Write HDF5 content to the buffer with h5.File(hdf5_buffer, "w") as h5file: save_hdf5(h5file, model) # Step 3: Compress and save to disk filename = filename.with_name(filename.name + ".gz") with gzip.open(filename, "wb") as gz_file: gz_file.write(hdf5_buffer.getvalue()) return filename # With .gz extension
[docs] def save_hdf5(h5file: h5.File, model) -> None: for frame in ["agents", "patches"]: if not hasattr(model, frame): raise AttributeError(f"Recorder: model needs to have a '{frame}' attribute.") lf = getattr(model, frame) properties = dir(lf) # TODO - automagically write all properties if a subset is not specified properties = [p for p in properties if not p.startswith("_")] properties = [p for p in properties if not isinstance(getattr(lf, p), (MethodType))] # TODO - if a subset is specified, don't bother with the filtering above properties = [p for p in properties if p in model.params["return"]] group = h5file.create_group(frame) for prop in properties: logger.info(f"Recorder: saving {frame}.{prop} ...") group.create_dataset(prop, data=getattr(lf, prop)) return