# -*- coding: utf-8 -*-
import datetime
import re
from operator import attrgetter
from typing import Any, Union
import numpy as np
import pandas as pd
import zarr
from honeybees.library.raster import coord_to_pixel
[docs]
def create_time_array(
start: datetime.datetime,
end: datetime.datetime,
timestep: datetime.timedelta,
conf: dict,
) -> list:
"""Create a time array based on the start and end time, the timestep, and the frequency.
Args:
start: The start time.
end: The end time.
timestep: The timestep length.
conf: The configuration for the frequency.
Returns:
time: The time array.
"""
if "frequency" not in conf:
frequency = {"every": "day"}
else:
frequency = conf["frequency"]
if "every" in frequency:
every = frequency["every"]
time = []
current_time = start
while current_time <= end:
if every == "year":
if (
frequency["month"] == current_time.month
and frequency["day"] == current_time.day
):
time.append(current_time)
elif every == "month":
if frequency["day"] == current_time.day:
time.append(current_time)
elif every == "day":
time.append(current_time)
current_time += timestep
return time
elif frequency == "initial":
return [start]
elif frequency == "final":
return [end]
else:
raise ValueError(f"Frequency {frequency} not recognized.")
[docs]
class Reporter:
"""This class is used to report data to disk.
Args:
model: The GEB model.
"""
def __init__(self, model) -> None:
self.model = model
self.hydrology = model.hydrology
self.report_folder = self.model.output_folder / "report" / self.model.run_name
self.report_folder.mkdir(parents=True, exist_ok=True)
self.variables = {}
self.timesteps = []
if (
self.model.mode == "w"
and "report" in self.model.config
and self.model.config["report"]
):
self.activated = True
for module_name, configs in self.model.config["report"].items():
self.variables[module_name] = {}
for name, config in configs.items():
self.variables[module_name][name] = self.create_variable(
config, module_name, name
)
else:
self.activated = False
[docs]
def create_variable(self, config: dict, module_name: str, name: str) -> None:
"""This function creates a variable for the reporter. For
configurations without a aggregation function, a zarr file is created. For
configurations with an aggregation function, the data
is stored in memory and exported on the final timestep.
Args:
config: The configuration for the variable.
module_name: The name of the module to which the variable belongs.
name: The name of the variable.
"""
if config["type"] in ("grid", "HRU"):
if config["function"] is not None:
return []
else:
if config["type"] == "HRU":
raster = self.hydrology.HRU
else:
raster = self.hydrology.grid
zarr_path = self.report_folder / module_name / (name + ".zarr")
zarr_path.parent.mkdir(parents=True, exist_ok=True)
config["path"] = str(zarr_path)
zarr_store = zarr.storage.LocalStore(zarr_path, read_only=False)
zarr_group = zarr.open_group(zarr_store, mode="w")
time = create_time_array(
start=self.model.current_time,
end=self.model.end_time,
timestep=self.model.timestep_length,
conf=config,
)
time = (
np.array(time, dtype="datetime64[ns]")
.astype("datetime64[s]")
.astype(np.int64)
)
time_group = zarr_group.create_array(
"time",
shape=time.shape,
dtype=time.dtype,
dimension_names=["time"],
)
time_group[:] = time
time_group.attrs.update(
{
"standard_name": "time",
"units": "seconds since 1970-01-01T00:00:00",
"calendar": "gregorian",
}
)
y_group = zarr_group.create_array(
"y",
shape=raster.lat.shape,
dtype=raster.lat.dtype,
dimension_names=["y"],
)
y_group[:] = raster.lat
y_group.attrs.update(
{
"standard_name": "latitude",
"units": "degrees_north",
}
)
x_group = zarr_group.create_array(
"x",
shape=raster.lon.shape,
dtype=raster.lon.dtype,
dimension_names=["x"],
)
x_group[:] = raster.lon
x_group.attrs.update(
{
"standard_name": "longitude",
"units": "degrees_east",
}
)
zarr_data = zarr_group.create_array(
name,
shape=(
time.size,
raster.lat.size,
raster.lon.size,
),
chunks=(
1,
raster.lat.size,
raster.lon.size,
),
dtype=np.float32,
compressor=zarr.codecs.BloscCodec(
cname="zlib",
clevel=9,
shuffle=zarr.codecs.BloscShuffle.shuffle,
),
fill_value=np.nan,
dimension_names=["time", "y", "x"],
)
zarr_data.attrs.update(
{
"grid_mapping": "crs",
"coordinates": "time y x",
"units": "unknown",
"long_name": name,
"_CRS": raster.crs,
}
)
return zarr_store
elif config["type"] == "agents":
if config["function"] is not None:
return []
else:
filepath = zarr_path = (
self.report_folder / module_name / (name + ".zarr")
)
filepath.parent.mkdir(parents=True, exist_ok=True)
store = zarr.storage.LocalStore(filepath, read_only=False)
ds = zarr.open_group(store, mode="w")
time = create_time_array(
start=self.model.current_time,
end=self.model.end_time,
timestep=self.model.timestep_length,
conf=config,
)
time = np.array(
[np.datetime64(t, "s").astype(np.int64) for t in time],
dtype=np.int64,
)
time_group = ds.create_array(
"time",
shape=time.shape,
dtype=time.dtype,
)
time_group[:] = time
time_group.attrs.update(
{
"standard_name": "time",
"units": "seconds since 1970-01-01T00:00:00",
"calendar": "gregorian",
"_ARRAY_DIMENSIONS": ["time"],
}
)
config["_file"] = ds
config["_time_index"] = time
else:
raise ValueError(
f"Type {config['type']} not recognized. Must be 'grid', 'agents' or 'HRU'."
)
[docs]
def maybe_report_value(
self,
module_name: str,
name: Union[str, tuple[str, Any]],
module: Any,
local_variables: dict,
config: dict,
) -> None:
"""This method is used to save and/or export model values.
Args:
name: Name of the value to be exported.
value: The array itself.
conf: Configuration for saving the file. Contains options such a file format, and whether to export the data or save the data in the model.
"""
# here we return None if the value is not to be reported on this timestep
if "frequency" in config:
if config["frequency"] == "initial":
if self.model.current_timestep != 0:
return None
elif config["frequency"] == "final":
if self.model.current_timestep != self.model.n_timesteps - 1:
return None
elif "every" in config["frequency"]:
every = config["frequency"]["every"]
if every == "year":
month = config["frequency"]["month"]
day = config["frequency"]["day"]
if (
self.model.current_time.month != month
or self.model.current_time.day != day
):
return None
elif every == "month":
day = config["frequency"]["day"]
if self.model.current_time.day != day:
return None
elif every == "day":
pass
else:
raise ValueError(
f"Frequency every {config['every']} not recognized (must be 'yearly', or 'monthly')."
)
else:
raise ValueError(f"Frequency {config['frequency']} not recognized.")
varname = config["varname"]
fancy_index = re.search(r"\[.*?\]", varname)
if fancy_index:
fancy_index = fancy_index.group(0)
varname = varname.replace(fancy_index, "")
if varname.startswith("."):
varname = varname[1:]
value = local_variables[varname]
else:
value = attrgetter(varname)(module)
if fancy_index:
value = eval(f"value{fancy_index}")
# if the value is not None, we check whether the value is valid
if isinstance(value, list):
value = [v.item() for v in value]
for v in value:
self.check_value(v)
elif np.isscalar(value):
value = value.item()
self.check_value(value)
self.process_value(module_name, name, value, config)
[docs]
def process_value(
self, module_name: str, name: str, value: np.ndarray, config: dict
) -> None:
"""Exports an array of values to the export folder.
Args:
module_name: Name of the module to which the value belongs.
name: Name of the value to be exported.
value: The array itself.
conf: Configuration for saving the file. Contains options such a file format, and whether to export the array in this timestep at all.
"""
if config["type"] in ("grid", "HRU"):
if config["function"] is None:
if config["type"] == "HRU":
value = self.hydrology.HRU.decompress(value)
else:
value = self.hydrology.grid.decompress(value)
zarr_group = zarr.open_group(self.variables[module_name][name])
if (
np.isin(self.model.current_time_unix_s, zarr_group["time"][:])
and value is not None
):
time_index = np.where(
zarr_group["time"][:] == self.model.current_time_unix_s
)[0].item()
if "substeps" in config:
time_index_start = np.where(time_index)[0][0]
time_index_end = time_index_start + config["substeps"]
zarr_group[name][time_index_start:time_index_end, ...] = value
else:
zarr_group[name][time_index, ...] = value
return None
else:
function, *args = config["function"].split(",")
if function == "mean":
value = np.mean(value)
elif function == "nanmean":
value = np.nanmean(value)
elif function == "sum":
value = np.sum(value)
elif function == "nansum":
value = np.nansum(value)
elif function == "sample":
decompressed_array = self.decompress(config["varname"], value)
value = decompressed_array[int(args[0]), int(args[1])]
elif function == "sample_coord":
if config["varname"].startswith("hydrology.grid"):
gt = self.model.hydrology.grid.gt
elif config["varname"].startswith("hydrology.HRU"):
gt = self.hydrology.HRU.gt
else:
raise ValueError
px, py = coord_to_pixel((float(args[1]), float(args[0])), gt)
decompressed_array = self.decompress(config["varname"], value)
try:
value = decompressed_array[py, px]
except IndexError:
raise IndexError(
f"The coordinate ({args[0]},{args[1]}) is outside the model domain."
)
elif function in ("weightedmean", "weightednanmean"):
if config["type"] == "HRU":
cell_area = self.hydrology.HRU.var.cell_area
else:
cell_area = self.hydrology.grid.var.cell_area
if function == "weightedmean":
value = np.average(value, weights=cell_area)
elif function == "weightednanmean":
value = np.nansum(value * cell_area) / np.sum(cell_area)
else:
raise ValueError(f"Function {function} not recognized")
elif config["type"] == "agents":
if config["function"] is None:
ds = config["_file"]
if name not in ds:
# zarr file has not been created yet
if isinstance(value, (float, int)):
shape = (ds["time"].size,)
chunks = (1,)
compressor = None
dtype = type(value)
array_dimensions = ["time"]
else:
shape = (ds["time"].size, value.size)
chunks = (1, value.size)
compressor = zarr.codecs.BloscCodec(
cname="zlib",
clevel=9,
shuffle=zarr.codecs.BloscShuffle.shuffle,
)
dtype = value.dtype
array_dimensions = ["time", "agents"]
if dtype in (float, np.float32, np.float64):
fill_value = np.nan
elif dtype in (int, np.int32, np.int64):
fill_value = -1
else:
raise ValueError(
f"Value {dtype} of type {type(dtype)} not recognized."
)
ds.create_array(
name,
shape=shape,
chunks=chunks,
dtype=dtype,
compressor=compressor,
fill_value=fill_value,
)
ds[name].attrs["_ARRAY_DIMENSIONS"] = array_dimensions
index = np.argwhere(
config["_time_index"]
== np.datetime64(self.model.current_time, "s").astype(
config["_time_index"].dtype
)
).item()
if value.size < ds[name][index].size:
print("Padding array with NaNs or -1 - temporary solution")
value = np.pad(
value,
(0, ds[name][index].size - value.size),
mode="constant",
constant_values=np.nan
if value.dtype in (float, np.float32, np.float64)
else -1,
)
ds[name][index] = value
return None
else:
function, *args = config["function"].split(",")
if function == "mean":
value = np.mean(value)
elif function == "nanmean":
value = np.nanmean(value)
elif function == "sum":
value = np.sum(value)
elif function == "nansum":
value = np.nansum(value)
else:
raise ValueError(f"Function {function} not recognized")
if np.isnan(value):
value = None
if module_name not in self.variables:
self.variables[module_name] = {}
if name not in self.variables[module_name]:
self.variables[module_name][name] = []
self.variables[module_name][name].append((self.model.current_time, value))
return None
[docs]
def finalize(self) -> None:
"""At the end of the model run, all previously collected data is reported to disk."""
for module_name, variables in self.variables.items():
for name, values in variables.items():
if (
self.model.config["report"][module_name][name]["function"]
is not None
):
df = pd.DataFrame.from_records(
values, columns=["date", name], index="date"
)
folder = self.report_folder / module_name
folder.mkdir(parents=True, exist_ok=True)
df.to_csv(folder / (name + ".csv"))
[docs]
def report(self, module, local_variables, module_name) -> None:
"""This method is in every step function to report data to disk.
Args:
module: The module to report data from.
variables: A dictionary of local variables from the function
that calls this one.
module_name: The name of the module.
"""
if not self.activated:
return None
report = self.model.config["report"].get(module_name, None)
if report is not None:
for name, config in report.items():
self.maybe_report_value(
module_name=module_name,
name=name,
module=module,
local_variables=local_variables,
config=config,
)