Source code for emod_api.campaign

#!/usr/bin/env python
"""
You use this simple campaign builder by importing it, adding valid events via "add", and writing it out with "save".
"""

import json

schema_path = None
campaign_dict = {"Events": [], "Use_Defaults": 1}
pubsub_signals_subbing = []
pubsub_signals_pubbing = []
adhocs = []
custom_coordinator_events = []
custom_node_events = []
event_map = {}
use_old_adhoc_handling = False
unsafe = False
implicits = list()
trigger_list = None


[docs]def reset(): del (campaign_dict["Events"][:]) global pubsub_signals_subbing global pubsub_signals_pubbing global adhocs global event_map del (pubsub_signals_subbing[:]) del (pubsub_signals_pubbing[:]) del (adhocs[:]) del (custom_coordinator_events[:]) del (custom_node_events[:]) event_map = {} from emod_api import schema_to_class as s2c s2c.schema_cache = None del (implicits[:])
[docs]def set_schema(schema_path_in): """ Set the (path to) the schema file. And reset all campaign variables. This is essentially a "start_building_campaign" function. Args: schema_path_in. The path to a schema.json. Returns: Nothing """ reset() global schema_path schema_path = schema_path_in
[docs]def get_schema(): schema = None if schema_path: with open(schema_path) as schema_file: schema = json.load(schema_file) return schema
[docs]def add(event, name=None, first=False): """ Add a complete campaign event to the campaign builder. The new event is assumed to be a Python dict, and a valid event. The new event is not validated here. Set the first flag to True if this is the first event in a campaign because it functions as an accumulator and in some situations like sweeps it might have been used recently. """ event.finalize() if first: print("Use of 'first' flag is deprecated. Use set_schema to start build a new, empty campaign.") global campaign_dict campaign_dict["Events"] = [] if "Event_Name" not in event and name is not None: event["Event_Name"] = name if "Listening" in event: pubsub_signals_subbing.extend(event["Listening"]) event.pop("Listening") if "Broadcasting" in event: pubsub_signals_pubbing.extend(event["Broadcasting"]) event.pop("Broadcasting") campaign_dict["Events"].append(event)
[docs]def get_trigger_list(): global trigger_list if get_schema(): # This needs to be fixed in the schema post-processor: maybe create a new idmTime:EventEnum and replace # all the occurrences with a reference to that. try: trigger_list = get_schema()["idmTypes"]["idmAbstractType:EventCoordinator"]["BroadcastCoordinatorEvent"][ "Broadcast_Event"]["enum"] except Exception as ex: trigger_list = get_schema()["idmTypes"]["idmType:IncidenceCounter"]["Trigger_Condition_List"]["Built-in"] return trigger_list
[docs]def save(filename="campaign.json"): """ Save 'campaign_dict' as file named 'filename'. """ with open(filename, "w") as camp_file: json.dump(campaign_dict, camp_file, sort_keys=True, indent=4) import copy ignored_events = copy.deepcopy(set(pubsub_signals_pubbing)) non_camp_events = set() if len(pubsub_signals_subbing) > 0: for event in set(pubsub_signals_subbing): if event in ignored_events: ignored_events.remove(event) if len(non_camp_events) > 0: for event in set(non_camp_events): if event in get_adhocs() and not unsafe: raise RuntimeError(f"ERROR: Report is configured to LISTEN to the following non-existent event: \n" f"{event} \nPlease fix the error.\n") return filename
[docs]def get_adhocs(): return event_map
[docs]def get_custom_coordinator_events(): return list(set(custom_coordinator_events))
[docs]def get_custom_node_events(): return list(set(custom_node_events))
[docs]def get_recv_trigger(trigger, old=use_old_adhoc_handling): """ Get the correct representation of a trigger (also called signal or even event) that is being listened to. """ pubsub_signals_subbing.append(trigger) return get_event(trigger, old)
[docs]def get_send_trigger(trigger, old=use_old_adhoc_handling): """ Get the correct representation of a trigger (also called signal or even event) that is being broadcast. """ pubsub_signals_pubbing.append(trigger) return get_event(trigger, old)
[docs]def get_event(event, old=False): """ Basic placeholder functionality for now. This will map new ad-hoc events to GP_EVENTs and manage that 'cache' If event in built-ins, return event, else if in adhoc map, return mapped event, else add to adhoc_map and return mapped event. """ if event is None or event == "": raise ValueError("campaign.get_event() called with an empty event. Please specify a string.") return_event = None global trigger_list if trigger_list is None: trigger_list = get_trigger_list() if event in trigger_list: return_event = event elif event in event_map: return_event = event_map[event] else: # get next entry in GP_EVENT_xxx new_event_name = event if old else 'GP_EVENT_{:03d}'.format(len(event_map)) event_map[event] = new_event_name return_event = event_map[event] return return_event