from collections import defaultdict
from datetime import datetime
from functools import partial
import json
from numbers import Integral
from os import environ, SEEK_SET
from pathlib import Path
from platform import system
from warnings import warn
import numpy as np
import csv
# for from_params()
import scipy.spatial.distance as spspd
from emod_api.demographics import Demographics as Demog
# for from_demog_and_param_gravity()
from geographiclib.geodesic import Geodesic
from .client import client
[docs]class Layer(dict):
"""
The Layer object represents a mapping from source node (IDs) to destination node (IDs) for a particular
age, gender, age+gender combination, or all users if no age or gender dependence. Users will not generally
interact directly with Layer objects.
"""
def __init__(self):
super().__init__()
return
@property
def DatavalueCount(self) -> int:
"""Get (maximum) number of data values for any node in this layer
Returns:
Maximum number of data values for any node in this layer
"""
count = max([len(entry) for entry in self.values()]) if len(self) else 0
return count
@property
def NodeCount(self) -> int:
"""Get the number of (source) nodes with rates in this layer
Returns:
Number of (source) nodes with rates in this layer
"""
return len(self)
# @property
# def Nodes(self) -> dict:
# return self._nodes
def __getitem__(self, key):
"""Allows indexing directly into this object with source node id
Args:
key (int): source node id
Returns:
Dictionary of outbound rates for the given node id
"""
if key not in self:
if isinstance(key, Integral):
super().__setitem__(key, defaultdict(float))
else:
raise RuntimeError(f"Migration node IDs must be integer values (key = {key}).")
return super().__getitem__(key)
_METADATA = "Metadata"
_AUTHOR = "Author"
_DATECREATED = "DateCreated"
_TOOLNAME = "Tool"
_IDREFERENCE = "IdReference"
_MIGRATIONTYPE = "MigrationType"
_NODECOUNT = "NodeCount"
_DATAVALUECOUNT = "DatavalueCount"
_GENDERDATATYPE = "GenderDataType"
_AGESYEARS = "AgesYears"
_INTERPOLATIONTYPE = "InterpolationType"
_NODEOFFSETS = "NodeOffsets"
_EMODAPI = "emod-api"
[docs]class Migration(object):
"""Represents migration data in a mapping from source node (IDs) to destination node (IDs) with rates for each pairing.
Migration data may be age dependent, gender dependent, both, or the same for all ages and genders.
A migration file (along with JSON metadata) can be loaded from the static method Migration.from_file() and
inspected and/or modified.
Migration objects can be started from scratch with Migration(), and populated with appropriate source-dest rate data
and saved to a file with the to_file() method.
Given migration = Migration(), syntax is as follows:
age and gender agnostic: migration[source_id][dest_id]
age dependent: migration[source_id:age] # age should be >= 0, ages > last bucket value use last bucket value
gender dependent: migration[source_id:gender] # gender one of Migration.MALE or Migration.FEMALE
age and gender dependent: migration[source_id:gender:age] # gender one of Migration.MALE or Migration.FEMALE
EMOD/DTK format migration files (and associated metadata files) can be written with migration.to_file(<filename>).
EMOD/DTK format migration files (with associated metadata files) can be read with migration.from_file(<filename>).
"""
SAME_FOR_BOTH_GENDERS = 0
ONE_FOR_EACH_GENDER = 1
LINEAR_INTERPOLATION = 0
PIECEWISE_CONSTANT = 1
LOCAL = 1
AIR = 2
REGIONAL = 3
SEA = 4
FAMILY = 5
INTERVENTION = 6
IDREF_LEGACY = "Legacy"
IDREF_GRUMP30ARCSEC = "Gridded world grump30arcsec"
IDREF_GRUMP2PT5ARCMIN = "Gridded world grump2.5arcmin"
IDREF_GRUMP1DEGREE = "Gridded world grump1degree"
MALE = 0
FEMALE = 1
MAX_AGE = 125
def __init__(self):
self._agesyears = []
try:
self._author = _author()
except Exception as ex:
self._author = "Mystery Guest"
self._datecreated = datetime.now()
self._genderdatatype = self.SAME_FOR_BOTH_GENDERS
self._idreference = self.IDREF_LEGACY
self._interpolationtype = self.PIECEWISE_CONSTANT
self._migrationtype = self.LOCAL
self._tool = _EMODAPI
self._create_layers()
return
def _create_layers(self):
self._layers = []
for gender in range(0, self._genderdatatype+1):
for age in range(0, len(self.AgesYears) if self.AgesYears else 1):
self._layers.append(Layer())
return
@property
def AgesYears(self) -> list:
"""
List of ages - ages < first value use first bucket, ages > last value use last bucket.
"""
return self._agesyears
@AgesYears.setter
def AgesYears(self, ages: list) -> None:
"""
List of ages - ages < first value use first bucket, ages > last value use last bucket.
"""
if sorted(ages) != self.AgesYears:
if self.NodeCount > 0:
warn("Changing age buckets clears existing migration information.", category=UserWarning)
self._agesyears = sorted(ages)
self._create_layers()
return
@property
def Author(self) -> str:
"""str: Author value for metadata for this migration datafile"""
return self._author
@Author.setter
def Author(self, author: str) -> None:
self._author = author
return
@property
def DatavalueCount(self) -> int:
"""int: Maximum data value count for any layer in this migration datafile"""
count = max([layer.DatavalueCount for layer in self._layers])
return count
@property
def DateCreated(self) -> datetime:
"""datetime: date/time stamp of this datafile"""
return self._datecreated
@DateCreated.setter
def DateCreated(self, value) -> None:
if not isinstance(value, datetime):
raise RuntimeError(f"DateCreated must be a datetime value (got {type(value)}).")
self._datecreated = value
return
@property
def GenderDataType(self) -> int:
"""int: gender data type for this datafile - SAME_FOR_BOTH_GENDERS or ONE_FOR_EACH_GENDER"""
return self._genderdatatype
@GenderDataType.setter
def GenderDataType(self, value: int) -> None:
# integer value
if value in Migration._GENDER_DATATYPE_ENUMS.keys():
value = int(value)
# string value
elif value in Migration._GENDER_DATATYPE_LOOKUP.keys():
value = Migration._GENDER_DATATYPE_LOOKUP[value]
else:
expected = [f"{key}/{value}" for key, value in Migration._GENDER_DATATYPE_LOOKUP.items()]
raise RuntimeError(f"Unknown gender data type, {value}, expected one of {expected}.")
if (self.NodeCount > 0) and (value != self._genderdatatype):
warn("Changing gender data type clears existing migration information.", category=UserWarning)
if value != self._genderdatatype:
self._genderdatatype = int(value)
self._create_layers()
return
@property
def IdReference(self) -> str:
"""str: ID reference metadata value"""
return self._idreference
@IdReference.setter
def IdReference(self, value: str) -> None:
self._idreference = str(value)
return
@property
def InterpolationType(self) -> int:
"""int: interpolation type for this migration data file - LINEAR_INTERPOLATION or PIECEWISE_CONSTANT"""
return self._interpolationtype
@InterpolationType.setter
def InterpolationType(self, value: int) -> None:
# integer value
if value in Migration._INTERPOLATION_TYPE_ENUMS.keys():
self._interpolationtype = int(value)
# string value
elif value in Migration._INTERPOLATION_TYPE_LOOKUP.keys():
self._interpolationtype = Migration._INTERPOLATION_TYPE_LOOKUP[value]
else:
expected = [f"{key}/{value}" for key, value in Migration._INTERPOLATION_TYPE_LOOKUP.items()]
raise RuntimeError(f"Unknown interpolation type, {value}, expected one of {expected}.")
return
@property
def MigrationType(self) -> int:
"""int: migration type for this migration data file - LOCAL | AIR | REGIONAL | SEA | FAMILY | INTERVENTION"""
return self._migrationtype
@MigrationType.setter
def MigrationType(self, value: int) -> None:
# integer value
if value in Migration._MIGRATION_TYPE_ENUMS.keys():
self._migrationtype = int(value)
elif value in Migration._MIGRATION_TYPE_LOOKUP.keys():
self._migrationtype = Migration._MIGRATION_TYPE_LOOKUP[value]
else:
expected = [f"{key}/{value}" for key, value in Migration._MIGRATION_TYPE_LOOKUP.items()]
raise RuntimeError(f"Unknown migration type, {value}, expected one of {expected}.")
return
@property
def Nodes(self) -> list:
node_ids = set()
for layer in self._layers:
node_ids |= set(layer.keys())
node_ids = sorted(node_ids)
return node_ids
@property
def NodeCount(self) -> int:
"""int: maximum number of source nodes in any layer of this migration data file"""
count = max([layer.NodeCount for layer in self._layers])
return count
[docs] def get_node_offsets(self, limit: int = 100) -> dict:
nodes = set()
for layer in self._layers:
nodes |= set(key for key in layer.keys())
count = min(self.DatavalueCount, limit)
# offsets = {}
# for index, node in enumerate(sorted(nodes)):
# offsets[node] = index * 12 * count
offsets = {node: 12*index*count for index, node in enumerate(sorted(nodes))}
return offsets
@property
def NodeOffsets(self) -> dict:
"""dict: mapping from source node id to offset to destination and rate data in binary data"""
return self.get_node_offsets()
@property
def Tool(self) -> str:
"""str: tool metadata value"""
return self._tool
@Tool.setter
def Tool(self, value: str) -> None:
self._tool = str(value)
return
def __getitem__(self, key):
"""allows indexing on this object to read/write rate data
Args:
key (slice): source node id:gender:age (gender and age depend on GenderDataType and AgesYears properties)
Returns:
dict for specified node/gender/age
"""
if self.GenderDataType == Migration.SAME_FOR_BOTH_GENDERS:
if not self.AgesYears:
# Case 1 - no gender or age differentiation - key (integer) == node id
return self._layers[0][key]
else:
# Case 3 - age buckets, no gender differentiation - key (tuple or slice) == node id:age
if isinstance(key, tuple):
node_id, age = key
elif isinstance(key, slice):
node_id, age = key.start, key.stop
else:
raise RuntimeError(f"Invalid indexing for migration - {key}")
layer_index = self._index_for_gender_and_age(None, age)
return self._layers[layer_index][node_id]
else:
if not self.AgesYears:
# Case 2 - by gender, no age differentiation - key (tuple or slice) == node id:gender
if isinstance(key, tuple):
node_id, gender = key
elif isinstance(key, slice):
node_id, gender = key.start, key.stop
else:
raise RuntimeError(f"Invalid indexing for migration - {key}")
if gender not in [Migration.SAME_FOR_BOTH_GENDERS, Migration.ONE_FOR_EACH_GENDER]:
raise RuntimeError(f"Invalid gender ({gender}) for migration.")
layer_index = self._index_for_gender_and_age(gender, None)
return self._layers[layer_index][node_id]
else:
# Case 4 - by gender and age - key (slice) == node id:gender:age
if isinstance(key, tuple):
node_id, gender, age = key
elif isinstance(key, slice):
node_id, gender, age = key.start, key.stop, key.step
else:
raise RuntimeError(f"Invalid indexing for migration - {key}")
if gender not in [Migration.SAME_FOR_BOTH_GENDERS, Migration.ONE_FOR_EACH_GENDER]:
raise RuntimeError(f"Invalid gender ({gender}) for migration.")
layer_index = self._index_for_gender_and_age(gender, age)
return self._layers[layer_index][node_id]
# raise RuntimeError("Invalid state.")
def _index_for_gender_and_age(self, gender: int, age: float) -> int:
"""
Use age to determine age bucket, 0 if no age differentiation.
Use gender data type to offset by # age buckets if gender data type is one for each gender and gender is female
Ages < first value use first bucket, ages > last value use last bucket.
"""
age_offset = 0
for age_offset, edge in enumerate(self.AgesYears):
if edge >= age:
break
gender_span = len(self.AgesYears) if self.AgesYears else 1
gender_offset = gender * gender_span if self.GenderDataType == Migration.ONE_FOR_EACH_GENDER else 0
index = gender_offset + age_offset
return index
def __iter__(self):
return iter(self._layers)
_MIGRATION_TYPE_ENUMS = {
LOCAL: "LOCAL_MIGRATION",
AIR: "AIR_MIGRATION",
REGIONAL: "REGIONAL_MIGRATION",
SEA: "SEA_MIGRATION",
FAMILY: "FAMILY_MIGRATION",
INTERVENTION: "INTERVENTION_MIGRATION"
}
_GENDER_DATATYPE_ENUMS = {
SAME_FOR_BOTH_GENDERS: "SAME_FOR_BOTH_GENDERS",
ONE_FOR_EACH_GENDER: "ONE_FOR_EACH_GENDER"
}
_INTERPOLATION_TYPE_ENUMS = {
LINEAR_INTERPOLATION: "LINEAR_INTERPOLATION",
PIECEWISE_CONSTANT: "PIECEWISE_CONSTANT"
}
[docs] def to_file(self, binaryfile: Path, metafile: Path = None, value_limit: int = 100):
"""Write current data to given file (and .json metadata file)
Args:
binaryfile (Path): path to output file (metadata will be written to same path with ".json" appended)
metafile (Path): override standard metadata file naming
value_limit (int): limit on number of destination values to write for each source node (default = 100)
Returns:
(Path): path to binary file
"""
binaryfile = Path(binaryfile).absolute()
metafile = metafile if metafile else binaryfile.parent / (binaryfile.name + ".json")
actual_datavalue_count = min(self.DatavalueCount, value_limit) # limited to 100 destinations
node_ids = set()
for layer in self._layers:
node_ids |= set(layer.keys())
node_ids = sorted(node_ids)
offsets = self.get_node_offsets(actual_datavalue_count)
node_offsets_string = ''.join([f"{node:08x}{offsets[node]:08x}" for node in sorted(offsets.keys())])
metadata = {
_METADATA: {
_AUTHOR: self.Author,
_DATECREATED: f"{self.DateCreated:%a %b %d %Y %H:%M:%S}",
_TOOLNAME: self.Tool,
_IDREFERENCE: self.IdReference,
_MIGRATIONTYPE: self._MIGRATION_TYPE_ENUMS[self.MigrationType],
_NODECOUNT: self.NodeCount,
_DATAVALUECOUNT: actual_datavalue_count,
# could omit this if SAME_FOR_BOTH_GENDERS since it is the default
_GENDERDATATYPE: self._GENDER_DATATYPE_ENUMS[self.GenderDataType],
# _AGESYEARS: self.AgesYears, # see below
_INTERPOLATIONTYPE: self._INTERPOLATION_TYPE_ENUMS[self.InterpolationType]
},
_NODEOFFSETS: node_offsets_string
}
if self.AgesYears:
# older versions of Eradication do not handle empty AgesYears lists robustly
metadata[_METADATA][_AGESYEARS] = self.AgesYears
print(f"Writing metadata to '{metafile}'")
with metafile.open("w") as handle:
json.dump(metadata, handle, indent=4, separators=(",", ": "))
def key_func(k, d=None):
return d[k]
# layers are in age bucket order by gender, e.g. male 0-5, 5-10, 10+, female 0-5, 5-10, 10+
# see _index_for_gender_and_age()
print(f"Writing binary data to '{binaryfile}'")
with binaryfile.open("wb") as file:
for layer in self:
for node in node_ids:
destinations = np.zeros(actual_datavalue_count, dtype=np.uint32)
rates = np.zeros(actual_datavalue_count, dtype=np.float64)
if node in layer:
# Sort keys descending on rate and ascending on node ID.
# That way if we are truncating the list, we include the "most important" nodes.
keys = sorted(layer[node].keys()) # sorted ascending on node ID
keys = sorted(keys, key=partial(key_func, d=layer[node]), reverse=True) # descending on rate
if len(keys) > actual_datavalue_count:
keys = keys[0:actual_datavalue_count]
# save rates in ascending order so small rates are not lost when looking at the cumulative sum
keys = list(reversed(keys))
destinations[0:len(keys)] = keys
rates[0:len(keys)] = [layer[node][key] for key in keys]
else:
warn(f"No destination nodes found for node {node}", category=UserWarning)
destinations.tofile(file)
rates.tofile(file)
return binaryfile
_MIGRATION_TYPE_LOOKUP = {
"LOCAL_MIGRATION": LOCAL,
"AIR_MIGRATION": AIR,
"REGIONAL_MIGRATION": REGIONAL,
"SEA_MIGRATION": SEA,
"FAMILY_MIGRATION": FAMILY,
"INTERVENTION_MIGRATION": INTERVENTION
}
_GENDER_DATATYPE_LOOKUP = {
"SAME_FOR_BOTH_GENDERS": SAME_FOR_BOTH_GENDERS,
"ONE_FOR_EACH_GENDER": ONE_FOR_EACH_GENDER
}
_INTERPOLATION_TYPE_LOOKUP = {
"LINEAR_INTERPOLATION": LINEAR_INTERPOLATION,
"PIECEWISE_CONSTANT": PIECEWISE_CONSTANT
}
[docs]def from_file(binaryfile: Path, metafile: Path = None):
"""Reads migration data file from given binary (and associated JSON metadata file)
Args:
binaryfile (Path): path to binary file (metadata file is assumed to be at same location with ".json" suffix)
metafile (Path): use given metafile rather than inferring metafile name from the binary file name
Returns:
Migration object representing binary data in the given file.
"""
binaryfile = Path(binaryfile).absolute()
metafile = metafile if metafile else binaryfile.parent / (binaryfile.name + ".json")
if not binaryfile.exists():
raise RuntimeError(f"Cannot find migration binary file '{binaryfile}'")
if not metafile.exists():
raise RuntimeError(f"Cannot find migration metadata file '{metafile}'.")
with metafile.open("r") as file:
jason = json.load(file)
# these are the minimum required entries to load a migration file
assert _METADATA in jason, f"Metadata file '{metafile}' does not have a 'Metadata' entry."
metadata = jason[_METADATA]
assert _NODECOUNT in metadata, f"Metadata file '{metafile}' does not have a 'NodeCount' entry."
assert _DATAVALUECOUNT in metadata, f"Metadata file '{metafile}' does not have a 'DatavalueCount' entry."
assert _NODEOFFSETS in jason, f"Metadata file '{metafile}' does not have a 'NodeOffsets' entry."
migration = Migration()
migration.Author = _value_with_default(metadata, _AUTHOR, _author())
migration.DateCreated = _try_parse_date(metadata[_DATECREATED]) if _DATECREATED in metadata else datetime.now()
migration.Tool = _value_with_default(metadata, _TOOLNAME, _EMODAPI)
migration.IdReference = _value_with_default(metadata, _IDREFERENCE, Migration.IDREF_LEGACY)
migration.MigrationType = Migration._MIGRATION_TYPE_LOOKUP[_value_with_default(metadata,
_MIGRATIONTYPE,
"LOCAL_MIGRATION")]
migration.GenderDataType = Migration._GENDER_DATATYPE_LOOKUP[_value_with_default(metadata,
_GENDERDATATYPE,
"SAME_FOR_BOTH_GENDERS")]
migration.AgesYears = _value_with_default(metadata, _AGESYEARS, [])
migration.InterpolationType = Migration._INTERPOLATION_TYPE_LOOKUP[_value_with_default(metadata,
_INTERPOLATIONTYPE,
"PIECEWISE_CONSTANT")]
node_count = metadata[_NODECOUNT]
node_offsets = jason[_NODEOFFSETS]
if len(node_offsets) != 16*node_count:
raise RuntimeError(f"Length of node offsets string {len(node_offsets)} != 16 * node count {node_count}.")
offsets = _parse_node_offsets(node_offsets, node_count)
datavalue_count = metadata[_DATAVALUECOUNT]
with binaryfile.open("rb") as file:
for gender in range(1 if migration.GenderDataType == Migration.SAME_FOR_BOTH_GENDERS else 2):
for age in migration.AgesYears if migration.AgesYears else [0]:
layer = migration._layers[migration._index_for_gender_and_age(gender, age)]
for node, offset in offsets.items():
file.seek(offset, SEEK_SET)
destinations = np.fromfile(file, dtype=np.uint32, count=datavalue_count)
rates = np.fromfile(file, dtype=np.float64, count=datavalue_count)
for destination, rate in zip(destinations, rates):
if rate > 0:
layer[node][destination] = rate
return migration
[docs]def examine_file(filename):
def name_for_gender_datatype(e: int) -> str:
return Migration._GENDER_DATATYPE_ENUMS[e] if e in Migration._GENDER_DATATYPE_ENUMS else "unknown"
def name_for_interpolation(e: int) -> str:
return Migration._INTERPOLATION_TYPE_ENUMS[e] if e in Migration._INTERPOLATION_TYPE_ENUMS else "unknown"
def name_for_migration_type(e: int) -> str:
return Migration._MIGRATION_TYPE_ENUMS[e] if e in Migration._MIGRATION_TYPE_ENUMS else "unknown"
migration = from_file(filename)
print(f"Author: {migration.Author}")
print(f"DatavalueCount: {migration.DatavalueCount}")
print(f"DateCreated: {migration.DateCreated:%a %B %d %Y %H:%M}")
print(f"GenderDataType: {migration.GenderDataType} ({name_for_gender_datatype(migration.GenderDataType)})")
print(f"IdReference: {migration.IdReference}")
print(f"InterpolationType: {migration.InterpolationType} ({name_for_interpolation(migration.InterpolationType)})")
print(f"MigrationType: {migration.MigrationType} ({name_for_migration_type(migration.MigrationType)})")
print(f"NodeCount: {migration.NodeCount}")
print(f"NodeOffsets: {migration.NodeOffsets}")
print(f"Tool: {migration.Tool}")
print(f"Nodes: {migration.Nodes}")
return
def _author() -> str:
username = "Unknown"
if system() == "Windows":
username = environ["USERNAME"]
elif "USER" in environ:
username = environ["USER"]
return username
def _parse_node_offsets(string: str, count: int) -> dict:
assert len(string) == 16*count, f"Length of node offsets string {len(string)} != 16 * node count {count}."
offsets = {}
for index in range(count):
base = 16 * index
offset = base + 8
offsets[int(string[base:base+8], 16)] = int(string[offset:offset+8], 16)
return offsets
def _try_parse_date(string: str) -> datetime:
patterns = [
"%a %b %d %Y %H:%M:%S",
"%a %b %d %H:%M:%S %Y",
"%m/%d/%Y",
"%Y-%m-%d %H:%M:%S.%f"
]
for pattern in patterns:
try:
timestamp = datetime.strptime(string, pattern)
return timestamp
except ValueError:
pass
timestamp = datetime.now()
warn(f"Could not parse date stamp '{string}', using datetime.now() ({timestamp})")
return timestamp
def _value_with_default(dictionary: dict, key: str, default: object) -> object:
return dictionary[key] if key in dictionary else default
"""
utility functions emodpy-utils?
"""
[docs]def from_params(demographics_file_path=None, pop=1e6, num_nodes=100, mig_factor=1.0, frac_rural=0.3, id_ref="from_params", migration_type=Migration.LOCAL):
"""
This function is for creating a migration file that goes with a (multinode)
demographics file created from a few parameters, as opposed to one from real-world data.
Note that the 'demographics_file_path" input param is not used at this time but in future
will be exploited to ensure nodes, etc., match.
"""
# ***** Write migration files *****
# NOTE: This goes straight from input 'data' -- parameters -- to output file.
# We really want to go from input parameters to standard data representation of migration data
# and then to file as a separate decoupled step.
ucellb = np.array([[1.0, 0.0], [-0.5, 0.86603]])
nlocs = np.random.rand(num_nodes, 2)
nlocs[0, :] = 0.5
nlocs = np.round(np.matmul(nlocs, ucellb), 4)
# Calculate inter-node distances on periodic grid
nlocs = np.tile(nlocs, (9, 1))
nlocs[0 * num_nodes:1 * num_nodes, :] += [0.0, 0.0]
nlocs[1 * num_nodes:2 * num_nodes, :] += [1.0, 0.0]
nlocs[2 * num_nodes:3 * num_nodes, :] += [-1.0, 0.0]
nlocs[3 * num_nodes:4 * num_nodes, :] += [0.0, 0.0]
nlocs[4 * num_nodes:5 * num_nodes, :] += [1.0, 0.0]
nlocs[5 * num_nodes:6 * num_nodes, :] += [-1.0, 0.0]
nlocs[6 * num_nodes:7 * num_nodes, :] += [0.0, 0.0]
nlocs[7 * num_nodes:8 * num_nodes, :] += [1.0, 0.0]
nlocs[8 * num_nodes:9 * num_nodes, :] += [-1.0, 0.0]
nlocs[0 * num_nodes:1 * num_nodes, :] += [0.0, 0.0]
nlocs[1 * num_nodes:2 * num_nodes, :] += [0.0, 0.0]
nlocs[2 * num_nodes:3 * num_nodes, :] += [0.0, 0.0]
nlocs[3 * num_nodes:4 * num_nodes, :] += [-0.5, 0.86603]
nlocs[4 * num_nodes:5 * num_nodes, :] += [-0.5, 0.86603]
nlocs[5 * num_nodes:6 * num_nodes, :] += [-0.5, 0.86603]
nlocs[6 * num_nodes:7 * num_nodes, :] += [0.5, -0.86603]
nlocs[7 * num_nodes:8 * num_nodes, :] += [0.5, -0.86603]
nlocs[8 * num_nodes:9 * num_nodes, :] += [0.5, -0.86603]
distgrid = spspd.squareform(spspd.pdist(nlocs))
nborlist = np.argsort(distgrid, axis=1)
npops = Demog.get_node_pops_from_params(pop, num_nodes, frac_rural)
migration = Migration()
migration.IdReference = id_ref
for source in range(num_nodes):
for index in range(1, 31):
if distgrid.shape[0] > index:
destination = int(np.mod(nborlist[source, index], num_nodes)) + 1
tnode = int(np.mod(nborlist[source, index], num_nodes))
idnode = nborlist[source, index]
rate = mig_factor * npops[tnode] / np.sum(npops) / distgrid[source, idnode]
else:
destination = 0
rate = 0.0
migration[source][destination] = rate
migration.MigrationType = migration_type
return migration
[docs]def from_demog_and_param_gravity_webservice(demographics_file_path: str, params: str, id_ref: str, migration_type=Migration.LOCAL) -> Migration:
"""
Calls a webservice (running on a GPU) to calculate the migration patterns quickly.
Args:
demographics_file_path: Path to the demographics file.
params: Path to the json file with parameters for gravity calculation and server url.
id_ref: Metadata tag that needs to match corresponding value in demographics file.
migration_type: Migration type.
Returns:
Migration object
"""
with Path(params).open("r") as f_params:
params_url = json.load(f_params)
# load
rates = client.run(Path(demographics_file_path), params_url)
demog = Demog.from_file(demographics_file_path)
migration = Migration()
nodes = [node.forced_id for node in demog.nodes]
# we need a 0-N index for the NumPy array and the node ID for the migration file
for i, src in enumerate(nodes):
for j, dst in enumerate(nodes):
if dst != src:
migration[dst][src] = rates[i, j]
migration.IdReference = id_ref
migration.MigrationType = migration_type
return migration
[docs]def from_demog_and_param_gravity(demographics_file_path, gravity_params, id_ref, migration_type=Migration.LOCAL):
demog = Demog.from_file(demographics_file_path)
return _from_demog_and_param_gravity(demog, gravity_params, id_ref, migration_type)
def _from_demog_and_param_gravity(demographics, gravity_params, id_ref, migration_type=Migration.LOCAL):
"""
Create migration files from a gravity model and an input demographics file.
"""
def _compute_migr_prob(grav_params, home_population, dest_population, distance):
"""
Utility function for computing migration probabilities for gravity model.
"""
# If home/dest node has 0 pop, assume this node is the regional work node-- no local migration allowed
if home_population == 0 or dest_population == 0:
return 0.
else:
num_trips = grav_params[0] *\
home_population ** grav_params[1] *\
dest_population ** grav_params[2] *\
distance ** grav_params[3]
prob_trip = np.min([1., num_trips / home_population])
return prob_trip
def _compute_migr_dict(node_list, grav_params, **kwargs):
"""
Utility function for computing migration value map.
"""
excluded_nodes = set(kwargs["exclude_nodes"]) if "exclude_nodes" in kwargs else set()
mig = Migration()
geodesic = Geodesic.WGS84
for source_node in node_list:
source_id = source_node["NodeID"]
src_lat = source_node["NodeAttributes"]["Latitude"]
src_long = source_node["NodeAttributes"]["Longitude"]
src_pop = source_node["NodeAttributes"]["InitialPopulation"]
if source_id in excluded_nodes:
continue
for destination_node in node_list:
if destination_node == source_node:
continue
dest_id = destination_node["NodeID"]
if dest_id in excluded_nodes:
continue
dst_lat = destination_node["NodeAttributes"]["Latitude"]
dst_long = destination_node["NodeAttributes"]["Longitude"]
dst_pop = destination_node["NodeAttributes"]["InitialPopulation"]
distance = geodesic.Inverse(src_lat, src_long, dst_lat, dst_long, Geodesic.DISTANCE)['s12'] / 1000 # km
probability = _compute_migr_prob(grav_params, src_pop, dst_pop, distance)
mig[source_id][dest_id] = probability
return mig
# load
nodes = [node.to_dict() for node in demographics.nodes]
migration = _compute_migr_dict(nodes, gravity_params)
migration.IdReference = id_ref
migration.MigrationType = migration_type
return migration
# by gender, by age
_mapping_fns = {
(False, False): lambda m, i, g, a: m[i],
(False, True): lambda m, i, g, a: m[i:a],
(True, False): lambda m, i, g, a: m[i:g],
(True, True): lambda m, i, g, a: m[i:g:a]
}
# by gender, by age
_display_fns = {
(False, False): lambda i, g, a, d, r: f"{i},{d},{r}", # id only
(False, True): lambda i, g, a, d, r: f"{i},{a},{d},{r}", # id:age
(True, False): lambda i, g, a, d, r: f"{i},{g},{d},{r}", # id:gender
(True, True): lambda i, g, a, d, r: f"{i},{g},{a},{d},{r}" # id:gender:age
}
[docs]def to_csv(filename: Path):
migration = from_file(filename)
mapping = _mapping_fns[(migration.GenderDataType == Migration.ONE_FOR_EACH_GENDER, bool(migration.AgesYears))]
display = _display_fns[(migration.GenderDataType == Migration.ONE_FOR_EACH_GENDER, bool(migration.AgesYears))]
print(display("node", "gender", "age", "destination", "rate"))
for gender in range(1 if migration.GenderDataType == Migration.SAME_FOR_BOTH_GENDERS else 2):
for age in migration.AgesYears if migration.AgesYears else [0]:
for node in migration.Nodes:
for destination, rate in mapping(migration, node, gender, age).items():
print(display(node, gender, age, destination, rate))
[docs]def from_csv(filename: Path, id_ref, mig_type=None):
"""Create migration from csv file. The file should have columns 'source' for the source node, 'destination' for the destination node, and 'rate' for the migration rate.
Args:
filename: csv file
Returns:
Migration object
"""
migration = Migration()
migration.IdReference = id_ref
if not mig_type:
mig_type = Migration.LOCAL
else:
migration._migrationtype = mig_type
with filename.open("r") as csvfile:
reader = csv.DictReader(csvfile)
csv_data_read = False
for row in reader:
csv_data_read = True
migration[int(row['source'])][int(row['destination'])] = float(row['rate'])
assert csv_data_read, "Csv file %s does not contain migration data." % filename
return migration