Source code for laser.cholera.metapop.recorder
import gzip
import io
import logging
from datetime import datetime
from pathlib import Path
from types import MethodType
from typing import Union
import h5py as h5
logger = logging.getLogger(__name__)
[docs]
class Recorder:
def __init__(self, model) -> None:
self.model = model
return
[docs]
def check(self):
if not hasattr(self.model, "people"):
Warning("Recorder: model expected to have a 'people' attribute.")
if not hasattr(self.model, "patches"):
Warning("Recorder: model expected to have a 'patches' attribute.")
return
def __call__(self, model, tick: int) -> None:
# write the current state of the model to an HDF5 file if it is the last tick of the simulation
if tick == (model.params.nticks - 1):
# To get output must a) specify hdf5_output = true in the params file and b) specify properties to return
write_output = ("hdf5_output" in model.params) and model.params.hdf5_output and ("return" in model.params) and model.params["return"]
if write_output:
root = Path(model.params.outdir) if "outdir" in model.params and model.params.outdir else Path.cwd()
root.mkdir(parents=True, exist_ok=True)
filename = root / f"{datetime.now().strftime('%Y%m%d%H%M%S')}.h5" # noqa: DTZ005
if ("compress" in model.params) and model.params.compress:
filename = save_compressed_hdf5_parameters(model, filename)
else:
filename = save_hdf5_parameters(model, filename)
logger.info(f"Recorder: model state saved to {filename}")
else:
logger.info("Recorder: model state not saved to HDF5 file.")
logger.info(
f"\t'hdf5_output' {'is' if 'hdf5_output' in model.params else 'is not'} in the params file."
+ (f" hdf5_output = {model.params.hdf5_output}" if "hdf5_output" in model.params else "")
)
logger.info(
f"\t'return' {'is' if 'return' in model.params else 'is not'} in the params file."
+ (f" return = {model.params['return']}" if "return" in model.params else "")
)
return
# def plot(self, fig: Figure = None): # pragma: no cover
# yield
# return
[docs]
def save_hdf5_parameters(model, filename: Union[str, Path]) -> Path:
# Step 1: Write HDF5 content to the file
with h5.File(filename, "w") as h5file:
save_hdf5(h5file, model)
return filename # Unmodified
[docs]
def save_compressed_hdf5_parameters(model, filename: Union[str, Path]) -> Path:
# Step 1: Create an in-memory buffer for HDF5
hdf5_buffer = io.BytesIO()
# Step 2: Write HDF5 content to the buffer
with h5.File(hdf5_buffer, "w") as h5file:
save_hdf5(h5file, model)
# Step 3: Compress and save to disk
filename = filename.with_name(filename.name + ".gz")
with gzip.open(filename, "wb") as gz_file:
gz_file.write(hdf5_buffer.getvalue())
return filename # With .gz extension
[docs]
def save_hdf5(h5file: h5.File, model) -> None:
for frame in ["people", "patches"]:
if not hasattr(model, frame):
raise AttributeError(f"Recorder: model needs to have a '{frame}' attribute.")
lf = getattr(model, frame)
properties = dir(lf)
# TODO - automagically write all properties if a subset is not specified
properties = [p for p in properties if not p.startswith("_")]
properties = [p for p in properties if not isinstance(getattr(lf, p), (MethodType))]
# TODO - if a subset is specified, don't bother with the filtering above
properties = [p for p in properties if p in model.params["return"]]
group = h5file.create_group(frame)
for prop in properties:
logger.info(f"Recorder: saving {frame}.{prop} ...")
group.create_dataset(prop, data=getattr(lf, prop))
return