Source code for laser_measles.biweekly.mixing
import numpy as np
import polars as pl
# from astropy.coordinates import angular_separation
# Constants for gravity diffusion model
MAX_DISTANCE = 100000000  # km, used to prevent self-migration
MIN_DISTANCE = 10  # km, minimum distance to prevent excessive neighbor migration
[docs]
def pairwise_haversine(df):  # TODO: use angular separation formula instead
    """Pairwise distances for all (lon, lat) points using the Haversine formula.
    Args:
        df (pl.DataFrame): Polars DataFrame with 'lon' and 'lat' columns
    Returns:
        Pairwise distances in kilometers
    """
    # mean earth radius in km
    earth_radius_km = 6367
    # convert from degrees to radians using polars
    data = np.deg2rad(df[["lon", "lat"]].to_numpy())
    lon = data[:, 0]
    lat = data[:, 1]
    # matrices of pairwise differences for latitudes & longitudes
    dlat = lat[:, None] - lat
    dlon = lon[:, None] - lon
    # vectorized haversine distance calculation
    d = np.sin(dlat / 2) ** 2 + np.cos(lat[:, None]) * np.cos(lat) * np.sin(dlon / 2) ** 2
    return 2 * earth_radius_km * np.arcsin(np.sqrt(d)) 
[docs]
def init_gravity_diffusion(df: pl.DataFrame, scale: float, dist_exp: float) -> np.ndarray:
    if len(df) == 1:
        return np.ones((1, 1))
    # Calculate pairwise distances
    distances = pairwise_haversine(df)
    pops = np.array(df["pop"])
    pops = pops[:, np.newaxis].T
    pops = np.repeat(pops, pops.size, axis=0).astype(np.float64)
    np.fill_diagonal(distances, 100000000)  # Prevent divide by zero errors and self migration
    diffusion_matrix = pops / (distances + 10) ** dist_exp  # minimum distance prevents excessive neighbor migration
    np.fill_diagonal(diffusion_matrix, 0)
    # normalize average total outbound migration to 1
    diffusion_matrix = diffusion_matrix / np.mean(np.sum(diffusion_matrix, axis=1))
    diffusion_matrix *= scale
    diagonal = 1 - np.sum(diffusion_matrix, axis=1)  # normalized outbound migration by source
    np.fill_diagonal(diffusion_matrix, diagonal)
    return diffusion_matrix 
[docs]
def pairwise_haversine_(lon: np.ndarray, lat: np.ndarray) -> np.ndarray:
    """Calculate pairwise distances for all (lon, lat) points using the Haversine formula.
    Args:
        lon: Array of longitude values in degrees
        lat: Array of latitude values in degrees
    Returns:
        Compressed matrix of pairwise distances in kilometers, where only the upper triangle
        (excluding diagonal) is stored. The full matrix can be reconstructed using:
        full_matrix = np.zeros((n, n))
        full_matrix[np.triu_indices(n, k=1)] = compressed_matrix
        full_matrix = full_matrix + full_matrix.T
    """
    earth_radius_km = 6367
    n = len(lon)
    # Get upper triangle indices (excluding diagonal)
    i, j = np.triu_indices(n, k=1)
    # Calculate differences only for the upper triangle
    dlat = lat[j] - lat[i]
    dlon = lon[j] - lon[i]
    # Calculate cosines only for the needed pairs
    cos_lat_i = np.cos(lat[i])
    cos_lat_j = np.cos(lat[j])
    # vectorized haversine distance calculation for upper triangle only
    d = np.sin(dlat / 2) ** 2 + cos_lat_i * cos_lat_j * np.sin(dlon / 2) ** 2
    return 2 * earth_radius_km * np.arcsin(np.sqrt(d)) 
[docs]
def init_gravity_diffusion_(df: pl.DataFrame | tuple[np.ndarray, np.ndarray], scale: float, dist_exp: float) -> np.ndarray:
    """Initialize a gravity diffusion matrix for population mixing.
    Args:
        df: Either a DataFrame with 'population', 'lat', and 'lon' columns,
            or a tuple of (lon, lat) arrays
        scale: Scaling factor for the diffusion matrix
        dist_exp: Distance exponent for the gravity model
    Returns:
        Normalized diffusion matrix where each row sums to 1
    """
    if len(df) == 1:
        return np.ones((1, 1))
    n = len(df)
    compressed_distances = pairwise_haversine(df["lon"].to_numpy(), df["lat"].to_numpy())
    pops = df["pop"].to_numpy()
    # Get the indices for the upper triangle
    i, j = np.triu_indices(n, k=1)
    # Calculate population products only for the upper triangle
    pop_products = pops[i] * pops[j]
    # Calculate diffusion values for upper triangle
    diffusion_values = pop_products / (compressed_distances + MIN_DISTANCE) ** dist_exp
    # Create the full matrix
    diffusion_matrix = np.zeros((n, n))
    diffusion_matrix[i, j] = diffusion_values
    diffusion_matrix[j, i] = diffusion_values  # Mirror the values
    # Calculate row sums for normalization
    row_sums = np.sum(diffusion_matrix, axis=1)
    mean_sum = np.mean(row_sums)
    # Normalize and scale
    diffusion_matrix = diffusion_matrix / mean_sum * scale
    # Calculate and set diagonal values
    diagonal = 1 - np.sum(diffusion_matrix, axis=1)
    np.fill_diagonal(diffusion_matrix, diagonal)
    return diffusion_matrix