#!/usr/bin/env python3
import json
from parse import parse
from emod_api.interventions.ccdl import *
presets = {}
trigger_map = {}
[docs]def decorate_actual_iv( iv, signal=None ):
return decorate_actual_iv_impl( iv, signal )
[docs]def decorate_actual_iv_impl( iv, signal=None ):
"""
This function converts json interventions to their CCDL versions. This relies on a lot of special-casing.
"""
orig_iv_name = iv["class"]
iv_name = orig_iv_name
if signal:
iv_name = f"{signal}->{iv_name}"
if orig_iv_name == "PropertyValueChanger":
key = iv["Target_Property_Key"]
value = iv["Target_Property_Value"]
iv_name += f"({key}:{value})"
elif orig_iv_name == "MigrateIndividuals":
dest = iv["NodeID_To_Migrate_To"]
iv_name += f"({dest})"
elif orig_iv_name.endswith( "DelayedIntervention" ): # handle more than 1 kind of DelayedIntervention
key = "Delay_Period_Distribution" if "Delay_Period_Distribution" in iv else "Delay_Distribution"
dist = iv[key].replace("_DURATION","")
if dist == "FIXED" or dist == "EXPONENTIAL":
dist += f"/{iv['Delay_Period']}"
elif dist == "UNIFORM":
if 'Delay_Period_Min' in iv:
the_min = iv['Delay_Period_Min']
else:
the_min = 0
dist += f"/{the_min}/{iv['Delay_Period_Max']}"
elif dist == "GAUSSIAN":
dist += f"/{iv['Delay_Period_Mean']}/{iv['Delay_Period_Std_Dev']}"
elif dist == "WEIBULL":
dist += f"/{iv['Delay_Period_Scale']}/{iv['Delay_Period_Shape']}"
iv_name += f"({dist})"
if "Broadcast_Event" in iv:
actual = iv["Broadcast_Event"]
be = f"=>BroadcastEvent({actual})"
iv_name += be
elif orig_iv_name == "BroadcastEvent":
signal = iv["Broadcast_Event"]
if signal in trigger_map:
signal = trigger_map[signal]
iv_name += f"({signal})"
elif orig_iv_name == "SimpleHealthSeekingBehavior":
# special case for HIV/rakai
# if "Intervention_Name" in iv:
# iv_name += iv["Intervention_Name"]
# goal = iv["New_Property_Value"]
goal = iv["Actual_IndividualIntervention_Event"]
if goal in trigger_map:
goal = trigger_map[goal]
tendency = iv["Tendency"]
name = iv["Intervention_Name"]
if name != "HSB":
iv_name += f"({goal}/{tendency}/{name})"
else:
iv_name += f"({goal}/{tendency})"
elif "Diagnostic" in orig_iv_name or "DrawBlood" in orig_iv_name:
try:
pos = iv["Positive_Diagnosis_Event"]
if pos in trigger_map:
pos = trigger_map[pos]
if "Negative_Diagnosis_Event" in iv:
neg = iv["Negative_Diagnosis_Event"]
if neg in trigger_map:
neg = trigger_map[neg]
else:
neg = "null" # this is ugly; do better.
iv_name += f"({pos}/{neg})"
except Exception as ex:
print( f"Exception {ex} assuming key in {iv_name}." )
### THESE WILL BE MOVED INTO A DISEASE-SPECIFIC EXTENSION OF THIS FILL THAT CALLS
### INTO THE BASE EMOD_API FILE
### HIV
elif orig_iv_name == "HIVMuxer":
signal = iv["Broadcast_Event"]
if colorize:
iv_name += f"({console.highlight(signal, textColor=textColor.RED)})"
else:
iv_name += f"({signal})"
#elif orig_iv_name == "HIVDelayedIntervention":
#signal = iv["Broadcast_Event"]
#iv_name += f"({console.highlight(signal, textColor=textColor.RED)})"
elif orig_iv_name == "HIVRandomChoice":
choices = iv["Choices"]
iv_name += f"({choices})"
elif orig_iv_name == "PMTCT":
eff = iv["Efficacy"]
iv_name += f"({eff})"
#elif orig_iv_name == "HIVRapidHIVDiagnostic":
#elif orig_iv_name in [ "HIVRapidHIVDiagnostic", "DiagnosticTreatNeg", "" ]:
### MALARIA
elif orig_iv_name == "AntimalarialDrug":
drug = iv["Drug_Type"]
iv_name += f"({drug})"
return iv_name
[docs]def handle_di( iv ):
new_iv_name = "=>"
if "Actual_IndividualIntervention_Configs" in iv: # DelayedIntervention
for act_iv in iv["Actual_IndividualIntervention_Configs"]:
new_iv_name += decorate_actual_iv( act_iv )
new_iv_name += multi_iv_sep
elif "Actual_IndividualIntervention_Config" in iv: # HIVDelayedIntervention
new_iv_name += decorate_actual_iv( iv )
new_iv_name += multi_iv_sep
elif "Broadcast_Event" in iv: # HIVDelayedIntervention
new_iv_name += f"Broadcast_Event({iv['Broadcast_Event']})"
return new_iv_name.strip( multi_iv_sep )
[docs]def get_ip( coord ):
ip = None
if "Property_Restrictions" in coord:
ip = coord["Property_Restrictions"]
if len(ip)==0:
ip = ""
else:
ip = str(ip).strip("[").strip("]").strip("'").replace(":","=")
elif "Property_Restrictions_Within_Node" in coord:
ips = coord["Property_Restrictions_Within_Node"][0]
if len(ips)>0:
ip = ""
for ip_key, ip_value in ips.items():
ip += f"{ip_key}={ip_value}"
ip += ","
ip = ip.strip(",")
return ip
[docs]def get_ages( coord ):
min_age = 0
max_age = 120*365
try:
if "Target_Age_Min" in coord and float(coord["Target_Age_Min"]) > 0:
min_age = coord["Target_Age_Min"]
if "Target_Age_Max" in coord and float(coord["Target_Age_Max"]) < 120*365:
max_age = coord["Target_Age_Max"]
except Exception as ex:
print( "Exception extracting ages." )
print( str( ex ) )
return min_age, max_age
[docs]def decode( camp_path, config_path=None ):
def get_when( event ):
if 'Start_Day' in event:
day = int(event['Start_Day'])
else:
day = 1
coord = event['Event_Coordinator_Config']
if coord["class"] != "StandardInterventionDistributionEventCoordinator":
#print( f"DEBUG: {coord['class']} not fully supported yet." )
frac = "???"
#continue
if 'Number_Repetitions' in coord and coord['Number_Repetitions'] != 1:
reps = coord['Number_Repetitions']
gap = None
if coord['Timesteps_Between_Repetitions'] != -1:
gap = coord['Timesteps_Between_Repetitions']
day = f"{day}(x{reps}/_{gap})"
return day
def get_where( event ):
nc = event['Nodeset_Config']
if nc["class"] == "NodeSetAll":
nodes = "AllPlaces"
else:
nodes = nc["Node_List"]
return nodes
def get_who( coord, frac, sex, ip, min_age, max_age ):
if "Demographic_Coverage" in coord and frac == 1:
frac = coord["Demographic_Coverage"]
if "Target_Gender" in coord and sex == "Both":
sex = coord["Target_Gender"]
min_age, max_age = get_ages( coord )
tmp_ip = get_ip( coord )
if tmp_ip:
ip = tmp_ip
who = "STEERED" if type(frac) is str else f"{float(frac)*100}%"
if "ale" in sex:
who += f"/{sex}"
if min_age > 0:
who += f"/>{min_age}"
if max_age < 120*365.:
who += f"/<{max_age}"
if ip and ip != "" and ip != "None":
who += f"/{ip}"
return who
def get_what( event, day ):
iv = event['Event_Coordinator_Config']['Intervention_Config']
iv_name = decorate_actual_iv( iv )
sex = "Both"
ip = None
min_age = None
max_age = None
duration = None
frac = 1
if event['Event_Coordinator_Config']["class"] != "StandardInterventionDistributionEventCoordinator":
frac = "???"
if iv_name == "NodeLevelHealthTriggeredIV":
signals = iv["Trigger_Condition_List"]
new_signals = []
for signal in signals:
if signal in trigger_map:
signal = trigger_map[signal]
new_signals.append( signal )
signal = multi_iv_sep.join( new_signals )
if "Demographic_Coverage" in iv:
frac = iv["Demographic_Coverage"]
if "Target_Gender" in iv:
sex = iv["Target_Gender"]
ip = get_ip( iv )
min_age, max_age = get_ages( iv )
duration = iv["Duration"]
if duration != -1:
day = f"{day}-{day+duration}"
iv = iv["Actual_IndividualIntervention_Config"]
iv_name = decorate_actual_iv( iv, signal )
if iv_name.startswith( "MultiInterventionDistributor" ) or iv_name.split(post_trigger_sep)[-1].startswith( "MultiInterventionDistributor" ):
iv_name = iv_name.strip( "MultiInterventionDistributor" ) # not sure about this yet
new_iv_name = ""
for act_iv in iv["Intervention_List"]:
new_iv_name += decorate_actual_iv( act_iv )
new_iv_name += multi_iv_sep
iv_name += new_iv_name.strip( multi_iv_sep )
if iv_name.split(multi_iv_sep)[-1].startswith( "DelayedIntervention" ): # ugly copy-paste
iv_name += handle_di( act_iv )
if iv_name.split(post_trigger_sep)[-1].startswith( "DelayedIntervention" ): # we should have a heuristic, or a list
iv_name += handle_di( iv )
return iv_name, frac, sex, ip, min_age, max_age, day
if config_path is not None:
global trigger_map
with open( config_path ) as conf:
params = json.load( conf )["parameters"]
if "Event_Map" in params:
trigger_map = params["Event_Map"]
#print( f"DEBUG: Found {trigger_map.keys()} in config." )
with open( camp_path ) as camp:
cj = json.load( camp )
print( f"{ len( cj['Events'] ) }" )
for event in cj['Events']:
ip = ""
try:
# when
day = get_when( event )
# where
nodes = get_where( event )
# what
iv_name, frac, sex, ip, min_age, max_age, day = get_what( event, day )
# who
who = get_who( event['Event_Coordinator_Config'], frac, sex, ip, min_age, max_age )
print( f"{day} :: {nodes} :: {who} :: {iv_name}" )
except Exception as ex:
print( "Exception..." )
print( str( ex ) )
print( str( event ) )
[docs]def params_to_dict( start_day, reps=None, gap=None, nodes=None, frac=None, sex=None, minage=None, maxage=None, ips=None, signal=None, iv_name=None, payload=None, delay=None ):
"""
Take all the CCDL params (When? Where? Who? What? Why?) and create a dictionary from them.
"""
new_dict = {}
new_dict[ "start_day" ] = start_day
if reps:
new_dict[ "reps" ] = reps
new_dict[ "gap" ] = gap
new_dict[ "nodes" ] = nodes
new_dict[ "frac" ] = frac
if sex:
new_dict[ "sex" ] = sex
if minage:
new_dict[ "minage" ] = minage
if maxage:
new_dict[ "maxage" ] = maxage
if ips:
new_dict[ "ips" ] = ips
if signal:
new_dict[ "signal" ] = signal
if delay:
new_dict[ "delay" ] = delay
new_dict[ "iv_name" ] = iv_name
if payload:
new_dict[ "payload" ] = payload
return new_dict
[docs]def encode( encoded_path ):
"""
The encode function takes a CCDL files as input and returns a list of campaign events as dictionaries that can be used to create a campaign json from it using emod-api/emodpy functions.
This is early code, use at your own risk, or contribute to its improvement. :)
"""
output_list = []
from parse import parse
# TBD: Check that encoded_path exists and is the right format.
with open( encoded_path ) as ccdl:
encoded = ccdl.readlines()
for line in encoded:
data = line.split( main_sep )
if len(data)==1:
# might be a map preset
tokens = line.split("=")
if "=" in line and "map" in tokens[0].lower():
presets[tokens[0]]=eval(tokens[1].strip())
continue
# print( data )
# extract start_day, and optional repetition number and intervale
when = data[WHEN_IDX]
if "(" in when:
result = parse( "{}(x{}/_{})", when )
start_day = result[0]
reps = result[1]
gap = result[2]
else:
start_day = float(when)
reps = None
gap = None
#print( f"{start_day}" )
#print( f"{reps}" )
#print( f"{gap}" )
# extract node list, usually All
where = data[WHERE_IDX]
nodes = []
if "All" not in where:
nodelist = parse( "[{}]", where )
nodes.extend( [int(x.strip(',')) for x in nodelist[0].split()] )
# extract coverage, and optional min age, max age, sex, and IPs.
who = data[WHO_IDX]
whos = who.split("/")
if whos[0] == "STEERED":
print( "DEBUG: Reference Tracking not supported yet." )
continue
frac = float(whos[0].strip("%"))/100
sex = None
minage = None
maxage = None
ips = None
if len(whos)>1 and "ale" in whos[1]:
sex = whos[1]
if len(whos)>1:
for who_elem in whos[1:]:
if who_elem.startswith(">"):
minage = float(who_elem.strip(">"))
elif who_elem.startswith("<"):
maxage = float(who_elem.strip("<"))
elif "=" in who_elem: # IP has to include a=b
ips = who_elem
# format: coverage/minage/maxage/sex/IPs
#print( f"{frac}" )
# extract interventions; this is where we'll do most of our work.
what = data[WHAT_IDX]
signal = None
if post_trigger_sep in what:
splits = what.split(post_trigger_sep)
signal = splits[0] # We're going to be calling TriggeredCampaignEvent(...)
what = splits[1]
#interventions = what.split(".")
interventions = what.split("=>")
print( f"DEBUG: interventions = {interventions}" )
delay = None
payload = None
for iv in interventions:
# check for multiintervention first.
if multi_iv_sep in iv: # multi-iv
# ladies & gentlemen, we have ourselves a multiintervention...
iv_name = []
payload = []
for iv in iv.split(multi_iv_sep):
result = parse( "{}({})", iv.strip() )
try:
iv_name.append( result[0] )
payload.append( result[1] )
except Exception as ex:
print( f"DEBUG: Failed to parse {iv} using standard intervention format." )
iv_name.append( iv )
payload.append( "" )
elif "(" in iv: # payload
result = parse( "{}({})", iv.strip() )
try:
iv_name = result[0]
payload = result[1]
if iv_name == "DelayedIntervention":
delay=payload # handle delay as its own thing
except Exception as ex:
print( f"Had trouble parsing {iv}" )
else:
iv_name = iv.strip()
print( f"DEBUG: {iv_name}" )
new_dict = params_to_dict( start_day, reps, gap, nodes, frac, sex, minage, maxage, ips, signal, iv_name, payload, delay )
output_list.append( new_dict )
return output_list
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--campaign',
help='Existing campaign.json file path.'
)
parser.add_argument('-e', '--encode', help='encode')
parser.add_argument('-f', '--config', default=None, help='Existing config file path.'
)
args = parser.parse_args()
if args.encode:
encode( args.campaign )
else:
decode( args.campaign, args.config )