Simple Spatial SIR Model with Synthetic Data¶
This example simulates a spatial Susceptible-Infected-Recovered (SIR) model with modular components. The population is distributed across 20 nodes in a 1-D chain, with infection spreading spatially from node 0 and agents migrating based on a migration matrix.
Spatial Arrangement¶
The model uses a 1-D spatial structure: - Node connections: Each node is connected to its next neighbor in the chain, allowing migration to occur sequentially (e.g., 0 → 1 → 2 …). - Infection propagation: Infection starts in node 0 and spreads to neighboring nodes as agents migrate.
Two migration matrix options are available: 1. Sequential Migration Matrix: Agents can only move to their next node in the chain. 2. Gravity Model Migration Matrix: This provides a more realistic 2-D spatial dynamic, where migration probabilities depend on node distances and population sizes.
Population Initialization¶
Skewed Distribution: The population is distributed across nodes using a rural-to-urban skew.
Timers: Migration timers are assigned to control agent migration frequency.
Model Components¶
The simulation uses modular components for migration, transmission, and recovery dynamics. Each component encapsulates the logic for its specific task.
Full Code Implementation¶
Imports¶
1import numpy as np
2import matplotlib.pyplot as plt
3import csv
4from laser_core.laserframe import LaserFrame
5from laser_core.demographics.spatialpops import distribute_population_skewed as dps
6from laser_core.migration import gravity
Model Class¶
1# Define the model
2class MultiNodeSIRModel:
3 """
4 A multi-node SIR (Susceptible-Infected-Recovered) disease transmission model.
5
6 Attributes:
7 params (dict): Configuration parameters for the model.
8 nodes (int): Number of nodes in the simulation.
9 population (LaserFrame): Represents the population with agent-level properties.
10 results (np.ndarray): Stores simulation results for S, I, and R per node.
11 components (list): List of components (e.g., Migration, Transmission) added to the model.
12 """
13
14 def __init__(self, params):
15 """
16 Initializes the SIR model with the given parameters.
17
18 Args:
19 params (dict): Dictionary containing parameters such as population size,
20 number of nodes, timesteps, and rates for transmission/migration.
21 """
22 self.components = []
23 self.params = params
24 self.nodes = params["nodes"]
25 self.population = LaserFrame(capacity=params["population_size"], initial_count=params["population_size"])
26
27 # Define properties
28 self.population.add_scalar_property("node_id", dtype=np.int32)
29 self.population.add_scalar_property("disease_state", dtype=np.int32, default=0) # 0=S, 1=I, 2=R
30 self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
31 self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0)
32
33 # Initialize population distribution
34 node_pops = dps(params["population_size"], self.nodes, frac_rural=0.3)
35 node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)])
36 np.random.shuffle(node_ids)
37 self.population.node_id[:params["population_size"]] = node_ids
38
39 # Reporting structure: Use LaserFrame for reporting
40 self.results = LaserFrame( capacity=self.nodes ) # not timesteps for capacity
41 for state in ["S", "I", "R"]:
42 self.results.add_vector_property(state, length=params["timesteps"], dtype=np.int32)
43
44 # Record results: reporting could actually be a component if we wanted. Or it can be
45 # done in a log/report function in the relevant component (e.g., Transmission)
46 self.results.S[self.current_timestep, :] = [
47 np.sum(self.population.disease_state[self.population.node_id == i] == 0)
48 for i in range(self.nodes)
49 ]
50 self.results.I[self.current_timestep, :] = [
51 np.sum(self.population.disease_state[self.population.node_id == i] == 1)
52 for i in range(self.nodes)
53 ]
54 self.results.R[self.current_timestep, :] = [
55 np.sum(self.population.disease_state[self.population.node_id == i] == 2)
56 for i in range(self.nodes)
57 ]
58
59 def add_component(self, component):
60 """
61 Adds a component (e.g., Migration, Transmission, Recovery) to the model.
62
63 Args:
64 component: An instance of a component to be added.
65 """
66 self.components.append(component)
67
68 def step(self):
69 """
70 Advances the simulation by one timestep, updating all components and recording results.
71 """
72 for component in self.components:
73 component.step()
74
75 # Record results
76 for i in range(self.nodes):
77 in_node = self.population.node_id == i
78 self.results[self.current_timestep, i, 0] = (self.population.disease_state[in_node] == 0).sum()
79 self.results[self.current_timestep, i, 1] = (self.population.disease_state[in_node] == 1).sum()
80 self.results[self.current_timestep, i, 2] = (self.population.disease_state[in_node] == 2).sum()
81
82 def run(self):
83 """
84 Runs the simulation for the configured number of timesteps.
85 """
86 from tqdm import tqdm
87 for self.current_timestep in tqdm(range(self.params["timesteps"])):
88 self.step()
89
90 def save_results(self, filename):
91 """
92 Saves the simulation results to a CSV file.
93
94 Args:
95 filename (str): Path to the output file.
96 """
97 with open(filename, mode='w', newline='') as file:
98 writer = csv.writer(file)
99 writer.writerow(["Timestep", "Node", "Susceptible", "Infected", "Recovered"])
100 for t in range(self.params["timesteps"]):
101 for node in range(self.nodes):
102 writer.writerow([t, node, *self.results[t, node]])
103
104 def plot_results(self):
105 """
106 Plots the prevalence of infected agents over time for all nodes.
107 """
108 plt.figure(figsize=(10, 6))
109 for i in range(self.nodes):
110 prevalence = self.results.I[:, i] / (
111 self.results.S[:, i] +
112 self.results.I[:, i] +
113 self.results.R[:, i]
114 )
115 plt.plot(prevalence, label=f"Node {i}")
116
117 plt.title("Prevalence Across All Nodes")
118 plt.xlabel("Timesteps")
119 plt.ylabel("Prevalence of Infected Agents")
120 plt.legend()
121 plt.show()
Migration Component Class¶
1class MigrationComponent:
2 """
3 Handles migration behavior of agents between nodes in the model.
4
5 Attributes:
6 model (MultiNodeSIRModel): The simulation model instance.
7 migration_matrix (ndarray): A matrix representing migration probabilities between nodes.
8 """
9
10 def __init__(self, model):
11 """
12 Initializes the MigrationComponent.
13
14 Args:
15 model (MultiNodeSIRModel): The simulation model instance.
16 """
17 self.model = model
18
19 # Set initial migration timers
20 max_timer = int(1 / model.params["migration_rate"])
21 model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"])
22
23 self.migration_matrix = self.get_sequential_migration_matrix(model.nodes)
24
25 # Example customization: Disable migration from node 13 to 14
26 def break_matrix_node(matrix, from_node, to_node):
27 matrix[from_node][to_node] = 0
28 break_matrix_node(self.migration_matrix, 13, 14)
29
30 def get_sequential_migration_matrix(self, nodes):
31 """
32 Creates a migration matrix where agents can only migrate to the next sequential node.
33
34 Args:
35 nodes (int): Number of nodes in the simulation.
36
37 Returns:
38 ndarray: A migration matrix where migration is allowed only to the next node.
39 """
40 migration_matrix = np.zeros((nodes, nodes))
41 for i in range(nodes - 1):
42 migration_matrix[i, i + 1] = 1.0
43 return migration_matrix
44
45 def step(self):
46 """
47 Updates the migration state of the population by determining which agents migrate
48 and their destinations based on the migration matrix.
49 """
50 node_ids = self.model.population.node_id
51
52 # Decrement migration timers
53 self.model.population.migration_timer -= 1
54
55 # Identify agents ready to migrate
56 migrating_indices = np.where(self.model.population.migration_timer <= 0)[0]
57 if migrating_indices.size == 0:
58 return
59
60 # Shuffle migrants and assign destinations based on migration matrix
61 np.random.shuffle(migrating_indices)
62 destinations = np.empty(len(migrating_indices), dtype=int)
63 for origin in range(self.model.nodes):
64 origin_mask = node_ids[migrating_indices] == origin
65 num_origin_migrants = origin_mask.sum()
66
67 if num_origin_migrants > 0:
68 # Assign destinations proportionally to migration matrix
69 destination_counts = np.round(self.migration_matrix[origin] * num_origin_migrants).astype(int)
70 destination_counts = np.maximum(destination_counts, 0) # Clip negative values
71 if destination_counts.sum() == 0: # No valid destinations
72 destinations[origin_mask] = origin # Stay in the same node
73 continue
74 destination_counts[origin] += num_origin_migrants - destination_counts.sum() # Adjust rounding errors
75
76 # Create ordered destination assignments
77 destination_indices = np.repeat(np.arange(self.model.nodes), destination_counts)
78 destinations[origin_mask] = destination_indices[:num_origin_migrants]
79
80 # Update node IDs of migrants
81 node_ids[migrating_indices] = destinations
82
83 # Reset migration timers for migrated agents
84 self.model.population.migration_timer[migrating_indices] = np.random.randint(
85 1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size
86 )
Transmission Component Class¶
1class TransmissionComponent:
2 """
3 Handles the disease transmission dynamics within the population.
4
5 Attributes:
6 model (MultiNodeSIRModel): The simulation model instance.
7 """
8
9 def __init__(self, model):
10 """
11 Initializes the TransmissionComponent and infects initial agents.
12
13 Args:
14 model (MultiNodeSIRModel): The simulation model instance.
15 """
16 self.model = model
17
18 def step(self):
19 """
20 Simulates disease transmission for each node in the current timestep.
21 """
22 for i in range(self.model.nodes):
23 in_node = self.model.population.node_id == i
24 susceptible = in_node & (self.model.population.disease_state == 0)
25 infected = in_node & (self.model.population.disease_state == 1)
26
27 num_susceptible = susceptible.sum()
28 num_infected = infected.sum()
29 total_in_node = in_node.sum()
30
31 if total_in_node > 0 and num_infected > 0 and num_susceptible > 0:
32 infectious_fraction = num_infected / total_in_node
33 susceptible_fraction = num_susceptible / total_in_node
34
35 new_infections = int(
36 self.model.params["transmission_rate"] * infectious_fraction * susceptible_fraction * total_in_node
37 )
38
39 susceptible_indices = np.where(susceptible)[0]
40 newly_infected_indices = np.random.choice(susceptible_indices, size=new_infections, replace=False)
41
42 self.model.population.disease_state[newly_infected_indices] = 1
43 self.model.population.recovery_timer[newly_infected_indices] = np.random.randint(5, 15, size=new_infections)
Recovery Component Class¶
1class RecoveryComponent:
2 """
3 Handles the recovery dynamics of infected individuals in the population.
4
5 Attributes:
6 model (MultiNodeSIRModel): The simulation model instance.
7 """
8
9 def __init__(self, model):
10 """
11 Initializes the RecoveryComponent.
12
13 Args:
14 model (MultiNodeSIRModel): The simulation model instance.
15 """
16 self.model = model
17
18 def step(self):
19 """
20 Updates the recovery state of infected individuals, moving them to the recovered state
21 if their recovery timer has elapsed.
22 """
23 infected = self.model.population.disease_state == 1
24 self.model.population.recovery_timer[infected] -= 1
25 self.model.population.disease_state[(infected) & (self.model.population.recovery_timer <= 0)] = 2
Run Everything¶
1# Parameters
2params = {
3 "population_size": 1_000_000,
4 "nodes": 20,
5 "timesteps": 600,
6 "initial_infected_fraction": 0.01,
7 "transmission_rate": 0.25,
8 "migration_rate": 0.001
9}
10
11# Run simulation
12model = MultiNodeSIRModel(params)
13model.add_component(MigrationComponent(model))
14model.add_component(TransmissionComponent(model))
15model.add_component(RecoveryComponent(model))
16model.run()
17model.save_results("simulation_results.csv")
18model.plot_results()
Discussion¶
This simple spatial example with migration creates a set of nodes with synthetic populations and a linear connection structure. The 0th node is the ‘urban’ node, with the largest population, and where we seed the infection. The migration matrix just connects nodes to the next node (by index). So we expect to see infection travel sequentially from node to node. We break the connection at node 13 just to show we can.
Migration in 2 Dimensions¶
Now we make things a bit more interesting by taking a similar set of nodes, but creating a migration matrix from the gravtiy function, so we effectively have a 2D network with rates proportional to population sizes. Distances are still very synthetic. We change our migration function as well since we have to be a little smarter when our matrix isn’t sparse.
Migration Component (2D)¶
1class MigrationComponent2D:
2 """
3 Handles migration behavior of agents between nodes in the model.
4
5 Attributes:
6 model (MultiNodeSIRModel): The simulation model instance.
7 migration_matrix (ndarray): A matrix representing migration probabilities between nodes.
8 """
9
10 def __init__(self, model):
11 """
12 Initializes the MigrationComponent.
13
14 Args:
15 model (MultiNodeSIRModel): The simulation model instance.
16 """
17 self.model = model
18
19 # Set initial migration timers
20 max_timer = int(1 / model.params["migration_rate"])
21 model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"])
22
23 self.migration_matrix = self.get_gravity_migration_matrix(model.nodes)
24
25 def get_gravity_migration_matrix(self, nodes):
26 """
27 Generates a gravity-based migration matrix based on population and distances between nodes.
28
29 Args:
30 nodes (int): Number of nodes in the simulation.
31
32 Returns:
33 ndarray: A migration matrix where each row represents probabilities of migration to other nodes.
34 """
35 pops = np.array([np.sum(self.model.population.node_id == i) for i in range(nodes)])
36 distances = np.ones((nodes, nodes)) - np.eye(nodes)
37 migration_matrix = gravity(pops, distances, k=1.0, a=0.5, b=0.5, c=2.0)
38 migration_matrix = migration_matrix / migration_matrix.sum(axis=1, keepdims=True)
39 return migration_matrix
40
41 def step(self):
42
43 """
44 Executes the migration step for the agent-based model.
45
46 This function selects a fraction of agents to migrate based on expired migration timers.
47 It then changes their node_id according to the migration matrix, ensuring that movements
48 follow the prescribed probability distributions.
49
50 Steps:
51 - Selects a subset of the population for migration.
52 - Determines the origin nodes of migrating agents.
53 - Uses a multinomial draw to assign new destinations based on the migration matrix.
54 - Updates the agents' node assignments accordingly.
55
56 Returns:
57 None
58 """
59 # Decrement migration timers
60 self.model.population.migration_timer -= 1
61
62 # Identify agents ready to migrate
63 migrating_indices = np.where(self.model.population.migration_timer <= 0)[0]
64 if migrating_indices.size == 0:
65 return
66
67 np.random.shuffle(migrating_indices)
68
69 origins = model.population.node_id[migrating_indices]
70 origin_counts = np.bincount(origins, minlength=model.params["nodes"])
71
72 offset = 0
73
74 for origin in range(model.params["nodes"]):
75 count = origin_counts[origin]
76 if count == 0:
77 continue
78
79 origin_slice = migrating_indices[offset : offset + count]
80 offset += count
81
82 row = self.migration_matrix[origin]
83 row_sum = row.sum()
84 if row_sum <= 0:
85 continue
86
87 fraction_row = row / row_sum
88 destination_counts = np.random.multinomial(count, fraction_row)
89 destinations = np.repeat(np.arange(model.params["nodes"]), destination_counts)
90 model.population.node_id[origin_slice] = destinations[:count]
91
92 # Reset migration timers for migrated agents
93 self.model.population.migration_timer[migrating_indices] = np.random.randint(
94 1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size
95 )
Discussion¶
This example is more advanced than our first one since it moves from 1 dimension to 2, and is fully connected. We should see infection spread from the large seed node to most or potentially all of the other nodes (depending on transmission rates and migration rates and stochasticity) in a way that is broadly a function of the other nodes’ populations. Though since the smaller nodes don’t vary massively in population and since the model is stochastic, it will be a general correlation.
Simple Spatial SIR Model with Real Data¶
Design¶
Now we switch from synthetic population and spatial data to real population data. In this example, we have a csv file which starts off like this:
region_id,population,centroid_lat,centroid_long,birth_rate
Ryansoro,46482.66796875,-3.707268618580818,29.79879895068512,11.65647
Ndava,72979.296875,-3.391556716979041,29.753430749757815,15.881549
Buyengero,76468.8125,-3.8487418774123014,29.53299692786253,12.503805
Bugarama,44571.8515625,-3.6904789341549504,29.400408879716224,11.025566
Rumonge,300248.03125,-3.9622108122897663,29.45711276535364,19.567726
Burambi,63219.703125,-3.798641437985548,29.452423323952797,9.199019
Kanyosha1,115017.984375,-3.4309688424403473,29.41531324224386,37.951366
Kabezi,71913.8359375,-3.5311012728218527,29.369968675926785,31.831919
Muhuta,88141.7109375,-3.623512958448508,29.415218642943234,21.598902
Code¶
1class SpatialSIRModelRealData:
2 def __init__(self, params, population_data):
3 """
4 Initialize the mode, LASER-style, using the population_data loaded from a csv file (pandas).
5 Create nodes and a migration_matrix based on populations and locations of each node.
6
7 """
8 self.params = params
9 # We scale down population here from literal values but this may not be necessary.
10 population_data["scaled_population"] = (population_data["population"] / params["scale_factor"]).round().astype(int)
11 total_population = int(population_data["scaled_population"].sum())
12 print( f"{total_population=}" )
13
14 # Set up the properties as before
15 self.population = LaserFrame(capacity=total_population, initial_count=total_population)
16 self.population.add_scalar_property("node_id", dtype=np.int32)
17 self.population.add_scalar_property("disease_state", dtype=np.int32, default=0)
18 self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
19 self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0)
20
21 node_pops = population_data["scaled_population"].values
22 self.params["nodes"] = len(node_pops)
23
24 node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)])
25 np.random.shuffle(node_ids)
26 self.population.node_id[:total_population] = node_ids
27
28 # seed in big node
29 big_node_id = np.argmax( node_pops )
30 available_population = population_data["scaled_population"][big_node_id]
31 initial_infected = int(params["initial_infected_fraction"] * available_population)
32 infection_indices = np.random.choice(np.where(self.population.node_id == big_node_id)[0], initial_infected, replace=False)
33 self.population.disease_state[infection_indices] = 1
34 self.population.recovery_timer[infection_indices] = np.random.uniform(params["recovery_time"] - 3, params["recovery_time"] + 3, size=initial_infected).astype(int)
35
36 pop_sizes = np.array(node_pops)
37 latitudes = population_data["centroid_lat"].values
38 longitudes = population_data["centroid_long"].values
39 distances = np.zeros((self.params["nodes"], self.params["nodes"]))
40
41 # Nested for loop here is optimized for readbility, not performance
42 for i in range(self.params["nodes"]):
43 for j in range(self.params["nodes"]):
44 if i != j:
45 distances[i, j] = distance(latitudes[i], longitudes[i], latitudes[j], longitudes[j])
46
47 # Set up our migration_matrix based on gravity model and input data (pops & distances)
48 self.distances = distances
49 self.migration_matrix = gravity(pop_sizes, distances, k=10.0, a=1.0, b=1.0, c=1.0)
50 self.migration_matrix /= self.migration_matrix.sum(axis=1, keepdims=True) # normalize
Discussion¶
We load the population data from the csv file, round and scale down (optional), and assign node ids. The shuffling is probably optional, but helps avoid biasing. We exploit the distance function from laser_utils (import not in code snippet).
Alternative Migration Approach¶
This section describes an alternative approach to modeling migration by using infection migration rather than individual movement. This method allows for computational efficiency while maintaining accurate disease transmission dynamics. Note we don’t use any MigrationComponent or migration_timer in this solution.
import numpy as np
from laser_core.migration import gravity
from laser_core.utils import calc_distances
class TransmissionComponent:
"""
Transmission Component
=======================
This class models the transmission of disease using "infection migration"
instead of human movement. Instead of tracking individual movement,
infection is spread probabilistically based on a gravity-inspired network.
This approach can significantly improve computational efficiency for
large-scale spatial epidemic simulations.
Attributes:
------------
model : object
The simulation model containing population and node information.
network : ndarray
A matrix representing the transmission probabilities between nodes.
locations : ndarray
Array of node latitude and longitude coordinates.
"""
def __init__(self, model):
"""
Initializes the transmission component by computing the infection migration network.
Parameters:
-----------
model : object
The simulation model containing population and node information.
"""
self.model = model
model.nodes.add_vector_property("network", length=model.nodes.count, dtype=np.float32)
self.network = model.nodes.network
# Extract node locations and populations from model.population_data
self.locations = np.column_stack((model.population_data["centroid_lat"], model.population_data["centroid_long"]))
initial_populations = np.array(model.population_data["population"])
# Initialize heterogeneous transmission factor per agent (0.5 to 2.0)
self.model.population.tx_hetero_factor = np.random.uniform(0.5, 2.0, size=model.population.capacity)
# Compute the infection migration network based on node populations.
a, b, c, k = self.model.params.a, self.model.params.b, self.model.params.c, self.model.params.k
# Compute all pairwise distances in one call (this speeds up initialization significantly)
# from laser_core.migration import gravity, row_normalizer
# from laser_core.utils import calc_distances
distances = calc_distances(self.locations[:, 0], self.locations[:, 1])
self.network = gravity(initial_populations, distances, k, a, b, c)
self.network /= np.power(initial_populations.sum(), c) # Normalize
self.network = row_normalizer(self.network, 0.01) # 0.01=max_frac
def step(self):
"""
Simulates disease transmission and infection migration across the network.
New infections are determined deterministically based on contagion levels and susceptible fraction.
"""
contagious_indices = np.where(self.model.population.disease_state == 1)[0]
values = self.model.population.tx_hetero_factor[contagious_indices] # Apply heterogeneity factor
# Compute contagion levels per node
contagion = np.bincount(
self.model.population.node_id[contagious_indices],
weights=values,
minlength=self.model.nodes.count
).astype(np.float64)
# Apply network-based infection movement
transfer = (contagion * self.network).round().astype(np.float64)
# Ensure net contagion remains positive after movement
contagion += transfer.sum(axis=1) - transfer.sum(axis=0)
contagion = np.maximum(contagion, 0) # Prevent negative contagion
# Infect susceptible individuals in each node deterministically
for i in range(self.model.nodes.count):
node_population = np.where(self.model.population.node_id == i)[0]
susceptible = node_population[self.model.population.disease_state[node_population] == 0]
if len(susceptible) > 0:
# Compute new infections deterministically based on prevalence and susceptible fraction
num_new_infections = int(min(len(susceptible), (
self.model.params.transmission_rate * contagion[i] * len(susceptible) / len(node_population)
)))
# Randomly select susceptible individuals for infection
new_infected_indices = np.random.choice(susceptible, size=num_new_infections, replace=False)
self.model.population.disease_state[new_infected_indices] = 1
# Assign recovery timers to newly infected individuals
self.model.population.recovery_timer[new_infected_indices] = np.random.randint(5, 15, size=num_new_infections)
# TODO: Potential Performance Improvement: Consider using a sparse representation for `network`
# if many connections have very low probability. This would speed up matrix multiplications significantly.
# TODO: Investigate parallelization of contagion computation for large-scale simulations
# using Numba or JIT compilation to optimize the loop structure.