# ==============================================================================
# SpatialBinary.py - python wrapper for IDM spatial binary reports
# ==============================================================================
"""SpatialBinary.py
This file contains:
* SpatialBinary - a wrapper for SpatialReport DTK output files.
SpatialBinary is a Python wrapper for DTK SpatialReport_* files. It can both
read and write them, and can combine them using combiner functions to make new
SpatialReports.
Usage::
spatial_binary = SpatialBinary(path.combine(my_dir,
"SpatialReport_Prevalence.bin"))
print spatial_binary
"""
# imports
from builtins import range
from builtins import object
import struct
import os
import array
import copy
import math
import sys
# ==============================================================================
# SpatialBinary - a class to hold IDM spatial binary report
# ==============================================================================
[docs]class SpatialBinary(object):
"""Class to hold DTK spatial binary report data.
The class is constructed with the path to the report file in question.
Thereafter the public data members described below may be used to directly
access (or change) the data.
Additionally, since SpatialBinary implements __len__, __iter__, and
__getitem__, the object can be treated like an array on timestep, e.g.::
timestep_rec = spatial_binary[timestep] # Obtain one timestep
The resulting timestep_rec is a dictionary<node_id, channel_value>.
Public members:
drop_zeros (bool): True: drop zero values from in-memory representation.
source_file (str): A copy of the file_path that was used to construct
the SpatialBinary object.
channel_name (str): The channel name, pulled from source_file.
node_count (int): The number of nodes in the SpatialBinary's node table.
value_min (float): The minimum value for all nodes * timesteps.
value_max (float): The maximum value for all nodes * timesteps.
timesteps (array): Array of dictionaries containing the spatial report's
data.
"""
def __init__(self, file_path="", drop_zeros=False, excluded_node_ids=None,
verbose=False):
"""Construct a SpatialBinary.
Args:
file_path (str): The path to the SpatialReport file.
drop_zeros (bool): If true, zero values will not appear in the
timestep dictionaries. This can dramatically reduce the in-
memory size of the object, but requires an extra step on
retrieval of doing node_id in timestep_rec to avoid an
exception.
excluded_node_ids (list): An iterable (e.g. array or list) of
node_ids that should be excluded from the min/max statistics for
this spatial binary. Note that these nodes are still present in
the timesteps array, they just do not influence min/max. This is
typically used for removing outlier nodes, such as the
"elsewhere" node in simulations involving external migration.
verbose (bool)
Raises:
I/O and other exceptions.
"""
# data members, some of which are gleaned by processing the data
self.drop_zeros = drop_zeros
self.source_file = ""
self.channel_name = ""
self.node_count = 0
self.value_min = sys.float_info.max
self.value_max = sys.float_info.min
self._excluded_node_ids = set()
if excluded_node_ids is not None:
self._excluded_node_ids = set(excluded_node_ids)
self._verbose = verbose
# pythonized contents of spatial file
# each element of timesteps is a dictionary<nodeId, float>
self.timesteps = []
# read if file path was given
if not file_path == "":
self._read_binary(file_path)
# --------------------------------------------------------------------------
def __str__(self):
"""Generates a textual representation of a SpatialBinary.
This method allows the SpatialBinary object to report the source file
and number of timesteps when it is printed.
Returns:
str: String containing source file and timestep count.
"""
if len(self.timesteps) == 0:
return "(empty)"
else:
return self.source_file + ": " + repr(len(self.timesteps)) +\
" timesteps"
# --------------------------------------------------------------------------
def __len__(self):
"""Returns the number of timesteps in the timesteps array.
Returns:
int: The number of timesteps in the spatial binary data.
"""
return len(self.timesteps)
# --------------------------------------------------------------------------
def __iter__(self):
"""Returns an iterator for the timesteps array.
Returns:
iterator: Iterator for timesteps array.
"""
return self.timesteps.__iter__()
# --------------------------------------------------------------------------
def __getitem__(self, timestep_index):
"""Returns the timestep record for a given timestep index.
Returns:
obj: Dictionary<node_id, spatial_value> for given timestep.
Raises:
IndexError.
"""
return self.timesteps[timestep_index]
# --------------------------------------------------------------------------
[docs] def value_range(self, for_json=False):
"""Returns an object with the value range of the data.
Returns:
obj: An object with the min/max values of the data, with either
Python or Javascript naming conventions.
Args:
for_json (bool): If true, emit an object using Javascript naming
conventions, otherwise use Python naming conventions.
"""
if for_json:
return {
"min": self.value_min,
"max": self.value_max
}
else:
return {
"value_min": self.value_min,
"value_max": self.value_max
}
# --------------------------------------------------------------------------
[docs] def write_binary(self, bin_file_path):
"""Writes the SpatialBinary to a given file path.
This function write out the spatial data in the object to a
SpatialReport-format binary file. Typically this is used when the caller
has modified the data in a SpatialBinary object or used combine() to
create a new one. Note that if zeros were dropped, write_binary will
throw an exception.
Returns:
None.
Args:
bin_file_path (str): The file path to which to write.
Raises:
ValueError: if the SpatialBinary used drop_zeros on construction.
To do:
* Make it work even for sparse spatial binaries. The zero values are
implied in by missing keys in the timestep records, so no actual
data is missing.
"""
# We can't write zero-dropped (sparse) spatial binary representations
# because we may not have all the nodes anymore.
if self.drop_zeros:
raise ValueError("Can't write sparse spatial binaries")
# Collect all the node ids
node_ids_array = list(self.timesteps[0].keys())
node_ids_array.sort()
node_ids = array.array("I", node_ids_array)
try:
with open(bin_file_path, "wb") as bin_file:
# Write out the two counts
bin_file.write(struct.pack("<i", len(node_ids)))
bin_file.write(struct.pack("<i", len(self.timesteps)))
# Write out the node IDs
node_ids.tofile(bin_file)
# Write out the values for each timestep
for values_dict in self.timesteps:
values = array.array("f")
for node_id in node_ids_array:
values.append(values_dict[node_id])
values.tofile(bin_file)
except BaseException:
if self._verbose:
print("SpatialBinary.write_binary: Exception writing spatial "
"binary %s" % bin_file_path, file=sys.stderr)
raise
self.source_file = bin_file_path
# --------------------------------------------------------------------------
[docs] def clone(self):
"""Returns a copy of this SpatialBinary in a new SpatialBinary object.
Returns:
obj: A new SpatialBinary object populated from self.
Args:
None.
"""
return copy.deepcopy(self)
# --------------------------------------------------------------------------
[docs] def print(self):
"""Prints the entire contents of the spatial binary. Can be lengthy.
Returns:
None.
Args:
None.
"""
for timestep in range(0, len(self.timesteps)):
ts_rec = self.timesteps[timestep]
for node_id in ts_rec:
print("Timestep %08d, node %08d, value = %f" %
(timestep, node_id, ts_rec[node_id]))
# --------------------------------------------------------------------------
# Where combine_func is float combineFunc(float src1, float src2)
# --------------------------------------------------------------------------
[docs] @staticmethod
def combine(bin_file_path_1, bin_file_path_2, channel_name, combine_func):
"""Combine two SpatialBinary objects into a new SpatialBinary object.
This function takes two SpatialBinary objects (of the same exact
dimensions in both timesteps and nodes) and combines them through a
"combine function" to make an entirely new in-memory SpatialBinary. That
resulting SpatialBinary would then typically be written out using
write_binary().
There are four simple arithmetic static combine functions built into
SpatialBinary, but the user may pass in any valid combine function that
has a compatible signature. (See Usage below.) For operations that are
not commutative such as division, let it be known that argument value1
in the combine function comes from bin_file_path1, and value2 comes from
bin_file_path2.
Beware: temporarily has all three SpatialBinaries in memory.
Usage::
def rounded_multiply_combiner(value1, value2):
return round(value1 * value2)
inf_vec_count = SpatialBinary.combine(
"output/SpatialReport_Adult_Vectors",
"output/SpatialReport_Infected_Vectors",
"Infected Vector Count", rounded_multiply_combiner)
inf_vec_count.write_binary("SpatialReport_Infected_Vector_Count")
Returns:
obj: A new SpatialBinary object combining sources 1 and 2
Args:
bin_file_path_1 (str): File path of first spatial binary file.
bin_file_path_2 (str): File path of second spatial binary file.
channel_name (str): Channel name to assign to the result binary.
combine_func (function): A function that combines the values from
the two spatial binary inputs, one at a time. The signature of
the combine_func is::
combine_func(value1, value2)
return value1 + value2 # for example
Raises:
ValueError: if SpatialBinary ofbjects don't have same dimensions or
nodes
"""
sb1 = SpatialBinary(bin_file_path_1)
sb2 = SpatialBinary(bin_file_path_2)
# Do some validation
if sb1.drop_zeros or sb2.drop_zeros:
raise ValueError("One or more inputs to combine have drop_zeros "
"set which makes them inappropriate for combine.")
if sb1.node_count != sb2.node_count:
raise ValueError("Combine's inputs do not represent the same "
"number of nodes.")
if len(sb1.timesteps) != len(sb2.timesteps):
raise ValueError("Combine's inputs do not represent the same "
"number of timesteps.")
sb1_nodes = list(sb1.timesteps[0].keys())
sb2_nodes = list(sb2.timesteps[0].keys())
if sb1_nodes != sb2_nodes:
raise ValueError("Combine's inputs do not represent the same node "
"ids.")
try:
sb1.value_min = sys.float_info.max
sb1.value_max = sys.float_info.min
sb1.source_file = ""
sb1.channel_name = channel_name
for i in range(0, len(sb1.timesteps)):
ts1 = sb1.timesteps[i]
ts2 = sb2.timesteps[i]
for nodeId in list(ts1.keys()):
new_value = combine_func(ts1[nodeId], ts2[nodeId])
sb1.value_min =\
new_value if new_value < sb1.value_min else sb1.value_min
sb1.value_max =\
new_value if new_value > sb1.value_max else sb1.value_max
ts1[nodeId] = new_value
sb1.value_min = SpatialBinary._condition_value(sb1.value_min)
sb1.value_max = SpatialBinary._condition_value(sb1.value_max)
except(BaseException):
# print(sys.exc_info()[0])
raise
return sb1
# --------------------------------------------------------------------------
# Simple arithmetic combiner functions
# --------------------------------------------------------------------------
[docs] @staticmethod
def multiply_combiner(value1, value2):
"""Combiner function that multiplies channel values.
Returns:
float: new value.
Args:
value1 (float): Value from input file 1.
value2 (float): Value from input file 2.
"""
return value1 * value2
# --------------------------------------------------------------------------
[docs] @staticmethod
def add_combiner(value1, value2):
"""Combiner function that adds channel values.
Returns:
float: new value.
Args:
value1 (float): Value from input file 1.
value2 (float): Value from input file 2.
"""
return value1 + value2
# --------------------------------------------------------------------------
[docs] @staticmethod
def subtract_combiner(value1, value2):
"""Combiner function that subtracts channel values.
Returns:
float: new value.
Args:
value1 (float): Value from input file 1.
value2 (float): Value from input file 2.
"""
return value1 - value2
# --------------------------------------------------------------------------
[docs] @staticmethod
def divide_combiner(value1, value2):
"""Combiner function that divides channel values.
Returns:
float: new value.
Args:
value1 (float): Value from input file 1.
value2 (float): Value from input file 2.
"""
return value1 / value2
# --------------------------------------------------------------------------
# Implementation
# --------------------------------------------------------------------------
def _read_binary(self, bin_file_path):
try:
with open(bin_file_path, "rb") as bin_file:
self.source_file = bin_file_path
self.channel_name = os.path.splitext(
os.path.basename(bin_file_path)[len("SpatialReport_"):])[0]
# read counts
counts = bin_file.read(8)
self.node_count, = struct.unpack("<i", counts[0:4])
timestep_count, = struct.unpack("<i", counts[4:8])
# read node IDs
node_ids_data = bin_file.read(self.node_count * 4)
node_ids = array.array("i")
node_ids.frombytes(node_ids_data)
# read timestep data
self.timesteps = []
for timestep in range(timestep_count):
entries = {}
values = array.array("f")
values_data = bin_file.read(self.node_count * 4)
values.frombytes(values_data)
for i in range(self.node_count):
value = values[i]
# Note: set value_min/value_max BEFORE zero check.
node_id = node_ids[i]
if node_id not in self._excluded_node_ids:
if value < self.value_min: self.value_min = value
if value > self.value_max: self.value_max = value
if self.drop_zeros and value == 0.0:
continue
entries[node_ids[i]] = values[i]
self.timesteps.append(entries)
# make sure we didn't end up with infinities or NaNs
self.value_min = SpatialBinary._condition_value(self.value_min)
self.value_max = SpatialBinary._condition_value(self.value_max)
except BaseException:
if self._verbose:
print("SpatialBinary._read_binary: Exception reading spatial "
"binary %s" % bin_file_path, file=sys.stderr)
raise
# --------------------------------------------------------------------------
@staticmethod
def _condition_value(value):
if math.isnan(value):
value = 0
elif value == math.inf:
value = sys.float_info.max
elif value == -math.inf:
value = sys.float_info.min
return value