Source code for emod_api.campaign
#!/usr/bin/env python
"""
You use this simple campaign builder by importing it, adding valid events via "add", and writing it out with "save".
"""
import json
schema_path = None
campaign_dict = {}
campaign_dict["Events"] = []
campaign_dict["Use_Defaults"] = 1
pubsub_signals_subbing = []
pubsub_signals_pubbing = []
adhocs = []
event_map = {}
use_old_adhoc_handling = False
unsafe = False
implicits = list()
[docs]def reset():
del( campaign_dict["Events"][:] )
global pubsub_signals_subbing
global pubsub_signals_pubbing
global adhocs
global event_map
del( pubsub_signals_subbing[:] )
del( pubsub_signals_pubbing[:] )
del( adhocs[:] )
event_map = {}
from emod_api import schema_to_class as s2c
s2c.schema_cache = None
del( implicits[:] )
[docs]def set_schema( schema_path_in ):
"""
Set the (path to) the schema file. And reset all campaign variables. This is essentially a "start_building_campaign" function.
Args:
schema_path_in. The path to a schema.json.
Returns:
N/A.
"""
reset()
global schema_path
schema_path = schema_path_in
[docs]def add( event, name=None, first=False ):
"""
Add a complete campaign event to the campaign builder. The new event is assumed to be a Python dict, and a
valid event. The new event is not validated here.
Set the first flag to True if this is the first event in a campaign because it functions as an
accumulator and in some situations like sweeps it might have been used recently.
"""
event.finalize()
if first:
print( "Use of first flag is deprecated. Use set_schema to start build a new, empty campaign." )
global campaign_dict
campaign_dict["Events"] = []
if "Event_Name" not in event and name is not None:
event["Event_Name"] = name
if "Listening" in event:
pubsub_signals_subbing.extend( event["Listening"] )
event.pop( "Listening" )
if "Broadcasting" in event:
pubsub_signals_pubbing.extend( event["Broadcasting"] )
event.pop( "Broadcasting" )
campaign_dict["Events"].append( event )
trigger_list = None
[docs]def get_trigger_list():
global trigger_list
if get_schema():
# This needs to be fixed in the schema post-processor: maybe create a new idmTime:EventEnum and replace all the occurrences with a reference to that.
try:
trigger_list = get_schema()["idmTypes"]["idmAbstractType:EventCoordinator"]["BroadcastCoordinatorEvent"]["Broadcast_Event"]["enum"]
except Exception as ex:
trigger_list = get_schema()["idmTypes"]["idmType:IncidenceCounter"]["Trigger_Condition_List"]["Built-in"]
return trigger_list
[docs]def save( filename="campaign.json" ):
"""
Save 'camapign_dict' as 'filename'.
"""
#campaign_dict["ADHOCS"] = event_map
with open( filename, "w" ) as camp_file:
json.dump( campaign_dict, camp_file, sort_keys=True, indent=4 )
# For now we just print to screen the events discovered for human inspection.
# TBD: 1) Check for any published-but-not-listened events.
# TBD: 2) Check for any listened-but-not-published events -- but many events come from model, not campaign.
# TBD: 3) Discover ad-hoc events (those not in schema) and map to GP_EVENTS.
import copy
ignored_events = copy.deepcopy(set(pubsub_signals_pubbing))
non_camp_events = set()
if len( pubsub_signals_pubbing ) > 0:
print( "Campaign is publishing the following events:" )
for event in set( pubsub_signals_pubbing ):
print( event )
if len( pubsub_signals_subbing ) > 0:
print( "Campaign is listening to the following events:" )
for event in set(pubsub_signals_subbing):
if event in ignored_events:
ignored_events.remove( event )
else:
non_camp_events.add( event )
print( event )
if len( ignored_events ) > 0:
print( "Campaign is IGNORING the following events:" )
for event in set( ignored_events ):
print( event )
if len( non_camp_events ) > 0:
print( "WARNING: Campaign or Report is configured to LISTEN to the following non-campaign events:" )
for event in set( non_camp_events ):
print( event )
if event in get_adhocs() and not unsafe:
print( f"\nERROR: Report is configured to LISTEN to the following non-existent 'trigger': {event}" )
raise RuntimeError( "Please fix above error." )
return filename
[docs]def get_adhocs():
return event_map
[docs]def get_schema():
schema = None
if schema_path and not schema:
with open( schema_path ) as schema_file:
schema = json.load( schema_file )
return schema
[docs]def get_recv_trigger( trigger, old=use_old_adhoc_handling ):
"""
Get the correct representation of a trigger (also called signal or even event) that is being listened to.
"""
pubsub_signals_subbing.append( trigger )
return get_event( trigger, old )
[docs]def get_send_trigger( trigger, old=use_old_adhoc_handling ):
"""
Get the correct representation of a trigger (also called signal or even event) that is being broadcast.
"""
pubsub_signals_pubbing.append( trigger )
return get_event( trigger, old )
[docs]def get_event( event, old=False ):
"""
Basic placeholder functionality for now. This will map new ad-hoc events to GP_EVENTs and manage that 'cache'
If event in built-ins, return event, else if in adhoc map, return mapped event, else add to adhoc_map and return mapped event.
"""
if event is None or event == "":
raise ValueError( "campaign.get_event() called with an empty event. Please specify a string." )
return_event = None
global trigger_list
if trigger_list is None:
trigger_list = get_trigger_list()
if event in trigger_list:
return_event = event
elif event in event_map:
return_event = event_map[event]
else:
# get next entry in GP_EVENT_xxx
new_event_name = event if old else 'GP_EVENT_{:03d}'.format(len(event_map))
event_map[event] = new_event_name
return_event = event_map[event]
return return_event