Simple Spatial SIR Model with Synthetic Data

This example simulates a spatial Susceptible-Infected-Recovered (SIR) model with modular components. The population is distributed across 20 nodes in a 1-D chain, with infection spreading spatially from node 0 and agents migrating based on a migration matrix.

Spatial Arrangement

The model uses a 1-D spatial structure: - Node connections: Each node is connected to its next neighbor in the chain, allowing migration to occur sequentially (e.g., 0 → 1 → 2 …). - Infection propagation: Infection starts in node 0 and spreads to neighboring nodes as agents migrate.

Two migration matrix options are available: 1. Sequential Migration Matrix: Agents can only move to their next node in the chain. 2. Gravity Model Migration Matrix: This provides a more realistic 2-D spatial dynamic, where migration probabilities depend on node distances and population sizes.

Population Initialization

  • Skewed Distribution: The population is distributed across nodes using a rural-to-urban skew.

  • Timers: Migration timers are assigned to control agent migration frequency.

Model Components

The simulation uses modular components for migration, transmission, and recovery dynamics. Each component encapsulates the logic for its specific task.

Full Code Implementation

Imports

1import numpy as np
2import matplotlib.pyplot as plt
3import csv
4from laser_core.laserframe import LaserFrame
5from laser_core.demographics.spatialpops import distribute_population_skewed as dps
6from laser_core.migration import gravity

Model Class

  1# Define the model
  2class MultiNodeSIRModel:
  3    """
  4    A multi-node SIR (Susceptible-Infected-Recovered) disease transmission model.
  5
  6    Attributes:
  7        params (dict): Configuration parameters for the model.
  8        nodes (int): Number of nodes in the simulation.
  9        population (LaserFrame): Represents the population with agent-level properties.
 10        results (np.ndarray): Stores simulation results for S, I, and R per node.
 11        components (list): List of components (e.g., Migration, Transmission) added to the model.
 12    """
 13
 14    def __init__(self, params):
 15        """
 16        Initializes the SIR model with the given parameters.
 17
 18        Args:
 19            params (dict): Dictionary containing parameters such as population size,
 20                           number of nodes, timesteps, and rates for transmission/migration.
 21        """
 22        self.components = []
 23        self.params = params
 24        self.nodes = params["nodes"]
 25        self.population = LaserFrame(capacity=params["population_size"], initial_count=params["population_size"])
 26
 27        # Define properties
 28        self.population.add_scalar_property("node_id", dtype=np.int32)
 29        self.population.add_scalar_property("disease_state", dtype=np.int32, default=0)  # 0=S, 1=I, 2=R
 30        self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
 31        self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0)
 32
 33        # Initialize population distribution
 34        node_pops = dps(params["population_size"], self.nodes, frac_rural=0.3)
 35        node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)])
 36        np.random.shuffle(node_ids)
 37        self.population.node_id[:params["population_size"]] = node_ids
 38
 39        # Reporting structure: Use LaserFrame for reporting
 40        self.results = LaserFrame( capacity=self.nodes ) # not timesteps for capacity
 41        for state in ["S", "I", "R"]:
 42            self.results.add_vector_property(state, length=params["timesteps"], dtype=np.int32)
 43
 44        # Record results: reporting could actually be a component if we wanted. Or it can be
 45        # done in a log/report function in the relevant component (e.g., Transmission)
 46        self.results.S[self.current_timestep, :] = [
 47            np.sum(self.population.disease_state[self.population.node_id == i] == 0)
 48            for i in range(self.nodes)
 49        ]
 50        self.results.I[self.current_timestep, :] = [
 51            np.sum(self.population.disease_state[self.population.node_id == i] == 1)
 52            for i in range(self.nodes)
 53        ]
 54        self.results.R[self.current_timestep, :] = [
 55            np.sum(self.population.disease_state[self.population.node_id == i] == 2)
 56            for i in range(self.nodes)
 57        ]
 58
 59    def add_component(self, component):
 60        """
 61        Adds a component (e.g., Migration, Transmission, Recovery) to the model.
 62
 63        Args:
 64            component: An instance of a component to be added.
 65        """
 66        self.components.append(component)
 67
 68    def step(self):
 69        """
 70        Advances the simulation by one timestep, updating all components and recording results.
 71        """
 72        for component in self.components:
 73            component.step()
 74
 75        # Record results
 76        for i in range(self.nodes):
 77            in_node = self.population.node_id == i
 78            self.results[self.current_timestep, i, 0] = (self.population.disease_state[in_node] == 0).sum()
 79            self.results[self.current_timestep, i, 1] = (self.population.disease_state[in_node] == 1).sum()
 80            self.results[self.current_timestep, i, 2] = (self.population.disease_state[in_node] == 2).sum()
 81
 82    def run(self):
 83        """
 84        Runs the simulation for the configured number of timesteps.
 85        """
 86        from tqdm import tqdm
 87        for self.current_timestep in tqdm(range(self.params["timesteps"])):
 88            self.step()
 89
 90    def save_results(self, filename):
 91        """
 92        Saves the simulation results to a CSV file.
 93
 94        Args:
 95            filename (str): Path to the output file.
 96        """
 97        with open(filename, mode='w', newline='') as file:
 98            writer = csv.writer(file)
 99            writer.writerow(["Timestep", "Node", "Susceptible", "Infected", "Recovered"])
100            for t in range(self.params["timesteps"]):
101                for node in range(self.nodes):
102                    writer.writerow([t, node, *self.results[t, node]])
103
104    def plot_results(self):
105        """
106        Plots the prevalence of infected agents over time for all nodes.
107        """
108        plt.figure(figsize=(10, 6))
109        for i in range(self.nodes):
110            prevalence = self.results.I[:, i] / (
111                self.results.S[:, i] +
112                self.results.I[:, i] +
113                self.results.R[:, i]
114            )
115            plt.plot(prevalence, label=f"Node {i}")
116
117        plt.title("Prevalence Across All Nodes")
118        plt.xlabel("Timesteps")
119        plt.ylabel("Prevalence of Infected Agents")
120        plt.legend()
121        plt.show()

Migration Component Class

 1class MigrationComponent:
 2    """
 3    Handles migration behavior of agents between nodes in the model.
 4
 5    Attributes:
 6        model (MultiNodeSIRModel): The simulation model instance.
 7        migration_matrix (ndarray): A matrix representing migration probabilities between nodes.
 8    """
 9
10    def __init__(self, model):
11        """
12        Initializes the MigrationComponent.
13
14        Args:
15            model (MultiNodeSIRModel): The simulation model instance.
16        """
17        self.model = model
18
19        # Set initial migration timers
20        max_timer = int(1 / model.params["migration_rate"])
21        model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"])
22
23        self.migration_matrix = self.get_sequential_migration_matrix(model.nodes)
24
25        # Example customization: Disable migration from node 13 to 14
26        def break_matrix_node(matrix, from_node, to_node):
27            matrix[from_node][to_node] = 0
28        break_matrix_node(self.migration_matrix, 13, 14)
29
30    def get_sequential_migration_matrix(self, nodes):
31        """
32        Creates a migration matrix where agents can only migrate to the next sequential node.
33
34        Args:
35            nodes (int): Number of nodes in the simulation.
36
37        Returns:
38            ndarray: A migration matrix where migration is allowed only to the next node.
39        """
40        migration_matrix = np.zeros((nodes, nodes))
41        for i in range(nodes - 1):
42            migration_matrix[i, i + 1] = 1.0
43        return migration_matrix
44
45    def step(self):
46        """
47        Updates the migration state of the population by determining which agents migrate
48        and their destinations based on the migration matrix.
49        """
50        node_ids = self.model.population.node_id
51
52        # Decrement migration timers
53        self.model.population.migration_timer -= 1
54
55        # Identify agents ready to migrate
56        migrating_indices = np.where(self.model.population.migration_timer <= 0)[0]
57        if migrating_indices.size == 0:
58            return
59
60        # Shuffle migrants and assign destinations based on migration matrix
61        np.random.shuffle(migrating_indices)
62        destinations = np.empty(len(migrating_indices), dtype=int)
63        for origin in range(self.model.nodes):
64            origin_mask = node_ids[migrating_indices] == origin
65            num_origin_migrants = origin_mask.sum()
66
67            if num_origin_migrants > 0:
68                # Assign destinations proportionally to migration matrix
69                destination_counts = np.round(self.migration_matrix[origin] * num_origin_migrants).astype(int)
70                destination_counts = np.maximum(destination_counts, 0)  # Clip negative values
71                if destination_counts.sum() == 0:  # No valid destinations
72                    destinations[origin_mask] = origin  # Stay in the same node
73                    continue
74                destination_counts[origin] += num_origin_migrants - destination_counts.sum()  # Adjust rounding errors
75
76                # Create ordered destination assignments
77                destination_indices = np.repeat(np.arange(self.model.nodes), destination_counts)
78                destinations[origin_mask] = destination_indices[:num_origin_migrants]
79
80        # Update node IDs of migrants
81        node_ids[migrating_indices] = destinations
82
83        # Reset migration timers for migrated agents
84        self.model.population.migration_timer[migrating_indices] = np.random.randint(
85            1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size
86        )

Transmission Component Class

 1class TransmissionComponent:
 2    """
 3    Handles the disease transmission dynamics within the population.
 4
 5    Attributes:
 6        model (MultiNodeSIRModel): The simulation model instance.
 7    """
 8
 9    def __init__(self, model):
10        """
11        Initializes the TransmissionComponent and infects initial agents.
12
13        Args:
14            model (MultiNodeSIRModel): The simulation model instance.
15        """
16        self.model = model
17
18    def step(self):
19        """
20        Simulates disease transmission for each node in the current timestep.
21        """
22        for i in range(self.model.nodes):
23            in_node = self.model.population.node_id == i
24            susceptible = in_node & (self.model.population.disease_state == 0)
25            infected = in_node & (self.model.population.disease_state == 1)
26
27            num_susceptible = susceptible.sum()
28            num_infected = infected.sum()
29            total_in_node = in_node.sum()
30
31            if total_in_node > 0 and num_infected > 0 and num_susceptible > 0:
32                infectious_fraction = num_infected / total_in_node
33                susceptible_fraction = num_susceptible / total_in_node
34
35                new_infections = int(
36                    self.model.params["transmission_rate"] * infectious_fraction * susceptible_fraction * total_in_node
37                )
38
39                susceptible_indices = np.where(susceptible)[0]
40                newly_infected_indices = np.random.choice(susceptible_indices, size=new_infections, replace=False)
41
42                self.model.population.disease_state[newly_infected_indices] = 1
43                self.model.population.recovery_timer[newly_infected_indices] = np.random.randint(5, 15, size=new_infections)

Recovery Component Class

 1class RecoveryComponent:
 2    """
 3    Handles the recovery dynamics of infected individuals in the population.
 4
 5    Attributes:
 6        model (MultiNodeSIRModel): The simulation model instance.
 7    """
 8
 9    def __init__(self, model):
10        """
11        Initializes the RecoveryComponent.
12
13        Args:
14            model (MultiNodeSIRModel): The simulation model instance.
15        """
16        self.model = model
17
18    def step(self):
19        """
20        Updates the recovery state of infected individuals, moving them to the recovered state
21        if their recovery timer has elapsed.
22        """
23        infected = self.model.population.disease_state == 1
24        self.model.population.recovery_timer[infected] -= 1
25        self.model.population.disease_state[(infected) & (self.model.population.recovery_timer <= 0)] = 2

Run Everything

 1# Parameters
 2params = {
 3    "population_size": 1_000_000,
 4    "nodes": 20,
 5    "timesteps": 600,
 6    "initial_infected_fraction": 0.01,
 7    "transmission_rate": 0.25,
 8    "migration_rate": 0.001
 9}
10
11# Run simulation
12model = MultiNodeSIRModel(params)
13model.add_component(MigrationComponent(model))
14model.add_component(TransmissionComponent(model))
15model.add_component(RecoveryComponent(model))
16model.run()
17model.save_results("simulation_results.csv")
18model.plot_results()

Discussion

This simple spatial example with migration creates a set of nodes with synthetic populations and a linear connection structure. The 0th node is the ‘urban’ node, with the largest population, and where we seed the infection. The migration matrix just connects nodes to the next node (by index). So we expect to see infection travel sequentially from node to node. We break the connection at node 13 just to show we can.

Migration in 2 Dimensions

Now we make things a bit more interesting by taking a similar set of nodes, but creating a migration matrix from the gravtiy function, so we effectively have a 2D network with rates proportional to population sizes. Distances are still very synthetic. We change our migration function as well since we have to be a little smarter when our matrix isn’t sparse.

Migration Component (2D)

 1class MigrationComponent2D:
 2    """
 3    Handles migration behavior of agents between nodes in the model.
 4
 5    Attributes:
 6        model (MultiNodeSIRModel): The simulation model instance.
 7        migration_matrix (ndarray): A matrix representing migration probabilities between nodes.
 8    """
 9
10    def __init__(self, model):
11        """
12        Initializes the MigrationComponent.
13
14        Args:
15            model (MultiNodeSIRModel): The simulation model instance.
16        """
17        self.model = model
18
19        # Set initial migration timers
20        max_timer = int(1 / model.params["migration_rate"])
21        model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"])
22
23        self.migration_matrix = self.get_gravity_migration_matrix(model.nodes)
24
25    def get_gravity_migration_matrix(self, nodes):
26        """
27        Generates a gravity-based migration matrix based on population and distances between nodes.
28
29        Args:
30            nodes (int): Number of nodes in the simulation.
31
32        Returns:
33            ndarray: A migration matrix where each row represents probabilities of migration to other nodes.
34        """
35        pops = np.array([np.sum(self.model.population.node_id == i) for i in range(nodes)])
36        distances = np.ones((nodes, nodes)) - np.eye(nodes)
37        migration_matrix = gravity(pops, distances, k=1.0, a=0.5, b=0.5, c=2.0)
38        migration_matrix = migration_matrix / migration_matrix.sum(axis=1, keepdims=True)
39        return migration_matrix
40
41    def step(self):
42
43        """
44        Executes the migration step for the agent-based model.
45
46        This function selects a fraction of agents to migrate based on expired migration timers.
47        It then changes their node_id according to the migration matrix, ensuring that movements
48        follow the prescribed probability distributions.
49
50        Steps:
51        - Selects a subset of the population for migration.
52        - Determines the origin nodes of migrating agents.
53        - Uses a multinomial draw to assign new destinations based on the migration matrix.
54        - Updates the agents' node assignments accordingly.
55
56        Returns:
57            None
58        """
59        # Decrement migration timers
60        self.model.population.migration_timer -= 1
61
62        # Identify agents ready to migrate
63        migrating_indices = np.where(self.model.population.migration_timer <= 0)[0]
64        if migrating_indices.size == 0:
65            return
66
67        np.random.shuffle(migrating_indices)
68
69        origins = model.population.node_id[migrating_indices]
70        origin_counts = np.bincount(origins, minlength=model.params["nodes"])
71
72        offset = 0
73
74        for origin in range(model.params["nodes"]):
75            count = origin_counts[origin]
76            if count == 0:
77                continue
78
79            origin_slice = migrating_indices[offset : offset + count]
80            offset += count
81
82            row = self.migration_matrix[origin]
83            row_sum = row.sum()
84            if row_sum <= 0:
85                continue
86
87            fraction_row = row / row_sum
88            destination_counts = np.random.multinomial(count, fraction_row)
89            destinations = np.repeat(np.arange(model.params["nodes"]), destination_counts)
90            model.population.node_id[origin_slice] = destinations[:count]
91
92        # Reset migration timers for migrated agents
93        self.model.population.migration_timer[migrating_indices] = np.random.randint(
94            1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size
95        )

Discussion

This example is more advanced than our first one since it moves from 1 dimension to 2, and is fully connected. We should see infection spread from the large seed node to most or potentially all of the other nodes (depending on transmission rates and migration rates and stochasticity) in a way that is broadly a function of the other nodes’ populations. Though since the smaller nodes don’t vary massively in population and since the model is stochastic, it will be a general correlation.

Simple Spatial SIR Model with Real Data

Design

Now we switch from synthetic population and spatial data to real population data. In this example, we have a csv file which starts off like this:

region_id,population,centroid_lat,centroid_long,birth_rate
Ryansoro,46482.66796875,-3.707268618580818,29.79879895068512,11.65647
Ndava,72979.296875,-3.391556716979041,29.753430749757815,15.881549
Buyengero,76468.8125,-3.8487418774123014,29.53299692786253,12.503805
Bugarama,44571.8515625,-3.6904789341549504,29.400408879716224,11.025566
Rumonge,300248.03125,-3.9622108122897663,29.45711276535364,19.567726
Burambi,63219.703125,-3.798641437985548,29.452423323952797,9.199019
Kanyosha1,115017.984375,-3.4309688424403473,29.41531324224386,37.951366
Kabezi,71913.8359375,-3.5311012728218527,29.369968675926785,31.831919
Muhuta,88141.7109375,-3.623512958448508,29.415218642943234,21.598902

Code

 1class SpatialSIRModelRealData:
 2    def __init__(self, params, population_data):
 3        """
 4            Initialize the mode, LASER-style, using the population_data loaded from a csv file (pandas).
 5            Create nodes and a migration_matrix based on populations and locations of each node.
 6
 7        """
 8        self.params = params
 9        # We scale down population here from literal values but this may not be necessary.
10        population_data["scaled_population"] = (population_data["population"] / params["scale_factor"]).round().astype(int)
11        total_population = int(population_data["scaled_population"].sum())
12        print( f"{total_population=}" )
13
14        # Set up the properties as before
15        self.population = LaserFrame(capacity=total_population, initial_count=total_population)
16        self.population.add_scalar_property("node_id", dtype=np.int32)
17        self.population.add_scalar_property("disease_state", dtype=np.int32, default=0)
18        self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0)
19        self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0)
20
21        node_pops = population_data["scaled_population"].values
22        self.params["nodes"] = len(node_pops)
23
24        node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)])
25        np.random.shuffle(node_ids)
26        self.population.node_id[:total_population] = node_ids
27
28        # seed in big node
29        big_node_id = np.argmax( node_pops )
30        available_population = population_data["scaled_population"][big_node_id]
31        initial_infected = int(params["initial_infected_fraction"] * available_population)
32        infection_indices = np.random.choice(np.where(self.population.node_id == big_node_id)[0], initial_infected, replace=False)
33        self.population.disease_state[infection_indices] = 1
34        self.population.recovery_timer[infection_indices] = np.random.uniform(params["recovery_time"] - 3, params["recovery_time"] + 3, size=initial_infected).astype(int)
35
36        pop_sizes = np.array(node_pops)
37        latitudes = population_data["centroid_lat"].values
38        longitudes = population_data["centroid_long"].values
39        distances = np.zeros((self.params["nodes"], self.params["nodes"]))
40
41        # Nested for loop here is optimized for readbility, not performance
42        for i in range(self.params["nodes"]):
43            for j in range(self.params["nodes"]):
44                if i != j:
45                    distances[i, j] = distance(latitudes[i], longitudes[i], latitudes[j], longitudes[j])
46
47        # Set up our migration_matrix based on gravity model and input data (pops & distances)
48        self.distances = distances
49        self.migration_matrix = gravity(pop_sizes, distances, k=10.0, a=1.0, b=1.0, c=1.0)
50        self.migration_matrix /= self.migration_matrix.sum(axis=1, keepdims=True) # normalize

Discussion

We load the population data from the csv file, round and scale down (optional), and assign node ids. The shuffling is probably optional, but helps avoid biasing. We exploit the distance function from laser_utils (import not in code snippet).

Alternative Migration Approach

This section describes an alternative approach to modeling migration by using infection migration rather than individual movement. This method allows for computational efficiency while maintaining accurate disease transmission dynamics. Note we don’t use any MigrationComponent or migration_timer in this solution.

import numpy as np
from laser_core.migration import gravity
from laser_core.utils import calc_distances

class TransmissionComponent:
    """
    Transmission Component
    =======================

    This class models the transmission of disease using "infection migration"
    instead of human movement. Instead of tracking individual movement,
    infection is spread probabilistically based on a gravity-inspired network.

    This approach can significantly improve computational efficiency for
    large-scale spatial epidemic simulations.

    Attributes:
    ------------
    model : object
        The simulation model containing population and node information.
    network : ndarray
        A matrix representing the transmission probabilities between nodes.
    locations : ndarray
        Array of node latitude and longitude coordinates.
    """
    def __init__(self, model):
        """
        Initializes the transmission component by computing the infection migration network.

        Parameters:
        -----------
        model : object
            The simulation model containing population and node information.
        """
        self.model = model
        model.nodes.add_vector_property("network", length=model.nodes.count, dtype=np.float32)
        self.network = model.nodes.network

        # Extract node locations and populations from model.population_data
        self.locations = np.column_stack((model.population_data["centroid_lat"], model.population_data["centroid_long"]))
        initial_populations = np.array(model.population_data["population"])

        # Initialize heterogeneous transmission factor per agent (0.5 to 2.0)
        self.model.population.tx_hetero_factor = np.random.uniform(0.5, 2.0, size=model.population.capacity)

        # Compute the infection migration network based on node populations.
        a, b, c, k = self.model.params.a, self.model.params.b, self.model.params.c, self.model.params.k

        # Compute all pairwise distances in one call (this speeds up initialization significantly)
        # from laser_core.migration import gravity, row_normalizer
        # from laser_core.utils import calc_distances
        distances = calc_distances(self.locations[:, 0], self.locations[:, 1])
        self.network = gravity(initial_populations, distances, k, a, b, c)
        self.network /= np.power(initial_populations.sum(), c)  # Normalize
        self.network = row_normalizer(self.network, 0.01) # 0.01=max_frac

   def step(self):
        """
        Simulates disease transmission and infection migration across the network.

        New infections are determined deterministically based on contagion levels and susceptible fraction.
        """
        contagious_indices = np.where(self.model.population.disease_state == 1)[0]
        values = self.model.population.tx_hetero_factor[contagious_indices]  # Apply heterogeneity factor

        # Compute contagion levels per node
        contagion = np.bincount(
            self.model.population.node_id[contagious_indices],
            weights=values,
            minlength=self.model.nodes.count
        ).astype(np.float64)

        # Apply network-based infection movement
        transfer = (contagion * self.network).round().astype(np.float64)

        # Ensure net contagion remains positive after movement
        contagion += transfer.sum(axis=1) - transfer.sum(axis=0)
        contagion = np.maximum(contagion, 0)  # Prevent negative contagion

        # Infect susceptible individuals in each node deterministically
        for i in range(self.model.nodes.count):
            node_population = np.where(self.model.population.node_id == i)[0]
            susceptible = node_population[self.model.population.disease_state[node_population] == 0]

            if len(susceptible) > 0:
                # Compute new infections deterministically based on prevalence and susceptible fraction
                num_new_infections = int(min(len(susceptible), (
                    self.model.params.transmission_rate * contagion[i] * len(susceptible) / len(node_population)
                )))

                # Randomly select susceptible individuals for infection
                new_infected_indices = np.random.choice(susceptible, size=num_new_infections, replace=False)
                self.model.population.disease_state[new_infected_indices] = 1

                # Assign recovery timers to newly infected individuals
                self.model.population.recovery_timer[new_infected_indices] = np.random.randint(5, 15, size=num_new_infections)

        # TODO: Potential Performance Improvement: Consider using a sparse representation for `network`
        # if many connections have very low probability. This would speed up matrix multiplications significantly.

        # TODO: Investigate parallelization of contagion computation for large-scale simulations
        # using Numba or JIT compilation to optimize the loop structure.