Source code for emod_api.interventions.ccdl_viz
"""
Early draft of a very handy utility that takes a CCDL file (Concise Campaign Definition Language)
and creates a graph(viz) visualization of it.
"""
import sys
import graphviz # This makes emod-api dependent on graphviz. Might be nice if this was optional?
from emod_api.interventions.ccdl import *
debug = False
[docs]def get_nickname_from_event( event_num, pieces ):
"""
Allow nodes to get briefer and potentially more helpful nicknames. Default will probably remain a
the nasty autogen above. Users can override this function with a callback of their own.
"""
# If user has manually added an optional 5th 'column', use that, but turns certain characters into newlines.
if len(pieces)==5:
event_name = pieces[4].strip().replace( " ", "\n" ).replace("->","%%%").replace( "-", "\n" ).replace( "%%%", "->" )
else:
event_name = f"[{event_num}]{pieces[WHEN_IDX]}*{pieces[WHERE_IDX]}*{pieces[WHO_IDX]}"
return event_name
[docs]def get_colour_from_event( tokens ):
"""
Allow nodes to get a content-dependent colour. Default to just white. Users can override this function with
a callback of their own. Have been using colour to capture IP categories.
"""
return "white"
[docs]def get_shape_from_event( tokens ):
"""
Allow nodes to get a content-dependent shape. Default to circle. Users can override this function with
a callback of their own. Have been using shape to capture 'epoch' categories. Possible shapes include ellipse, circle, square, and diamond. Full list can be found at: https://www.graphviz.org/doc/info/shapes.html
"""
return "circle"
[docs]def set_beautifiers( name_cb=None, colour_cb=None, shape_cb=None ):
"""
Override default no-op callbacks for setting nicknames, colours, and shapes of campaign nodes
"""
global get_nickname_from_event, get_colour_from_event, get_shape_from_event
if name_cb:
get_nickname_from_event = name_cb
if colour_cb:
get_colour_from_event = colour_cb
if shape_cb:
get_shape_from_event = shape_cb
return
[docs]def viz( in_name = "campaign.ccdl", out_name = "camp.sv", display = True, whitelist = None ):
ccdl = open( in_name ).readlines()
dot = graphviz.Digraph(out_name, comment='Patient Pathway Design')
producers = {}
consumers = {}
# Don't see how to avoid a specific list here. Not complete, needs more diagnostics.
broadcasters = ["BroadcastEvent", "SimpleHealthSeekingBehavior", "DiagnosticTreatNeg" ]
event_names = {}
event_num = 0
for camp_event in ccdl:
if debug:
print( f"Processing line {camp_event}." )
# We're going to do a brute force purge of DelayedInterventions for now.
pieces = camp_event.strip().split( main_sep )
if len(pieces)<2:
event_num += 1
continue
if "Outbreak" in pieces[-1]:
# It is useful to 'hack' a few lines so that major infection lifecycle events get tied into
# our visualization.
camp_event = camp_event.replace( pieces[-1], pieces[-1]+"+BroadcastEvent(TBActivation)" )
pieces = camp_event.strip().split( main_sep )
# Color represents IP
# Shape represents Epoch
node_name = get_nickname_from_event( event_num, pieces )
event_names[event_num] = node_name
node_color = get_colour_from_event(pieces)
node_shape = get_shape_from_event(pieces)
if debug:
print( f"Creating node with name {event_names[event_num]}." )
dot.node( name=event_names[event_num], style='filled', fillcolor = node_color, shape=node_shape )
triggered = pieces[WHAT_IDX].split( post_trigger_sep )
if len(triggered)>1:
triggers = triggered[0].strip()
for trigger in triggers.split( multi_trigger_sep ):
if trigger not in consumers:
consumers[trigger] = set()
consumers[trigger].add( event_num )
triggeree = triggered[-1].split( post_delay_sep )[-1].strip()
for broadcaster in broadcasters:
sender = (triggeree.split( multi_iv_sep )[-1]).strip()
if broadcaster in sender:
from parse import parse
regex = broadcaster + "({})"
#print( f"Looking for {regex} in {sender}" )
tokens = parse( regex, sender )
signals = tokens[0]
for signal in signals.split( multi_signal_sep ):
#print( f"Found an event broadcasting {signal}" )
if signal not in producers:
producers[signal] = set()
producers[signal].add( event_num )
event_num += 1
if debug:
print( "PRODUCERS\n" )
print( producers )
print( "\nCONSUMERS\n" )
print( consumers )
# This is a list so we can do more than 1 eventually but for now lets just do 1.
event_whitelist = []
if whitelist:
event_whitelist.append( whitelist )
for event in producers:
if event_whitelist and event not in event_whitelist:
continue
senders = producers[event]
if event in consumers:
receivers = consumers[event]
for from_event in senders:
for to_event in receivers:
# Reject edges where IPs don't "overlap"
ip_from = ccdl[from_event].split( main_sep )[2].split("/")[1] if "/" in ccdl[from_event].split( main_sep )[2] else "*"
ip_to = ccdl[to_event].split( main_sep )[2].split("/")[1] if "/" in ccdl[to_event].split( main_sep )[2] else "*"
#if ip_to != "*" and ip_from != "*" and ip_from.split(":")[0] == ip_to.split(":")[0] and ip_from.split(":")[1] != ip_to.split(":")[1]:
if ip_to != "*" and ip_from != "*":
ip_from_key = ip_from.split("=")[0]
ip_from_value = ip_from.split("=")[1]
ip_to_key = ip_to.split("=")[0]
ip_to_value = ip_to.split("=")[1]
if ip_from_key == ip_to_key and ip_from_value != ip_to_value:
continue # aka do not continue
# Reject edges where Times don't "overlap"
time_from = ccdl[from_event].split( main_sep )[0]
if "(" in time_from:
time_from = time_from.split( "(" )[0]
time_to = ccdl[to_event].split( main_sep )[0]
time_from_start = time_from
time_to_start = time_to
time_from_end = 1e9
time_to_end = 1e9
if "-" in time_from:
time_from_start = float(time_from.split("-")[0])
time_from_end = float(time_from.split("-")[1])
if "-" in time_to:
time_to_start = float(time_to.split("-")[0])
time_to_end = float(time_to.split("-")[1])
time_from_start = float(time_from_start)
time_to_start = float(time_to_start)
if time_from_end <= time_to_start or time_from_start >= time_to_end:
if debug:
print( f"Discarding edge from {from_event} to {to_event} because not overlapping in time." )
print( time_from_start, time_from_end, time_to_start, time_to_end )
continue
if debug:
#print( f"Creating edge from {from_event} to {to_event} for {event}." )
#print( f"{from_event} ---{event} ---> {to_event}" )
print( f"{event_names[from_event]} ---{event} ---> {event_names[to_event]}" )
#dot.edge( from_event, to_event, name=event )
dot.edge( str(event_names[from_event]), str(event_names[to_event]), label=event )
# Honestly not sure what the most general-purpose display solution is for graphviz
# This seems to generate a pdf and open the registered pdf viewer if there is one.
dot.render( view=True )
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--ccdl', help='Path to existing campaign in CCDL format.', default= "campaign.ccdl" )
parser.add_argument('-w', '--whitelist', help='Optional trigger to limit visualization to.', default=None )
args = parser.parse_args()
viz( args.ccdl, args.whitelist )