import calendar
import numpy as np
import numpy.typing as npt
from ..hydrology.lakes_reservoirs import RESERVOIR
from ..store import DynamicArray
from .general import AgentBaseClass
IRRIGATION_RESERVOIR: int = 1
FLOOD_RESERVOIR: int = 2
RESERVOIR_MEMORY_YEARS: int = 20
[docs]
class ReservoirOperators(AgentBaseClass):
"""This class is used to simulate the government.
Args:
model: The GEB model.
agents: The class that includes all agent types (allowing easier communication between agents).
"""
def __init__(self, model, agents):
super().__init__(model)
self.agents = agents
self.config = (
self.model.config["agent_settings"]["reservoir_operators"]
if "reservoir_operators" in self.model.config["agent_settings"]
else {}
)
self.environmental_flow_requirement = 0.0
self.water_conveyance_efficiency = 1.0
if self.model.in_spinup:
self.spinup()
@property
def name(self) -> str:
return "agents.reservoir_operators"
def spinup(self):
self.reservoirs = self.model.hydrology.lakes_reservoirs.var.water_body_data[
self.model.hydrology.lakes_reservoirs.var.water_body_data["waterbody_type"]
== 2
].copy()
assert (self.reservoirs["volume_total"] > 0).all()
self.var.active_reservoirs = self.reservoirs[
self.reservoirs["waterbody_type"] == RESERVOIR
]
self.var.reservoir_release_factors = DynamicArray(
np.full(
len(self.var.active_reservoirs),
self.model.config["agent_settings"]["reservoir_operators"][
"max_reservoir_release_factor"
],
)
)
# set the storage at the beginning of the year
self.var.storage_year_start = self.storage.copy()
# set all reservoirs to a capacity reduction factor of 0.85 (Hanasaki et al., 2006)
# https://doi.org/10.1016/j.jhydrol.2005.11.011
self.var.alpha = np.full_like(
self.var.storage_year_start, 0.85, dtype=np.float32
)
total_monthly_inflow = (
self.var.active_reservoirs["average_discharge"].values * 30 * 24 * 3600
)
# Make all reservoirs irrigation reservoirs. This could be changed in the future
self.var.reservoir_purpose = np.full_like(
self.storage, IRRIGATION_RESERVOIR, dtype=np.int8
)
# Create arrays for n year moving averages of mean total inflow and mean irrigation demand.
self.var.multi_year_monthly_total_inflow = np.full(
(self.storage.size, 12, RESERVOIR_MEMORY_YEARS), np.nan, dtype=np.float32
)
# set this hydrological year to 0 so that we can start counting
self.var.multi_year_monthly_total_inflow[..., 0] = 0
self.var.multi_year_monthly_total_inflow[..., 1] = total_monthly_inflow[
:, np.newaxis
]
self.var.multi_year_monthly_total_irrigation_demand_m3 = (
self.var.multi_year_monthly_total_inflow
) * 0.25
self.var.multi_year_monthly_usable_command_area_release_m3 = (
self.var.multi_year_monthly_total_inflow * 0.25
)
self.var.hydrological_year_counter = (
0 # Number of hydrological years for each reservoir
)
def get_command_area_release(self, gross_irrigation_demand_m3):
assert gross_irrigation_demand_m3.size == self.storage.size
# add the irrigation demand to the multi_year_monthly_total_irrigation_demand_m3, use the current month
self.var.multi_year_monthly_total_irrigation_demand_m3[
:, self.current_month_index, 0
] += gross_irrigation_demand_m3
self.remaining_command_area_release = np.zeros_like(gross_irrigation_demand_m3)
self.command_area_release_m3 = np.zeros_like(gross_irrigation_demand_m3)
# environmental release is not used for irrigation
usable_release_m3, environmental_release_m3 = self._get_release(
irrigation_demand_m3=gross_irrigation_demand_m3,
daily_substeps=1,
enforce_minimum_usable_release_m3=False,
)
# limit command area release to the irrigation demand
self.command_area_release_m3 = np.minimum(
usable_release_m3, gross_irrigation_demand_m3
)
self.usable_command_area_release_m3 = (
self.command_area_release_m3 * self.water_conveyance_efficiency
)
self.remaining_command_area_release = self.command_area_release_m3.copy()
self.gross_irrigation_demand_m3 = gross_irrigation_demand_m3
self.var.multi_year_monthly_usable_command_area_release_m3[
:, self.current_month_index, 0
] += self.usable_command_area_release_m3
# print(
# "fullfillment",
# np.round(
# (self.command_area_release_m3 / self.gross_irrigation_demand_m3) * 100,
# decimals=1,
# ),
# )
return self.usable_command_area_release_m3
[docs]
def get_maximum_abstraction_m3_by_farmer(
self,
farmer_command_areas: npt.NDArray[np.float32],
gross_irrigation_demand_m3_per_farmer: npt.NDArray[np.float32],
) -> npt.NDArray[np.float32]:
"""Get the maximum abstraction from reservoirs for each farmer.
If the configuration is set to equal abstraction, the maxmimum abstraction
per farmer is calculated based on the irrigation demand of the farmers.
Args:
farmer_command_areas: The command areas of the farmers in m2.
gross_irrigation_demand_m3_per_farmer: The gross irrigation demand per farmer in m3.
Returns:
The maximum abstraction from reservoirs for each farmer in m3.
"""
if self.config["equal_abstraction"] is True:
command_area_mask = farmer_command_areas != -1
demand_per_command_area = np.bincount(
farmer_command_areas[command_area_mask],
weights=gross_irrigation_demand_m3_per_farmer[command_area_mask],
minlength=self.model.hydrology.lakes_reservoirs.n,
)
command_area_release_m3 = np.full(
self.model.hydrology.lakes_reservoirs.n, np.nan, dtype=np.float32
)
command_area_release_m3[
self.model.hydrology.lakes_reservoirs.is_reservoir
] = self.command_area_release_m3
correction_factor = command_area_release_m3 / demand_per_command_area
correction_factor_per_farmer = correction_factor[farmer_command_areas]
correction_factor_per_farmer[~command_area_mask] = np.nan
return gross_irrigation_demand_m3_per_farmer * correction_factor_per_farmer
else:
return np.full_like(farmer_command_areas, np.inf, dtype=np.float32)
[docs]
def track_inflow(self, inflow_m3: npt.NDArray[np.float32]) -> None:
"""Track the inflow to the reservoirs. Is called from the routing module every time step.
Args:
inflow_m3: The inflow to the reservoirs in m3.
"""
if inflow_m3.size == 0:
return np.zeros_like(inflow_m3), np.zeros_like(inflow_m3)
# add the inflow to the multi_year_monthly_total_inflow, use the current month
self.var.multi_year_monthly_total_inflow[:, self.current_month_index, 0] += (
inflow_m3
)
def release(
self, daily_substeps: int, current_substep: int
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
command_area_release_substep: npt.NDArray[np.float64] = (
self.command_area_release_m3 / daily_substeps
)
# subtract the evaporation in this timestep from the remaining evaporation
self.remaining_command_area_release -= command_area_release_substep
usable_release_m3, environmental_release_m3 = self._get_release(
irrigation_demand_m3=self.gross_irrigation_demand_m3,
daily_substeps=daily_substeps,
enforce_minimum_usable_release_m3=True,
)
# main channel release is the usable release, the environmental release
# minus the command area release. The command area release
# is divided by the daily timesteps to get the release per timestep.
main_channel_release: npt.NDArray[np.float64] = (
usable_release_m3 + environmental_release_m3 - command_area_release_substep
)
assert (main_channel_release >= 0).all()
if (
current_substep + 1 == daily_substeps and __debug__
): # current_substep starts counting at 0
np.testing.assert_allclose(
self.remaining_command_area_release, 0, atol=1e-7
)
np.testing.assert_allclose(
main_channel_release + command_area_release_substep,
usable_release_m3 + environmental_release_m3,
atol=1e-7,
)
return main_channel_release, command_area_release_substep
def _get_release(
self, irrigation_demand_m3, daily_substeps, enforce_minimum_usable_release_m3
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
if irrigation_demand_m3.size == 0:
return np.zeros_like(irrigation_demand_m3), np.zeros_like(
irrigation_demand_m3
)
days_in_month: int = calendar.monthrange(
self.model.current_time.year, self.model.current_time.month
)[1]
n_monthly_substeps: int = daily_substeps * days_in_month
month_weights: npt.NDArray[np.float32] = np.array(
[31, 28.2425, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], dtype=np.float32
)
month_weights: npt.NDArray[np.float32] = np.broadcast_to(
month_weights, (irrigation_demand_m3.shape[0], 12)
)[..., np.newaxis].repeat(self.var.history_fill_index - 1, axis=2)
# get the long term inflow across all time. Do not consider the current month as
# it is not yet complete.
long_term_monthly_inflow_m3 = np.average(
self.var.multi_year_monthly_total_inflow[
..., 1 : self.var.history_fill_index
],
axis=(1, 2),
weights=month_weights,
)
# get the long term inflow for this month. Do not consider the current month as
# it is not yet complete.
long_term_monthly_inflow_this_month_m3 = np.average(
self.var.multi_year_monthly_total_inflow[
:, self.current_month_index, 1 : self.var.history_fill_index
],
axis=1,
)
long_term_monthly_irrigation_demand_m3 = np.average(
self.var.multi_year_monthly_total_irrigation_demand_m3[
..., 1 : self.var.history_fill_index
],
axis=(1, 2),
weights=month_weights,
)
provisional_reservoir_release_m3 = np.full_like(
self.storage, np.nan, dtype=np.float32
)
# Calculate reservoir release for each reservoir type in m3/s. Only execute if there are reservoirs of that type.
irrigation_reservoirs = self.var.reservoir_purpose == IRRIGATION_RESERVOIR
if np.any(irrigation_reservoirs):
provisional_reservoir_release_m3[irrigation_reservoirs] = (
self.get_irrigation_reservoir_release(
self.capacity[irrigation_reservoirs],
self.var.storage_year_start[irrigation_reservoirs],
long_term_monthly_inflow_m3[irrigation_reservoirs],
long_term_monthly_inflow_this_month_m3[irrigation_reservoirs],
irrigation_demand_m3[irrigation_reservoirs],
long_term_monthly_irrigation_demand_m3[irrigation_reservoirs],
self.var.alpha[irrigation_reservoirs],
n_monthly_substeps,
)
)
if np.any(self.var.reservoir_purpose == FLOOD_RESERVOIR):
raise NotImplementedError
environmental_flow_requirement_m3 = (
long_term_monthly_inflow_m3
* self.environmental_flow_requirement
/ n_monthly_substeps
)
assert (provisional_reservoir_release_m3 >= 0).all()
usable_release_m3, environmental_release_m3 = self._release_corrections(
provisional_reservoir_release_m3=provisional_reservoir_release_m3,
storage_m3=self.storage,
capacity_m3=self.capacity,
minimum_usable_release_m3=self.command_area_release_m3 / daily_substeps
if enforce_minimum_usable_release_m3
else None,
environmental_flow_requirement_m3=environmental_flow_requirement_m3,
alpha=self.var.alpha,
daily_substeps=daily_substeps,
)
assert (usable_release_m3 >= 0).all()
assert (environmental_release_m3 >= 0).all()
assert (
usable_release_m3 >= self.command_area_release_m3 / daily_substeps
).all()
return usable_release_m3, environmental_release_m3
def _release_corrections(
self,
provisional_reservoir_release_m3,
storage_m3,
capacity_m3,
minimum_usable_release_m3,
environmental_flow_requirement_m3,
alpha,
daily_substeps,
):
"""Adjusts the provisional reservoir release to ensure it meets environmental flow requirements, does not exceed the reservoir capacity, and maintains a minimum usable release."""
# release is at least 10% of the mean monthly inflow (environmental flow)
reservoir_release_m3 = np.maximum(
provisional_reservoir_release_m3, environmental_flow_requirement_m3
)
assert (reservoir_release_m3 >= 0).all()
# get provisional storage given inflow and release
provisional_storage_m3 = storage_m3 - reservoir_release_m3
# release any over capacity
normal_capacity = alpha * capacity_m3
over_capacity = provisional_storage_m3 - normal_capacity
reservoir_release_m3 = np.where(
over_capacity > 0,
reservoir_release_m3 + over_capacity,
reservoir_release_m3,
)
assert (reservoir_release_m3 >= 0).all()
# storage can never drop below 10% of the capacity
# also make sure the remaining storage is enough to provide the command area release
# and remaining evaporative demand
minimum_storage_m3 = 0.1 * capacity_m3 + self.remaining_command_area_release
reservoir_release_m3_ = np.where(
provisional_storage_m3 < minimum_storage_m3,
np.maximum(storage_m3 - minimum_storage_m3, 0),
reservoir_release_m3,
)
assert (reservoir_release_m3_ >= 0).all()
environmental_flow_release_m3 = np.minimum(
reservoir_release_m3_, environmental_flow_requirement_m3
)
usable_release_m3 = reservoir_release_m3_ - environmental_flow_release_m3
if minimum_usable_release_m3 is not None:
# make sure the usable release is at least the command area release
# divided by the number of daily substeps
usable_release_m3 = np.maximum(usable_release_m3, minimum_usable_release_m3)
assert (
usable_release_m3 >= self.command_area_release_m3 / daily_substeps
).all()
return usable_release_m3, environmental_flow_release_m3
[docs]
def get_irrigation_reservoir_release(
self,
capacity,
storage_year_start,
long_term_monthly_inflow_m3,
long_term_monthly_inflow_this_month_m3,
current_irrigation_demand_m3,
long_term_monthly_irrigation_demand_m3,
alpha,
n_monthly_substeps,
):
"""https://github.com/gutabeshu/xanthos-wm/blob/updatev1/xanthos-wm/xanthos/reservoirs/WaterManagement.py."""
# Based on Shin et al. (2019)
# https://agupubs.onlinelibrary.wiley.com/doi/10.1029/2018WR023025
M = 0.1
ratio_long_term_demand_to_inflow = (
long_term_monthly_irrigation_demand_m3 / long_term_monthly_inflow_m3
) # here the units don't matter as long as they are the same
irrigation_dominant_reservoirs = ratio_long_term_demand_to_inflow > 1.0 - M
provisional_release = np.full_like(capacity, np.nan, dtype=np.float32)
# equation 2a in Shin et al. (2019)
# if the irrigation demand is not dominant, this assumes there is sufficient
# water in the reservoir to meet the demand
long_term_inflow_irrigation_delta_m3 = (
long_term_monthly_inflow_m3[~irrigation_dominant_reservoirs]
- long_term_monthly_irrigation_demand_m3[~irrigation_dominant_reservoirs]
)
provisional_release[~irrigation_dominant_reservoirs] = (
long_term_inflow_irrigation_delta_m3 / n_monthly_substeps
+ current_irrigation_demand_m3[~irrigation_dominant_reservoirs]
)
provisional_release[irrigation_dominant_reservoirs] = (
long_term_monthly_inflow_m3[irrigation_dominant_reservoirs]
/ n_monthly_substeps
* (
M
+ (1 - M)
* current_irrigation_demand_m3[irrigation_dominant_reservoirs]
/ (
long_term_monthly_irrigation_demand_m3[
irrigation_dominant_reservoirs
]
/ n_monthly_substeps
)
)
) # equation 2b in Shin et al. (2019)
# Targeted monthly release (Shin et al., 2019)
ratio_capacity_to_total_inflow = capacity / (long_term_monthly_inflow_m3 * 12)
# R in Shin et al. (2019). "As R varies from 0 to 1, the reservoir release changes
# from run-of-the-river flow to demand-controlled release.""
demand_controlled_release_ratio = np.minimum(
alpha * ratio_capacity_to_total_inflow, 1.0
)
# kᵣₗₛ [−], "the ratio between the initial storage at the
# beginning of the operational year (S0 [L3])
# and the long-term target storage (αC [L3])" (Shin et al., 2019)
release_coefficient = storage_year_start / (alpha * capacity)
final_release = (
demand_controlled_release_ratio * release_coefficient * provisional_release
+ (1 - demand_controlled_release_ratio)
* long_term_monthly_inflow_this_month_m3
/ n_monthly_substeps
)
return final_release
[docs]
def get_flood_control_reservoir_release(
self, cpa, cond_ppose, qin, S_begin_yr, mtifl, alpha
):
"""Computes release from flood control reservoirs.
cpa = reservoir capacity (m^3)
cond_ppose = array containing irrigation reservoir cells
based on selection mask
qin = inflow (m^3/s)
Sini = initial storage (m^3)
mtifl = annual mean total annual inflow (m^3/s)
alpha = reservoir capacity reduction factor (dimensionless).
"""
# flood Reservoirs
# initialization
Nx = len(cond_ppose)
Rprovisional = np.zeros(
[
Nx,
]
) # Provisional Release
Rflood_final = np.zeros(
[
Nx,
]
) # Final Release
# water management
mtifl_flood = mtifl[cond_ppose] # mean flow: m^3/s
cpa_flood = cpa[cond_ppose] # capacity: m^3
qin_flood = qin[cond_ppose] # mean flow: m^3/s
Sbeginning_ofyear = S_begin_yr[cond_ppose] # capacity: m^3
# Provisional Release
Rprovisional = mtifl_flood.copy()
# Final Release
# capacity & annual total infow
c = np.divide(cpa_flood, (mtifl_flood * 365 * 24 * 3600))
cond1 = np.where(c >= 0.5)[0] # c = capacity/imean >= 0.5
cond2 = np.where(c < 0.5)[0] # c = capacity/imean < 0.5
# c = capacity/imean >= 0.5
Krls = np.divide(Sbeginning_ofyear, (alpha * cpa_flood))
Rflood_final[cond1] = np.multiply(Krls[cond1], mtifl_flood[cond1])
# c = capacity/imean < 0.5
temp1 = (c[cond2] / 0.5) ** 2
temp2 = np.multiply(temp1, Krls[cond2])
temp3 = np.multiply(temp2, Rprovisional[cond2])
temp4 = np.multiply((1 - temp1), qin_flood[cond2])
Rflood_final[cond2] = temp3 + temp4
return Rflood_final
def step(self) -> None:
# operational year should start after the end of the rainy season
# this could also maybe be determined based on the start of the irrigation season
# thus crop calendars
if self.model.current_time.day == 1 and self.model.current_time.month == 10:
# in the second year, we want to discard the default data that was estimated
# from external sources
if self.var.hydrological_year_counter == 1:
self.var.multi_year_monthly_total_inflow[..., 1] = np.nan
self.var.multi_year_monthly_total_irrigation_demand_m3[..., 1] = np.nan
self.var.multi_year_monthly_usable_command_area_release_m3[..., 1] = (
np.nan
)
# in the first year, we don't want to save the data, because we don't have a full year yet
# so no shifting should be done
if self.var.hydrological_year_counter > 0:
self.var.multi_year_monthly_total_inflow[..., 1:] = (
self.var.multi_year_monthly_total_inflow[..., 0:-1]
)
self.var.multi_year_monthly_total_irrigation_demand_m3[..., 1:] = (
self.var.multi_year_monthly_total_irrigation_demand_m3[..., 0:-1]
)
self.var.multi_year_monthly_usable_command_area_release_m3[..., 1:] = (
self.var.multi_year_monthly_usable_command_area_release_m3[
..., 0:-1
]
)
# always reset the counters for the next year
self.var.multi_year_monthly_total_inflow[..., 0] = 0
self.var.multi_year_monthly_total_irrigation_demand_m3[..., 0] = 0
self.var.multi_year_monthly_usable_command_area_release_m3[..., 0] = 0
self.var.hydrological_year_counter += 1
self.var.storage_year_start = self.storage.copy()
self.var.history_fill_index = max(
2, min(RESERVOIR_MEMORY_YEARS, self.var.hydrological_year_counter)
)
self.report(self, locals())
@property
def storage(self):
return self.model.hydrology.lakes_reservoirs.reservoir_storage
@storage.setter
def storage(self, value):
self.model.hydrology.lakes_reservoirs.reservoir_storage = value
# @property
# def evaporation_m3(self):
# return self.model.hydrology.lakes_reservoirs.potential_evaporation_per_water_body_m3_reservoir
@property
def capacity(self):
return self.model.hydrology.lakes_reservoirs.reservoir_capacity
@capacity.setter
def capacity(self, value):
self.model.hydrology.lakes_reservoirs.reservoir_capacity = value
@property
def fill_ratio(self):
return self.storage / self.capacity
@property
def current_month_index(self):
return self.model.current_time.month - 1
@property
def waterbody_ids(self):
return self.model.hydrology.lakes_reservoirs.var.waterbody_ids_original[
self.model.hydrology.lakes_reservoirs.is_reservoir
]
@property
def yearly_usuable_release_m3(self) -> npt.NDArray[np.float32]:
"""Get the yearly usable release in m3.
We do not use the current year, as it may not be complete yet, and
we only use up to the history fill index, because earlier years are not
yet run and thus contain no data.
"""
yearly_usable_release_m3 = self.agents.reservoir_operators.var.multi_year_monthly_usable_command_area_release_m3.sum(
axis=1
)[:, 1 : self.agents.reservoir_operators.var.history_fill_index]
assert not np.isnan(yearly_usable_release_m3).any()
return yearly_usable_release_m3