Source code for emod_api.schema_to_class

import json
import os
import pdb

from collections import OrderedDict

schema_cache = None
show_warnings = True


[docs]def disable_warnings(): """ Turn off warnings to console. These can get very verbose. """ show_warnings = False
[docs]class ReadOnlyDict(OrderedDict): def __missing__(self, key): raise KeyError(f"'{key}' not found in this object. List of keys = {self.keys()}.") def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError(item) # to allow deepcopy (from s/o) def __setattr__(self, key, value): # if key not in self and "Config" not in key and "List" not in key: # these are lame; # find way in schema to initialize complex types {}, [], or null if key not in self: # these are lame; find way in schema to initialize complex types {}, [], or null self.__missing__(key) # this should not be necessary if value is None: return if "schema" not in self: print( f"DEBUG: No schema in node for param {key}." ) else: if "type" in self["schema"][key]: if self["schema"][key]["type"] in [ "integer", "float" ]: if type(value) is str: raise ValueError(f"{value} is string when needs to be {self['schema'][key]['type']} " f"for parameter {key}.") elif value < self["schema"][key]["min"]: raise ValueError(f"{value} is below minimum {self['schema'][key]['min']} for parameter {key}.") elif value > self["schema"][key]["max"]: raise ValueError(f"{value} is above maximum {self['schema'][key]['max']} " f"for parameter {key}.") elif self["schema"][key]["type"] == "enum": if value not in self["schema"][key]["enum"]: raise ValueError(f"{value} is not list of possible values {self['schema'][key]['enum']} " f"for parameter {key}.") elif self["schema"][key]["type"] == "bool": if not any([value is False, value is True, value == 1, value == 0]): raise ValueError(f"value needs to be a bool for parameter {key}.") if value is False: value = 0 elif value is True: value = 1 elif "Vector" in self["schema"][key]["type"] and "idmType" not in self['schema'][key]['type']: if type(value) is not list: raise ValueError(f"Value needs to be a list for parameter {key}.") # if param is dependent on a param, set that param to the value. but needs to be recursive. if "depends-on" in self["schema"][key]: # set this value BUT NOT IF key == Simulation_Type! for k,v in dict(self["schema"][key]["depends-on"]).items(): if k == "Simulation_Type": if k not in self.keys(): # not supported yet for campaigns; need to provide campaign blobs access to config... continue elif self["Simulation_Type"] in [x.strip() for x in v.split(',')]: pass else: raise ValueError(f"ERROR: Simulation_Type needs to be one of {v} for you to be able to " f"set {key} to {value} but it seems to be {self.Simulation_Type}.") else: if type(v) is str and len(v.split(',')) > 1: v = v.split(',')[0] # pretty arbitrary but least arbitrary of options it seems if self["schema"][k]['default'] == self[k]: # only set implicit value if default (i.e. user didn't set it) self.__setattr__(k, v) if "implicits" not in self: # This should NOT be needed self["implicits"] = [] self["implicits"].append(key) if "default" in self["schema"][key] and self["schema"][key]["default"] == value and value == self[key]: return self[key] = value if "explicits" not in self: # This should NOT be needed self["explicits"] = [] self["explicits"].append(key) return
[docs] def set_schema(self, schema): """ Add schema node. """ self["schema"] = schema
[docs] def to_file(self, config_name="config.json"): """ Write 'clean' config file out to disk as json. Param: config_name (defaults to 'config.json') """ config = self.finalize() with open( config_name, "w" ) as config_file: json.dump( config, config_file, indent=4, sort_keys=True )
[docs] def finalize(self): """ Remove all params that are disabled by depends-on param being off and schema node. """ nuke_list = [] for key, v in self.items(): finalized_keys = [] if type(v) is ReadOnlyDict and "schema" in v.keys(): v.finalize() # experimental recursive code elif type(v) is list and len(v)>0 and type(v[0]) is ReadOnlyDict and "schema" in v[0].keys(): for elem in v: elem.finalize() # experimental recursive code if key in [ "schema", "explicits", "implicits" ]: continue elif key not in self["schema"]: if show_warnings: print(f"WARNING: During schema-based param purge, {key} not in schema.") elif "depends-on" in self["schema"][key]: def purge_key(key): if key not in self.keys() or "depends-on" not in self["schema"][key]: return for dep_k,dep_v in dict(self["schema"][key]["depends-on"]).items(): if dep_k not in nuke_list and dep_k not in finalized_keys: purge_key( dep_k ) # careful if type(dep_v) is str: vals = [x.strip() for x in str(dep_v).split(',')] if dep_k in self and self[dep_k] not in vals: nuke_list.append(key) else: if self[dep_k] != dep_v or dep_k in nuke_list: nuke_list.append(key) finalized_keys.append( key ) purge_key( key ) elif v == "UNINITIALIZED STRING": # work around current schema string defaults self[key] = "" # Logging parameters are implicit and only need to be retained if other than default if "logLevel_default" in self.keys(): ll_default = self["logLevel_default"] for key, val in self.items(): if key.startswith("logLevel_") and val == ll_default and key != "logLevel_default": nuke_list.append(key) if "Actual_IndividualIntervention_Config" in self.keys() and "Actual_NodeIntervention_Config" in self.keys(): # Need to purge one of these; yes this could be done cleverer but this is easy to follow and maintain ind_len = len( self["Actual_IndividualIntervention_Config"] ) nod_len = len( self["Actual_NodeIntervention_Config"] ) if ind_len > 0 and nod_len == 0: self.pop( "Actual_NodeIntervention_Config" ) elif nod_len > 0 and ind_len == 0: self.pop( "Actual_IndividualIntervention_Config" ) else: raise ValueError("We have both Actual_IndividualIntervention_Config " "and Actual_NodeIntervention_Config set.") # Note that nuke_list is not a set and is typically full of duplicates. There are many ways to de-dupe. for nuke_key in nuke_list: if nuke_key in self.keys(): if "explicits" in self and nuke_key in self['explicits']: raise ValueError(f"You set param {nuke_key} but it was disabled and is not being used.") self.pop(nuke_key) if "implicits" in self: self.pop("implicits") if "explicits" in self: self.pop( "explicits" ) try: self.pop("schema") except Exception as ex: raise ValueError(f"ERROR: Something bad happened during finalize: {ex}.") return self
[docs]def uses_old_waning(schema_path=None): global schema_cache if schema_path is not None: schema_cache = None waning_effects = get_schema(schema_path)["idmTypes"]["idmType:WaningEffect"].keys() return any(["WaningEffect" in k for k in waning_effects])
[docs]def get_schema( schema_path=None ): global schema_cache if schema_cache is None: if schema_path is None: schema_path = "schema.json" if type(schema_path) is not dict: if not os.path.exists(schema_path): raise ValueError(f"ERROR: No file found at {schema_path}. " f"A valid schema path needs to exist at the path specified.") with open(schema_path) as file: schema = json.load(file) schema_cache = schema else: schema = schema_path else: schema = schema_cache return schema
[docs]def get_class_with_defaults( classname, schema_path=None ): """ Returns the default config for a datatype in the schema. """ schema = get_schema(schema_path) ret_json = {} # there are some types that are actually arrays!? schema_blob = None def get_default( schema, key, types_schema ): default = None try: if "default" in schema[key]: default = schema[key]["default"] elif "idmType:" in schema[key]["type"]: idmtype = schema[key]["type"] default = get_class_with_defaults( idmtype, types_schema ) except Exception as ex: raise ValueError(f"ERROR for key '{key}': {ex}") return default if "campaignevent" in classname.lower(): if classname in schema["idmTypes"]["idmAbstractType:CampaignEvent"].keys(): schema_blob = schema["idmTypes"]["idmAbstractType:CampaignEvent"][classname] ret_json["class"] = schema_blob["class"] for ce_key in schema_blob.keys(): if ce_key == "class": continue try: if "default" in schema_blob[ce_key] and schema_blob[ce_key]["default"] != "null": ret_json[ce_key] = schema_blob[ce_key]["default"] elif ce_key == "Nodeset_Config": # this doesn't look a real pattern ret_json[ce_key] = get_class_with_defaults( "NodeSetAll", schema_path ) elif "type" in schema_blob[ce_key]: ret_json[ce_key] = get_class_with_defaults( schema_blob[ce_key]["type"], schema_path ) elif ce_key != "class": ret_json[ce_key] = {} except Exception as ex: raise ValueError(f"ERROR: {ex}") elif "coordinator" in classname.lower(): for ec_name in schema["idmTypes"]["idmAbstractType:EventCoordinator"].keys(): if ec_name == classname or classname.replace("EventCoordinator", "") in ec_name: schema_blob = schema["idmTypes"]["idmAbstractType:EventCoordinator"][ec_name] ret_json["class"] = schema_blob["class"] for ec_key in schema_blob.keys(): if ec_key == "class" or ec_key == "Sim_Types": continue ret_json[ec_key] = get_default(schema_blob, ec_key, schema["idmTypes"]) break # once we find it, stop elif "idmType:" in classname: # Might be full schema or might just be the idmTypes segment. Got to handle both. I think. if "idmTypes" in schema.keys(): schema = schema["idmTypes"] if classname in schema.keys(): schema_blob = schema[classname] if type(schema_blob) is list: ret_json = list() schema_blob = schema_blob[0] # schema_blob might be dict or list if schema_blob is None: raise ValueError(f"Wow. That's super-bad. {classname} is in schema but schema is null." f"Must be LarvalHabitat?") else: new_elem = dict() for type_key in schema_blob.keys(): if type_key.startswith( "<" ): continue try: if "default" in schema_blob[type_key] and schema_blob[type_key]["default"] != "null": new_elem[type_key] = schema_blob[type_key]["default"] elif "min" in schema_blob[type_key] and schema_blob[type_key]["min"] != "null": new_elem[type_key] = schema_blob[type_key]["min"] elif "type" in schema_blob[type_key]: new_elem[type_key] = get_class_with_defaults( schema_blob[type_key]["type"], schema ) elif type_key != "class": new_elem[type_key] = {} except Exception as ex: raise ValueError(f"ERROR: {ex}") if type(ret_json) is list: if new_elem: ret_json.append( new_elem ) else: ret_json.update( new_elem ) else: raise ValueError(f"ERROR: '{classname}' not found in schema.") elif classname == "WaningEffect": schema_blob = schema["idmTypes"]["idmType:WaningEffect"] ret_json["class"] = classname for effect in schema_blob.keys(): ret_json[effect] = get_default(schema_blob, effect, schema["idmTypes"]) elif "waning" in classname.lower(): if classname in schema["idmTypes"]["idmType:WaningEffect"].keys(): schema_blob = schema["idmTypes"]["idmType:WaningEffect"][classname] ret_json["class"] = schema_blob["class"] for wan_key in schema_blob.keys(): if wan_key == "class": continue ret_json[wan_key] = get_default( schema_blob, wan_key, schema["idmTypes"]) # IF the class is in idmType:AdditionalRestrictions, use this section elif "idmTypes" in schema.keys() and "idmType:AdditionalRestrictions" in schema["idmTypes"].keys() and classname in \ schema["idmTypes"]["idmType:AdditionalRestrictions"].keys(): schema_blob = schema["idmTypes"]["idmType:AdditionalRestrictions"][classname] ret_json["class"] = schema_blob["class"] for tar_key in schema_blob.keys(): if tar_key in [ "class", "Sim_Types" ]: continue ret_json[tar_key] = get_default( schema_blob, tar_key, schema["idmTypes"]) elif "nodeset" in classname.lower(): if classname in schema["idmTypes"]["idmAbstractType:NodeSet"].keys(): schema_blob = schema["idmTypes"]["idmAbstractType:NodeSet"][classname] ret_json["class"] = schema_blob["class"] for ns_key in schema_blob.keys(): if ns_key == "class": continue try: if "default" in schema_blob[ns_key]: ret_json[ns_key] = schema_blob[ns_key]["default"] elif "type" in schema_blob[ns_key]: if schema_blob[ns_key]["type"] == "idmType:NodeListConfig": # hack for now, might be schema bug ret_json[ns_key] = [] elif "Vector" in schema_blob[ns_key]["type"]: ret_json[ns_key] = [] else: raise ValueError(f"'type' not found in schema_blob[{ns_key}].") else: if show_warnings: print( f"WARNING: Not setting default for NodeSet key {ns_key}." ) except Exception as ex: raise ValueError(f"ERROR: Exception caught while processing {ns_key} in NodeSet family." f"Exception: {ex}") elif "idmTypes" in schema and "idmType:IReport" in schema["idmTypes"] and classname in schema["idmTypes"][ "idmType:IReport"].keys(): schema_blob = schema["idmTypes"]["idmType:IReport"][classname] ret_json["class"] = schema_blob["class"] for ce_key in schema_blob.keys(): if ce_key == "class": continue try: if "default" in schema_blob[ce_key] and schema_blob[ce_key]["default"] != "null": ret_json[ce_key] = schema_blob[ce_key]["default"] elif ce_key == "Nodeset_Config": # this doesn't look a real pattern ret_json[ce_key] = get_class_with_defaults( "NodeSetAll", schema_path ) elif ce_key != "class": ret_json[ce_key] = {} except Exception as ex: raise ValueError(f"ERROR: {ex}") else: for iv_type in schema["idmTypes"]["idmAbstractType:Intervention"].keys(): if classname in schema["idmTypes"]["idmAbstractType:Intervention"][iv_type].keys(): schema_blob = schema["idmTypes"]["idmAbstractType:Intervention"][iv_type][classname] ret_json["class"] = schema_blob["class"] for iv_key in schema_blob.keys(): if any( [ iv_key == "class", iv_key == "iv_type" ] ): continue try: if "default" in schema_blob[iv_key]: ret_json[iv_key] = schema_blob[iv_key]["default"] elif "_Config" in iv_key and iv_key.count("_") > 1: # this sucks, looking for things like Actual_IndividualIntervention_Config # and Positive_Diagnosis_Config ret_json[iv_key] = {} elif "type" in schema_blob[iv_key]: idmtype = schema_blob[iv_key]["type"] if "Vector" in idmtype: ret_json[iv_key] = [] elif "String" in idmtype: ret_json[iv_key] = "" elif "idmType:" in schema_blob[iv_key]["type"]: ret_json[iv_key] = get_class_with_defaults( idmtype, schema["idmTypes"] ) elif "List" in iv_key: # a bit lame: to handle Intervention_List which has bad schema bug ret_json[iv_key] = [] else: raise ValueError(f"Don't know how to make default for type {idmtype}.") elif iv_key not in ["Sim_Types"]: # very small whitelist of keys that are allowed to be ignored by this process. continue except Exception as ex: raise ValueError(f"ERROR: Exception caught while processing {iv_key} in Intervention family." f"Exception: {ex}") break if bool(ret_json) is False: raise ValueError(f"Failed to find {classname} in schema.") ret_this = ret_json if type(ret_json) is dict: ret_this = ReadOnlyDict(ret_json) if ret_this: # there is an edge-case where the returned dict is empty and we don't want/need to add to the schema. ret_this.set_schema(schema_blob) return ret_this