Skip to content

utils

Helper functions, primarily for property reports, which are channel reports.

__get_trace_name(channel_title, key_value_pairs, groupby)

Return "canonical" trace name for a given channel, IP:value list, and groupby list.

Since we may be aggregating by IP values, trace name may not equal any particular channel name.

Example

title = "Infected" key_value_pairs = ["Age_Bin:Age_Bin_Property_From_0_To_20","QualityOfCare:High","QualityOfCare1:High","QualityOfCare2:High"]

groupby = None return "Infected:Age_Bin:Age_Bin_Property_From_0_To_20,QualityOfCare:High,QualityOfCare1:High,QualityOfCare2:High"

groupby = ["Age_Bin"] return = "Infected:Age_Bin:Age_Bin_Property_From_0_To_20"

groupby = ["Age_Bin", "QualityOfCare"] return = "Infected:Age_Bin:Age_Bin_Property_From_0_To_20,QualityOfCare:High"

groupby = [] return = "Infected"

Source code in emod_api/channelreports/utils.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def __get_trace_name(channel_title: str, key_value_pairs: List[str], groupby: List[str]) -> str:

    """
    Return "canonical" trace name for a given channel, IP:value list, and groupby list.

    Since we may be aggregating by IP values, trace name may not equal any particular channel name.

    Example:
        title = "Infected"
        key_value_pairs = ["Age_Bin:Age_Bin_Property_From_0_To_20","QualityOfCare:High","QualityOfCare1:High","QualityOfCare2:High"]

        groupby = None
        return "Infected:Age_Bin:Age_Bin_Property_From_0_To_20,QualityOfCare:High,QualityOfCare1:High,QualityOfCare2:High"

        groupby = ["Age_Bin"]
        return = "Infected:Age_Bin:Age_Bin_Property_From_0_To_20"

        groupby = ["Age_Bin", "QualityOfCare"]
        return = "Infected:Age_Bin:Age_Bin_Property_From_0_To_20,QualityOfCare:High"

        groupby = []
        return = "Infected"
    """

    # trace name will have channel title and any property:value pairs
    # which aren't being grouped

    trace_name = channel_title + ':'

    if groupby is None:
        trace_name = f"{channel_title}:{','.join(key_value_pairs)}"
    else:
        if len(groupby) > 0:
            kvps = filter(lambda pair: pair.split(":")[0] in groupby, key_value_pairs)
            trace_name = f"{channel_title}:{','.join(kvps)}"
        else:
            trace_name = channel_title

    return trace_name

accumulate_channel_data(channels, verbose, groupby, channel_data)

Extract selected channel(s) from property report data.

Aggregate on groupby IP(s), if provided, otherwise on channel per unique IP:value pair (e.g., "QualityOfCare:High"), per main channel (e.g., "Infected").

Parameters:

Name Type Description Default
channels List[str]

names of channels to plot

required
verbose bool

output some "debugging"/progress information if true

required
groupby List[str]

IP(s) under which to aggregate other IP:value pairs

required
channel_data Dict

data for channels keyed on channel name

required

Returns:

Type Description
Dict[str, ndarray]

tuple of dictionary of aggregated data, keyed on channel name, and of Numpy array of normalization values

Source code in emod_api/channelreports/utils.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def accumulate_channel_data(channels: List[str], verbose: bool, groupby: List[str], channel_data: Dict) -> Dict[str, np.ndarray]:

    """
    Extract selected channel(s) from property report data.

    Aggregate on groupby IP(s), if provided, otherwise on channel per unique
    IP:value pair (e.g., "QualityOfCare:High"), per main channel (e.g., "Infected").

    Args:
        channels:       names of channels to plot
        verbose:        output some "debugging"/progress information if true
        groupby:        IP(s) under which to aggregate other IP:value pairs
        channel_data:   data for channels keyed on channel name

    Returns:
        tuple of dictionary of aggregated data, keyed on channel name, and of Numpy array of normalization values
    """

    trace_values = {}
    pool_keys = sorted(channel_data)

    name_ip_pairs = map(lambda key: tuple(key.split(":", 1)), pool_keys)
    name_ip_pairs_to_process = filter(lambda p: p[0] in channels, name_ip_pairs)
    for (channel_title, key_value_pairs) in name_ip_pairs_to_process:

        if verbose:
            print(f"Processing channel '{channel_title}:{key_value_pairs}'")

        key_value_pairs = key_value_pairs.split(',')
        trace_name = __get_trace_name(channel_title, key_value_pairs, groupby)
        trace_data = np.array(channel_data[f"{channel_title}:{','.join(key_value_pairs)}"]['Data'], dtype=np.float32)

        if trace_name not in trace_values:
            if verbose:
                print(f"New trace: '{trace_name}'")
            trace_values[trace_name] = trace_data
        else:
            if verbose:
                print(f"Add to trace: '{trace_name}'")
            trace_values[trace_name] += trace_data

    return trace_values

plot_traces(trace_values, norm_values, overlay, channels, title, legend)

Plot trace data. One subplot per channel unless overlaying all variations of rolled-up IP(s) is requested.

A trace (like old-time pen and ink EKG) may represent the aggregation of several IP values so trace may not equal any particular channel data.

Parameters:

Name Type Description Default
trace_values Dict[str, ndarray]

channel data, keyed on channel name

required
norm_values Optional[Union[int, ndarray]]

normalization data for channels

required
overlay bool

whether or not to overlay all variations of a given channel on one subplot

required
channels List[str]

selection of channel names to plot

required
title str

plot title

required
legend bool

whether or not to include a legend on plots

required

Returns:

Type Description
Figure

plt.Figure

Source code in emod_api/channelreports/utils.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def plot_traces(trace_values: Dict[str, np.ndarray],
                norm_values: Optional[Union[int, np.ndarray]],
                overlay: bool,
                channels: List[str],
                title: str,
                legend: bool) -> plt.Figure:

    """
    Plot trace data. One subplot per channel unless overlaying all variations of rolled-up IP(s) is requested.

    A trace (like old-time pen and ink EKG) may represent the aggregation of
    several IP values so trace may not equal any particular channel data.

    Args:
        trace_values: channel data, keyed on channel name
        norm_values:  normalization data for channels
        overlay:      whether or not to overlay all variations of a given channel on one subplot
        channels:     selection of channel names to plot
        title:        plot title
        legend:       whether or not to include a legend on plots

    Returns:
        plt.Figure
    """

    if len(trace_values) == 0:
        print("Didn't find requested channel(s) in property report.")
        return

    if not overlay:
        plot_count = len(trace_values)
    else:
        plot_count = len(channels)

    normalize = norm_values is not None
    if normalize:
        plot_count *= 2

    figure = plt.figure(title, figsize=(16, 9), dpi=300)
    trace_keys = sorted(trace_values)

    # plotting here
    for trace_name in trace_keys:
        plot_index = __index_for(trace_name, channels, trace_keys, normalize, overlay)
        plt.subplot(plot_count, 1, plot_index)
        plt.plot(trace_values[trace_name], label=trace_name)
        if normalize:
            plt.subplot(plot_count, 1, plot_index + 1)
            plt.ylim((0.0, 1.0))    # yes, this takes a tuple
            plt.plot(trace_values[trace_name] / norm_values, label=trace_name)

    # make it pretty
    _ = plt.subplot(plot_count, 1, 1)
    for trace_name in trace_keys:
        plot_index = __index_for(trace_name, channels, trace_keys, normalize, overlay)
        plot_title = __title_for(trace_name, channels, overlay)
        plt.subplot(plot_count, 1, plot_index)
        plt.title(plot_title)
        if legend:
            plt.legend()
        if normalize:
            plt.subplot(plot_count, 1, plot_index + 1)
            plt.title(f"{plot_title} normalized by 'Statistical Population'")
            if legend:
                plt.legend()

    plt.tight_layout()

    return figure

property_report_to_csv(source_file, csv_file, channels=None, groupby=None, transpose=False)

Write a property report to a CSV formatted file.

Optionally selected a subset of available channels. Optionally "rolling-up" IP:value sub-channels into a "parent" IP.

Parameters:

Name Type Description Default
source_file Union[str, Path]

filename of property report

required
channels Optional[List[str]]

list of channels to output, None results in writing all channels to output

None
groupby Optional[List[str]]

list of IPs into which to aggregate remaining IPs, None indicates no grouping, [] indicates all aggregated

None
csv_file Union[str, Path]

filename of CSV formatted result

required
transpose bool

write channels as columns rather than rows

False
Source code in emod_api/channelreports/utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def property_report_to_csv(source_file: Union[str, Path],
                           csv_file: Union[str, Path],
                           channels: Optional[List[str]] = None,
                           groupby: Optional[List[str]] = None,
                           transpose: bool = False) -> None:

    """
    Write a property report to a CSV formatted file.

    Optionally selected a subset of available channels.
    Optionally "rolling-up" IP:value sub-channels into a "parent" IP.

    Args:
        source_file: filename of property report
        channels:    list of channels to output, None results in writing _all_ channels to output
        groupby:     list of IPs into which to aggregate remaining IPs, None indicates no grouping, [] indicates _all_ aggregated
        csv_file:    filename of CSV formatted result
        transpose:   write channels as columns rather than rows
    """

    json_data = read_json_file(Path(source_file))
    channel_data = get_report_channels(json_data)

    if channels is None:
        channels = sorted({key.split(":")[0] for key in channel_data})
    elif isinstance(channels, str):
        channels = [channels]

    if isinstance(groupby, str):
        groupby = [groupby]

    _validate_property_report_channels(channels, channel_data)
    _validate_property_report_ips(groupby, channel_data)

    trace_values = accumulate_channel_data(channels, False, groupby, channel_data)

    save_to_csv(trace_values, csv_file, transpose)

    return

save_to_csv(trace_values, filename, transpose=False)

Save property report to CSV. Uses underlying ChannelReport.to_csv() function.

Parameters:

Name Type Description Default
trace_values Dict[str, ndarray]

full set of available channels, keyed on channel name

required
filename Union[str, Path]

destination file for CSV data

required
transpose bool

write channels as columns rather than rows

False
Source code in emod_api/channelreports/utils.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def save_to_csv(trace_values: Dict[str, np.ndarray],
                filename: Union[str, Path],
                transpose: bool = False) -> None:

    """
    Save property report to CSV. Uses underlying ChannelReport.to_csv() function.

    Args:
        trace_values: full set of available channels, keyed on channel name
        filename:     destination file for CSV data
        transpose:    write channels as columns rather than rows
    """

    report = ChannelReport()

    for channel, data in trace_values.items():
        report.channels[channel] = data

    report.to_csv(Path(filename), transpose=transpose)  # by default, use _all_ the channels we just added

    return