Source code for fpsim.analyzers

'''
Specify the core analyzers available in FPsim. Other analyzers can be
defined by the user by inheriting from these classes.
'''

import os
import numpy as np
import pandas as pd
import sciris as sc
import pylab as pl
from . import defaults as fpd


#%% Generic intervention classes

__all__ = ['Analyzer', 'snapshot', 'timeseries_recorder', 'age_pyramids', 'verbose_sim']


[docs]class Analyzer(sc.prettyobj): ''' Base class for analyzers. Based on the Intervention class. Analyzers are used to provide more detailed information about a simulation than is available by default -- for example, pulling states out of sim.people on a particular timestep before it gets updated in the next timestep. To retrieve a particular analyzer from a sim, use sim.get_analyzer(). Args: label (str): a label for the Analyzer (used for ease of identification) ''' def __init__(self, label=None): if label is None: label = self.__class__.__name__ # Use the class name if no label is supplied self.label = label # e.g. "Record ages" self.initialized = False self.finalized = False return
[docs] def initialize(self, sim=None): ''' Initialize the analyzer, e.g. convert date strings to integers. ''' self.initialized = True self.finalized = False return
[docs] def finalize(self, sim=None): ''' Finalize analyzer This method is run once as part of `sim.finalize()` enabling the analyzer to perform any final operations after the simulation is complete (e.g. rescaling) ''' if self.finalized: raise RuntimeError('Analyzer already finalized') # Raise an error because finalizing multiple times has a high probability of producing incorrect results e.g. applying rescale factors twice self.finalized = True return
[docs] def apply(self, sim): ''' Apply analyzer at each time point. The analyzer has full access to the sim object, and typically stores data/results in itself. This is the core method which each analyzer object needs to implement. Args: sim: the Sim instance ''' pass
[docs] def to_json(self): ''' Return JSON-compatible representation Custom classes can't be directly represented in JSON. This method is a one-way export to produce a JSON-compatible representation of the intervention. This method will attempt to JSONify each attribute of the intervention, skipping any that fail. Returns: JSON-serializable representation ''' # Set the name json = {} json['analyzer_name'] = self.label if hasattr(self, 'label') else None json['analyzer_class'] = self.__class__.__name__ # Loop over the attributes and try to process attrs = self.__dict__.keys() for attr in attrs: try: data = getattr(self, attr) try: attjson = sc.jsonify(data) json[attr] = attjson except Exception as E: json[attr] = f'Could not jsonify "{attr}" ({type(data)}): "{str(E)}"' except Exception as E2: json[attr] = f'Could not jsonify "{attr}": "{str(E2)}"' return json
[docs]class snapshot(Analyzer): ''' Analyzer that takes a "snapshot" of the sim.people array at specified points in time, and saves them to itself. Args: timesteps (list): list of timesteps on which to take the snapshot args (list): additional timestep(s) die (bool): whether or not to raise an exception if a date is not found (default true) kwargs (dict): passed to Analyzer() **Example**:: sim = fp.Sim(analyzers=fps.snapshot('2020-04-04', '2020-04-14')) sim.run() snapshot = sim.pars['analyzers'][0] people = snapshot.snapshots[0] ''' def __init__(self, timesteps, *args, die=True, **kwargs): super().__init__(**kwargs) # Initialize the Analyzer object timesteps = sc.promotetolist(timesteps) # Combine multiple days timesteps.extend(args) # Include additional arguments, if present self.die = die # Whether or not to raise an exception self.timesteps = timesteps # String representations self.snapshots = sc.odict() # Store the actual snapshots return
[docs] def apply(self, sim): """ Apply snapshot at each timestep listed in timesteps and save result at snapshot[str(timestep)] """ for t in self.timesteps: if np.isclose(sim.i, t): self.snapshots[str(sim.i)] = sc.dcp(sim.people) # Take snapshot! return
[docs]class timeseries_recorder(Analyzer): ''' Record every attribute in people as a timeseries. Attributes: self.i: The list of timesteps (ie, 0 to 261 steps). self.t: The time elapsed in years given how many timesteps have passed (ie, 25.75 years). self.y: The calendar year of timestep (ie, 1975.75). self.keys: A list of people states excluding 'dobs'. self.data: A dictionary where self.data[state][timestep] is the mean of the state at that timestep. ''' def __init__(self): """ Initializes self.i/t/y as empty lists and self.data as empty dictionary """ super().__init__() self.i = [] self.t = [] self.y = [] self.data = sc.objdict() return
[docs] def initialize(self, sim): """ Initializes self.keys from sim.people """ super().initialize() self.keys = [] for key in sim.people.keys(): if sc.isarray(sim.people[key]): self.keys.append(key) for key in self.keys: self.data[key] = [] return
[docs] def apply(self, sim): """ Applies recorder at each timestep """ self.i.append(sim.i) self.t.append(sim.t) self.y.append(sim.y) for k in self.keys: val = np.mean(sim.people[k]) self.data[k].append(val)
[docs] def plot(self, x='y', fig_args=None, pl_args=None): """ Plots time series of each state as a line graph """ xmap = dict(i=self.i, t=self.t, y=self.y) x = xmap[x] fig_args = sc.mergedicts(fig_args) pl_args = sc.mergedicts(pl_args) nkeys = len(self.keys) rows,cols = sc.get_rows_cols(nkeys) fig = pl.figure(**fig_args) for k,key in enumerate(self.keys): pl.subplot(rows,cols,k+1) try: data = np.array(self.data[key], dtype=float) mean = data.mean() label = f'mean: {mean}' pl.plot(x, data, label=label, **pl_args) pl.title(key) pl.legend() except: pl.title(f'Could not plot {key}') return fig
[docs]class age_pyramids(Analyzer): ''' Records age pyramids for each timestep. Attributes: self.bins: A list of ages, default is a sequence from 0 to max_age + 1. self.data: A matrix of shape (number of timesteps, number of bins - 1) containing age pyramid data. ''' def __init__(self, bins=None): """ Initializes bins and data variables """ super().__init__() self.bins = bins self.data = None return
[docs] def initialize(self, sim): """ Initializes bins and data with proper shapes """ super().initialize() if self.bins is None: self.bins = np.arange(0, sim.pars['max_age']+2) nbins = len(self.bins)-1 self.data = np.full((sim.npts, nbins), np.nan) self._raw = sc.dcp(self.data) return
[docs] def apply(self, sim): """ Records histogram of ages of all alive individuals at a timestep such that self.data[timestep] = list of proportions where index signifies age """ ages = sim.people.age[sc.findinds(sim.people.alive)] self._raw[sim.i, :] = np.histogram(ages, self.bins)[0] self.data[sim.i, :] = self._raw[sim.i, :]/self._raw[sim.i, :].sum()
[docs] def plot(self): """ Plots self.data as 2D pyramid plot """ fig = pl.figure() pl.pcolormesh(self.data.T) pl.xlabel('Timestep') pl.ylabel('Age (years)') return fig
[docs] def plot3d(self): """ Plots self.data as 3D pyramid plot """ print('Warning, very slow...') fig = pl.figure() sc.bar3d(self.data.T) pl.xlabel('Timestep') pl.ylabel('Age (years)') return fig
[docs]class verbose_sim(Analyzer): def __init__(self, to_csv=False, custom_csv_tables=None, to_file=False): """ Initializes a verbose_sim analyzer which extends the logging functionality of the sim with calculated channels, total state results of a sim run, the story() feature, and configurable file formatting for results """ self.to_csv = to_csv self.custom_csv_tables = custom_csv_tables self.to_file = to_file self.initialized = False self.total_results = sc.ddict(lambda: {}) self.dead_moms = set() self.is_sexactive = set() self.events = sc.ddict(dict) self.channels = ["Births", "Conceptions", "Miscarriages", "Deaths"] self.set_baseline = False self.states = list(fpd.person_defaults.keys()) + ['dobs'] # states saved by timestep
[docs] def apply(self, sim): """ Logs data for total_results and events at each timestep. Output: self.total_results::dict Dictionary of all individual results formatted as {timestep: attribute: [values]} keys correspond to fpsim.defaults debug_states self.events::dict Dictionary of events correponding to self.channels formatted as {timestep: channel: [indices]}. """ print('Warning, needs to be refactored to not use dataframes on each step') if not self.set_baseline: initial_pop = sim.pars['n_agents'] self.last_year_births = [0] * initial_pop self.last_year_gestations = [0] * initial_pop self.last_year_alive = [0] * initial_pop self.last_year_pregnant = [0] * initial_pop self.set_baseline = True for state in self.states: self.total_results[sim.y][state] = sc.dcp(getattr(sim.people, state)) # Getting births gestation and sexual_activity self.this_year_births = sc.dcp(self.total_results[sim.y]["parity"]) self.this_year_gestations = sc.dcp(self.total_results[sim.y]["gestation"]) self.this_year_alive = sc.dcp(self.total_results[sim.y]["alive"]) self.this_year_pregnant = sc.dcp(self.total_results[sim.y]["pregnant"]) for channel in self.channels: self.events[sim.y][channel] = [] # Comparing parity of previous year to this year, adding births for index, last_parity in enumerate(self.last_year_births): if last_parity < self.this_year_births[index]: for i in range(self.this_year_births[index] - last_parity): self.events[sim.y]['Births'].append(index) # Comparing pregnancy of previous year to get conceptions for index, last_pregnant in enumerate(self.last_year_pregnant): if last_pregnant == 0 and self.this_year_pregnant[index]: self.events[sim.y]['Conceptions'].append(index) # Comparing gestaton of previous year to get miscarriages for index, last_gestation in enumerate(self.last_year_gestations): # This is when miscarriages are checked in Sim if last_gestation == (sim.pars['end_first_tri'] - 1) and self.this_year_gestations[index] == 0: self.events[sim.y]['Miscarriages'].append(index) for index, alive in enumerate(self.last_year_alive): if alive > self.this_year_alive[index]: self.events[sim.y]['Deaths'].append(index) # Aggregate channels taken from people.results self.last_year_births = sc.dcp(self.this_year_births) self.last_year_gestations = sc.dcp(self.this_year_gestations) self.last_year_alive = sc.dcp(self.this_year_alive) self.last_year_pregnant = sc.dcp(self.this_year_pregnant)
[docs] def save(self, to_csv=True, to_json=False, custom_csv_tables=None): """ At the end of sim run, stores total_results as either a json or feather file. Inputs self.to_csv::bool If True, writes results to csv files in /sim_output where each state's history is a separate file self.to_json::bool If True, writes results to json file custom_csv_tables::list List of states that the user wants to write to csv, default is all Outputs: Either a json file at "sim_output/total_results.json" or a csv file for each state at "sim_output/{state}_state.csv" """ os.makedirs("sim_output", exist_ok=True) if to_json: sc.savejson(filename="sim_output/total_results.json", obj=self.total_results) if to_csv: states = self.states if self.custom_csv_tables is None else custom_csv_tables for state in states: state_frame = pd.DataFrame() max_length = len(self.total_results[max(self.total_results.keys())][state]) for timestep, _ in self.total_results.items(): colname = str(timestep) + "_" + state adjustment = max_length - len(self.total_results[timestep][state]) state_frame[colname] = list(self.total_results[timestep][state]) + [None] * adjustment # ONLY WORKS IF LAST YEAR HAS MOST PEOPLE state_frame.to_csv(f"sim_output/{state}_state.csv")
[docs] def story(self, index, output=False, debug=False): """ Prints a story of all major events in an individual's life based on calculated verbose_sim channels, base Sim channels, and statistics calculated within the function such as year of birth of individual. Args: index (int): index of the individual, must be less than population output (bool): return as output string rather than print debug (bool): print additional information Outputs: printed display of each major event in the individual's life """ string = '' if debug: print(self.events.keys()) def to_date(t): year = int(t) if debug: print(t) mo = round(((t) - year) * 12) month = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][mo] return f'{year}-{month}' if len(self.events) == 0: errormsg = 'Story function can only be used after sim is run. Try Experiment.run_model() first' raise RuntimeError(errormsg) last_year = max(self.total_results.keys()) ages = self.total_results[last_year]['age'] # Progresses even if dead year_born = last_year - ages[index] if debug: print(last_year) print(year_born) print(ages[index]) string += f'This is the story of Person {index} who was born {to_date(year_born)}:\n' event_response_dict = { "Births": "gives birth", "Conceptions": "conceives", "Miscarriages": "has a miscarriage", "Deaths": "dies" } method_list = list(fpd.method_map.keys()) last_method = method_list[self.total_results[min(self.total_results.keys())]['method'][index]] for y in self.events: if y >= year_born: for new_channel in event_response_dict: if index in self.events[y][new_channel]: if new_channel == "Births": string += f"{to_date(y)}: Person {index} gives birth to child number {self.total_results[y]['parity'][index]}\n" else: string += f"{to_date(y)}: Person {index} {event_response_dict[new_channel]}\n" if self.total_results[y]['sexual_debut_age'][index] == 0: string += f"{to_date(y)}: Person {index} had their sexual debut\n" new_method = method_list[self.total_results[y]['method'][index]] if new_method != last_method: string += f"{to_date(y)}: Person {index} switched from {last_method} to {new_method}\n" last_method = new_method if not output: print(string) else: return string