Source code for emod_api.spatialreports.spatial

#!/usr/bin/env python3

"""emod-api spatial report module. Exposes SpatialReport and SpatialNode objects."""

from pathlib import Path
from typing import Dict, List
import numpy as np


[docs]class SpatialNode(object): """ Class representing a single node of a spatial report. """ def __init__(self, node_id: int, data): self._id = node_id self._data = data return @property def id(self) -> int: """Node ID""" return self._id @property def data(self): """Time series data for this node.""" return self._data def __getitem__(self, item: int) -> float: """index into node data by time step""" return self._data[item] def __setitem__(self, key: int, value: float) -> None: """index into node data by time step""" self._data[key] = value return
NUM_STEPS_INDEX = 0 NUM_NODES_INDEX = 1
[docs]class SpatialReport(object): """ Class for reading (and, optionally, writing) spatial reports in EMOD/DTK format. "Filtered" reports will have start > 0 and/or reporting interval > 1. """ def __init__(self, filename: str = None, node_ids: List[int] = None, data: np.array=None, start:int = 0, interval:int = 1): """ Note: Args: filename: file from which to read data node_ids: list of node ids, must be integer values data: NumPy array of data, shape must be (#values, #nodes) start: time step of first sample (used with filtered reports) interval: # of time steps between samples (used with filtered reports) """ if isinstance(filename, str): self._from_file(filename) else: self._from_node_ids_and_data(node_ids, data, start, interval) return @property def data(self) -> np.array: """Returns full 2 dimensional NumPy array with report data. Shape is (#values, #nodes).""" return self._data @property def node_ids(self) -> List[int]: """Returns list of node IDs (integers) for nodes in the report.""" return self._node_ids @property def nodes(self) -> Dict[int, SpatialNode]: """Returns dictionary of SpatialNodes keyed on node ID.""" return self._nodes # index into report by node id def __getitem__(self, item: int) -> SpatialNode: return self._nodes[item] @property def node_count(self) -> int: """Number of nodes in the report.""" return self.data.shape[NUM_NODES_INDEX] @property def time_steps(self) -> int: """Number of samples in the report.""" return self.data.shape[NUM_STEPS_INDEX] @property def start(self) -> int: """Time step of first sample.""" return self._start @property def interval(self) -> int: """Interval, in time steps, between samples.""" return self._interval
[docs] def write_file(self, filename: str): """Save current nodes and timeseries data to given file.""" with open(filename, "wb") as file: np.array([self.node_count], dtype=np.uint32).tofile(file) np.array([self.time_steps], dtype=np.uint32).tofile(file) if self.start != 0 or self.interval != 1: np.array([self.start], dtype=np.float32).tofile(file) np.array([self.interval], dtype=np.float32).tofile(file) np.array([self.node_ids], dtype=np.uint32).tofile(file) self.data.tofile(file) return
def _from_file(self, filename: str): """ Read binary spatial report file. #nodes, #time steps, node ids (#nodes values), data (#nodes x #time steps values) """ # File format: # number of nodes - uint32 * 1 # number of time steps - uint32 * 1 # OPTIONAL: # starting time step - float32 * 1 (integral value in reality) # time step interval - float32 * 1 (integral value in reality) # node ids - uint32 * number of nodes # data - (float32 * number of nodes) * number of time_steps file_size = Path(filename).stat().st_size with open(filename, "rb") as file: num_nodes = np.fromfile(file, dtype=np.uint32, count=1)[0] num_time_steps = np.fromfile(file, dtype=np.uint32, count=1)[0] simple_size = (2 + num_nodes + (num_nodes * num_time_steps)) * 4 # num_nodes, num_time_steps, node_ids, and data filtered_size = simple_size + 8 # include starting time step and time step interval if file_size == simple_size: self._start = 0 self._interval = 1 elif file_size == filtered_size: self._start = int(np.fromfile(file, dtype=np.float32, count=1)[0]) self._interval = int(np.fromfile(file, dtype=np.float32, count=1)[0]) assert(self.start >= 0) assert(self.interval >= 1) else: raise RuntimeError(f"Unexpected file size {file_size}, expected {simple_size} (standard spatial report) or {filtered_size} (filtered spatial report.") node_ids = np.fromfile(file, dtype=np.uint32, count=num_nodes) data = np.fromfile(file, dtype=np.float32, count=num_nodes * num_time_steps) # let us index data[step, node] data = data.reshape((num_time_steps, num_nodes)) self._from_node_ids_and_data(node_ids, data, self._start, self._interval) return def _from_node_ids_and_data(self, node_ids: list, data: np.array, start: int, interval: int) -> None: assert _is_iterable(node_ids), "node_ids must be specified and iterable" concrete = list(node_ids) assert len(concrete) > 0, "node_ids must not be empty" assert all(map(lambda i: _isinteger(i), concrete)), "node_ids must be integers" assert len(set(concrete)) == len(concrete), "node_ids must be unique" self._node_ids = sorted(concrete) assert data.dtype is np.dtype("float32"), "data must be np.float32" assert data.shape[1] == len( self._node_ids ), "data shape must be (#values, #nodes)" self._data = data self._node_id_to_index_map = { node_ids[n]: n for n in range(data.shape[NUM_NODES_INDEX]) } self._nodes = { node_id: SpatialNode(node_id, data[:, self._node_id_to_index_map[node_id]]) for node_id in node_ids } assert int(start) >= 0, "start sample time must be >= 0" self._start = int(start) assert int(interval) >= 1, "sample interval must be >= 1" self._interval = int(interval) return
def _is_iterable(obj) -> bool: try: _ = iter(obj) return True except TypeError: return False def _isinteger(item) -> bool: return isinstance(item, (int, np.integer))