Source code for emodpy_malaria.weather.weather_request

#!/usr/bin/env python3

"""
Weather service methods for submitting and working with weather data requests.
"""

from __future__ import annotations

import json
import pandas as pd
import tempfile

from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, NoReturn, Tuple, Union

from idmtools.core import ItemType
from idmtools.core.platform_factory import Platform
from idmtools.entities.relation_type import RelationType
from idmtools_platform_comps.comps_platform import COMPSPlatform
from idmtools_platform_comps.comps_platform import AssetCollection
from idmtools_platform_comps.ssmt_work_items.comps_workitems import SSMTWorkItem

from emodpy_malaria.weather.data_sources import _get_data_source_metadata
from emodpy_malaria.weather.weather_utils import make_path, parse_date, ymd
from emodpy_malaria.weather.weather_variable import WeatherVariable
from emodpy_malaria.weather.weather_metadata import _META_DEFAULT_ID_REFERENCE
from emodpy_malaria.weather.weather_set import WeatherSet

_DATE_MIN = datetime(year=2000, month=1, day=1)
_DATE_MAX = datetime(year=2030, month=12, day=31)


[docs]class WeatherArgs: """Arguments defining weather request space and time scope.""" def __init__(self, site_file: Union[str, Path], start_date: Union[int, str, datetime], end_date: Union[int, str, datetime] = None, node_column: str = "node", lat_column: str = "lat", lon_column: str = "lon", id_reference: str = _META_DEFAULT_ID_REFERENCE): """ Initializes and validates weather arguments. Args: site_file: CSV (.csv) or demographics (.json) file containing a set of sites (points) defined with lat/lon. CSV file must contain columns for: EMOD node ids (node), latitude (lat) and longitude (lon). Demographics file must match EMOD demographics file schema. start_date: Start date, in formats: year (2018), year and day-of-year (2018001) or date (20180101) end_date: (Optional) End date, in formats: year (2018), year and day-of-year (2018365) or date (20181231) node_column: (Optional) Name of a column containing EMOD node ids. The default is "nodes". lat_column: (Optional) Name of a column containing site (point) latitude. lon_column: (Optional) Name of a column containing site (point) longitude. id_reference: (Optional) Value of weather metadata IdReference attribute. Default is "Default". """ self.site_file: Path = Path(site_file) end_date = end_date or start_date start_date = start_date if isinstance(start_date, datetime) else parse_date(start_date, 1, 1) end_date = end_date if isinstance(end_date, datetime) else parse_date(end_date, 12, 31) self.start_date: datetime = start_date self.end_date: datetime = end_date self.node_column: str = node_column or "nodes" self.lat_column: str = lat_column or "lat" self.lon_column: str = lon_column or "lon" self.id_reference = id_reference or _META_DEFAULT_ID_REFERENCE self.validate()
[docs] def validate(self): """Validates: site file (exists, is readable, and it contains specified columns) and dates range.""" f = self.site_file # Validate sites file assert f.is_file(), f"File not found: {str(f)}" if f.suffix == ".json": # Sites demographics file, sanity checks. content = json.loads(f.read_text()) assert len(content["Nodes"]) > 0, f"Invalid demographics file: {str(f)}, " for node in content["Nodes"]: assert -90 < node["NodeAttributes"]["Latitude"] < 90 assert -180 < node["NodeAttributes"]["Longitude"] < 180 or 0 < node["NodeAttributes"]["Longitude"] < 360 else: # Sites .csv file df = pd.read_csv(f) assert len(df) > 0, f"Invalid sites file: {str(f)}" # Validate column names msg = "Correct {name} column must be specified." cols = df.columns.values assert self.node_column and self.node_column in cols, msg.format("node") assert self.lat_column and self.lat_column in cols, msg.format("latitude") assert self.lon_column and self.lon_column in cols, msg.format("longitude") # Validate year range assert _DATE_MIN <= self.start_date <= self.end_date <= _DATE_MAX, "Start and end years are not valid."
[docs]class RequestReport: """Specifies an object containing weather request operational reports.""" download: Dict[str, List[str]] = None # Status of downloaded files: ok, fail, skip.
[docs]class DataSource: def __init__(self, name: str = None): """Initiate DataSource object based on name. If name is not provided the default is used.""" # TODO: Implement methods to retrieve the list of available data sources (when supported by the service) self._content = _get_data_source_metadata() assert "defaults" in self._content and "name" in self._content["defaults"] assert "data_sources" in self._content if name and name not in self._content["data_sources"]: raise ValueError(f"Unsupported datasource name {name}.") self._name = name or self._content["defaults"]["name"] @property def name(self) -> str: """Data source name property.""" return self._name @property def file_prefix(self) -> str: """Weather file prefix based on the current data source resolution.""" # TODO: Consider a case when a data source supports more than one resolution. arc_min = int(self._info["arc_seconds"][0]/60) return f"dtk_{arc_min}arcmin_" @property def weather_variables(self) -> List[WeatherVariable]: """List of weather variables supported by the current data source.""" return [v for v in WeatherVariable.list() if v.name in self._info["weather_variables"]] @property def _info(self) -> Dict[str]: """Info dictionary for the current data source.""" return self._content["data_sources"][self._name]
[docs]class WeatherRequest: """Functionality for requesting and downloading weather files. Leverages idmtools API for COMPS SSMT.""" _image: str = "idm-docker-{}.packages.idmod.org/dse/weather-files" # weather tool image name. _create_asset: bool = True # flag to indicate creation of a weather asset. _platform: COMPSPlatform = None # The name of COMPS platfrom on which to run the SSMT work item. def __init__(self, platform: Union[str, COMPSPlatform], local_dir: str = None, data_source: str = None, is_staging: bool = None): """ Initializes a weather request per specified time-space, weather files and SSMT arguments. Args: platform: SSMT platform name or COMPSPlatform object. Determined where the work item will run. local_dir: (Optional) Local dir where files will be downloaded. If not specified a temp dir is created. data_source: (Optional) Data source name to be used by SSMT platform. is_staging: (Optional) Flag determining weather image. By default, set based on the platform endpoint. """ # Initialize the platform object platform = platform or Platform("SLURMStage") self._platform = platform if isinstance(platform, COMPSPlatform) else Platform(platform) is_staging = is_staging or self._platform.endpoint == "https://comps2.idmod.org" self._image = self._image.format("staging" if is_staging else "production") # Exposed as properties self._local_dir: Union[str, None] = local_dir self._data_source: DataSource = DataSource(data_source) # The data source name, as used by weather SSMT. self._asset_collection_id: Union[str, None] = None self._report: RequestReport = RequestReport() # Operational self._asset_file_tuples: Union[List[Tuple[str, Path]], None] = None @property def data_id(self) -> str: """Expose asset collection id as interface data id property.""" return self._asset_collection_id @data_id.setter def data_id(self, value) -> NoReturn: """Setter for the data_id property.""" self._asset_collection_id = value @property def local_dir(self) -> str: """Local dir to/from where weather files will be downloaded.""" if not self._local_dir: self._local_dir = tempfile.mkdtemp() return self._local_dir @local_dir.setter def local_dir(self, value: str) -> NoReturn: """Setter for the local_dir property.""" self._local_dir = str(value) @property def files(self) -> List[str]: """List expected weather file paths.""" bin_files = WeatherSet.make_file_paths(dir_path=self.local_dir, prefix=self._data_source.file_prefix, weather_variables=self._data_source.weather_variables) bin_files = list(bin_files.values()) files = bin_files + [f"{f}.json" for f in bin_files] return files @property def files_exist(self) -> bool: """Returns True if all expected weather files exist in the local dir.""" return all([Path(f).exists() for f in self.files]) @property def report(self) -> RequestReport: """Returns report object.""" return self._report @property def _asset_files(self) -> List[Tuple[Any, Path]]: """Returns a list of tuples of weather asset objects and corresponding file weather names.""" if self._asset_file_tuples is None: assert self._asset_collection_id is not None, "Data id is not set. Either set it or run 'generate'." asc = self._fetch_asset_collection(self._asset_collection_id) # Transform the list of asset object to (asset, file name) tuples. self._asset_file_tuples = [(a, Path(self.local_dir).joinpath(a.filename)) for a in asc] # Validate asset file names match expected pattern. expected_files = sorted(self.files) actual_files = sorted([str(f) for _, f in self._asset_file_tuples]) odd_files = set(actual_files) - set(expected_files) if len(odd_files) > 0: print("Warning: Some weather files have unexpected names:") print("\n".join(odd_files)) return self._asset_file_tuples def _fetch_asset_collection(self, asset_collection_id) -> AssetCollection: """Get asset collection object.""" asset_collection = self._platform.get_item(item_id=asset_collection_id, item_type=ItemType.ASSETCOLLECTION) return asset_collection def _construct_command(self, weather_args: WeatherArgs) -> str: """ Constructs SSMT command to run within the weather tool image, to generate weather files. Args: weather_args: Arguments defining space and time scope and weather files' id reference. Returns: String representing a command to be run within the weather tool image. """ assert weather_args, "Space and time scope is not defined." st = weather_args optional_args = f"--ds {self._data_source.name} --id-ref '{st.id_reference}' --node-col '{st.node_column}' " optional_args += "--create-asset" if self._create_asset else "" command_args = f"{st.site_file.name} {ymd(st.start_date)} {ymd(st.end_date)} {optional_args}" command = f"python /app/generate_weather_asset_collection.py {command_args}" return command def _init_work_item(self, weather_args: WeatherArgs, command: str, name: str = None) -> SSMTWorkItem: """ Initializes SSMT work item. Args: weather_args: Arguments defining space and time scope and weather files' id reference. command: Command to be run within the weather tool image. name: Work item name. Returns: Initialized, ready to run, SSMTWorkItem object. """ st = weather_args if not name: label = f"{st.site_file.name} {ymd(st.start_date)}-{ymd(st.end_date)}" name = f"{self._data_source.name} weather for {label}" # Instantiate work item and upload site_details.csv wi = SSMTWorkItem(item_name=name, docker_image=self._image, command=command) wi.tags = {'weather': None, self._data_source.name: None} wi.transient_assets.add_asset(st.site_file) return wi
[docs] def generate(self, weather_args: WeatherArgs, request_name: str = None, force: bool = False) -> Union[WeatherRequest, None]: """ Submits the weather request and when data is ready sets the data_id property. Args: weather_args: Arguments defining space and time scope and weather files' id reference. request_name: (Optional) Name to be used for the weather SSMT work item. force: (Optional) Force the download, even if target weather files already exist in the local dir. Returns: Returns this WeatherRequest object (to support method chaining). """ # Skip if files already exist, unless the 'force' flag is set. if not force and self.files_exist: print("Skipping weather request, files already exist.") return self self._asset_collection_id: Union[str, None] = None # TODO: add date range validation (when supported by the service) command = self._construct_command(weather_args=weather_args) work_item: SSMTWorkItem = self._init_work_item(weather_args=weather_args, command=command, name=request_name) try: # Run work item # Note: For simplicity reasons only synchronous scenario is supported (covers the majority of use cases). work_item.run(wait_on_done=True) comps_wi = work_item.get_platform_object(force=True) # Get asset collection and set data id acs = comps_wi.get_related_asset_collections(RelationType.Created) assert acs and len(acs) > 0, f"Failed to get asset collection for work item {work_item.id}" self._asset_collection_id = str(acs[0].id) print(f"Generated asset collection ID: {self._asset_collection_id}") except ValueError: return None return self
[docs] def download(self, data_id: str = None, local_dir: Union[str, Path] = None, force: bool = False) -> WeatherRequest: """ Downloads weather files. Args: data_id: (Optional) Asset collection ID to be downloaded, even if not generated by this request. local_dir: (Optional) Local dir where files will be downloaded. If not specified a temp dir is created. force: (Optional) Force the download, even if target weather files already exist in the local dir. Returns: Returns this WeatherRequest object (to support method chaining). """ # Override asset collection id and local dir is specified. if data_id: self._asset_collection_id = data_id if local_dir: self._local_dir = local_dir # Skip if files already exist, unless the 'force' flag is set. if self.files_exist and not force: self.report.download = {"ok": [], "fail": [], "skip": self.files} print("Skipping download, files already exist.") return self assert len(self._asset_collection_id) == 36, "Invalid 'asset collection id' length." make_path(self._local_dir) result = {"ok": [], "fail": [], "skip": []} for asset, file_path in self._asset_files: assert asset.filename == file_path.name, "Asset and file name do not match." try: mtime_before = file_path.stat().st_mtime if file_path.is_file() else 0 if not file_path.is_file() or force: asset.download_to_path(str(file_path), force=force) if not file_path.is_file(): key = "fail" elif file_path.stat().st_mtime > mtime_before: key = "ok" else: key = "skip" # TODO: More specific exception handling except Exception as ex: print(str(ex)) key = "fail" result[key].append(str(file_path)) self.report.download = result return self