190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460 | def get_class_with_defaults(classname, schema_path=None, schema_json=None):
"""
Return the default parameter values for a datatype defined in schema.
Args:
classname (str): Name of target datatype
schema_path (str): Filename of schema: DEPRECATED, use schema_json instead
schema_json (dict): Contents of schema file
Returns:
(dict): Default parameter values for requested datatype
"""
# Function should be removed after schema_path argument is removed
def get_schema(schema_path=None, schema_json=None):
global schema_cache
global _schema_path
schema_ret = None
# Prefer schema-as-dict if provided
if schema_json:
schema_ret = schema_json
# Then check file path
elif schema_path:
if not os.path.exists(schema_path):
raise ValueError(f"ERROR: No file found at {schema_path}. "
f"A valid schema path needs to exist at the path specified.")
if schema_cache is None or _schema_path != schema_path:
with open(schema_path) as file:
schema_val = json.load(file)
schema_cache = schema_val
schema_ret = schema_val
_schema_path = schema_path
else:
schema_ret = schema_cache
else:
raise ValueError("A valid schema_path or schema_json needs to be specified.")
return schema_ret
# Evaluate default value; assumes default containers are empty
def eval_default(default_val):
if (type(default_val) is dict):
return dict()
elif (type(default_val) is list):
return list()
else:
return default_val
# Assign default value from schema
def get_default(schema_obj, schema):
default = None
try:
if ("default" in schema_obj):
default = eval_default(schema_obj["default"])
elif ("type" in schema_obj):
type_name = schema_obj["type"]
if ("Vector2d idmType:AdditionalRestrictions" in type_name):
default = list()
else:
default = get_class_with_defaults(type_name, schema_json=schema)
except Exception as ex:
raise ValueError(f"ERROR for object: {schema_obj}: {ex}")
return default
# Depending on the schema, a WaningEffect may be an abstract type or a
# concrete type. If the text 'WaningEffect' is part of any of the keys in
# idmType:WaningEffect, then the schema is using WaningEffect as an
# abstract type, and uses_old_waning should return True.
def uses_old_waning(schema_idm):
if ("idmType:WaningEffect" not in schema_idm):
return True
waning_effects = schema_idm["idmType:WaningEffect"].keys()
return any(["WaningEffect" in k for k in waning_effects])
schema = get_schema(schema_path, schema_json)
schema_blob = None
ret_json = dict()
schema_idm = schema["idmTypes"]
abstract_key1 = "idmAbstractType:CampaignEvent"
abstract_key2 = "idmAbstractType:EventCoordinator"
abstract_key3a = "idmAbstractType:IReport"
abstract_key3b = "idmType:IReport"
abstract_key4 = "idmAbstractType:NodeSet"
abstract_key5a = "idmAbstractType:WaningEffect"
abstract_key5b = "idmType:WaningEffect"
abstract_key6a = "idmAbstractType:AdditionalRestrictions"
abstract_key6b = "idmType:AdditionalRestrictions"
abstract_key7 = "idmAbstractType:IndividualIntervention"
abstract_key8 = "idmAbstractType:NodeIntervention"
abstract_key9 = "idmAbstractType:Intervention"
# Abstract types get initialized to empty object
if (classname.startswith("idmAbstractType")):
ret_json = dict()
# IReport is an abstract type, but incorrectly named
# abstract_key3b = "idmType:IReport"
elif (classname == abstract_key3b):
ret_json = dict()
# Waning effect (old style) is an abstract type, but incorrectly named
# abstract_key5b = "idmType:WaningEffect"
elif (classname == abstract_key5b and uses_old_waning(schema_idm)):
ret_json = dict()
# AdditionalRestriction is an abstract type, but incorrectly named
# abstract_key6b = "idmType:AdditionalRestrictions"
elif (classname == abstract_key6b):
ret_json = dict()
# Check if class is CampaignEvent type
# abstract_key1 = "idmAbstractType:CampaignEvent"
elif (abstract_key1 in schema_idm and classname in schema_idm[abstract_key1]):
schema_blob = schema_idm[abstract_key1][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# Check if class is EventCoordinator type
# abstract_key2 = "idmAbstractType:EventCoordinator"
elif (abstract_key2 in schema_idm and classname in schema_idm[abstract_key2]):
schema_blob = schema_idm[abstract_key2][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# Check if class is IReport type
# abstract_key3a = "idmAbstractType:IReport"
elif (abstract_key3a in schema_idm and classname in schema_idm[abstract_key3a]):
schema_blob = schema_idm[abstract_key3a][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# abstract_key3b = "idmType:IReport"
elif (abstract_key3b in schema_idm and classname in schema_idm[abstract_key3b]):
schema_blob = schema_idm[abstract_key3b][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# Check if class is NodeSet type
# abstract_key4 = "idmAbstractType:NodeSet"
elif (abstract_key4 in schema_idm and classname in schema_idm[abstract_key4]):
schema_blob = schema_idm[abstract_key4][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# Check if class is WaningEffect (old style) type
# abstract_key5a = "idmAbstractType:WaningEffect"
elif (abstract_key5a in schema_idm and classname in schema_idm[abstract_key5a]):
schema_blob = schema_idm[abstract_key5a][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# abstract_key5b = "idmType:WaningEffect"
elif (abstract_key5b in schema_idm and classname in schema_idm[abstract_key5b]):
schema_blob = schema_idm[abstract_key5b][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# Check if class is AdditionalRestriction type
# abstract_key6a = "idmAbstractType:AdditionalRestrictions"
elif (abstract_key6a in schema_idm and classname in schema_idm[abstract_key6a]):
schema_blob = schema_idm[abstract_key6a][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types", "Vector2d idmType:AdditionalRestrictions"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# abstract_key6b = "idmType:AdditionalRestrictions"
elif (abstract_key6b in schema_idm and classname in schema_idm[abstract_key6b]):
schema_blob = schema_idm[abstract_key6b][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types", "Vector2d idmType:AdditionalRestrictions"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# abstract_key7 = "idmAbstractType:IndividualIntervention"
elif (abstract_key7 in schema_idm and classname in schema_idm[abstract_key7]):
schema_blob = schema_idm[abstract_key7][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# abstract_key8 = "idmAbstractType:NodeIntervention"
elif (abstract_key8 in schema_idm and classname in schema_idm[abstract_key8]):
schema_blob = schema_idm[abstract_key8][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
# Check if class is an idmType
elif (classname.startswith("idmType:")):
if classname in schema_idm:
schema_blob = schema_idm[classname]
if type(schema_blob) is list:
ret_json = list()
schema_blob = schema_blob[0]
if ('NodeListConfig' in classname): # KF: Need to remove NodeListConfig
schema_blob = dict()
new_elem = dict()
for type_key in schema_blob.keys():
if type_key.startswith("<"):
continue
try:
if "default" in schema_blob[type_key]:
new_elem[type_key] = eval_default(schema_blob[type_key]["default"])
elif "min" in schema_blob[type_key]:
new_elem[type_key] = schema_blob[type_key]["min"]
elif "type" in schema_blob[type_key]:
new_elem[type_key] = get_class_with_defaults(schema_blob[type_key]["type"], schema_json=schema)
elif type_key != "class":
new_elem[type_key] = dict()
except Exception as ex:
raise ValueError(f"ERROR: {ex}")
if type(ret_json) is list:
if new_elem:
ret_json.append(new_elem)
else:
ret_json.update(new_elem)
else:
raise ValueError(f"ERROR: '{classname}' not found in schema.")
# Looking for NodeIntervention or IndividualIntervention
# abstract_key9 = "idmAbstractType:Intervention"
else:
for iv_type in schema_idm[abstract_key9].keys():
if classname in schema_idm[abstract_key9][iv_type].keys():
schema_blob = schema_idm[abstract_key9][iv_type][classname]
ret_json["class"] = schema_blob["class"]
for key_str in schema_blob.keys():
if key_str in ["class", "Sim_Types"]:
continue
ret_json[key_str] = get_default(schema_blob[key_str], schema)
if bool(ret_json) is False:
raise ValueError(f"Failed to find {classname} in schema.")
ret_this = ret_json
# If non-empty dict, add schema
if (type(ret_json) is dict and ret_json):
ret_this = ReadOnlyDict(ret_json)
ret_this.set_schema(schema_blob)
return ret_this
|