"""Early draft of a very handy utility that takes a CCDL file (Concise Campaign Definition Language)and creates a graph(viz) visualization of it."""importsysimportgraphviz# This makes emod-api dependent on graphviz. Might be nice if this was optional?fromemod_api.interventions.ccdlimport*debug=False
[docs]defget_nickname_from_event(event_num,pieces):""" Allow nodes to get briefer and potentially more helpful nicknames. Default will probably remain a the nasty autogen above. Users can override this function with a callback of their own. """# If user has manually added an optional 5th 'column', use that, but turns certain characters into newlines.iflen(pieces)==5:event_name=pieces[4].strip().replace(" ","\n").replace("->","%%%").replace("-","\n").replace("%%%","->")else:event_name=f"[{event_num}]{pieces[WHEN_IDX]}*{pieces[WHERE_IDX]}*{pieces[WHO_IDX]}"returnevent_name
[docs]defget_colour_from_event(tokens):""" Allow nodes to get a content-dependent colour. Default to just white. Users can override this function with a callback of their own. Have been using colour to capture IP categories. """return"white"
[docs]defget_shape_from_event(tokens):""" Allow nodes to get a content-dependent shape. Default to circle. Users can override this function with a callback of their own. Have been using shape to capture 'epoch' categories. Possible shapes include ellipse, circle, square, and diamond. Full list can be found at: https://www.graphviz.org/doc/info/shapes.html """return"circle"
[docs]defset_beautifiers(name_cb=None,colour_cb=None,shape_cb=None):""" Override default no-op callbacks for setting nicknames, colours, and shapes of campaign nodes """globalget_nickname_from_event,get_colour_from_event,get_shape_from_eventifname_cb:get_nickname_from_event=name_cbifcolour_cb:get_colour_from_event=colour_cbifshape_cb:get_shape_from_event=shape_cbreturn
[docs]defviz(in_name="campaign.ccdl",out_name="camp.sv",display=True,whitelist=None):ccdl=open(in_name).readlines()dot=graphviz.Digraph(out_name,comment='Patient Pathway Design')producers={}consumers={}# Don't see how to avoid a specific list here. Not complete, needs more diagnostics.broadcasters=["BroadcastEvent","SimpleHealthSeekingBehavior","DiagnosticTreatNeg"]event_names={}event_num=0forcamp_eventinccdl:ifdebug:print(f"Processing line {camp_event}.")# We're going to do a brute force purge of DelayedInterventions for now.pieces=camp_event.strip().split(main_sep)iflen(pieces)<2:event_num+=1continueif"Outbreak"inpieces[-1]:# It is useful to 'hack' a few lines so that major infection lifecycle events get tied into# our visualization.camp_event=camp_event.replace(pieces[-1],pieces[-1]+"+BroadcastEvent(TBActivation)")pieces=camp_event.strip().split(main_sep)# Color represents IP# Shape represents Epochnode_name=get_nickname_from_event(event_num,pieces)event_names[event_num]=node_namenode_color=get_colour_from_event(pieces)node_shape=get_shape_from_event(pieces)ifdebug:print(f"Creating node with name {event_names[event_num]}.")dot.node(name=event_names[event_num],style='filled',fillcolor=node_color,shape=node_shape)triggered=pieces[WHAT_IDX].split(post_trigger_sep)iflen(triggered)>1:triggers=triggered[0].strip()fortriggerintriggers.split(multi_trigger_sep):iftriggernotinconsumers:consumers[trigger]=set()consumers[trigger].add(event_num)triggeree=triggered[-1].split(post_delay_sep)[-1].strip()forbroadcasterinbroadcasters:sender=(triggeree.split(multi_iv_sep)[-1]).strip()ifbroadcasterinsender:fromparseimportparseregex=broadcaster+"({})"#print( f"Looking for {regex} in {sender}" )tokens=parse(regex,sender)signals=tokens[0]forsignalinsignals.split(multi_signal_sep):#print( f"Found an event broadcasting {signal}" )ifsignalnotinproducers:producers[signal]=set()producers[signal].add(event_num)event_num+=1ifdebug:print("PRODUCERS\n")print(producers)print("\nCONSUMERS\n")print(consumers)# This is a list so we can do more than 1 eventually but for now lets just do 1.event_whitelist=[]ifwhitelist:event_whitelist.append(whitelist)foreventinproducers:ifevent_whitelistandeventnotinevent_whitelist:continuesenders=producers[event]ifeventinconsumers:receivers=consumers[event]forfrom_eventinsenders:forto_eventinreceivers:# Reject edges where IPs don't "overlap"ip_from=ccdl[from_event].split(main_sep)[2].split("/")[1]if"/"inccdl[from_event].split(main_sep)[2]else"*"ip_to=ccdl[to_event].split(main_sep)[2].split("/")[1]if"/"inccdl[to_event].split(main_sep)[2]else"*"#if ip_to != "*" and ip_from != "*" and ip_from.split(":")[0] == ip_to.split(":")[0] and ip_from.split(":")[1] != ip_to.split(":")[1]:ifip_to!="*"andip_from!="*":ip_from_key=ip_from.split("=")[0]ip_from_value=ip_from.split("=")[1]ip_to_key=ip_to.split("=")[0]ip_to_value=ip_to.split("=")[1]ifip_from_key==ip_to_keyandip_from_value!=ip_to_value:continue# aka do not continue# Reject edges where Times don't "overlap"time_from=ccdl[from_event].split(main_sep)[0]if"("intime_from:time_from=time_from.split("(")[0]time_to=ccdl[to_event].split(main_sep)[0]time_from_start=time_fromtime_to_start=time_totime_from_end=1e9time_to_end=1e9if"-"intime_from:time_from_start=float(time_from.split("-")[0])time_from_end=float(time_from.split("-")[1])if"-"intime_to:time_to_start=float(time_to.split("-")[0])time_to_end=float(time_to.split("-")[1])time_from_start=float(time_from_start)time_to_start=float(time_to_start)iftime_from_end<=time_to_startortime_from_start>=time_to_end:ifdebug:print(f"Discarding edge from {from_event} to {to_event} because not overlapping in time.")print(time_from_start,time_from_end,time_to_start,time_to_end)continueifdebug:#print( f"Creating edge from {from_event} to {to_event} for {event}." )#print( f"{from_event} ---{event} ---> {to_event}" )print(f"{event_names[from_event]} ---{event} ---> {event_names[to_event]}")#dot.edge( from_event, to_event, name=event )dot.edge(str(event_names[from_event]),str(event_names[to_event]),label=event)# Honestly not sure what the most general-purpose display solution is for graphviz# This seems to generate a pdf and open the registered pdf viewer if there is one.dot.render(view=True)
if__name__=="__main__":importargparseparser=argparse.ArgumentParser()parser.add_argument('-c','--ccdl',help='Path to existing campaign in CCDL format.',default="campaign.ccdl")parser.add_argument('-w','--whitelist',help='Optional trigger to limit visualization to.',default=None)args=parser.parse_args()viz(args.ccdl,args.whitelist)