Source code for laser_measles.biweekly.mixing
import numpy as np
import polars as pl
# from astropy.coordinates import angular_separation
# Constants for gravity diffusion model
MAX_DISTANCE = 100000000 # km, used to prevent self-migration
MIN_DISTANCE = 10 # km, minimum distance to prevent excessive neighbor migration
[docs]
def pairwise_haversine(df): # TODO: use angular separation formula instead
"""Pairwise distances for all (lon, lat) points using the Haversine formula.
Args:
df (pl.DataFrame): Polars DataFrame with 'lon' and 'lat' columns
Returns:
Pairwise distances in kilometers
"""
# mean earth radius in km
earth_radius_km = 6367
# convert from degrees to radians using polars
data = np.deg2rad(df[["lon", "lat"]].to_numpy())
lon = data[:, 0]
lat = data[:, 1]
# matrices of pairwise differences for latitudes & longitudes
dlat = lat[:, None] - lat
dlon = lon[:, None] - lon
# vectorized haversine distance calculation
d = np.sin(dlat / 2) ** 2 + np.cos(lat[:, None]) * np.cos(lat) * np.sin(dlon / 2) ** 2
return 2 * earth_radius_km * np.arcsin(np.sqrt(d))
[docs]
def init_gravity_diffusion(df: pl.DataFrame, scale: float, dist_exp: float) -> np.ndarray:
if len(df) == 1:
return np.ones((1, 1))
# Calculate pairwise distances
distances = pairwise_haversine(df)
pops = np.array(df["pop"])
pops = pops[:, np.newaxis].T
pops = np.repeat(pops, pops.size, axis=0).astype(np.float64)
np.fill_diagonal(distances, 100000000) # Prevent divide by zero errors and self migration
diffusion_matrix = pops / (distances + 10) ** dist_exp # minimum distance prevents excessive neighbor migration
np.fill_diagonal(diffusion_matrix, 0)
# normalize average total outbound migration to 1
diffusion_matrix = diffusion_matrix / np.mean(np.sum(diffusion_matrix, axis=1))
diffusion_matrix *= scale
diagonal = 1 - np.sum(diffusion_matrix, axis=1) # normalized outbound migration by source
np.fill_diagonal(diffusion_matrix, diagonal)
return diffusion_matrix
[docs]
def pairwise_haversine_(lon: np.ndarray, lat: np.ndarray) -> np.ndarray:
"""Calculate pairwise distances for all (lon, lat) points using the Haversine formula.
Args:
lon: Array of longitude values in degrees
lat: Array of latitude values in degrees
Returns:
Compressed matrix of pairwise distances in kilometers, where only the upper triangle
(excluding diagonal) is stored. The full matrix can be reconstructed using:
full_matrix = np.zeros((n, n))
full_matrix[np.triu_indices(n, k=1)] = compressed_matrix
full_matrix = full_matrix + full_matrix.T
"""
earth_radius_km = 6367
n = len(lon)
# Get upper triangle indices (excluding diagonal)
i, j = np.triu_indices(n, k=1)
# Calculate differences only for the upper triangle
dlat = lat[j] - lat[i]
dlon = lon[j] - lon[i]
# Calculate cosines only for the needed pairs
cos_lat_i = np.cos(lat[i])
cos_lat_j = np.cos(lat[j])
# vectorized haversine distance calculation for upper triangle only
d = np.sin(dlat / 2) ** 2 + cos_lat_i * cos_lat_j * np.sin(dlon / 2) ** 2
return 2 * earth_radius_km * np.arcsin(np.sqrt(d))
[docs]
def init_gravity_diffusion_(df: pl.DataFrame | tuple[np.ndarray, np.ndarray], scale: float, dist_exp: float) -> np.ndarray:
"""Initialize a gravity diffusion matrix for population mixing.
Args:
df: Either a DataFrame with 'population', 'lat', and 'lon' columns,
or a tuple of (lon, lat) arrays
scale: Scaling factor for the diffusion matrix
dist_exp: Distance exponent for the gravity model
Returns:
Normalized diffusion matrix where each row sums to 1
"""
if len(df) == 1:
return np.ones((1, 1))
n = len(df)
compressed_distances = pairwise_haversine(df["lon"].to_numpy(), df["lat"].to_numpy())
pops = df["pop"].to_numpy()
# Get the indices for the upper triangle
i, j = np.triu_indices(n, k=1)
# Calculate population products only for the upper triangle
pop_products = pops[i] * pops[j]
# Calculate diffusion values for upper triangle
diffusion_values = pop_products / (compressed_distances + MIN_DISTANCE) ** dist_exp
# Create the full matrix
diffusion_matrix = np.zeros((n, n))
diffusion_matrix[i, j] = diffusion_values
diffusion_matrix[j, i] = diffusion_values # Mirror the values
# Calculate row sums for normalization
row_sums = np.sum(diffusion_matrix, axis=1)
mean_sum = np.mean(row_sums)
# Normalize and scale
diffusion_matrix = diffusion_matrix / mean_sum * scale
# Calculate and set diagonal values
diagonal = 1 - np.sum(diffusion_matrix, axis=1)
np.fill_diagonal(diffusion_matrix, diagonal)
return diffusion_matrix