#! /usr/bin/env python3
"""Command line utility for plotting property reports."""
import argparse
from functools import reduce
import json
from pathlib import Path
from typing import Dict, List
import matplotlib.pyplot as plt
import numpy as np
from emod_api.channelreports.utils import read_json_file, get_report_channels, accumulate_channel_data, save_to_csv, plot_traces
from emod_api.channelreports.utils import _validate_property_report_channels, _validate_property_report_ips
[docs]def main(args: argparse.Namespace):
"""
Plot specified property report with the given options.
"""
json_data = read_json_file(args.filename)
channel_data = get_report_channels(json_data)
channel_keys = sorted(channel_data)
if args.verbose:
print("Channels:Pools-")
print(json.dumps(channel_keys, indent=4))
if args.list:
list_channels_and_ips(channel_keys)
return
_validate_property_report_channels(args.channels, channel_data)
_validate_property_report_ips(args.groupby, channel_data)
if args.normalize and ("Statistical Population" not in args.channels):
args.channels.append("Statistical Population")
trace_values = accumulate_channel_data(args.channels, args.verbose, args.groupby, channel_data)
if args.csv is None:
call_plot_traces(args, trace_values)
else:
save_to_csv(trace_values, args.csv, args.transpose)
return
[docs]def list_channels_and_ips(channel_keys: List[str]) -> None:
"""
List the channels and properties found in a property report from the
CHANNEL:IP:value,...,IP:value keys of the channel dictionary.
"""
# keys look like "CHANNEL:IP:value,...,IP:value"
channels = sorted(set([key.split(":",1)[0] for key in channel_keys]))
print("\nChannels:")
for channel in channels:
print(f"\t{channel}")
# Each channel _should_ have the same set of IPs, but we'll check them all
csvkvps = [key.split(":",1)[1] for key in channel_keys] # For each channel get a comma separated list of IP:value pairs (see format above)
kvplists = [csv.split(",") for csv in csvkvps] # For each CSV convert to actual list by splitting on ","
ips = [map(lambda t: t.split(":")[0], kvps) for kvps in kvplists] # Convert each IP:value entry to just IP
properties = sorted(reduce(lambda s, e: s.union(e), ips, set())) # Add all IPs to an initially empty set
print("\nIPs:")
for prop in properties:
print(f"\t{prop}")
print()
return
[docs]def call_plot_traces(args: argparse.Namespace, trace_values: Dict[str, np.ndarray]) -> None:
"""
Call the internal `plot_traces` function and, optionally, save the results to disk.
"""
if args.verbose:
print(sorted(trace_values))
if args.normalize:
stat_pop = "Statistical Population"
traces = {key:value for (key, value) in trace_values.items() if not key.startswith(stat_pop)}
# reduce the various statistical population traces to a single vector
norms = reduce(lambda x, y: np.array(y) + x, [value for (key, value) in trace_values.items() if key.startswith(stat_pop)], 0)
else:
traces = trace_values
norms = None
figure = plot_traces(traces, norms, args.overlay, args.channels, args.filename, args.legend)
if args.saveFigure:
print("Saving figure 'propertyReport.png'...")
figure.savefig('propertyReport.png')
plt.show()
return
[docs]def process_cmd_line() -> argparse.Namespace:
"""Put command line processing here rather than in `if 'name' == '__main__'`."""
parser = argparse.ArgumentParser(description='Property Report Plotting')
parser.add_argument('filename', nargs='?', default='PropertyReport.json', help='property report filename [PropertyReport.json]')
parser.add_argument('-c', '--channel', action='append', help='channel(s) to display [Infected]', metavar='channelName', dest='channels')
parser.add_argument('-g', '--groupby', action='append', help="IP(s) under which to aggregate other IP keys and values")
parser.add_argument('-n', '--normalize', help='plot channel(s) normalized by statistical population', action='store_true')
parser.add_argument('-o', '--overlay', help='overlay pools of the same channel', action='store_true')
parser.add_argument('-s', '--save', help="save figure to file 'propertyReport.png'", action='store_true', dest='saveFigure')
parser.add_argument('-v', '--verbose', action="store_true")
parser.add_argument('--no-legend', action="store_false", dest="legend") # Note args.legend default to True, passing --no-legend sets args.legend to False
parser.add_argument('-l', '--list', action="store_true", help="List channels and IP keys found in the report. No plotting is performed with this option.")
parser.add_argument("--csv", type=Path, default=None, help="Write data for selected channel(s) to given file.")
parser.add_argument("-t", "--transpose", action="store_true", help="write channels as columns rather than rows (only in effect with '--csv' option)")
args = parser.parse_args()
if not args.channels:
args.channels = ['Infected']
if args.groupby is not None and len(args.groupby) == 1 and args.groupby[0].lower() == "all":
args.groupby = []
if not args.list:
print(f"Filename: '{args.filename}'")
print(f"Channel(s): {args.channels}")
print(f"Groupby: {args.groupby}")
print(f"Normalize: {args.normalize}")
print(f"Overlay: {args.overlay}")
print(f"Save: {args.saveFigure}")
if args.csv:
print(f"CSV filename: '{args.csv}'")
print(f"Transpose CSV: {args.transpose}")
return args
if __name__ == '__main__':
main(process_cmd_line())