Source code for emod_api.schema_to_class

import json
import os

from collections import OrderedDict

schema_cache = None


[docs]class ReadOnlyDict(OrderedDict): def __missing__(self, key): raise KeyError(f"'{key}' not found in this object. List of keys = {self.keys()}.") def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError(item) # to allow deepcopy (from s/o) def __setattr__(self, key, value): # if key not in self and "Config" not in key and "List" not in key: # these are lame; # find way in schema to initialize complex types {}, [], or null if key not in self: # these are lame; find way in schema to initialize complex types {}, [], or null self.__missing__(key) # this should not be necessary if value is None: return if "schema" not in self: print(f"DEBUG: No schema in node for param {key}.") else: if "type" in self["schema"][key]: if self["schema"][key]["type"] in ["integer", "float"]: if type(value) is str: raise ValueError(f"{value} is string when needs to be {self['schema'][key]['type']} " f"for parameter {key}.") elif value < self["schema"][key]["min"]: raise ValueError(f"{value} is below minimum {self['schema'][key]['min']} for parameter {key}.") elif value > self["schema"][key]["max"]: raise ValueError(f"{value} is above maximum {self['schema'][key]['max']} " f"for parameter {key}.") elif self["schema"][key]["type"] == "enum": if value not in self["schema"][key]["enum"]: raise ValueError(f"{value} is not list of possible values {self['schema'][key]['enum']} " f"for parameter {key}.") elif self["schema"][key]["type"] == "bool": if not any([value is False, value is True, value == 1, value == 0]): raise ValueError(f"value needs to be a bool for parameter {key}.") if value is False: value = 0 elif value is True: value = 1 elif "Vector" in self["schema"][key]["type"] and "idmType" not in self['schema'][key]['type']: if type(value) is not list: raise ValueError(f"Value needs to be a list for parameter {key}.") # if param is dependent on a param, set that param to the value. but needs to be recursive. if "depends-on" in self["schema"][key]: # set this value BUT NOT IF key == Simulation_Type! for k, v in dict(self["schema"][key]["depends-on"]).items(): if k == "Simulation_Type": if k not in self.keys(): # not supported yet for campaigns; need to provide campaign blobs access to config... continue elif self["Simulation_Type"] in [x.strip() for x in v.split(',')]: pass else: raise ValueError(f"ERROR: Simulation_Type needs to be one of {v} for you to be able to " f"set {key} to {value} but it seems to be {self.Simulation_Type}.") else: if type(v) is str and len(v.split(',')) > 1: v = v.split(',')[0] # pretty arbitrary but least arbitrary of options it seems if self["schema"][k]['default'] == self[k]: # only set implicit value if default (i.e. user didn't set it) self.__setattr__(k, v) if "implicits" not in self: # This should NOT be needed self["implicits"] = [] self["implicits"].append(key) if "default" in self["schema"][key] and self["schema"][key]["default"] == value and value == self[key]: return self[key] = value if "explicits" not in self: # This should NOT be needed self["explicits"] = [] self["explicits"].append(key) return
[docs] def set_schema(self, schema): """ Add schema node. """ self["schema"] = schema
[docs] def to_file(self, config_name="config.json"): """ Write 'clean' config file out to disk as json. Param: config_name (defaults to 'config.json') """ config = self.finalize() with open(config_name, "w") as config_file: json.dump(config, config_file, indent=4, sort_keys=True)
[docs] def finalize(self, show_warnings: bool = False): """ Remove all params that are disabled by depends-on param being off and schema node. """ nuke_list = [] for key, v in self.items(): finalized_keys = [] if type(v) is ReadOnlyDict and "schema" in v.keys(): v.finalize() # experimental recursive code elif type(v) is list and len(v) > 0 and type(v[0]) is ReadOnlyDict and "schema" in v[0].keys(): for elem in v: elem.finalize() # experimental recursive code if key in ["schema", "explicits", "implicits"]: continue elif key not in self["schema"]: if show_warnings: print(f"WARNING: During schema-based param purge, {key} not in schema.") elif "depends-on" in self["schema"][key]: def purge_key(key): if key not in self.keys() or "depends-on" not in self["schema"][key]: return for dep_k, dep_v in dict(self["schema"][key]["depends-on"]).items(): if dep_k not in nuke_list and dep_k not in finalized_keys: purge_key(dep_k) # careful if type(dep_v) is str: vals = [x.strip() for x in str(dep_v).split(',')] if dep_k in self and self[dep_k] not in vals: nuke_list.append(key) else: if self[dep_k] != dep_v or dep_k in nuke_list: nuke_list.append(key) finalized_keys.append(key) purge_key(key) elif v == "UNINITIALIZED STRING": # work around current schema string defaults self[key] = "" # Logging parameters are implicit and only need to be retained if other than default if "logLevel_default" in self.keys(): ll_default = self["logLevel_default"] for key, val in self.items(): if key.startswith("logLevel_") and val == ll_default and key != "logLevel_default": nuke_list.append(key) if "Actual_IndividualIntervention_Config" in self.keys() and "Actual_NodeIntervention_Config" in self.keys(): # Need to purge one of these; yes this could be done cleverer but this is easy to follow and maintain ind_len = len(self["Actual_IndividualIntervention_Config"]) nod_len = len(self["Actual_NodeIntervention_Config"]) if ind_len > 0 and nod_len == 0: self.pop("Actual_NodeIntervention_Config") elif nod_len > 0 and ind_len == 0: self.pop("Actual_IndividualIntervention_Config") else: raise ValueError("We have both Actual_IndividualIntervention_Config " "and Actual_NodeIntervention_Config set.") # Note that nuke_list is not a set and is typically full of duplicates. There are many ways to de-dupe. for nuke_key in nuke_list: if nuke_key in self.keys(): if "explicits" in self and nuke_key in self['explicits']: raise ValueError(f"You set param {nuke_key} but it was disabled and is not being used.") self.pop(nuke_key) if "implicits" in self: self.pop("implicits") if "explicits" in self: self.pop("explicits") try: self.pop("schema") except Exception as ex: raise ValueError(f"ERROR: Something bad happened during finalize: {ex}.") return self
[docs]def clear_schema_cache(): """ Clear cached version of the schema. """ global schema_cache schema_cache = None return None
[docs]def get_class_with_defaults(classname, schema_path=None, schema_json=None, show_warnings: bool=False): """ Return the default parameter values for a datatype defined in schema. Args: classname (str): Name of target datatype schema_path (str): Filename of schema: DEPRECATED, use schema_json instead schema_json (dict): Contents of schema file show_warnings (bool): Show warning associated with NodeSet Returns: (dict): Default parameter values for requested datatype """ def get_schema(schema_path=None, schema_json=None): global schema_cache schema_ret = None # Prefer schema-as-dict if provided if schema_json: schema_cache = schema_json schema_ret = schema_json # Check cache elif schema_cache: schema_ret = schema_cache # Then check file path elif schema_path: if not os.path.exists(schema_path): raise ValueError(f"ERROR: No file found at {schema_path}. " f"A valid schema path needs to exist at the path specified.") with open(schema_path) as file: schema_val = json.load(file) schema_cache = schema_val schema_ret = schema_val else: raise ValueError("A valid schema path needs to be specified.") return schema_ret def get_default(schema_blob, key, schema): default = None try: if "default" in schema_blob[key]: default = schema_blob[key]["default"] elif "Vector2d idmType:AdditionalRestrictions" in schema_blob[key]["type"]: default = list() elif "idmType:" in schema_blob[key]["type"]: default = get_class_with_defaults(schema_blob[key]["type"], schema_json=schema) except Exception as ex: raise ValueError(f"ERROR for key '{key}': {ex}") return default # Depending on the schema, a WaningEffect may be an abstract type or a # concrete type. If the text 'WaningEffect' is part of any of the keys in # idmType:WaningEffect, then the schema is using WaningEffect as an # abstract type, and uses_old_waning should return True. def uses_old_waning(schema_idm): waning_effects = schema_idm["idmType:WaningEffect"].keys() return any(["WaningEffect" in k for k in waning_effects]) schema = get_schema(schema_path, schema_json) schema_blob = None ret_json = dict() assert "idmTypes" in schema.keys() schema_idm = schema["idmTypes"] if "campaignevent" in classname.lower(): if classname in schema_idm["idmAbstractType:CampaignEvent"].keys(): schema_blob = schema_idm["idmAbstractType:CampaignEvent"][classname] ret_json["class"] = schema_blob["class"] for ce_key in schema_blob.keys(): if ce_key == "class": continue try: if "default" in schema_blob[ce_key] and schema_blob[ce_key]["default"] != "null": ret_json[ce_key] = schema_blob[ce_key]["default"] elif ce_key == "Nodeset_Config": # this doesn't look a real pattern ret_json[ce_key] = get_class_with_defaults("NodeSetAll", schema_json=schema) elif "type" in schema_blob[ce_key]: ret_json[ce_key] = get_class_with_defaults(schema_blob[ce_key]["type"], schema_json=schema) elif ce_key != "class": ret_json[ce_key] = {} except Exception as ex: raise ValueError(f"ERROR: {ex}") elif "coordinator" in classname.lower() and classname.lower() != "broadcastcoordinatoreventfromnode": for ec_name in schema_idm["idmAbstractType:EventCoordinator"].keys(): if ec_name == classname or classname.replace("EventCoordinator", "") in ec_name: schema_blob = schema_idm["idmAbstractType:EventCoordinator"][ec_name] ret_json["class"] = schema_blob["class"] for ec_key in schema_blob.keys(): if ec_key == "class" or ec_key == "Sim_Types": continue ret_json[ec_key] = get_default(schema_blob, ec_key, schema) break # once we find it, stop elif ("idmType:AdditionalRestrictions" == classname): ret_json = dict() elif ("idmType:WaningEffect" == classname and uses_old_waning(schema_idm)): ret_json = dict() elif "idmType:" in classname: if classname in schema_idm.keys(): schema_blob = schema_idm[classname] if type(schema_blob) is list: ret_json = list() schema_blob = schema_blob[0] # schema_blob might be dict or list if schema_blob is None: raise ValueError(f"Wow. That's super-bad. {classname} is in schema but schema is null." f"Must be LarvalHabitat?") else: new_elem = dict() for type_key in schema_blob.keys(): if type_key.startswith("<"): continue try: if "default" in schema_blob[type_key] and schema_blob[type_key]["default"] != "null": new_elem[type_key] = schema_blob[type_key]["default"] elif "min" in schema_blob[type_key] and schema_blob[type_key]["min"] != "null": new_elem[type_key] = schema_blob[type_key]["min"] elif "type" in schema_blob[type_key]: new_elem[type_key] = get_class_with_defaults(schema_blob[type_key]["type"], schema_json=schema) elif type_key != "class": new_elem[type_key] = {} except Exception as ex: raise ValueError(f"ERROR: {ex}") if type(ret_json) is list: if new_elem: ret_json.append(new_elem) else: ret_json.update(new_elem) else: raise ValueError(f"ERROR: '{classname}' not found in schema.") elif classname == "WaningEffect": # Typical usage is recursive with classname == "idmType:WaningEffect" # Only here when directly calling with "WaningEffect" (probably in tests) assert not uses_old_waning(schema_idm) schema_blob = schema_idm["idmType:WaningEffect"] ret_json["class"] = classname for effect in schema_blob.keys(): ret_json[effect] = get_default(schema_blob, effect, schema) elif "waning" in classname.lower(): # Only used when there are multiple waning effect options assert uses_old_waning(schema_idm) if classname in schema_idm["idmType:WaningEffect"].keys(): schema_blob = schema_idm["idmType:WaningEffect"][classname] ret_json["class"] = schema_blob["class"] for wan_key in schema_blob.keys(): if wan_key == "class": continue ret_json[wan_key] = get_default(schema_blob, wan_key, schema) elif ("idmType:AdditionalRestrictions" in schema_idm.keys() and classname in schema_idm["idmType:AdditionalRestrictions"].keys()): # Only used if the class is in idmType:AdditionalRestrictions schema_blob = schema_idm["idmType:AdditionalRestrictions"][classname] ret_json["class"] = schema_blob["class"] for tar_key in schema_blob.keys(): if tar_key in ["class", "Sim_Types", "Vector2d idmType:AdditionalRestrictions"]: continue ret_json[tar_key] = get_default(schema_blob, tar_key, schema) elif "nodeset" in classname.lower(): if classname in schema_idm["idmAbstractType:NodeSet"].keys(): schema_blob = schema_idm["idmAbstractType:NodeSet"][classname] ret_json["class"] = schema_blob["class"] for ns_key in schema_blob.keys(): if ns_key == "class": continue try: if "default" in schema_blob[ns_key]: ret_json[ns_key] = schema_blob[ns_key]["default"] elif "type" in schema_blob[ns_key]: if schema_blob[ns_key]["type"] == "idmType:NodeListConfig": # hack for now, might be schema bug ret_json[ns_key] = [] elif "Vector" in schema_blob[ns_key]["type"]: ret_json[ns_key] = [] else: raise ValueError(f"'type' not found in schema_blob[{ns_key}].") else: if show_warnings: print(f"WARNING: Not setting default for NodeSet key {ns_key}.") except Exception as ex: raise ValueError(f"ERROR: Exception caught while processing {ns_key} in NodeSet family." f"Exception: {ex}") elif ("idmType:IReport" in schema_idm and classname in schema_idm["idmType:IReport"].keys()): schema_blob = schema_idm["idmType:IReport"][classname] ret_json["class"] = schema_blob["class"] for ce_key in schema_blob.keys(): if ce_key == "class": continue try: if "default" in schema_blob[ce_key] and schema_blob[ce_key]["default"] != "null": ret_json[ce_key] = schema_blob[ce_key]["default"] elif ce_key == "Nodeset_Config": # this doesn't look a real pattern ret_json[ce_key] = get_class_with_defaults("NodeSetAll", schema_json=schema) elif ce_key != "class": ret_json[ce_key] = {} except Exception as ex: raise ValueError(f"ERROR: {ex}") else: # Looking for NodeIntervention or IndividualIntervention for iv_type in schema_idm["idmAbstractType:Intervention"].keys(): if classname in schema_idm["idmAbstractType:Intervention"][iv_type].keys(): schema_blob = schema_idm["idmAbstractType:Intervention"][iv_type][classname] ret_json["class"] = schema_blob["class"] for iv_key in schema_blob.keys(): if (iv_key == "class" or iv_key == "iv_type"): continue try: if "default" in schema_blob[iv_key]: ret_json[iv_key] = schema_blob[iv_key]["default"] elif "type" in schema_blob[iv_key]: idmtype = schema_blob[iv_key]["type"] if "idmAbstractType:" in idmtype: ret_json[iv_key] = dict() elif "Vector" in idmtype: ret_json[iv_key] = [] elif "String" in idmtype: ret_json[iv_key] = "" elif "idmType:" in idmtype: ret_json[iv_key] = get_class_with_defaults(idmtype, schema_json=schema) elif "List" in iv_key: # a bit lame: to handle Intervention_List which has bad schema bug ret_json[iv_key] = [] else: raise ValueError(f"Don't know how to make default for type {idmtype}.") elif iv_key not in ["Sim_Types"]: # very small whitelist of keys that are allowed to be ignored by this process. continue except Exception as ex: raise ValueError(f"ERROR: Exception caught while processing {iv_key} in Intervention family." f"Exception: {ex}") break if bool(ret_json) is False: raise ValueError(f"Failed to find {classname} in schema.") ret_this = ret_json # If non-empty dict, add schema if (type(ret_json) is dict and ret_json): ret_this = ReadOnlyDict(ret_json) ret_this.set_schema(schema_blob) return ret_this