Source code for vis_tools.CZMLWriter

# ==============================================================================
# CZMLWriter.py - Python wrapper for writing Cesium CZML animation files.
# ==============================================================================
"""CZMLWriter.py

This class simplifies the writing of Cesium CZML animation output files. It
has methods for writing Vis-Tools-specific animations (migration and marker).
Note that there are high-level methods on VisSet that implicitly create and use
CZMLWriter so in many cases you will not need to directly instantiate/call this
class yourself.

Usage::

    writer = CZMLWriter()
    writer.set_sim_duration(timestep_count)
    writer.add_event_markers(my_event_recorder, my_demographics,
        "New_Infection", "Triangle", "red", "Top", 2)
    writer.write(path.join(my_dir, "New_Infections.czml"))

"""

# imports
from builtins import object
from czml import czml
import functools
import sys
from datetime import date, timedelta
from vis_tools import Color, NamedColors, Gradient


# ==============================================================================
# CZMLWriter - a class for outputting high-level visualizations in CZML format
# ==============================================================================
[docs]class CZMLWriter(object): """Class for creating high-level visualizations in Cesium CZML format.""" # -------------------------------------------------------------------------- # Constants # -------------------------------------------------------------------------- k_default_node_point_size = 8 k_default_clock_multiplier = 60 * 60 * 4 # four hours in seconds k_default_migration_trail_duration = 60 * 60 * 24 # two days in seconds k_default_migration_duration_days = 2 k_default_migration_duration_seconds =\ k_default_migration_duration_days * 60 * 60 * 24 # -------------------------------------------------------------------------- def __init__(self, verbose=False): """Construct a CZMLWriter. Args: verbose (bool): Extra messaging from methods. """ self.doc = czml.CZML() self.doc.packets.append(czml.CZMLPacket(id="document", version="1.0")) self.timestep_count = 0 self.sim_end_date = None self.sim_start_date = None self._verbose = verbose # -------------------------------------------------------------------------- def __str__(self): """Generates a textual representation of CZMLWriter. This method allows the CZMLWriter to report the number of CZML packets it contains when it is printed. Returns: str: String containing number of packets. """ return "CZML: %d packet%s" % (len(self.doc.packets), "s" if len(self.doc.packets) != 1 else "") # --------------------------------------------------------------------------
[docs] def set_sim_dates(self, sim_start_date, total_timestep_count): """Set the sim start date and total timesteps. These are required because all CZML layers are time-synced to the Cesium clock. It is possible to use set_sim_duration in lieu of this function. Returns: None Args: sim_start_date (date): Start date for timestep 0 total_timestep_count (int): Number of timesteps in simulation data """ self.sim_start_date = sim_start_date self.sim_end_date = sim_start_date + timedelta(total_timestep_count) if self._verbose: print("CZMLWriter.set_sim_dates: Simulation date range set to ") "%s - %s" %\ (self.sim_start_date.isoformat(), self.sim_end_date.isoformat())
# --------------------------------------------------------------------------
[docs] def set_sim_duration(self, total_timestep_count, timestep_bounds=None, sim_start_date=None, emit_clock=True, anim_step_secs=None): """Set the sim duration and time range. This function allows you to set the start date and total timesteps, but also allows emitting the CZML over a subrange of timesteps less than the number of timesteps in total_timestep_count. This is effective for trimming migration animations that would otherwise be too big down to a workable subset. Returns: None Args: total_timestep_count (int): Total timesteps in sim timestep_bounds: (list (start, end)): Timestep subrange for animation sim_start_date (date): Start date for timestep 0 emit_clock (bool): Whether to emit a CZML Clock packet anim_step_secs (int): Clock multiplier for Clock packet """ self.timestep_count = total_timestep_count if anim_step_secs is None: anim_step_secs = CZMLWriter.k_default_clock_multiplier if sim_start_date is None: self.sim_end_date = date.today() self.sim_start_date = self.sim_end_date -\ timedelta(total_timestep_count) else: self.sim_start_date = sim_start_date self.sim_end_date = sim_start_date + timedelta(total_timestep_count) interval = self.sim_start_date.isoformat() + "/" +\ self.sim_end_date.isoformat() if timestep_bounds is None: current_time = self.sim_start_date.isoformat() else: current_time = (self.sim_start_date + timedelta(timestep_bounds[0])).isoformat() if emit_clock: clock = czml.Clock( currentTime=current_time, interval=interval, multiplier=anim_step_secs, range="CLAMPED", step="TICK_DEPENDENT" ) for packet in self.doc.packets: if packet.id == "document": packet.clock = clock break if self._verbose: print("CZMLWriter.set_sim_duration: Simulation date range set to ") "%s - %s" %\ (self.sim_start_date.isoformat(), self.sim_end_date.isoformat())
# --------------------------------------------------------------------------
[docs] def add_nodes(self, demographics, population_as_size=True, with_altitude=False, processor=None, namer=None): """Add demographics node data to the CZMLWriter object. DEPRECATED. This (DEPRECATED) method allows you to add node representations to a CZML output file, where the node point size is relative to the node's InitialPopulation. Since the Vis-Tools client does this in a much more flexible way, this method is deprecated, but may be educational for those looking to extend the CZMLWriter class. Returns: None Args: demographics (Demographics) A Demographics object with node data. population_as_size (bool): True to scale point size by InitialPopulation, fixed size otherwise with_altitude: True to emit node coordinates including their altitude, false to just emit them with default altitude processor: None or a function that processes the node and and the czml wrapper into a new CZML packet. namer: None or a function that converts a node into a name string """ pop_adjusted_max = demographics.calc_adjusted_pop_max() pop_range = pop_adjusted_max - demographics.population_min size_px_min = 6 size_px_max = 32 size_range = size_px_max - size_px_min for node in demographics.Nodes: name = repr(node["NodeID"]) if namer is not None: name = namer(node, name) attrs = node["NodeAttributes"] lon = attrs["Longitude"] lat = attrs["Latitude"] alt = attrs["Altitude"] if with_altitude else 0 if processor is None: packet = czml.CZMLPacket(id=name, name=name, position={"cartographicDegrees": [lon, lat, alt]}) size = CZMLWriter.k_default_node_point_size if population_as_size: pop = attrs["InitialPopulation"] if pop > pop_adjusted_max: pop = pop_adjusted_max norm_pop = (pop - demographics.population_min) / pop_range size = size_px_min + norm_pop * size_range # show=True is required below packet.point = czml.Point(pixelSize=size, show=True) else: packet = processor(self, czml, node) self.doc.packets.append(packet) if self._verbose: print("CZMLWriter.add_nodes: Added %s node%s" %\ (len(demographics.Nodes), "" if len(demographics.Nodes) == 1 else "s"))
# -------------------------------------------------------------------------- # Returns the number of migration paths emitted. # --------------------------------------------------------------------------
[docs] def add_migrations(self, migrations, demographics, with_altitude=False): """Adds migration animations to a CZML output. This function, given migrations in a particular format and demographics, adds "comet" animations for migration events. Returns: Number of infected human migrations in animation layer Args: migrations (obj): An object that describes migrations (see below) demographics (Demographics): A Demographics object describing nodes with_altitude: True to respect the altitude in the node coordinates migrations is a dictionary where the keys are <timestep> and the values are objects with keys <from_node_id>-<to_node_id> and the values are the number of migrations at that timestep from from_node_id to to_node_id. See MigrationHelpers.py for more details. Todo: * Color customization (comet heads, comet tails) * Duration customization (comet tails) """ count = 0 for timestep in list(migrations.keys()): timestep_rec = migrations[timestep] for migration_key in list(timestep_rec.keys()): from_node_id, to_node_id = migration_key.split("-") if not (from_node_id in demographics and to_node_id in demographics): continue from_node = demographics[from_node_id] to_node = demographics[to_node_id] availability = self._timestep_to_iso(timestep) + "/" +\ self._timestep_to_iso(int(timestep) + CZMLWriter.k_default_migration_duration_days) id_txt = "%s_%d" % (migration_key, count) # The thing to remember about paths is that their initial # duration is going to be availability, but then they are going # to be around for an *additional* trailTime seconds. So the # color fade in the path needs to run for dur_seconds + # trailTime. mig_path = czml.CZMLPacket(id=id_txt, availability=availability, position={ "epoch": self._timestep_to_iso(timestep), "cartographicDegrees": [ 0, # seconds from_node["NodeAttributes"]["Longitude"], from_node["NodeAttributes"]["Latitude"], from_node["NodeAttributes"]["Altitude"] if with_altitude else 0, CZMLWriter.k_default_migration_duration_seconds, to_node["NodeAttributes"]["Longitude"], to_node["NodeAttributes"]["Latitude"], to_node["NodeAttributes"]["Altitude"] if with_altitude else 0 ] }) pt = czml.Point(pixelSize=10, show=True, color={ "epoch": self._timestep_to_iso(timestep), "rgba": [ 0, 186, 127, 183, 255, CZMLWriter.k_default_migration_duration_seconds, 186, 127, 183, 0 ] }) path = czml.Path(leadTime=0, trailTime=60 * 60 * 12, # half-day resolution=CZMLWriter.k_default_clock_multiplier, material={ "solidColor": { "color": { "epoch": self._timestep_to_iso(timestep), "rgba": [0, 255, 255, 255, 255, CZMLWriter.k_default_migration_duration_seconds + 60 * 60 * 12, 255, 255, 255, 0] }} }, width=2.5) mig_path.point = pt mig_path.path = path self.doc.packets.append(mig_path) count += 1 if self._verbose: print("CZMLWriter.add_migrations: %d migrations added." % count) return count
# --------------------------------------------------------------------------
[docs] def add_vector_migrations(self, vector_migrations, demographics, migration_duration_timesteps, dot_color, dot_size_pixels, path_color, path_thickness_pixels, path_trail_time_factor): """Adds vector cohort migration animations to a CZML output. This function, given vector migrations in a particular format and a demographics file, adds "comet" animations for migration events. This function expects the following fields in vector_migrations: * Time (int): the timestep of the beginning of the migration event * FromNodeID (int): the node ID from which the migration emanates * ToNodeID (int): the node ID to which the migration completes Returns: Number of vector cohort migrations in animation layer Args: vector_migrations (CSVReport): The ReportVectorMigrations.csv report. demographics (Demographics): The Demographics object describing the nodes. migration_duration_timesteps (int): The duration of the migration animations in timesteps. dot_color (string): A CSS #rrggbb color for the comet dot. dot_size_pixels (int): Size in pixels of comet dot. path_color (string): A CSS #rrggbb color for the comet tail. path_thickness_pixels (float): Thickness in pixels of comet tail. path_trail_time_factor (float): Length of trail as a multiple of the migration_duration. E.g. if this is 1.0, the trail length will be the full distance from source node to the destination node. If 0.5, the trail length will be half the distance between the nodes. """ count = 0 for row in vector_migrations: timestep = int(row["Time"]) from_node_id = row["FromNodeID"] to_node_id = row["ToNodeID"] if not (from_node_id in demographics and to_node_id in demographics): continue from_node = demographics[from_node_id] to_node = demographics[to_node_id] dur_seconds = migration_duration_timesteps * 60 * 60 * 24 trail_seconds = dur_seconds * path_trail_time_factor availability = self._timestep_to_iso(timestep) + "/" +\ self._timestep_to_iso(timestep + migration_duration_timesteps) id_txt = "%s-%s@%d_%d" % (from_node_id, to_node_id, timestep, count) # The thing to remember about paths is that their initial duration # is going to be availability, but then they are going to be around # for an *additional* trailTime seconds. So the color fade in the # path needs to run for dur_seconds + trailTime. mig_path = czml.CZMLPacket(id=id_txt, availability=availability, position={ "epoch": self._timestep_to_iso(timestep), "cartographicDegrees": [ 0, # in seconds from_node["NodeAttributes"]["Longitude"], from_node["NodeAttributes"]["Latitude"], 0, # altitude dur_seconds, to_node["NodeAttributes"]["Longitude"], to_node["NodeAttributes"]["Latitude"], 0 # altitude ] }) dc = Color.from_html_hash(dot_color) pc = Color.from_html_hash(path_color) pt = czml.Point(pixelSize=dot_size_pixels, show=True, color={ "epoch": self._timestep_to_iso(timestep), "rgba": [0, dc.r, dc.g, dc.b, 255, dur_seconds, dc.r, dc.g, dc.b, 0] }) path = czml.Path(leadTime=0, trailTime=trail_seconds, resolution=CZMLWriter.k_default_clock_multiplier, material={ "solidColor": { "color": { "epoch": self._timestep_to_iso(timestep), "rgba": [0, pc.r, pc.g, pc.b, 255, dur_seconds + trail_seconds, pc.r, pc.g, pc.b, 0] }} }, width=path_thickness_pixels) mig_path.point = pt mig_path.path = path self.doc.packets.append(mig_path) count += 1 if self._verbose: print("CZMLWriter.add_vector_migrations: %d migrations added." %\ count) return count
# --------------------------------------------------------------------------
[docs] def add_simplified_vector_migrations(self, vector_migrations, demographics, migration_duration_timesteps, arrow_color, arrow_thickness_pixels): """Adds vector cohort migration animations to a CZML output. This function, given vector migrations in a particular format and a demographics file, adds "comet" animations for migration events. This function expects the following fields in vector_migrations: * Time (int): the timestep of the beginning of the migration event * FromNodeID (int): the node ID from which the migration emanates * ToNodeID (int): the node ID to which the migration completes Returns: Number of vector cohort migrations in animation layer Args: vector_migrations (CSVReport): The ReportVectorMigrations.csv report. demographics (Demographics): The Demographics object describing the nodes. migration_duration_timesteps (int): The duration of the migration animations in timesteps. arrow_color (string): A CSS #rrggbb color for the migration arrow. arrow_thickness_pixels (float): Thickness in pixels of comet tail. """ czml.Material._properties = \ ('grid', 'image', 'stripe', 'solidColor', 'polylineGlow', 'polylineOutline', 'polylineArrow') # This is a little ugly - I run-time extend Material with a # PolylineArrow property, since the one in module czml lacks that. class PolylineArrow(czml._CZMLBaseObject): """Colors the line with a color and an arrow.""" _color = None _properties = ('color',) czml.Material._polylineArrow = None czml.Material.polylineArrow = czml.class_property( PolylineArrow, 'polylineArrow', doc="""Colors the line with a color and an arrow.""") count = 0 for row in vector_migrations: timestep = int(row["Time"]) from_node_id = row["FromNodeID"] to_node_id = row["ToNodeID"] if not (from_node_id in demographics and to_node_id in demographics): continue from_node = demographics[from_node_id] to_node = demographics[to_node_id] dur_seconds = migration_duration_timesteps * 60 * 60 * 24 availability = self._timestep_to_iso(timestep) + "/" +\ self._timestep_to_iso(timestep + migration_duration_timesteps) id_txt = "%s-%s@%d_%d" % (from_node_id, to_node_id, timestep, count) ac = Color.from_html_hash(arrow_color) packet = czml.CZMLPacket(id=id_txt, availability=availability) polyline = czml.Polyline( followSurface=False, positions={ "cartographicDegrees": [ from_node["NodeAttributes"]["Longitude"], from_node["NodeAttributes"]["Latitude"], 0, # altitude to_node["NodeAttributes"]["Longitude"], to_node["NodeAttributes"]["Latitude"], 0 # altitude ]}, material={ "polylineArrow": {"color": { "epoch": self._timestep_to_iso(timestep), "rgba": [0, ac.r, ac.g, ac.b, 255, dur_seconds + 60 * 60 * 12, ac.r, ac.g, ac.b, 0] }} }, width=arrow_thickness_pixels) packet.polyline = polyline self.doc.packets.append(packet) count += 1 if self._verbose: print("CZMLWriter.add_vector_migrations: %d migrations added." %\ count) return count
# -------------------------------------------------------------------------- # Returns number of events emitted. # --------------------------------------------------------------------------
[docs] def add_event_markers(self, event_recorder, demographics, event_name, marker, color, placement, duration_days, with_altitude=False): """Adds event marker animation to a CZML output. This function looks for a particular event in an event recorder report and emits an animation that puts a marker somewhere around the node and leaves it there for a specified period after the event. Returns: Number of event markers in animation layer Args: event_recorder (CSVReport): Event recorder report object demographics (Demographics): Demographics object with node data event_name (str): Name of event to mark in the animation marker (str): Name of a marker icon ("Triangle" or "Cross") color (str): Any HTML or SVG named color (e.g. "red") or a CSS color string (e.g. "#ff0000") placement (str): Placement about the node point ("Top", "Left", "Bottom" or "Right") duration_days (int): Number of days for which the symbol should remain about the node after it fires. with_altitude (bool): True to set the symbol elevations to match the node altitude, or default altitude otherwise. """ czml.Billboard._properties =\ ('show', 'image', 'color', 'scale', 'verticalOrigin', 'horizontalOrigin') count = 0 timestep_offset = int(event_recorder.rows[0]["Time"]) if color in NamedColors.__dict__: color = NamedColors.__dict__[color] else: color = Color.from_html_hash(color) for event in event_recorder: if not event["Event_Name"] == event_name: continue if event["Node_ID"] in demographics: timestep = int(event["Time"]) - timestep_offset node = demographics[event["Node_ID"]] availability = self._timestep_to_iso(timestep) + "/" +\ self._timestep_to_iso(timestep + duration_days) attrs = node["NodeAttributes"] lon = attrs["Longitude"] lat = attrs["Latitude"] alt = attrs["Altitude"] if with_altitude else 0 ni_packet = czml.CZMLPacket( id=repr(count), availability=availability, position={ "epoch": self._timestep_to_iso(timestep), "cartographicDegrees": [ lon, lat, alt ] }) bb = czml.Billboard(image="/vistools/image/" + marker + placement + ".png", color={ "rgba": color.to_rgba_array() }) bb.verticalOrigin = self._vertical_origin(placement) bb.horizontalOrigin = self._horizontal_origin(placement) ni_packet.billboard = bb self.doc.packets.append(ni_packet) count += 1 if self._verbose: print("CZMLWriter.add_event_markers: %d event markers added." %\ count) return count
# --------------------------------------------------------------------------
[docs] def add_weighted_network(self, demographics, network, gradient_spec, opacity_func): """Adds a weighted network visualization layer to a CZML output. This method emits a CZML animation that provides a visual representation of a weighted network between nodes. Returns: Number of network segments added Args: demographics (Demographics): Demographics object for nodes. network (array): array of objects:: { from: <from-node-id>, to: <to-node-id>, weight: <float-weight> } gradient_spec (str): gradient spec for a gradient with which to color the network lines. opacity_func (function): function(weight, norm_weight) that returns the desired opacity in range [0,1]. """ # First pass - collect min/max rate min_weight = network[0]["weight"] max_weight = network[0]["weight"] for segment in network: weight = segment["weight"] min_weight = weight if weight < min_weight else min_weight max_weight = weight if weight > max_weight else max_weight weight_range = max_weight - min_weight # Second pass - precalculate norm_weight and opacity for segment in network: weight = segment["weight"] norm_weight = (weight - min_weight) / weight_range segment["norm_weight"] = norm_weight segment["opacity"] = opacity_func(weight, norm_weight) # Sort network by opacity, lowest opacity first # network.sort(key=lambda seg: seg["opacity"]) def sort_func(a, b): diff = a["opacity"] - b["opacity"] if diff < 0: return -1 elif diff == 0: return 0 else: return 1 if sys.version_info.major == 3: # python 3: use a key function network = sorted(network, key=functools.cmp_to_key(sort_func)) else: # python 2: use a cmp function network = sorted(network, cmp=sort_func) gradient = Gradient(gradient_spec) count = 0 for segment in network: from_node = demographics[segment["from"]] to_node = demographics[segment["to"]] color = gradient.sample(segment["norm_weight"]).to_rgba_array() color[3] = int(segment["opacity"] * 255) # id = repr(segment["from"]) + "-" + repr(segment["to"]) id = count packet = czml.CZMLPacket(id=id) positions = {"cartographicDegrees": [ from_node["NodeAttributes"]["Longitude"], from_node["NodeAttributes"]["Latitude"], 0, to_node["NodeAttributes"]["Longitude"], to_node["NodeAttributes"]["Latitude"], 0 ]} line = czml.Polyline(show=True, positions=positions, width=1, material={ "solidColor": {"color": {"rgba": color}} }) packet.polyline = line self.doc.packets.append(packet) count += 1 if self._verbose: print("CZMLWriter.add_network: %d network segments added." % count) return count
# --------------------------------------------------------------------------
[docs] def write_czml(self, file_path): """Write the CZML animation file. Returns: None Args: file_path (str): The file path to which to write the CZML animation. Raises: I/O exceptions. """ try: self.doc.write(file_path) except BaseException: if self._verbose: print("CZMLWriter.write_czml: Exception writing to %s" % file_path, file=sys.stderr) raise
# -------------------------------------------------------------------------- # Implementation # -------------------------------------------------------------------------- def _timestep_to_iso(self, timestep): return (self.sim_start_date + timedelta(days=int(timestep))).isoformat() # -------------------------------------------------------------------------- @staticmethod def _vertical_origin(placement): if placement is "Top": return "BOTTOM" if placement is "Bottom": return "TOP" return "CENTER" # -------------------------------------------------------------------------- @staticmethod def _horizontal_origin(placement): if placement is "Left": return "RIGHT" if placement is "Right": return "LEFT" return "CENTER"