Source code for emod_api.weather.weather

#!/usr/bin/env python3

"""
emod-api Weather module - Weather, Metadata, and WeatherNode objects along with IDREF and CLIMATE_UPDATE constants.
"""

from collections import namedtuple
from functools import reduce
import csv
from datetime import datetime
import getpass
import json
import numpy as np
from typing import Dict, List


IDREF_LEGACY = "Legacy"
IDREF_GRUMP30ARCSEC = "Gridded world grump30arcsec"
IDREF_GRUMP2PT5ARCMIN = "Gridded world grump2.5arcmin"
IDREF_GRUMP1DEGREE = "Gridded world grump1degree"

CLIMATE_UPDATE_YEAR = "CLIMATE_UPDATE_YEAR"
CLIMATE_UPDATE_MONTH = "CLIMATE_UPDATE_MONTH"
CLIMATE_UPDATE_WEEK = "CLIMATE_UPDATE_WEEK"
CLIMATE_UPDATE_DAY = "CLIMATE_UPDATE_DAY"
CLIMATE_UPDATE_HOUR = "CLIMATE_UPDATE_HOUR"


[docs]class WeatherNode(object): """Represents information for a single node: ID and timeseries data.""" def __init__(self, node_id: int, data): self._id = node_id self._data = data return @property def id(self) -> int: """Node ID""" return self._id @property def data(self): """Time series data for this node.""" return self._data # index into node by time step def __getitem__(self, item: int) -> float: return self._data[item] def __setitem__(self, key: int, value: float) -> None: self._data[key] = value return
def _is_iterable(obj) -> bool: try: _ = iter(obj) return True except TypeError: return False def _isinteger(obj) -> bool: return obj and isinstance(obj, (int, np.integer))
[docs]class Metadata(object): """ Metadata: * [DateCreated] * [Author] * [OriginalDataYears] * [StartDayOfYear] * [DataProvenance] * IdReference * NodeCount * DatavalueCount * UpdateResolution * NodeOffsets """ def __init__( self, node_ids: List[int], datavalue_count: int, author: str = None, created: datetime = None, frequency: str = None, provenance: str = None, reference: str = None, ): assert int(datavalue_count) > 0, "datavalue_count must be > 0" self._data_count = int(datavalue_count) self._author = f"{author}" if author else getpass.getuser() self._creation = ( created if created and isinstance(created, datetime) else datetime.now() ) self._id_reference = f"{reference}" if reference else IDREF_LEGACY self._provenance = f"{provenance}" if provenance else "unknown" self._update_frequency = f"{frequency}" if frequency else CLIMATE_UPDATE_DAY assert _is_iterable(node_ids), "node_ids must be iterable" concrete = list(node_ids) # if node_ids is a generator, make a concrete list assert len(concrete) > 0, "node_ids must not be empty" assert all( map(lambda i: isinstance(i, int), concrete) ), "node_ids must be integers" assert len(set(concrete)) == len(concrete), "node_ids must be unique" sorted_ids = sorted(concrete) self._nodes = { node_id: datavalue_count * sorted_ids.index(node_id) * 4 for node_id in sorted_ids } return @property def author(self) -> str: """Author of this file.""" return self._author @property def creation_date(self) -> datetime: """Creation date of this file.""" return self._creation @property def datavalue_count(self) -> int: """Number of data values in each timeseries, should be > 0.""" return self._data_count @property def id_reference(self) -> str: """ 'Schema' for node IDs. Commonly `Legacy`, `Gridded world grump2.5arcmin`, and `Gridded world grump30arcsec`. `Legacy` usually indicates a 0 or 1 based scheme with increasing ID numbers. `Gridded world grump2.5arcmin` and `Gridded world grump30arcsec` encode latitude and longitude values in the node ID with the following formula:: latitude = (((nodeid - 1) & 0xFFFF) * resolution) - 90 longitude = ((nodeid >> 16) * resolution) - 180 # nodeid = 90967271 @ 2.5 arcmin resolution # longitude = -122.1667, latitude = 47.5833 """ return self._id_reference @property def node_count(self) -> int: return len(self.nodes) @property def node_ids(self) -> List[int]: return sorted(self.nodes.keys()) @property def provenance(self) -> str: return self._provenance @property def update_resolution(self) -> str: return self._update_frequency @property def nodes(self) -> Dict[int, int]: """WeatherNodes offsets keyed by node id.""" return self._nodes
[docs] def write_file(self, filename: str) -> None: metadata = dict( DateCreated=f"{self.creation_date:%a %B %d %Y %H:%M:%S}", Author=self.author, DataProvenance=self.provenance, IdReference=self.id_reference, NodeCount=len(self.nodes), DatavalueCount=self.datavalue_count, UpdateResolution=self.update_resolution, ) node_offsets = reduce( lambda s, n: s + f"{n:08X}{self.nodes[n]:08X}", self.node_ids, "" ) jason = dict(Metadata=metadata, NodeOffsets=node_offsets) with open(filename, "wt") as file: json.dump(jason, file, indent=2, separators=(",", ": ")) return
[docs] @classmethod def from_file(cls, filename: str): """ Read weather metadata file. Metadata' and 'NodeOffsets' keys required. DatavalueCount', 'UpdateResolution', and 'IdReference' required in 'Metadata'. """ with open(filename, "rb") as file: jason = json.load(file) meta = jason["Metadata"] offsets = jason["NodeOffsets"] node_ids = sorted( [int(offsets[i * 16 : i * 16 + 8], 16) for i in range(len(offsets) // 16)] ) metadata = Metadata( node_ids, meta["DatavalueCount"], author=meta["Author"] if "Author" in meta else None, created=meta["DateCreated"] if "DateCreated" in meta else None, frequency=meta["UpdateResolution"], provenance=meta["DataProvenance"] if "DataProvenance" in meta else None, reference=meta["IdReference"], ) return metadata
[docs]class Weather(object): def __init__( self, filename: str = None, node_ids: List[int] = None, datavalue_count: int = None, author: str = None, created: datetime = None, frequency: str = None, provenance: str = None, reference: str = None, data: np.array = None, ): if filename and isinstance(filename, str): self._from_file(filename) else: # create "empty" Weather object assert _is_iterable(node_ids), "node_ids must be provided and be iterable" assert _isinteger(datavalue_count), "datavalue_count must be provided" assert datavalue_count > 0, "datavalue_count must be >= 1" self._metadata = Metadata( node_ids, datavalue_count, author, created, frequency, provenance, reference, ) node_ids = self._metadata.node_ids self._data = ( data if data is not None else np.zeros( (len(node_ids), self._metadata.datavalue_count), dtype=np.float32 ) ) self._nodes_and_map() return def _nodes_and_map(self): node_ids = self._metadata.node_ids self._node_id_to_index_map = {node_ids[n]: n for n in range(len(node_ids))} self._nodes = { node_id: WeatherNode( node_id, self._data[self._node_id_to_index_map[node_id], :] ) for node_id in node_ids } return @property def data(self) -> np.array: """Raw data as numpy array[node index, time step].""" return self._data @property def metadata(self) -> Metadata: return self._metadata # begin pass-through @property def author(self) -> str: return self._metadata.author @property def creation_date(self) -> datetime: return self._metadata.creation_date @property def datavalue_count(self) -> int: """>= 1""" return self._metadata.datavalue_count @property def id_reference(self) -> str: return self._metadata.id_reference @property def node_count(self) -> int: """>= 1""" return self._metadata.node_count @property def node_ids(self) -> List[int]: return self._metadata.node_ids @property def provenance(self) -> str: return self._metadata.provenance @property def update_resolution(self) -> str: return self._metadata.update_resolution # end pass-through @property def nodes(self) -> Dict[int, WeatherNode]: """WeatherNodes indexed by node id.""" return self._nodes # retrieve node by node id def __getitem__(self, item: int): return self._nodes[item]
[docs] def write_file(self, filename: str) -> None: """Writes data to filename and metadata to filename.json.""" self.metadata.write_file(filename + ".json") with open(filename, "wb") as file: self._data.tofile(file) return
def _from_file(self, filename: str): """Reads metadata from filename.json and data from filename.""" self._metadata = Metadata.from_file(filename + ".json") data = np.fromfile(filename, dtype=np.float32) expected = self._metadata.node_count * self._metadata.datavalue_count assert ( len(data) == expected ), f"length of data ({len(data)}) != #nodes * #values ({expected})" self._data = data.reshape( self._metadata.node_count, self._metadata.datavalue_count ) self._nodes_and_map() return
[docs] @classmethod def from_csv( cls, filename: str, var_column: str = "airtemp", id_column: str = "node_id", step_column: str = "step", author: str = None, provenance: str = None, ): """ Create weather from CSV file with specified variable column, node id column, and time step column. Note: * Column order in the CSV file is not significant, but columns names must match what is passed to this function. * Because a CSV might hold air temperature (may be negative and well outside 0-1 values), relative humidity (must _not_ be negative, must be in the interval [0-1]), or rainfall (must _not_ be negative, likely > 1), this function does not validate incoming data. """ Entry = namedtuple("Entry", ["id", "step", "value"]) with open(filename) as csv_file: reader = csv.DictReader(csv_file) entries = [ Entry( int(row[id_column]), int(row[step_column]), float(row[var_column]) ) for row in reader ] node_ids = set([entry.id for entry in entries]) steps_list = [ sorted([entry.step for entry in entries if entry.id == node_id]) for node_id in sorted(node_ids) ] for i in range(1, len(steps_list)): test = len(steps_list[i]) expected = len(steps_list[0]) assert ( test == expected ), f"number of data values for nodes is not consistent ({len(steps_list[i]) != {len(steps_list[0])}})" test = steps_list[i] expected = steps_list[0] assert ( test == expected ), f"time steps for node {sorted(node_ids)[i]} != time steps for node {sorted(node_ids)[0]}" steps = sorted(steps_list[0]) expected = [i for i in range(1, len(steps) + 1)] assert steps == expected, f"time steps do not cover all values 1...{len(steps)}" data_count = len(steps) w = Weather( node_ids=node_ids, datavalue_count=data_count, author=author, provenance=provenance, ) for node_id in node_ids: sorted_entries_for_node = sorted( [entry for entry in entries if entry.id == node_id], key=lambda e: e.step, ) data_for_node = [entry.value for entry in sorted_entries_for_node] w.nodes[node_id][:] = data_for_node return w