import datetime
import re
import shutil
from operator import attrgetter
from typing import Any, Union
import geopandas as gpd
import numpy as np
import pandas as pd
import zarr
from honeybees.library.raster import coord_to_pixel
from geb.store import DynamicArray
from geb.workflows.methods import multi_level_merge
WATER_CIRCLE_REPORT_CONFIG = {
"hydrology": {
"_water_circle_storage": {
"varname": ".current_storage",
"type": "scalar",
},
"_water_circle_routing_loss": {
"varname": ".routing_loss_m3",
"type": "scalar",
},
},
"hydrology.snowfrost": {
"_water_circle_rain": {
"varname": ".rain",
"type": "HRU",
"function": "weightedsum",
},
"_water_circle_snow": {
"varname": ".snow",
"type": "HRU",
"function": "weightedsum",
},
},
"hydrology.routing": {
"_water_circle_river_evaporation": {
"varname": ".total_evaporation_in_rivers_m3",
"type": "scalar",
},
"_water_circle_waterbody_evaporation": {
"varname": ".total_waterbody_evaporation_m3",
"type": "scalar",
},
"_water_circle_river_outflow": {
"varname": ".total_outflow_at_pits_m3",
"type": "scalar",
},
},
"hydrology.water_demand": {
"_water_circle_domestic_water_loss": {
"varname": ".domestic_water_loss_m3",
"type": "scalar",
},
"_water_circle_industry_water_loss": {
"varname": ".industry_water_loss_m3",
"type": "scalar",
},
"_water_circle_livestock_water_loss": {
"varname": ".livestock_water_loss_m3",
"type": "scalar",
},
},
"hydrology.landcover": {
"_water_circle_transpiration": {
"varname": ".actual_transpiration",
"type": "HRU",
"function": "weightedsum",
},
"_water_circle_bare_soil_evaporation": {
"varname": ".actual_bare_soil_evaporation",
"type": "HRU",
"function": "weightedsum",
},
"_water_circle_direct_evaporation": {
"varname": ".open_water_evaporation",
"type": "HRU",
"function": "weightedsum",
},
"_water_circle_interception_evaporation": {
"varname": ".interception_evaporation",
"type": "HRU",
"function": "weightedsum",
},
"_water_circle_snow_sublimation": {
"varname": ".snow_sublimation",
"type": "HRU",
"function": "weightedsum",
},
},
}
[docs]
def create_time_array(
start: datetime.datetime,
end: datetime.datetime,
timestep: datetime.timedelta,
conf: dict,
) -> list:
"""Create a time array based on the start and end time, the timestep, and the frequency.
Args:
start: The start time.
end: The end time.
timestep: The timestep length.
conf: The configuration for the frequency.
Returns:
time: The time array.
"""
if "frequency" not in conf:
frequency = {"every": "day"}
else:
frequency = conf["frequency"]
if "every" in frequency:
every = frequency["every"]
time = []
current_time = start
while current_time <= end:
if every == "year":
if (
frequency["month"] == current_time.month
and frequency["day"] == current_time.day
):
time.append(current_time)
elif every == "month":
if frequency["day"] == current_time.day:
time.append(current_time)
elif every == "day":
time.append(current_time)
current_time += timestep
return time
elif frequency == "initial":
return [start]
elif frequency == "final":
return [end]
else:
raise ValueError(f"Frequency {frequency} not recognized.")
[docs]
class Reporter:
"""This class is used to report data to disk.
Args:
model: The GEB model.
"""
def __init__(self, model, clean) -> None:
self.model = model
if self.model.simulate_hydrology:
self.hydrology = model.hydrology
self.report_folder = self.model.output_folder / "report" / self.model.run_name
# optionally clean report model at start of run
if clean:
shutil.rmtree(self.report_folder, ignore_errors=True)
self.report_folder.mkdir(parents=True, exist_ok=True)
self.variables = {}
self.timesteps = []
if (
self.model.mode == "w"
and "report" in self.model.config
and self.model.config["report"]
):
self.activated: bool = True
report_config: dict[str, Any] = self.model.config["report"]
to_delete: list[str] = []
for module_name, module_values in list(report_config.items()):
if module_name.startswith("_"):
if module_name == "_discharge_stations":
if module_values is True:
stations = gpd.read_parquet(
self.model.files["geom"][
"discharge/discharge_snapped_locations"
]
)
station_reporters = {}
for station_ID, station_info in stations.iterrows():
xy_grid = station_info["snapped_grid_pixel_xy"]
station_reporters[
f"discharge_daily_m3_s_{station_ID}"
] = {
"varname": f"grid.var.discharge_m3_s",
"type": "grid",
"function": f"sample_xy,{xy_grid[0]},{xy_grid[1]}",
}
report_config = multi_level_merge(
report_config,
{"hydrology.routing": station_reporters},
)
elif module_name == "_water_circle":
if module_values is True:
report_config = multi_level_merge(
report_config,
WATER_CIRCLE_REPORT_CONFIG,
)
else:
raise ValueError(
f"Module {module_name} is not a valid module for reporting."
)
to_delete.append(module_name)
for module_name in to_delete:
del self.model.config["report"][module_name]
for module_name, configs in self.model.config["report"].items():
self.variables[module_name] = {}
for name, config in configs.items():
assert isinstance(config, dict), (
f"Configuration for {module_name}.{name} must be a dictionary, but is {type(config)}."
)
self.variables[module_name][name] = self.create_variable(
config, module_name, name
)
else:
self.activated = False
[docs]
def create_variable(self, config: dict, module_name: str, name: str) -> None:
"""This function creates a variable for the reporter.
For
configurations without a aggregation function, a zarr file is created. For
configurations with an aggregation function, the data
is stored in memory and exported on the final timestep.
Args:
config: The configuration for the variable.
module_name: The name of the module to which the variable belongs.
name: The name of the variable.
"""
if config["type"] == "scalar":
assert "function" not in config or config["function"] is None, (
"Scalar variables cannot have a function. "
)
return []
elif config["type"] in ("grid", "HRU"):
if config["function"] is not None:
return []
else:
if config["type"] == "HRU":
raster = self.hydrology.HRU
else:
raster = self.hydrology.grid
zarr_path = self.report_folder / module_name / (name + ".zarr")
zarr_path.parent.mkdir(parents=True, exist_ok=True)
config["path"] = str(zarr_path)
zarr_store = zarr.storage.LocalStore(zarr_path, read_only=False)
zarr_group = zarr.open_group(zarr_store, mode="w")
time = create_time_array(
start=self.model.current_time,
end=self.model.end_time,
timestep=self.model.timestep_length,
conf=config,
)
time = (
np.array(time, dtype="datetime64[ns]")
.astype("datetime64[s]")
.astype(np.int64)
)
time_group = zarr_group.create_array(
"time",
shape=time.shape,
dtype=time.dtype,
dimension_names=["time"],
)
time_group[:] = time
time_group.attrs.update(
{
"standard_name": "time",
"units": "seconds since 1970-01-01T00:00:00",
"calendar": "gregorian",
}
)
y_group = zarr_group.create_array(
"y",
shape=raster.lat.shape,
dtype=raster.lat.dtype,
dimension_names=["y"],
)
y_group[:] = raster.lat
y_group.attrs.update(
{
"standard_name": "latitude",
"units": "degrees_north",
}
)
x_group = zarr_group.create_array(
"x",
shape=raster.lon.shape,
dtype=raster.lon.dtype,
dimension_names=["x"],
)
x_group[:] = raster.lon
x_group.attrs.update(
{
"standard_name": "longitude",
"units": "degrees_east",
}
)
zarr_data = zarr_group.create_array(
name,
shape=(
time.size,
raster.lat.size,
raster.lon.size,
),
chunks=(
1,
raster.lat.size,
raster.lon.size,
),
dtype=np.float32,
compressors=(
zarr.codecs.BloscCodec(
cname="zlib",
clevel=9,
shuffle=zarr.codecs.BloscShuffle.shuffle,
),
),
fill_value=np.nan,
dimension_names=["time", "y", "x"],
)
crs = raster.crs
assert isinstance(crs, str)
zarr_data.attrs.update(
{
"grid_mapping": "crs",
"coordinates": "time y x",
"units": "unknown",
"long_name": name,
"_CRS": {"wkt": crs},
}
)
return zarr_store
elif config["type"] == "agents":
if config["function"] is not None:
return []
else:
filepath = zarr_path = (
self.report_folder / module_name / (name + ".zarr")
)
filepath.parent.mkdir(parents=True, exist_ok=True)
store = zarr.storage.LocalStore(filepath, read_only=False)
zarr_group = zarr.open_group(store, mode="w")
time = create_time_array(
start=self.model.current_time,
end=self.model.end_time,
timestep=self.model.timestep_length,
conf=config,
)
time = np.array(
[np.datetime64(t, "s").astype(np.int64) for t in time],
dtype=np.int64,
)
time_group = zarr_group.create_array(
"time",
shape=time.shape,
dtype=time.dtype,
dimension_names=["time"],
)
time_group[:] = time
time_group.attrs.update(
{
"standard_name": "time",
"units": "seconds since 1970-01-01T00:00:00",
"calendar": "gregorian",
}
)
config["_file"] = zarr_group
config["_time_index"] = time
return store
else:
raise ValueError(
f"Type {config['type']} not recognized. Must be 'scalar', 'grid', 'agents' or 'HRU'."
)
[docs]
def maybe_report_value(
self,
module_name: str,
name: Union[str, tuple[str, Any]],
module: Any,
local_variables: dict,
config: dict,
) -> None:
"""This method is used to save and/or export model values.
Args:
module_name: Name of the module to which the value belongs.
name: Name of the value to be exported.
module: The module to report data from.
local_variables: A dictionary of local variables from the function
that calls this one.
config: Configuration for saving the file. Contains options such a file format, and whether to export the data or save the data in the model.
"""
# here we return None if the value is not to be reported on this timestep
if "frequency" in config:
if config["frequency"] == "initial":
if self.model.current_timestep != 0:
return None
elif config["frequency"] == "final":
if self.model.current_timestep != self.model.n_timesteps - 1:
return None
elif "every" in config["frequency"]:
every = config["frequency"]["every"]
if every == "year":
month = config["frequency"]["month"]
day = config["frequency"]["day"]
if (
self.model.current_time.month != month
or self.model.current_time.day != day
):
return None
elif every == "month":
day = config["frequency"]["day"]
if self.model.current_time.day != day:
return None
elif every == "day":
pass
else:
raise ValueError(
f"Frequency every {config['every']} not recognized (must be 'yearly', or 'monthly')."
)
else:
raise ValueError(f"Frequency {config['frequency']} not recognized.")
varname = config["varname"]
fancy_index = re.search(r"\[.*?\]", varname)
if fancy_index:
fancy_index = fancy_index.group(0)
varname = varname.replace(fancy_index, "")
# get the variable
if varname.startswith("."):
varname = varname[1:]
try:
value = local_variables[varname]
except KeyError:
raise KeyError(
f"Variable {varname} not found in local variables of {module_name}. "
)
else:
try:
value = attrgetter(varname)(module)
except AttributeError:
raise AttributeError(
f"Attribute {varname} not found in module {module_name}. "
)
if fancy_index:
value = eval(f"value{fancy_index}")
# if the value is not None, we check whether the value is valid
if isinstance(value, list):
value = [v.item() for v in value]
for v in value:
assert not np.isnan(value) and not np.isinf(v)
elif np.isscalar(value):
value = value.item()
assert not np.isnan(value) and not np.isinf(value)
self.process_value(module_name, name, value, config)
[docs]
def process_value(
self, module_name: str, name: str, value: np.ndarray, config: dict
) -> None:
"""Exports an array of values to the export folder.
Args:
module_name: Name of the module to which the value belongs.
name: Name of the value to be exported.
value: The array itself.
config: Configuration for saving the file. Contains options such a file format, and whether to export the array in this timestep at all.
"""
if config["type"] in ("grid", "HRU"):
if config["function"] is None:
if config["type"] == "HRU":
value = self.hydrology.HRU.decompress(value)
else:
value = self.hydrology.grid.decompress(value)
zarr_group = zarr.open_group(self.variables[module_name][name])
if (
np.isin(self.model.current_time_unix_s, zarr_group["time"][:])
and value is not None
):
time_index = np.where(
zarr_group["time"][:] == self.model.current_time_unix_s
)[0].item()
if "substeps" in config:
time_index_start = np.where(time_index)[0][0]
time_index_end = time_index_start + config["substeps"]
zarr_group[name][time_index_start:time_index_end, ...] = value
else:
zarr_group[name][time_index, ...] = value
return None
else:
function, *args = config["function"].split(",")
if function == "mean":
value = np.mean(value)
elif function == "nanmean":
value = np.nanmean(value)
elif function == "sum":
value = np.sum(value)
elif function == "nansum":
value = np.nansum(value)
elif function == "sample_xy":
if config["type"] == "grid":
decompressed_array = self.hydrology.grid.decompress(value)
elif config["type"] == "HRU":
decompressed_array = self.hydrology.HRU.decompress(value)
else:
raise ValueError(f"Unknown varname type {config['varname']}")
value = decompressed_array[int(args[1]), int(args[0])]
elif function == "sample_lonlat":
if config["type"] == "grid":
gt = self.model.hydrology.grid.gt
decompressed_array = self.hydrology.grid.decompress(value)
elif config["type"] == "HRU":
gt = self.hydrology.HRU.gt
decompressed_array = self.hydrology.HRU.decompress(value)
else:
raise ValueError(f"Unknown varname type {config['varname']}")
px, py = coord_to_pixel((float(args[0]), float(args[1])), gt)
try:
value = decompressed_array[py, px]
except IndexError:
raise IndexError(
f"The coordinate ({args[0]},{args[1]}) is outside the model domain."
)
elif function in (
"weightedmean",
"weightednanmean",
"weightedsum",
"weightednansum",
):
if config["type"] == "HRU":
cell_area = self.hydrology.HRU.var.cell_area
else:
cell_area = self.hydrology.grid.var.cell_area
if function == "weightedmean":
value = np.average(value, weights=cell_area)
elif function == "weightednanmean":
value = np.nansum(value * cell_area) / np.sum(cell_area)
elif function == "weightedsum":
value = np.sum(value * cell_area)
elif function == "weightednansum":
value = np.nansum(value * cell_area)
else:
raise ValueError(f"Function {function} not recognized")
elif config["type"] == "agents":
if config["function"] is None:
ds = config["_file"]
if name not in ds:
# zarr file has not been created yet
if isinstance(value, (float, int)):
shape = (ds["time"].size,)
chunks = (1,)
compressor = None
dtype = type(value)
array_dimensions = ["time"]
else:
shape = (ds["time"].size, value.size)
chunks = (1, value.size)
compressor = zarr.codecs.BloscCodec(
cname="zlib",
clevel=9,
shuffle=zarr.codecs.BloscShuffle.shuffle,
)
dtype = value.dtype
array_dimensions = ["time", "agents"]
if dtype in (float, np.float32, np.float64):
fill_value = np.nan
elif dtype in (int, np.int32, np.int64):
fill_value = -1
elif dtype == bool:
fill_value = False
else:
raise ValueError(
f"Value {dtype} of type {type(dtype)} not recognized."
)
ds.create_array(
name,
shape=shape,
chunks=chunks,
dtype=dtype,
compressors=(compressor,),
fill_value=fill_value,
dimension_names=array_dimensions,
)
index = np.argwhere(
config["_time_index"]
== np.datetime64(self.model.current_time, "s").astype(
config["_time_index"].dtype
)
).item()
if value.size < ds[name][index].size:
print("Padding array with NaNs or -1 - temporary solution")
value = np.pad(
value.data if isinstance(value, DynamicArray) else value,
(0, ds[name][index].size - value.size),
mode="constant",
constant_values=np.nan
if value.dtype in (float, np.float32, np.float64)
else -1,
)
ds[name][index] = value
return None
else:
function, *args = config["function"].split(",")
if function == "mean":
value = np.mean(value)
elif function == "nanmean":
value = np.nanmean(value)
elif function == "sum":
value = np.sum(value)
elif function == "nansum":
value = np.nansum(value)
else:
raise ValueError(f"Function {function} not recognized")
if np.isnan(value):
value = None
if module_name not in self.variables:
self.variables[module_name] = {}
if name not in self.variables[module_name]:
self.variables[module_name][name] = []
self.variables[module_name][name].append((self.model.current_time, value))
return None
[docs]
def finalize(self) -> None:
"""At the end of the model run, all previously collected data is reported to disk."""
for module_name, variables in self.variables.items():
for name, values in variables.items():
if self.model.config["report"][module_name][name][
"type"
] == "scalar" or (
self.model.config["report"][module_name][name]["function"]
is not None
):
# if the variable is a scalar or has an aggregation function, we report
df = pd.DataFrame.from_records(
values, columns=["date", name], index="date"
)
folder = self.report_folder / module_name
folder.mkdir(parents=True, exist_ok=True)
df.to_csv(folder / (name + ".csv"))
[docs]
def report(self, module, local_variables, module_name) -> None:
"""This method is in every step function to report data to disk.
Args:
module: The module to report data from.
local_variables: A dictionary of local variables from the function
that calls this one.
module_name: The name of the module.
"""
if not self.activated:
return None
report = self.model.config["report"].get(module_name, None)
if report is not None:
for name, config in report.items():
self.maybe_report_value(
module_name=module_name,
name=name,
module=module,
local_variables=local_variables,
config=config,
)