import matplotlib.pyplot as plt
import numpy as np
[docs]
class DerivedValues:
def __init__(self, model) -> None:
self.model = model
assert hasattr(model, "patches"), "DerivedValues: model needs to have a 'patches' attribute."
assert hasattr(model, "params"), "DerivedValues: model needs to have a 'params' attribute."
# There's no spatial hazard calculation at the start of the simulation, but we will allocate nticks + 1 to match other outputs.
model.patches.add_vector_property("spatial_hazard", length=model.params.nticks + 1, dtype=np.float32, default=0.0)
model.patches.add_array_property("coupling", shape=(model.patches.count, model.patches.count), dtype=np.float32, default=0.0)
return
[docs]
def check(self):
assert hasattr(self.model, "people"), "DerivedValues: model needs to have an 'people' attribute."
assert hasattr(self.model.people, "S"), "DerivedValues: model.people needs to have 'S' attribute."
assert hasattr(self.model.people, "Isym"), "DerivedValues: model.people needs to have 'Isym' attribute."
assert hasattr(self.model.people, "Iasym"), "DerivedValues: model.people needs to have 'Iasym' attribute."
assert hasattr(self.model.people, "V1sus"), "DerivedValues: model.people needs to have 'V1sus' attribute."
assert hasattr(self.model.people, "V2sus"), "DerivedValues: model.people needs to have 'V2sus' attribute."
assert hasattr(self.model, "patches"), "DerivedValues: model needs to have a 'patches' attribute."
assert hasattr(self.model.patches, "N"), "DerivedValues: model.patches needs to have 'N' attribute."
assert hasattr(self.model.patches, "beta_jt_human"), "DerivedValues: model.patches needs to have 'beta_jt_human' attribute."
assert hasattr(self.model.patches, "pi_ij"), "DerivedValues: model.patches needs to have 'pi_ij' attribute."
assert "beta_j0_hum" in self.model.params, "DerivedValues: model.params needs to have 'beta_j0_hum' attribute."
assert "p" in self.model.params, "DerivedValues: model.params needs to have 'p' attribute."
assert "tau_i" in self.model.params, "DerivedValues: model.params needs to have 'tau_i' attribute."
return
[docs]
def __call__(self, model, tick: int) -> None:
"""Calculate derived values for the model.
Spatial hazard and coupling are calculated at the end of the simulation.
.. math::
h(j,t) = \\frac {\\beta^{hum}_{jt} (1 - e^{-((1 - \\tau_j) (S_{jt} + V^{sus}_{1,jt} + V^{sus}_{2,jt}) / N_{jt}) \\sum_{\\forall i \\ne j} \\pi_{ij} \\tau_i ((I^{sym}_{it} + I^{asym}_{it}) / N_{it})})} {1/(1 + \\beta^{hum}_{jt}(1 - \\tau_j) (S_{jt} + V^{sus}_{1,jt} + V^{sus}_{2,jt}))}
.. math::
y_{it} = \\frac {I^{sym}_{it} + I^{asym}_{it}} {N_{it}}
.. math::
\\bar y_{i} = \\frac 1 T \\sum_{t=1}^{T} y_{it}
.. math::
C_{ij} = \\frac { \\sum_{t=1}^T {(y_{it} - \\bar y_i) (y_{jt} - \\bar y_j)} } { \\sqrt {\\sum_{t=1}^T {(y_{it} - \\bar y_i)}^2} \\sqrt {\\sum_{t=1}^T {(y_{jt} - \\bar y_j)}^2} }
.. math::
C_{ij} = \\frac {(y_{it} - \\bar y_{i}) (y_{jt} - \\bar y_{j})} { \\sqrt {\\text {var}(y_{i}) \\text {var}(y_{j})} }
"""
if tick == model.params.nticks - 1:
calculate_spatial_hazard(
model.params.nticks,
model.patches.beta_jt_human.T,
model.params.tau_i,
model.people.S[1:, :].T,
model.people.V1sus[1:, :].T,
model.people.V2sus[1:, :].T,
model.patches.N[1:, :].T,
model.patches.pi_ij,
model.people.Iasym[1:, :].T,
model.people.Isym[1:, :].T,
model.patches.spatial_hazard[1:, :].T,
)
calculate_coupling(model.people.Isym, model.people.Iasym, model.patches.N, model.patches.coupling)
return
[docs]
def plot(self, fig=None): # pragma: no cover
_fig = plt.figure(figsize=(12, 9), dpi=128, num="Spatial Hazard by Location Over Time") if fig is None else fig
plt.imshow(self.model.patches.spatial_hazard.T, aspect="auto", cmap="Reds", interpolation="nearest")
plt.colorbar(label="Spatial Hazard")
plt.xlabel("Time (Days)")
plt.ylabel("Location")
plt.yticks(ticks=np.arange(len(self.model.params.location_name)), labels=self.model.params.location_name)
yield "Spatial Hazard by Location Over Time"
return
[docs]
def calculate_spatial_hazard_for_model(model):
"""Calculate the spatial hazard for the model.
Helpful for calling from R without needing to specify all the parameters.
"""
calculate_spatial_hazard(
model.params.nticks,
model.patches.beta_jt_human.T,
model.params.tau_i,
model.people.S[1:, :].T,
model.people.V1sus[1:, :].T,
model.people.V2sus[1:, :].T,
model.patches.N[1:, :].T,
model.patches.pi_ij,
model.people.Iasym[1:, :].T,
model.people.Isym[1:, :].T,
model.patches.spatial_hazard[1:, :].T,
)
return
[docs]
def calculate_spatial_hazard(nticks, beta_jt_human, tau_i, S, V1sus, V2sus, Njt, pi_ij, Iasym, Isym, spatial_hazard):
"""Calculate the spatial hazard for each location at each time step.
The spatial hazard is calculated using the formula:
https://institutefordiseasemodeling.github.io/MOSAIC-docs/model-description.html#the-spatial-hazard
.. math::
h(j,t) = \\frac {\\beta^{hum}_{jt} (1 - e^{-((1 - \\tau_j) (S_{jt} + V^{sus}_{1,jt} + V^{sus}_{2,jt}) / N_{jt}) \\sum_{\\forall i \\ne j} \\pi_{ij} \\tau_i ((I^{sym}_{it} + I^{asym}_{it}) / N_{it})})} {1/(1 + \\beta^{hum}_{jt}(1 - \\tau_j) (S_{jt} + V^{sus}_{1,jt} + V^{sus}_{2,jt}))}
Note: To simplify coding and debugging, all arrays are passed in R order: [j, t]
"""
beta = beta_jt_human
# Reference implementation:
# values for comparison/debugging
# S_star = np.zeros_like(S, dtype=np.float64)
# x = np.zeros_like(S, dtype=np.float64)
# y_bar = np.zeros_like(S, dtype=np.float64)
# nlocs = Njt.shape[0]
# for t in range(nticks):
# Nkt = Njt[:, t].sum()
# for j in range(nlocs):
# S_star_jt = (1.0 - tau_i[j]) * (S[j, t] + V1sus[j, t] + V2sus[j, t])
# # S_star[j, t] = S_star_jt
# x_jt = S_star_jt / Njt[j, t]
# # x[j, t] = x_jt
# sum_tau_pi_i = 0.0
# for i in range(nlocs):
# if i != j:
# sum_tau_pi_i += tau_i[i] * pi_ij[i, j] * (Isym[i, t] + Iasym[i, t])
# y_bar_jt = ((1.0 - tau_i[j]) * (Isym[j, t] + Iasym[j, t]) + sum_tau_pi_i) / Nkt
# # y_bar[j, t] = y_bar_jt
# H_jt = beta[j, t] * S_star_jt * (1.0 - np.exp(-x_jt * y_bar_jt)) / (1.0 + beta[j, t] * S_star_jt)
# spatial_hazard[j, t] = H_jt
# Opimized implementation:
Nt = Njt.sum(axis=0) # Total population at time t
xfer_ij = (tau_i * pi_ij.T).T # fraction emmigrating from i * fraction going to j = fraction of i going to j
S_star_jt = ((1.0 - tau_i) * (S + V1sus + V2sus).T).T
x_jt = S_star_jt / Njt # S_star / N for each location and time step
beta_S_star_jt = beta * S_star_jt # beta_jt_human * S_star for each location and time step
Ijt = Isym + Iasym
# (Remaining) infections in location j at time t accounting for the fraction of emmigrants by location
Iloc_jt = ((1.0 - tau_i) * Ijt.T).T
for t in range(nticks):
I_local = Iloc_jt[:, t] # Local infections at time t, syntactic sugar
# Calculate the incoming infections at time t
I_incoming = np.dot(Ijt[:, t], xfer_ij) # @, matmul, raises RuntimeWarnings on MacOS, but works fine on Linux
y_bar_t = (I_local + I_incoming) / Nt[t]
H_t = beta_S_star_jt[:, t] * (-np.expm1(-x_jt[:, t] * y_bar_t)) / (1.0 + beta_S_star_jt[:, t])
spatial_hazard[:, t] = H_t
return
[docs]
def calculate_coupling_for_model(model):
"""Calculate the coupling for the model.
Helpful for calling from R without needing to specify all the parameters.
"""
calculate_coupling(model.people.Isym, model.people.Iasym, model.patches.N, model.patches.coupling)
return
[docs]
def calculate_coupling(Isym, Iasym, N, C):
"""Calculate the coupling between locations.
https://institutefordiseasemodeling.github.io/MOSAIC-docs/model-description.html#coupling-among-locations
.. math::
C_{ij} = \\frac {(y_{it} - \\bar y_{i}) (y_{jt} - \\bar y_{j})} { \\sqrt {\\text {var}(y_{i}) \\text {var}(y_{j})} }
"""
assert Isym.shape == Iasym.shape, "Isym and Iasym must have the same shape."
assert Isym.shape == N.shape, "Isym and N must have the same shape."
_T, L = N.shape
assert C.shape == (L, L), "C must be a square matrix of shape (L, L)."
y = (Isym + Iasym) / N
y_bar = np.mean(y, axis=0) # mean over time (axis=0) for each location
diff = y - y_bar # difference from mean
for i in range(L):
for j in range(i, L):
numerator = np.sum(diff[:, i] * diff[:, j])
denominator = np.sqrt(np.sum(diff[:, i] ** 2) * np.sum(diff[:, j] ** 2))
if denominator != 0:
C_ij = numerator / denominator
else:
C_ij = np.nan
C[i, j] = C[j, i] = C_ij
return