# -*- coding: utf-8 -*-
import calendar
import copy
import math
import os
from datetime import datetime
from typing import Tuple, Union
import numpy as np
import pandas as pd
from honeybees.library.neighbors import find_neighbors
from honeybees.library.raster import pixels_to_coords, sample_from_map
from numba import njit
from scipy.optimize import curve_fit
from scipy.stats import genextreme
from geb.workflows import TimingModule
from ..data import (
load_crop_data,
load_economic_data,
load_regional_crop_data_from_dict,
)
from ..HRUs import load_grid
from ..hydrology.landcover import GRASSLAND_LIKE, NON_PADDY_IRRIGATED, PADDY_IRRIGATED
from ..store import DynamicArray
from ..workflows import balance_check
from ..workflows.io import load_array
from .decision_module import DecisionModule
from .general import AgentBaseClass
from .workflows.crop_farmers import (
abstract_water,
crop_profit_difference_njit,
farmer_command_area,
get_farmer_groundwater_depth,
get_farmer_HRUs,
get_gross_irrigation_demand_m3,
plant,
)
NO_IRRIGATION = -1
CHANNEL_IRRIGATION = 0
RESERVOIR_IRRIGATION = 1
GROUNDWATER_IRRIGATION = 2
TOTAL_IRRIGATION = 3
SURFACE_IRRIGATION_EQUIPMENT = 0
WELL_ADAPTATION = 1
IRRIGATION_EFFICIENCY_ADAPTATION = 2
FIELD_EXPANSION_ADAPTATION = 3
[docs]
def cumulative_mean(mean, counter, update, mask=None):
"""Calculates the cumulative mean of a series of numbers. This function
operates in place.
Args:
mean: The cumulative mean.
counter: The number of elements that have been added to the mean.
update: The new elements that needs to be added to the mean.
"""
if mask is not None:
mean[mask] = (mean[mask] * counter[mask] + update[mask]) / (counter[mask] + 1)
counter[mask] += 1
else:
mean[:] = (mean * counter + update) / (counter + 1)
counter += 1
[docs]
def shift_and_update(array, update):
"""Shifts the array and updates the first element with the update value.
Args:
array: The array that needs to be shifted.
update: The value that needs to be added to the first element of the array.
"""
array[:, 1:] = array[:, :-1]
array[:, 0] = update
[docs]
def shift_and_reset_matrix(matrix: np.ndarray) -> None:
"""
Shifts columns to the right in the matrix and sets the first column to zero.
"""
matrix[:, 1:] = matrix[:, 0:-1] # Shift columns to the right
matrix[:, 0] = 0 # Reset the first column to 0
[docs]
def advance_crop_rotation_year(
current_crop_calendar_rotation_year_index: np.ndarray,
crop_calendar_rotation_years: np.ndarray,
):
"""Update the crop rotation year for each farmer. This function is used to update the crop rotation year for each farmer at the end of the year.
Args:
current_crop_calendar_rotation_year_index: The current crop rotation year for each farmer.
crop_calendar_rotation_years: The number of years in the crop rotation cycle for each farmer.
"""
current_crop_calendar_rotation_year_index[:] = (
current_crop_calendar_rotation_year_index + 1
) % crop_calendar_rotation_years
[docs]
class CropFarmers(AgentBaseClass):
"""The agent class for the farmers. Contains all data and behaviourial methods. The __init__ function only gets the model as arguments, the agent parent class and the redundancy. All other variables are loaded at later stages.
Args:
model: The GEB model.
agents: The class that includes all agent types (allowing easier communication between agents).
redundancy: a lot of data is saved in pre-allocated NumPy arrays. While this allows much faster operation, it does mean that the number of agents cannot grow beyond the size of the pre-allocated arrays. This parameter allows you to specify how much redundancy should be used. A lower redundancy means less memory is used, but the model crashes if the redundancy is insufficient.
"""
def __init__(self, model, agents, reduncancy: float) -> None:
super().__init__(model)
self.agents = agents
self.config = (
self.model.config["agent_settings"]["farmers"]
if "farmers" in self.model.config["agent_settings"]
else {}
)
if self.model.simulate_hydrology:
self.HRU = model.hydrology.HRU
self.grid = model.hydrology.grid
self.redundancy = reduncancy
self.decision_module = DecisionModule(self)
self.inflation_rate = load_economic_data(
self.model.files["dict"]["socioeconomics/inflation_rates"]
)
# self.lending_rate = load_economic_data(
# self.model.files["dict"]["socioeconomics/lending_rates"]
# )
self.electricity_cost = load_economic_data(
self.model.files["dict"]["socioeconomics/electricity_cost"]
)
self.why_10 = load_economic_data(
self.model.files["dict"]["socioeconomics/why_10"]
)
self.why_20 = load_economic_data(
self.model.files["dict"]["socioeconomics/why_20"]
)
self.why_30 = load_economic_data(
self.model.files["dict"]["socioeconomics/why_30"]
)
self.crop_prices = load_regional_crop_data_from_dict(
self.model, "crops/crop_prices"
)
self.cultivation_costs = load_regional_crop_data_from_dict(
self.model, "crops/cultivation_costs"
)
if self.model.in_spinup:
self.spinup()
@property
def name(self):
return "agents.crop_farmers"
def spinup(self):
self.var.crop_data_type, self.var.crop_data = load_crop_data(self.model.files)
self.var.crop_ids = self.var.crop_data["name"].to_dict()
# reverse dictionary
self.var.crop_names = {
crop_name: crop_id for crop_id, crop_name in self.var.crop_ids.items()
}
## Set parameters required for drought event perception, risk perception and SEUT
self.var.moving_average_threshold = self.model.config["agent_settings"][
"farmers"
]["expected_utility"]["drought_risk_calculations"]["event_perception"][
"drought_threshold"
]
self.var.previous_month = 0
# Assign risk aversion sigma, time discounting preferences, expenditure_cap
self.var.expenditure_cap = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["decisions"]["expenditure_cap"]
# New global well variables
self.var.pump_hours = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["pump_hours"]
self.var.specific_weight_water = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["specific_weight_water"]
self.var.max_initial_sat_thickness = self.model.config["agent_settings"][
"farmers"
]["expected_utility"]["adaptation_well"]["max_initial_sat_thickness"]
self.var.lifespan_well = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["lifespan"]
self.var.pump_efficiency = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["pump_efficiency"]
self.var.maintenance_factor = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["maintenance_factor"]
self.var.p_droughts = np.array([100, 50, 25, 10, 5, 2, 1])
# Set water costs
self.var.water_costs_m3_channel = 0.20
self.var.water_costs_m3_reservoir = 0.20
self.var.water_costs_m3_groundwater = 0.20
# Irr efficiency variables
self.var.lifespan_irrigation = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["lifespan"]
# load map of all subdistricts
self.var.subdistrict_map = load_grid(
self.model.files["region_subgrid"]["region_ids"]
)
region_mask = load_grid(self.model.files["region_subgrid"]["mask"])
self.HRU_regions_map = np.zeros_like(self.HRU.mask, dtype=np.int8)
self.HRU_regions_map[~self.HRU.mask] = self.var.subdistrict_map[
region_mask == 0
]
self.HRU_regions_map = self.HRU.compress(self.HRU_regions_map)
self.crop_prices = load_regional_crop_data_from_dict(
self.model, "crops/crop_prices"
)
self.adjust_cultivation_costs()
# Set the cultivation costs
cultivation_cost_fraction = self.model.config["agent_settings"]["farmers"][
"cultivation_cost_fraction"
] # Cultivation costs are set as a fraction of crop prices
date_index, cultivation_costs_array = self.cultivation_costs
adjusted_cultivation_costs_array = (
cultivation_costs_array * cultivation_cost_fraction
)
self.cultivation_costs = (date_index, adjusted_cultivation_costs_array)
# Test with a high variable for now
self.var.total_spinup_time = max(
self.model.config["general"]["start_time"].year
- self.model.config["general"]["spinup_time"].year,
30,
)
self.HRU.var.actual_evapotranspiration_crop_life = self.HRU.full_compressed(
0, dtype=np.float32
)
self.HRU.var.potential_evapotranspiration_crop_life = self.HRU.full_compressed(
0, dtype=np.float32
)
self.HRU.var.crop_map = np.full_like(self.HRU.var.land_owners, -1)
self.HRU.var.crop_age_days_map = np.full_like(self.HRU.var.land_owners, -1)
self.HRU.var.crop_harvest_age_days = np.full_like(self.HRU.var.land_owners, -1)
"""Calls functions to initialize all agent attributes, including their locations. Then, crops are initially planted."""
# If initial conditions based on spinup period need to be loaded, load them. Otherwise, generate them.
farms = self.model.hydrology.farms
# Get number of farmers and maximum number of farmers that could be in the entire model run based on the redundancy.
self.n = np.unique(farms[farms != -1]).size
self.max_n = self.get_max_n(self.n)
# The code below obtains the coordinates of the farmers' locations.
# First the horizontal and vertical indices of the pixels that are not -1 are obtained. Then, for each farmer the
# average of the horizontal and vertical indices is calculated. This is done by using the bincount function.
# Finally, the coordinates are obtained by adding .5 to the pixels and converting them to coordinates using pixel_to_coord.
vertical_index = (
np.arange(farms.shape[0])
.repeat(farms.shape[1])
.reshape(farms.shape)[farms != -1]
)
horizontal_index = np.tile(np.arange(farms.shape[1]), farms.shape[0]).reshape(
farms.shape
)[farms != -1]
pixels = np.zeros((self.n, 2), dtype=np.int32)
pixels[:, 0] = np.round(
np.bincount(farms[farms != -1], horizontal_index)
/ np.bincount(farms[farms != -1])
).astype(int)
pixels[:, 1] = np.round(
np.bincount(farms[farms != -1], vertical_index)
/ np.bincount(farms[farms != -1])
).astype(int)
self.var.locations = DynamicArray(
pixels_to_coords(pixels + 0.5, self.HRU.gt), max_n=self.max_n
)
self.set_social_network()
self.var.risk_aversion = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=np.nan
)
self.var.risk_aversion[:] = load_array(
self.model.files["array"]["agents/farmers/risk_aversion"]
)
self.var.discount_rate = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=np.nan
)
self.var.discount_rate[:] = load_array(
self.model.files["array"]["agents/farmers/discount_rate"]
)
self.var.intention_factor = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=np.nan
)
self.var.intention_factor[:] = load_array(
self.model.files["array"]["agents/farmers/intention_factor"]
)
# Load the region_code of each farmer.
self.var.region_id = DynamicArray(
input_array=load_array(
self.model.files["array"]["agents/farmers/region_id"]
),
max_n=self.max_n,
)
self.var.elevation = self.get_farmer_elevation()
self.var.crop_calendar = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(3, 4),
extra_dims_names=("rotation", "calendar"),
dtype=np.int32,
fill_value=-1,
) # first dimension is the farmers, second is the rotation, third is the crop, planting and growing length
self.var.crop_calendar[:] = load_array(
self.model.files["array"]["agents/farmers/crop_calendar"]
)
# assert self.var.crop_calendar[:, :, 0].max() < len(self.var.crop_ids)
self.var.crop_calendar_rotation_years = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.int32,
fill_value=0,
)
self.var.crop_calendar_rotation_years[:] = load_array(
self.model.files["array"]["agents/farmers/crop_calendar_rotation_years"]
)
self.var.current_crop_calendar_rotation_year_index = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.int32,
fill_value=0,
)
# For each farmer set a random crop rotation year. The farmer starts in that year. First set a seed for reproducibility.
np.random.seed(42)
self.var.current_crop_calendar_rotation_year_index[:] = np.random.randint(
0, self.var.crop_calendar_rotation_years
)
self.var.adaptations = DynamicArray(
load_array(self.model.files["array"]["agents/farmers/adaptations"]),
max_n=self.max_n,
extra_dims_names=("adaptation_type",),
)
# the time each agent has been paying off their loan
# 0 = no cost adaptation, 1 = well, 2 = irr efficiency, 3 = irr. field expansion -1 if they do not have adaptations
self.var.time_adapted = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=self.var.adaptations.shape[1:],
extra_dims_names=self.var.adaptations.extra_dims_names,
dtype=np.int32,
fill_value=-1,
)
# Set the initial well depth
self.var.well_depth = DynamicArray(
n=self.n,
max_n=self.max_n,
fill_value=self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["max_initial_sat_thickness"],
dtype=np.float32,
)
# Set how long the agents have adapted somewhere across the lifespan of farmers, would need to be a bit more realistic likely
rng_wells = np.random.default_rng(17)
self.var.time_adapted[
self.var.adaptations[:, WELL_ADAPTATION] == 1, WELL_ADAPTATION
] = rng_wells.uniform(
1,
self.var.lifespan_well,
np.sum(self.var.adaptations[:, WELL_ADAPTATION] == 1),
)
# Initiate a number of arrays with Nan, zero or -1 values for variables that will be used during the model run.
self.var.channel_abstraction_m3_by_farmer = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=0
)
self.var.reservoir_abstraction_m3_by_farmer = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=0
)
self.var.groundwater_abstraction_m3_by_farmer = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=0
)
# 2D-array for storing yearly abstraction by farmer. 0: channel abstraction, 1: reservoir abstraction, 2: groundwater abstraction, 3: total abstraction
self.var.yearly_abstraction_m3_by_farmer = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(4, self.var.total_spinup_time),
extra_dims_names=("abstraction_type", "year"),
dtype=np.float32,
fill_value=0,
)
# Yield ratio and crop variables
# 0 = kharif age, 1 = rabi age, 2 = summer age, 3 = total growth time
self.var.total_crop_age = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(2,),
extra_dims_names=("abstraction_type",),
dtype=np.float32,
fill_value=0,
)
self.var.max_paddy_water_level = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=0.05,
)
self.var.cumulative_SPEI_during_growing_season = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=0,
)
self.var.cumulative_SPEI_count_during_growing_season = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.int32,
fill_value=0,
)
# set no irrigation limit for farmers by default
self.var.irrigation_limit_m3 = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=np.nan, # m3
)
# set the remaining irrigation limit to the irrigation limit
self.var.remaining_irrigation_limit_m3 = DynamicArray(
n=self.n, max_n=self.max_n, fill_value=np.nan, dtype=np.float32
)
self.var.yield_ratios_drought_event = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.var.p_droughts.size,),
extra_dims_names=("drought_event",),
dtype=np.float32,
fill_value=0,
)
self.var.actual_yield_per_farmer = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=np.nan,
)
self.var.harvested_crop = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.int32,
fill_value=-1,
)
## Risk perception variables
self.var.risk_perception = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["drought_risk_calculations"]["risk_perception"]["min"],
)
self.var.drought_timer = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=99
)
self.var.yearly_SPEI_probability = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.var.total_spinup_time,),
extra_dims_names=("year",),
dtype=np.float32,
fill_value=0,
)
self.var.yearly_yield_ratio = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.var.total_spinup_time,),
extra_dims_names=("year",),
dtype=np.float32,
fill_value=0,
)
# note that this is NOT inflation corrected
self.var.yearly_profits = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.var.total_spinup_time,),
extra_dims_names=("year",),
dtype=np.float32,
fill_value=0,
)
# note that this is NOT inflation corrected
self.var.yearly_potential_profits = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.var.total_spinup_time,),
extra_dims_names=("year",),
dtype=np.float32,
fill_value=0,
)
self.var.farmer_yield_probability_relation = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(2,),
extra_dims_names=("log function parameters",),
dtype=np.float32,
fill_value=0,
)
self.var.household_size = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.int32, fill_value=-1
)
self.var.household_size[:] = load_array(
self.model.files["array"]["agents/farmers/household_size"]
)
self.var.yield_ratios_drought_event = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.var.p_droughts.size,),
extra_dims_names=("drought_event",),
dtype=np.float32,
fill_value=0,
)
# Set irrigation efficiency data
irrigation_mask = self.is_irrigated
self.var.irrigation_efficiency = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=0.50
)
rng = np.random.default_rng(42)
self.var.irrigation_efficiency[irrigation_mask] = rng.choice(
[0.50, 0.90], size=irrigation_mask.sum(), p=[0.8, 0.2]
)
self.var.adaptations[:, IRRIGATION_EFFICIENCY_ADAPTATION][
self.var.irrigation_efficiency >= 0.90
] = 1
rng_drip = np.random.default_rng(70)
self.var.time_adapted[
self.var.adaptations[:, IRRIGATION_EFFICIENCY_ADAPTATION] == 1,
IRRIGATION_EFFICIENCY_ADAPTATION,
] = rng_drip.uniform(
1,
self.var.lifespan_irrigation,
np.sum(self.var.adaptations[:, IRRIGATION_EFFICIENCY_ADAPTATION] == 1),
)
# Set irrigation expansion data
self.var.fraction_irrigated_field = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.float32, fill_value=1
)
self.var.adaptations[:, FIELD_EXPANSION_ADAPTATION][
self.var.fraction_irrigated_field >= 1
] = 1
self.var.base_management_yield_ratio = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=self.model.config["agent_settings"]["farmers"][
"base_management_yield_ratio"
],
)
# Initiate array that tracks the overall yearly costs for all adaptations
# 0 is input, 1 is microcredit, 2 is adaptation 1 (well), 3 is adaptation 2 (drip irrigation), 4 irr. field expansion, 5 is water costs, last is total
# Columns are the individual loans, i.e. if there are 2 loans for 2 wells, the first and second slot is used
self.n_loans = self.var.adaptations.shape[1] + 2
self.var.all_loans_annual_cost = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.n_loans + 1, 5),
extra_dims_names=("loan_type", "loans"),
dtype=np.float32,
fill_value=0,
)
self.var.adjusted_annual_loan_cost = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.n_loans + 1, 5),
extra_dims_names=("loan_type", "loans"),
dtype=np.float32,
fill_value=np.nan,
)
# 0 is input, 1 is microcredit, 2 is adaptation 1 (well), 3 is adaptation 2 (drip irrigation)
self.var.loan_tracker = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(self.n_loans, 5),
extra_dims_names=("loan_type", "loans"),
dtype=np.int32,
fill_value=0,
)
self.var.farmer_base_class = DynamicArray(
n=self.n, max_n=self.max_n, dtype=np.int32, fill_value=-1
)
self.var.water_use = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(4,),
extra_dims_names=("water_source",),
dtype=np.int32,
fill_value=0,
)
# Load the why class of agent's aquifer
self.var.why_class = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.int32,
fill_value=0,
)
why_map = load_grid(self.model.files["grid"]["groundwater/why_map"])
self.var.why_class[:] = sample_from_map(
why_map, self.var.locations.data, self.grid.gt
)
## Load in the GEV_parameters, calculated from the extreme value distribution of the SPEI timeseries, and load in the original SPEI data
self.var.GEV_parameters = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(3,),
extra_dims_names=("gev_parameters",),
dtype=np.float32,
fill_value=np.nan,
)
for i, varname in enumerate(["gev_c", "gev_loc", "gev_scale"]):
GEV_grid = getattr(self.grid, varname)
self.var.GEV_parameters[:, i] = sample_from_map(
GEV_grid, self.var.locations.data, self.grid.gt
)
self.var.risk_perc_min = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["drought_risk_calculations"]["risk_perception"]["min"],
)
self.var.risk_perc_max = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["drought_risk_calculations"]["risk_perception"]["max"],
)
self.var.risk_decr = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["drought_risk_calculations"]["risk_perception"]["coef"],
)
self.var.decision_horizon = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.int32,
fill_value=self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["decisions"]["decision_horizon"],
)
self.var.cumulative_water_deficit_m3 = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(366,),
extra_dims_names=("day",),
dtype=np.float32,
fill_value=0,
)
self.var.cumulative_water_deficit_current_day = DynamicArray(
n=self.n,
max_n=self.max_n,
dtype=np.float32,
fill_value=0,
)
self.var.field_indices_by_farmer = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(2,),
dtype=np.int32,
fill_value=-1,
extra_dims_names=("index",),
)
self.update_field_indices()
[docs]
@staticmethod
@njit(cache=True)
def update_field_indices_numba(
land_owners: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Creates `field_indices_by_farmer` and `field_indices`. These indices are used to quickly find the fields for a specific farmer.
Args:
land_owners: Array of the land owners. Each unique ID is a different land owner. -1 means the land is not owned by anyone.
Returns:
field_indices_by_farmer: This array contains the indices where the fields of a farmer are stored in `field_indices`.
field_indices: This array contains the indices of all fields, ordered by farmer. In other words, if a farmer owns multiple fields, the indices of the fields are indices.
"""
agents = np.unique(land_owners)
if agents[0] == -1:
n_agents = agents.size - 1
else:
n_agents = agents.size
field_indices_by_farmer = np.full((n_agents, 2), -1, dtype=np.int32)
field_indices = np.full(land_owners.size, -1, dtype=np.int32)
land_owners_sort_idx = np.argsort(land_owners)
land_owners_sorted = land_owners[land_owners_sort_idx]
last_not_owned = np.searchsorted(land_owners_sorted, -1, side="right")
prev_land_owner = -1
for i in range(last_not_owned, land_owners.size):
land_owner = land_owners[land_owners_sort_idx[i]]
if land_owner != -1:
if land_owner != prev_land_owner:
field_indices_by_farmer[land_owner, 0] = i - last_not_owned
field_indices_by_farmer[land_owner, 1] = i + 1 - last_not_owned
field_indices[i - last_not_owned] = land_owners_sort_idx[i]
prev_land_owner = land_owner
field_indices = field_indices[:-last_not_owned]
return field_indices_by_farmer, field_indices
[docs]
def update_field_indices(self) -> None:
"""Creates `field_indices_by_farmer` and `field_indices`. These indices are used to quickly find the fields for a specific farmer."""
(
self.var.field_indices_by_farmer[:],
self.var.field_indices,
) = self.update_field_indices_numba(self.HRU.var.land_owners)
[docs]
def set_social_network(self) -> None:
"""
Determines for each farmer a group of neighbors which constitutes their social network
"""
nbits = 19
radius = self.model.config["agent_settings"]["farmers"]["social_network"][
"radius"
]
n_neighbor = self.model.config["agent_settings"]["farmers"]["social_network"][
"size"
]
self.var.social_network = DynamicArray(
n=self.n,
max_n=self.max_n,
extra_dims=(n_neighbor,),
extra_dims_names=("neighbors",),
dtype=np.int32,
fill_value=np.nan,
)
bounds = self.grid.bounds
self.var.social_network[:] = find_neighbors(
self.var.locations.data,
radius=radius,
n_neighbor=n_neighbor,
bits=nbits,
minx=bounds[0],
miny=bounds[1],
maxx=bounds[2],
maxy=bounds[3],
)
def adjust_cultivation_costs(self):
# Set the cultivation costs
self.cultivation_costs = load_regional_crop_data_from_dict(
self.model, "crops/cultivation_costs"
)
cultivation_cost_fraction = self.model.config["agent_settings"]["farmers"][
"cultivation_cost_fraction"
] # Cultivation costs are set as a fraction of crop prices
date_index, cultivation_costs_array = self.cultivation_costs
if (
"calibration" in self.model.config
and "KGE_crops" in self.model.config["calibration"]["calibration_targets"]
):
# Load price change factors 0 to 25 into a NumPy array
factors = np.array(
[
self.model.config["agent_settings"]["calibration_crops"][
f"price_{i}"
]
for i in range(len(self.var.crop_ids))
]
)
# Multiply the cultivation_costs_array by the factors along the last axis
cultivation_costs_array *= factors
else:
cultivation_costs_array = (
cultivation_costs_array * cultivation_cost_fraction
)
self.cultivation_costs = (date_index, cultivation_costs_array)
@property
def activation_order_by_elevation(self):
"""
Activation order is determined by the agent elevation, starting from the highest.
Agents with the same elevation are randomly shuffled.
"""
# if activation order is fixed. Get the random state, and set a fixed seet.
if self.model.config["agent_settings"]["fix_activation_order"]:
if (
hasattr(self, "activation_order_by_elevation_fixed")
and self.activation_order_by_elevation_fixed[0] == self.n
):
return self.activation_order_by_elevation_fixed[1]
random_state = np.random.get_state()
np.random.seed(42)
elevation = self.var.elevation
# Shuffle agent elevation and agent_ids in unision.
p = np.random.permutation(elevation.size)
# if activation order is fixed, set random state to previous state
if self.model.config["agent_settings"]["fix_activation_order"]:
np.random.set_state(random_state)
elevation_shuffled = elevation[p]
agent_ids_shuffled = np.arange(0, elevation.size, 1, dtype=np.int32)[p]
# Use argsort to find the order or the shuffled elevation. Using a stable sorting
# algorithm such that the random shuffling in the previous step is conserved
# in groups with identical elevation.
activation_order_shuffled = np.argsort(elevation_shuffled, kind="stable")[::-1]
# unshuffle the agent_ids to get the activation order
activation_order = agent_ids_shuffled[activation_order_shuffled]
if self.model.config["agent_settings"]["fix_activation_order"]:
self.activation_order_by_elevation_fixed = (self.n, activation_order)
# Check if the activation order is correct, by checking if elevation is decreasing
assert np.diff(elevation[activation_order]).max() <= 0
return activation_order
@property
def farmer_command_area(self):
return farmer_command_area(
self.n,
self.var.field_indices,
self.var.field_indices_by_farmer.data,
self.HRU.var.reservoir_command_areas,
)
@property
def is_in_command_area(self):
return self.farmer_command_area != -1
def save_water_deficit(self, discount_factor=0.2):
water_deficit_day_m3 = (
self.HRU.var.ETRef - self.HRU.pr
) * self.HRU.var.cell_area
water_deficit_day_m3[water_deficit_day_m3 < 0] = 0
water_deficit_day_m3_per_farmer = np.bincount(
self.HRU.var.land_owners[self.HRU.var.land_owners != -1],
weights=water_deficit_day_m3[self.HRU.var.land_owners != -1],
)
day_index = self.model.current_day_of_year - 1
(
self.var.cumulative_water_deficit_current_day,
self.var.cumulative_water_deficit_previous_day,
) = (
(self.var.cumulative_water_deficit_m3[:, day_index]).copy(),
self.var.cumulative_water_deficit_current_day,
)
if day_index == 0:
self.var.cumulative_water_deficit_m3[:, day_index] = (
self.var.cumulative_water_deficit_m3[:, day_index]
* (1 - discount_factor)
+ water_deficit_day_m3_per_farmer * discount_factor
)
else:
self.var.cumulative_water_deficit_m3[:, day_index] = (
self.var.cumulative_water_deficit_m3[:, day_index - 1]
+ water_deficit_day_m3_per_farmer * discount_factor
+ (1 - discount_factor)
* (
self.var.cumulative_water_deficit_m3[:, day_index]
- self.var.cumulative_water_deficit_previous_day
)
)
assert (
self.var.cumulative_water_deficit_m3[:, day_index]
>= self.var.cumulative_water_deficit_m3[:, day_index - 1]
).all()
# if this is the last day of the year, but not a leap year, the virtual
# 366th day of the year is the same as the 365th day of the year
# this avoids complications with the leap year
if day_index == 364 and not calendar.isleap(self.model.current_time.year):
self.var.cumulative_water_deficit_m3[:, 365] = (
self.var.cumulative_water_deficit_m3[:, 364]
)
def get_gross_irrigation_demand_m3(
self, potential_evapotranspiration, available_infiltration
) -> np.ndarray:
gross_irrigation_demand_m3 = get_gross_irrigation_demand_m3(
day_index=self.model.current_day_of_year - 1,
n=self.n,
currently_irrigated_fields=self.currently_irrigated_fields,
field_indices_by_farmer=self.var.field_indices_by_farmer.data,
field_indices=self.var.field_indices,
irrigation_efficiency=self.var.irrigation_efficiency.data,
fraction_irrigated_field=self.var.fraction_irrigated_field.data,
cell_area=self.model.hydrology.HRU.var.cell_area,
crop_map=self.HRU.var.crop_map,
topwater=self.HRU.var.topwater,
available_infiltration=available_infiltration,
potential_evapotranspiration=potential_evapotranspiration,
root_depth=self.HRU.var.root_depth,
soil_layer_height=self.HRU.var.soil_layer_height,
field_capacity=self.HRU.var.wfc,
wilting_point=self.HRU.var.wwp,
w=self.HRU.var.w,
ws=self.HRU.var.ws,
arno_beta=self.HRU.var.arnoBeta,
remaining_irrigation_limit_m3=self.var.remaining_irrigation_limit_m3.data,
cumulative_water_deficit_m3=self.var.cumulative_water_deficit_m3.data,
crop_calendar=self.var.crop_calendar.data,
crop_group_numbers=self.var.crop_data["crop_group_number"].values.astype(
np.float32
),
paddy_irrigated_crops=self.var.crop_data["is_paddy"].values,
current_crop_calendar_rotation_year_index=self.var.current_crop_calendar_rotation_year_index.data,
max_paddy_water_level=self.var.max_paddy_water_level.data,
minimum_effective_root_depth=self.model.hydrology.soil.var.minimum_effective_root_depth,
)
assert (
gross_irrigation_demand_m3 < self.model.hydrology.HRU.var.cell_area
).all()
return gross_irrigation_demand_m3
@property
def surface_irrigated(self):
return self.var.adaptations[:, SURFACE_IRRIGATION_EQUIPMENT] > 0
@property
def well_irrigated(self):
return self.var.adaptations[:, WELL_ADAPTATION] > 0
@property
def irrigated(self):
return self.surface_irrigated | self.well_irrigated # | is the OR operator
@property
def currently_irrigated_fields(self):
return self.farmer_to_field(self.is_irrigated, False) & (
self.HRU.var.crop_map != -1
)
[docs]
def abstract_water(
self,
gross_irrigation_demand_m3_per_field: np.ndarray,
available_channel_storage_m3: np.ndarray,
available_groundwater_m3: np.ndarray,
groundwater_depth: np.ndarray,
available_reservoir_storage_m3: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
This function allows the abstraction of water by farmers for irrigation purposes. It's main purpose is to call the relevant numba function to do the actual abstraction. In addition, the function saves the abstraction from the various sources by farmer.
Args:
cell_area: the area of each subcell in m2.
HRU_to_grid: array to map the index of each subcell to the corresponding cell.
totalPotIrrConsumption: potential irrigation consumption.
available_channel_storage_m3: water available for irrigation from channels.
groundwater_head: groundwater head.
available_groundwater_m3: water available for irrigation from groundwater.
available_reservoir_storage_m3: water available for irrigation from reservoirs.
command_areas: command areas associated with reservoirs (i.e., which areas can access water from which reservoir.)
Returns:
water_withdrawal_m: water withdrawal in meters
water_consumption_m: water consumption in meters
returnFlowIrr_m: return flow in meters
addtoevapotrans_m: evaporated irrigation water in meters
"""
assert (available_channel_storage_m3 >= 0).all()
assert (available_groundwater_m3 >= 0).all()
assert (available_reservoir_storage_m3 >= 0).all()
if __debug__:
irrigation_limit_pre = self.var.remaining_irrigation_limit_m3.copy()
available_channel_storage_m3_pre = available_channel_storage_m3.copy()
available_groundwater_m3_pre = available_groundwater_m3.copy()
available_reservoir_storage_m3_pre = available_reservoir_storage_m3.copy()
(
self.var.channel_abstraction_m3_by_farmer[:],
self.var.reservoir_abstraction_m3_by_farmer[:],
self.var.groundwater_abstraction_m3_by_farmer[:],
water_withdrawal_m,
water_consumption_m,
returnFlowIrr_m,
addtoevapotrans_m,
) = abstract_water(
activation_order=self.activation_order_by_elevation,
field_indices_by_farmer=self.var.field_indices_by_farmer.data,
field_indices=self.var.field_indices,
irrigation_efficiency=self.var.irrigation_efficiency.data,
surface_irrigated=self.surface_irrigated,
well_irrigated=self.well_irrigated,
cell_area=self.model.hydrology.HRU.var.cell_area,
HRU_to_grid=self.HRU.var.HRU_to_grid,
nearest_river_grid_cell=self.HRU.var.nearest_river_grid_cell,
crop_map=self.HRU.var.crop_map,
available_channel_storage_m3=available_channel_storage_m3,
available_groundwater_m3=available_groundwater_m3,
available_reservoir_storage_m3=available_reservoir_storage_m3,
groundwater_depth=groundwater_depth,
farmer_command_area=self.farmer_command_area,
return_fraction=self.model.config["agent_settings"]["farmers"][
"return_fraction"
],
well_depth=self.var.well_depth.data,
remaining_irrigation_limit_m3=self.var.remaining_irrigation_limit_m3.data,
gross_irrigation_demand_m3_per_field=gross_irrigation_demand_m3_per_field,
)
assert (water_withdrawal_m < 1).all()
assert (water_consumption_m < 1).all()
assert (returnFlowIrr_m < 1).all()
assert (addtoevapotrans_m < 1).all()
if __debug__:
# make sure the withdrawal per source is identical to the total withdrawal in m (corrected for cell area)
balance_check(
name="water withdrawal_1",
how="sum",
influxes=(
self.var.channel_abstraction_m3_by_farmer,
self.var.reservoir_abstraction_m3_by_farmer,
self.var.groundwater_abstraction_m3_by_farmer,
),
outfluxes=[
(water_withdrawal_m * self.model.hydrology.HRU.var.cell_area)
],
tollerance=50,
)
# assert that the total amount of water withdrawn is equal to the total storage before and after abstraction
balance_check(
name="water withdrawal channel",
how="sum",
outfluxes=self.var.channel_abstraction_m3_by_farmer,
prestorages=available_channel_storage_m3_pre,
poststorages=available_channel_storage_m3,
tollerance=50,
)
balance_check(
name="water withdrawal reservoir",
how="sum",
outfluxes=self.var.reservoir_abstraction_m3_by_farmer,
prestorages=available_reservoir_storage_m3_pre,
poststorages=available_reservoir_storage_m3,
tollerance=50,
)
balance_check(
name="water withdrawal groundwater",
how="sum",
outfluxes=self.var.groundwater_abstraction_m3_by_farmer,
prestorages=available_groundwater_m3_pre,
poststorages=available_groundwater_m3,
tollerance=50,
)
# assert that the total amount of water withdrawn is equal to the total storage before and after abstraction
balance_check(
name="water withdrawal_2",
how="sum",
outfluxes=(
self.var.channel_abstraction_m3_by_farmer[
~np.isnan(self.var.remaining_irrigation_limit_m3)
],
self.var.reservoir_abstraction_m3_by_farmer[
~np.isnan(self.var.remaining_irrigation_limit_m3)
],
self.var.groundwater_abstraction_m3_by_farmer[
~np.isnan(self.var.remaining_irrigation_limit_m3)
],
),
prestorages=irrigation_limit_pre[
~np.isnan(self.var.remaining_irrigation_limit_m3)
],
poststorages=self.var.remaining_irrigation_limit_m3[
~np.isnan(self.var.remaining_irrigation_limit_m3)
],
tollerance=50,
)
# make sure the total water consumption plus 'wasted' irrigation water (evaporation + return flow) is equal to the total water withdrawal
balance_check(
name="water consumption",
how="sum",
influxes=(
water_consumption_m,
returnFlowIrr_m,
addtoevapotrans_m,
),
outfluxes=water_withdrawal_m,
tollerance=50,
)
assert water_withdrawal_m.dtype == np.float32
assert water_consumption_m.dtype == np.float32
assert returnFlowIrr_m.dtype == np.float32
assert addtoevapotrans_m.dtype == np.float32
return (
water_withdrawal_m,
water_consumption_m,
returnFlowIrr_m,
addtoevapotrans_m,
)
[docs]
@staticmethod
@njit(cache=True)
def get_yield_ratio_numba_GAEZ(
crop_map: np.ndarray, evap_ratios: np.ndarray, KyT
) -> float:
"""Calculate yield ratio based on https://doi.org/10.1016/j.jhydrol.2009.07.031
Args:
crop_map: array of currently harvested crops.
evap_ratios: ratio of actual to potential evapotranspiration of harvested crops.
KyT: Water stress reduction factor from GAEZ.
Returns:
yield_ratios: yield ratio (as ratio of maximum obtainable yield) per harvested crop.
"""
yield_ratios = np.full(evap_ratios.size, -1, dtype=np.float32)
assert crop_map.size == evap_ratios.size
for i in range(evap_ratios.size):
evap_ratio = evap_ratios[i]
crop = crop_map[i]
yield_ratios[i] = max(
1 - KyT[crop] * (1 - evap_ratio), 0
) # Yield ratio is never lower than 0.
return yield_ratios
[docs]
@staticmethod
@njit(cache=True)
def get_yield_ratio_numba_MIRCA2000(
crop_map: np.ndarray,
evap_ratios: np.ndarray,
alpha: np.ndarray,
beta: np.ndarray,
P0: np.ndarray,
P1: np.ndarray,
) -> float:
"""Calculate yield ratio based on https://doi.org/10.1016/j.jhydrol.2009.07.031
Args:
crop_map: array of currently harvested crops.
evap_ratios: ratio of actual to potential evapotranspiration of harvested crops.
alpha: alpha value per crop used in MIRCA2000.
beta: beta value per crop used in MIRCA2000.
P0: P0 value per crop used in MIRCA2000.
P1: P1 value per crop used in MIRCA2000.
Returns:
yield_ratios: yield ratio (as ratio of maximum obtainable yield) per harvested crop.
"""
yield_ratios = np.full(evap_ratios.size, -1, dtype=np.float32)
assert crop_map.size == evap_ratios.size
for i in range(evap_ratios.size):
evap_ratio = evap_ratios[i]
crop = crop_map[i]
if alpha[crop] * evap_ratio + beta[crop] > 1:
yield_ratio = 1
elif P0[crop] < evap_ratio < P1[crop]:
yield_ratio = (
alpha[crop] * P1[crop]
+ beta[crop]
- (P1[crop] - evap_ratio)
* (alpha[crop] * P1[crop] + beta[crop])
/ (P1[crop] - P0[crop])
)
elif evap_ratio < P0[crop]:
yield_ratio = 0
else:
yield_ratio = alpha[crop] * evap_ratio + beta[crop]
yield_ratios[i] = yield_ratio
return yield_ratios
[docs]
def get_yield_ratio(
self,
harvest: np.ndarray,
actual_transpiration: np.ndarray,
potential_transpiration: np.ndarray,
crop_map: np.ndarray,
) -> np.ndarray:
"""Gets yield ratio for each crop given the ratio between actual and potential evapostranspiration during growth.
Args:
harvest: Map of crops that are harvested.
actual_transpiration: Actual evapotranspiration during crop growth period.
potential_transpiration: Potential evapotranspiration during crop growth period.
crop_map: Subarray of type of crop grown.
Returns:
yield_ratio: Map of yield ratio.
TODO: Implement GAEZ crop stage function
"""
if self.var.crop_data_type == "GAEZ":
yield_ratio = self.get_yield_ratio_numba_GAEZ(
crop_map[harvest],
actual_transpiration[harvest] / potential_transpiration[harvest],
self.var.crop_data["KyT"].values,
)
elif self.var.crop_data_type == "MIRCA2000":
yield_ratio = self.get_yield_ratio_numba_MIRCA2000(
crop_map[harvest],
actual_transpiration[harvest] / potential_transpiration[harvest],
self.var.crop_data["a"].values,
self.var.crop_data["b"].values,
self.var.crop_data["P0"].values,
self.var.crop_data["P1"].values,
)
if np.any(yield_ratio == 0):
pass
else:
raise ValueError(
f"Unknown crop data type: {self.var.crop_data_type}, must be 'GAEZ' or 'MIRCA2000'"
)
assert not np.isnan(yield_ratio).any()
return yield_ratio
def field_to_farmer(self, array, method="sum"):
assert method == "sum", "Only sum is implemented"
farmer_fields = self.HRU.var.land_owners[self.HRU.var.land_owners != -1]
masked_array = array[self.HRU.var.land_owners != -1]
return np.bincount(farmer_fields, masked_array, minlength=self.n)
def farmer_to_field(self, array, nodata):
by_field = np.take(array, self.HRU.var.land_owners)
by_field[self.HRU.var.land_owners == -1] = nodata
return by_field
def decompress(self, array):
if np.issubdtype(array.dtype, np.floating):
nofieldvalue = np.nan
else:
nofieldvalue = -1
by_field = self.farmer_to_field(array, nodata=nofieldvalue)
return self.HRU.decompress(by_field)
@property
def mask(self):
mask = self.HRU.mask.copy()
mask[self.decompress(self.HRU.var.land_owners) == -1] = True
return mask
[docs]
@staticmethod
@njit(cache=True)
def harvest_numba(
n: np.ndarray,
field_indices_by_farmer: np.ndarray,
field_indices: np.ndarray,
crop_map: np.ndarray,
crop_age_days: np.ndarray,
crop_harvest_age_days: np.ndarray,
) -> np.ndarray:
"""This function determines whether crops are ready to be harvested by comparing the crop harvest age to the current age of the crop. If the crop is harvested, the crops next multicrop index and next plant day are determined.
Args:
n: Number of farmers.
start_day_per_month: Array containing the starting day of each month.
field_indices_by_farmer: This array contains the indices where the fields of a farmer are stored in `field_indices`.
field_indices: This array contains the indices of all fields, ordered by farmer. In other words, if a farmer owns multiple fields, the indices of the fields are indices.
crop_map: Subarray map of crops.
crop_age_days: Subarray map of current crop age in days.
crop: Crops grown by each farmer.
switch_crops: Whether to switch crops or not.
Returns:
harvest: Boolean subarray map of fields to be harvested.
"""
harvest = np.zeros(crop_map.shape, dtype=np.bool_)
for farmer_i in range(n):
farmer_fields = get_farmer_HRUs(
field_indices, field_indices_by_farmer, farmer_i
)
for field in farmer_fields:
crop_age = crop_age_days[field]
if crop_age >= 0:
crop = crop_map[field]
assert crop != -1
if crop_age == crop_harvest_age_days[field]:
harvest[field] = True
crop_harvest_age_days[field] = -1
else:
assert crop_map[field] == -1
return harvest
[docs]
def harvest(self):
"""
Determine which crops need to be harvested based on their current age and their harvest age.
Once harvested, compute various metrics related to the harvest including potential profit,
actual profit, crop age, drought perception, and update corresponding attributes of the model.
Save the corresponding SPEI over the last harvest.
Attributes:
harvest_numba: A helper function to obtain the harvest map.
get_yield_ratio: A function to calculate yield ratio based on the ratio of actual to potential evapotranspiration.
Note:
The function also updates the drought risk perception and tracks disposable income.
"""
# Using the helper function to determine which crops are ready to be harvested
harvest = self.harvest_numba(
n=self.n,
field_indices_by_farmer=self.var.field_indices_by_farmer.data,
field_indices=self.var.field_indices,
crop_map=self.HRU.var.crop_map,
crop_age_days=self.HRU.var.crop_age_days_map,
crop_harvest_age_days=self.HRU.var.crop_harvest_age_days,
)
self.var.actual_yield_per_farmer.fill(np.nan)
self.var.harvested_crop.fill(-1)
# If there are fields to be harvested, compute yield ratio and various related metrics
if np.count_nonzero(harvest):
# Get yield ratio for the harvested crops
yield_ratio_per_field = self.get_yield_ratio(
harvest,
self.HRU.var.actual_evapotranspiration_crop_life,
self.HRU.var.potential_evapotranspiration_crop_life,
self.HRU.var.crop_map,
)
assert (yield_ratio_per_field >= 0).all()
harvesting_farmer_fields = self.HRU.var.land_owners[harvest]
harvested_area = self.HRU.var.cell_area[harvest]
harvested_crops = self.HRU.var.crop_map[harvest]
max_yield_per_crop = np.take(
self.var.crop_data["reference_yield_kg_m2"].values, harvested_crops
)
harvesting_farmers = np.unique(harvesting_farmer_fields)
# it's okay for some crop prices to be nan, as they will be filtered out in the next step
crop_prices = self.agents.market.crop_prices
region_id_per_field = self.var.region_id
# Determine the region ids of harvesting farmers, as crop prices differ per region
region_id_per_field = self.var.region_id[self.HRU.var.land_owners]
region_id_per_field[self.HRU.var.land_owners == -1] = -1
region_id_per_harvested_field = region_id_per_field[harvest]
# Calculate the crop price per field
crop_price_per_field = crop_prices[
region_id_per_harvested_field, harvested_crops
]
# but it's not okay for the crop price to be nan now
assert not np.isnan(crop_price_per_field).any()
# Correct yield ratio
yield_ratio_per_field = (
self.var.base_management_yield_ratio[harvesting_farmer_fields]
* yield_ratio_per_field
)
# Calculate the potential yield per field
potential_yield_per_field = max_yield_per_crop * harvested_area
# Calculate the total yield per field
actual_yield_per_field = yield_ratio_per_field * potential_yield_per_field
# And sum the total yield per field to get the total yield per farmer
self.var.actual_yield_per_farmer[:] = np.bincount(
harvesting_farmer_fields,
weights=actual_yield_per_field,
minlength=self.n,
)
# get the harvested crop per farmer. This assumes each farmer only harvests one crop
# on the same day
self.var.harvested_crop[harvesting_farmers] = harvested_crops[
np.unique(self.HRU.var.land_owners[harvest], return_index=True)[1]
]
# Determine the actual and potential profits
potential_profit_per_field = (
potential_yield_per_field * crop_price_per_field
)
actual_profit_per_field = actual_yield_per_field * crop_price_per_field
assert (potential_profit_per_field >= 0).all()
assert (actual_profit_per_field >= 0).all()
# Convert from the profit and potential profit per field to the profit per farmer
potential_profit_farmer = np.bincount(
harvesting_farmer_fields,
weights=potential_profit_per_field,
minlength=self.n,
)
self.profit_farmer = np.bincount(
harvesting_farmer_fields,
weights=actual_profit_per_field,
minlength=self.n,
)
# Convert the yield_ratio per field to the average yield ratio per farmer
# yield_ratio_per_farmer = self.profit_farmer / potential_profit_farmer
# Get the current crop age
crop_age = self.HRU.var.crop_age_days_map[harvest]
total_crop_age = np.bincount(
harvesting_farmer_fields, weights=crop_age, minlength=self.n
) / np.bincount(harvesting_farmer_fields, minlength=self.n)
self.var.total_crop_age[harvesting_farmers, 0] = total_crop_age[
harvesting_farmers
]
harvesting_farmers_mask = np.zeros(self.n, dtype=bool)
harvesting_farmers_mask[harvesting_farmers] = True
self.save_yearly_profits(self.profit_farmer, potential_profit_farmer)
self.save_harvest_spei(harvesting_farmers)
self.drought_risk_perception(harvesting_farmers, total_crop_age)
## After updating the drought risk perception, set the previous month for the next timestep as the current for this timestep.
# TODO: This seems a bit like a quirky solution, perhaps there is a better way to do this.
self.var.previous_month = self.model.current_time.month
else:
self.profit_farmer = np.zeros(self.n, dtype=np.float32)
# Reset transpiration values for harvested fields
self.HRU.var.actual_evapotranspiration_crop_life[harvest] = 0
self.HRU.var.potential_evapotranspiration_crop_life[harvest] = 0
# Update crop and land use maps after harvest
self.HRU.var.crop_map[harvest] = -1
self.HRU.var.crop_age_days_map[harvest] = -1
self.HRU.var.land_use_type[harvest] = GRASSLAND_LIKE
# For unharvested growing crops, increase their age by 1
self.HRU.var.crop_age_days_map[(~harvest) & (self.HRU.var.crop_map >= 0)] += 1
assert (
self.HRU.var.crop_age_days_map <= self.HRU.var.crop_harvest_age_days
).all()
[docs]
def drought_risk_perception(
self, harvesting_farmers: np.ndarray, total_crop_age: np.ndarray
) -> None:
"""Calculate and update the drought risk perception for harvesting farmers.
Args:
harvesting_farmers: Index array of farmers that are currently harvesting.
This function computes the risk perception of farmers based on the difference
between their latest profits and potential profits. The perception is influenced
by the historical losses and time since the last drought event. Farmers who have
experienced a drought event will have their drought timer reset.
TODO: Perhaps move the constant to the model.yml
"""
# constants
HISTORICAL_PERIOD = min(5, self.var.yearly_potential_profits.shape[1]) # years
# Convert the harvesting farmers index array to a boolean array of full length
harvesting_farmers_long = np.zeros(self.n, dtype=bool)
harvesting_farmers_long[harvesting_farmers] = True
# Update the drought timer based on the months passed since the previous check
months_passed = (self.model.current_time.month - self.var.previous_month) % 12
self.var.drought_timer += months_passed / 12
# Create an empty drought loss np.ndarray
drought_loss_historical = np.zeros(
(self.n, HISTORICAL_PERIOD), dtype=np.float32
)
# Calculate the cumulative inflation from the start year to the current year for each farmer
# the base year is not important here as we are only interested in the relative change
cumulative_inflation_since_base_year = np.cumprod(
np.stack(
[
self.get_value_per_farmer_from_region_id(
self.inflation_rate,
datetime(year, 1, 1),
subset=harvesting_farmers_long,
)
for year in range(
self.model.current_time.year + 1 - HISTORICAL_PERIOD,
self.model.current_time.year + 1,
)
],
axis=1,
),
axis=1,
)
# Compute the percentage loss between potential and actual profits for harvesting farmers
potential_profits_inflation_corrected = (
self.var.yearly_potential_profits[
harvesting_farmers_long, :HISTORICAL_PERIOD
]
/ cumulative_inflation_since_base_year
)
actual_profits_inflation_corrected = (
self.var.yearly_profits[harvesting_farmers_long, :HISTORICAL_PERIOD]
/ cumulative_inflation_since_base_year
)
drought_loss_historical[harvesting_farmers_long] = (
(potential_profits_inflation_corrected - actual_profits_inflation_corrected)
/ potential_profits_inflation_corrected
) * 100
# Calculate the current and past average loss percentages
drought_loss_latest = drought_loss_historical[:, 0]
drought_loss_past = np.mean(drought_loss_historical[:, 1:], axis=1)
# Identify farmers who experienced a drought event based on loss comparison with historical losses
drought_loss_current = drought_loss_latest - drought_loss_past
experienced_drought_event = (
drought_loss_current >= self.var.moving_average_threshold
)
# Reset the drought timer for farmers who have harvested and experienced a drought event
self.var.drought_timer[
np.logical_and(harvesting_farmers_long, experienced_drought_event)
] = 0
# Update the risk perception of all farmers
self.var.risk_perception = (
self.var.risk_perc_max
* (1.6 ** (self.var.risk_decr * self.var.drought_timer))
+ self.var.risk_perc_min
)
print(
"Risk perception mean = ",
np.mean(self.var.risk_perception),
"STD",
np.std(self.var.risk_perception),
)
# Determine which farmers need emergency microcredit to keep farming
loaning_farmers = drought_loss_current >= self.var.moving_average_threshold
# Determine their microcredit
if (
np.any(loaning_farmers)
and "ruleset" in self.config
and not self.config["ruleset"] == "no-adaptation"
):
# print(np.count_nonzero(loaning_farmers), "farmers are getting microcredit")
self.microcredit(loaning_farmers, drought_loss_current, total_crop_age)
[docs]
def microcredit(
self,
loaning_farmers: np.ndarray,
drought_loss_current: np.ndarray,
total_crop_age: np.ndarray,
) -> None:
"""
Compute the microcredit for farmers based on their average profits, drought losses, and the age of their crops
with respect to their total cropping time.
Parameters:
- loaning_farmers: Boolean mask of farmers looking to obtain a loan, based on drought loss of harvesting farmers.
- drought_loss_current: Array of drought losses of the most recent harvest for each farmer.
- total_crop_age: Array of total age for crops of each farmer.
"""
# Compute the maximum loan amount based on the average profits of the last 10 years
max_loan = np.median(self.var.yearly_profits[loaning_farmers, :5], axis=1)
# Compute the crop age as a percentage of the average total time a farmer has had crops planted
crop_age_fraction = total_crop_age[loaning_farmers] / np.mean(
self.var.total_crop_age[loaning_farmers], axis=1
)
# Calculate the total loan amount based on drought loss, crop age percentage, and the maximum loan
total_loan = (
(drought_loss_current[loaning_farmers] / 100) * crop_age_fraction * max_loan
)
# Fetch loan configurations from the model settings
loan_duration = self.model.config["agent_settings"]["farmers"]["microcredit"][
"loan_duration"
]
# interest_rate = self.get_value_per_farmer_from_region_id(
# self.var.lending_rate, self.model.current_time
# )
interest_rate = np.full(self.n, 0.05, dtype=np.float32)
# Compute the annual cost of the loan using the interest rate and loan duration
annual_cost_microcredit = total_loan * (
interest_rate[loaning_farmers]
* (1 + interest_rate[loaning_farmers]) ** loan_duration
/ ((1 + interest_rate[loaning_farmers]) ** loan_duration - 1)
)
# Add the amounts to the individual loan slots
self.set_loans_numba(
all_loans_annual_cost=self.var.all_loans_annual_cost.data,
loan_tracker=self.var.loan_tracker.data,
loaning_farmers=loaning_farmers,
annual_cost_loan=annual_cost_microcredit,
loan_duration=loan_duration,
loan_type=1,
)
# Add it to the loan total
self.var.all_loans_annual_cost[loaning_farmers, -1, 0] += (
annual_cost_microcredit
)
@staticmethod
@njit(cache=True)
def set_loans_numba(
all_loans_annual_cost: np.ndarray,
loan_tracker: np.ndarray,
loaning_farmers: np.ndarray,
annual_cost_loan: np.ndarray,
loan_duration: int,
loan_type: int,
) -> None:
farmers_getting_loan = np.where(loaning_farmers)[0]
# Update the agent's loans and total annual costs with the computed annual cost
# Make sure it is in an empty loan slot
for farmer in farmers_getting_loan:
for i in range(4):
if all_loans_annual_cost[farmer, loan_type, i] == 0:
local_index = np.where(farmers_getting_loan == farmer)[0][0]
all_loans_annual_cost[farmer, loan_type, i] += annual_cost_loan[
local_index
]
loan_tracker[farmer, loan_type, i] = loan_duration
break # Exit the loop after adding to the first zero value
[docs]
def plant(self) -> None:
"""Determines when and what crop should be planted, mainly through calling the :meth:`agents.farmers.Farmers.plant_numba`. Then converts the array to cupy array if model is running with GPU."""
if self.cultivation_costs[0] is None:
cultivation_cost = self.cultivation_costs[1]
else:
index = self.cultivation_costs[0].get(self.model.current_time)
cultivation_cost = self.cultivation_costs[1][index]
assert cultivation_cost.shape[0] == len(self.model.regions)
assert cultivation_cost.shape[1] == len(self.var.crop_ids)
# interest_rate = self.get_value_per_farmer_from_region_id(
# self.var.lending_rate, self.model.current_time
# )
interest_rate = np.full(self.n, 0.05, dtype=np.float32)
plant_map, farmers_selling_land = plant(
n=self.n,
day_index=self.model.current_time.timetuple().tm_yday - 1, # 0-indexed
crop_calendar=self.var.crop_calendar.data,
current_crop_calendar_rotation_year_index=self.var.current_crop_calendar_rotation_year_index.data,
crop_map=self.HRU.var.crop_map,
crop_harvest_age_days=self.HRU.var.crop_harvest_age_days,
cultivation_cost=cultivation_cost,
region_ids_per_farmer=self.var.region_id.data,
field_indices_by_farmer=self.var.field_indices_by_farmer.data,
field_indices=self.var.field_indices,
field_size_per_farmer=self.field_size_per_farmer.data,
all_loans_annual_cost=self.var.all_loans_annual_cost.data,
loan_tracker=self.var.loan_tracker.data,
interest_rate=interest_rate,
farmers_going_out_of_business=False,
)
if farmers_selling_land.size > 0:
self.remove_agents(farmers_selling_land)
self.HRU.var.crop_map = np.where(
plant_map >= 0, plant_map, self.HRU.var.crop_map
)
self.HRU.var.crop_age_days_map[plant_map >= 0] = 1
assert (self.HRU.var.crop_age_days_map[self.HRU.var.crop_map > 0] >= 0).all()
is_paddy_crop = np.isin(
self.HRU.var.crop_map,
self.var.crop_data[self.var.crop_data["is_paddy"]].index,
)
self.HRU.var.land_use_type[(self.HRU.var.crop_map >= 0) & is_paddy_crop] = (
PADDY_IRRIGATED
)
self.HRU.var.land_use_type[(self.HRU.var.crop_map >= 0) & (~is_paddy_crop)] = (
NON_PADDY_IRRIGATED
)
[docs]
def water_abstraction_sum(self) -> None:
"""
Aggregates yearly water abstraction from different sources (channel, reservoir, groundwater) for each farmer
and also computes the total abstraction per farmer.
Note:
This function performs the following steps:
1. Updates the yearly channel water abstraction for each farmer.
2. Updates the yearly reservoir water abstraction for each farmer.
3. Updates the yearly groundwater water abstraction for each farmer.
4. Computes and updates the total water abstraction for each farmer.
"""
# Update yearly channel water abstraction for each farmer
self.var.yearly_abstraction_m3_by_farmer[:, CHANNEL_IRRIGATION, 0] += (
self.var.channel_abstraction_m3_by_farmer
)
# Update yearly reservoir water abstraction for each farmer
self.var.yearly_abstraction_m3_by_farmer[:, RESERVOIR_IRRIGATION, 0] += (
self.var.reservoir_abstraction_m3_by_farmer
)
# Update yearly groundwater water abstraction for each farmer
self.var.yearly_abstraction_m3_by_farmer[:, GROUNDWATER_IRRIGATION, 0] += (
self.var.groundwater_abstraction_m3_by_farmer
)
# Compute and update the total water abstraction for each farmer
self.var.yearly_abstraction_m3_by_farmer[:, TOTAL_IRRIGATION, 0] += (
self.var.channel_abstraction_m3_by_farmer
+ self.var.reservoir_abstraction_m3_by_farmer
+ self.var.groundwater_abstraction_m3_by_farmer
)
[docs]
def save_harvest_spei(self, harvesting_farmers) -> None:
"""
Update the monthly Standardized Precipitation Evapotranspiration Index (SPEI) array by shifting past records and
adding the SPEI for the current month.
Note:
This method updates the `monthly_SPEI` attribute in place.
"""
current_SPEI_per_farmer = sample_from_map(
array=self.model.hydrology.grid.spei_uncompressed,
coords=self.var.locations[harvesting_farmers],
gt=self.grid.gt,
)
full_size_SPEI_per_farmer = np.zeros_like(
self.var.cumulative_SPEI_during_growing_season
)
full_size_SPEI_per_farmer[harvesting_farmers] = current_SPEI_per_farmer
cumulative_mean(
mean=self.var.cumulative_SPEI_during_growing_season,
counter=self.var.cumulative_SPEI_count_during_growing_season,
update=full_size_SPEI_per_farmer,
mask=harvesting_farmers,
)
print(
"season SPEI",
np.mean(full_size_SPEI_per_farmer[harvesting_farmers]),
)
def save_yearly_spei(self):
assert self.model.current_time.month == 1
# calculate the SPEI probability using GEV parameters
SPEI_probability = genextreme.sf(
self.var.cumulative_SPEI_during_growing_season,
self.var.GEV_parameters[:, 0],
self.var.GEV_parameters[:, 1],
self.var.GEV_parameters[:, 2],
)
# SPEI_probability_norm = norm.cdf(self.var.cumulative_SPEI_during_growing_season)
print("Yearly probability", np.mean(1 - SPEI_probability))
shift_and_update(self.var.yearly_SPEI_probability, (1 - SPEI_probability))
# Reset the cumulative SPEI array at the beginning of the year
self.var.cumulative_SPEI_during_growing_season.fill(0)
self.var.cumulative_SPEI_count_during_growing_season.fill(0)
[docs]
def save_yearly_profits(
self,
profit: np.ndarray,
potential_profit: np.ndarray,
) -> None:
"""
Saves the latest profit and potential profit values for harvesting farmers to determine yearly profits, considering inflation and field size.
Args:
harvesting_farmers: Array of farmers who are currently harvesting.
profit: Array representing the profit value for each farmer per season.
potential_profit: Array representing the potential profit value for each farmer per season.
Note:
This function performs the following operations:
1. Asserts that all profit and potential profit values are non-negative.
2. Updates the latest profits and potential profits matrices by shifting all columns one column further.
The last column value is dropped.
3. Adjusts the yearly profits by accounting for the latest profit, field size, and inflation.
"""
# Ensure that all profit and potential profit values are non-negative
assert (profit >= 0).all()
assert (potential_profit >= 0).all()
# Adjust yearly profits by the cumulative inflation for each harvesting farmer
self.var.yearly_profits[:, 0] += profit
self.var.yearly_potential_profits[:, 0] += potential_profit
def calculate_yield_spei_relation_test_solo(self):
import matplotlib
matplotlib.use("Agg") # Use the 'Agg' backend for non-interactive plotting
import matplotlib.pyplot as plt
# Number of agents
n_agents = self.var.yearly_yield_ratio.shape[0]
# Define regression models
def linear_model(X, a, b):
return a * X + b
def exponential_model(X, a, b):
return a * np.exp(b * X)
def logarithmic_model(X, a, b):
return a * np.log(X) + b
def quadratic_model(X, a, b, c):
return a * X**2 + b * X + c
def power_model(X, a, b):
return a * X**b
# Initialize dictionaries for coefficients and R² values
model_names = ["linear", "exponential", "logarithmic", "quadratic", "power"]
r_squared_dict = {model: np.zeros(n_agents) for model in model_names}
coefficients_dict = {model: [] for model in model_names}
# Create a folder to save the plots
output_folder = "plot/relation_test"
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# For each agent, perform regression with different models
for agent_idx in range(n_agents):
# Get data for the agent
y_data = self.var.yearly_yield_ratio[agent_idx, :] # shape (n_years,)
X_data = self.var.yearly_SPEI_probability[agent_idx, :] # shape (n_years,)
# Filter out invalid values
valid_mask = (
(~np.isnan(X_data)) & (~np.isnan(y_data)) & (X_data > 0) & (y_data != 0)
)
X_valid = X_data[valid_mask]
y_valid = y_data[valid_mask]
if len(X_valid) >= 2:
# Prepare data
X_log = np.log10(X_valid)
# Model 1: Linear in log-transformed X
try:
popt, _ = curve_fit(linear_model, X_log, y_valid, maxfev=10000)
a, b = popt
y_pred = linear_model(X_log, a, b)
ss_res = np.sum((y_valid - y_pred) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["linear"][agent_idx] = r_squared
coefficients_dict["linear"].append((a, b))
except RuntimeError:
r_squared_dict["linear"][agent_idx] = np.nan
coefficients_dict["linear"].append((np.nan, np.nan))
# Model 2: Exponential
try:
popt, _ = curve_fit(
exponential_model, X_valid, y_valid, maxfev=10000
)
y_pred = exponential_model(X_valid, *popt)
ss_res = np.sum((y_valid - y_pred) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["exponential"][agent_idx] = r_squared
coefficients_dict["exponential"].append(popt)
except RuntimeError:
r_squared_dict["exponential"][agent_idx] = np.nan
coefficients_dict["exponential"].append((np.nan, np.nan))
# Model 3: Logarithmic (ensure X > 0)
try:
popt, _ = curve_fit(
logarithmic_model, X_valid, y_valid, maxfev=10000
)
y_pred = logarithmic_model(X_valid, *popt)
ss_res = np.sum((y_valid - y_pred) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["logarithmic"][agent_idx] = r_squared
coefficients_dict["logarithmic"].append(popt)
except RuntimeError:
r_squared_dict["logarithmic"][agent_idx] = np.nan
coefficients_dict["logarithmic"].append((np.nan, np.nan))
# Model 4: Quadratic
try:
popt, _ = curve_fit(quadratic_model, X_valid, y_valid, maxfev=10000)
y_pred = quadratic_model(X_valid, *popt)
ss_res = np.sum((y_valid - y_pred) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["quadratic"][agent_idx] = r_squared
coefficients_dict["quadratic"].append(popt)
except RuntimeError:
r_squared_dict["quadratic"][agent_idx] = np.nan
coefficients_dict["quadratic"].append((np.nan, np.nan))
# Model 5: Power
try:
popt, _ = curve_fit(power_model, X_valid, y_valid, maxfev=10000)
y_pred = power_model(X_valid, *popt)
ss_res = np.sum((y_valid - y_pred) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["power"][agent_idx] = r_squared
coefficients_dict["power"].append(popt)
except RuntimeError:
r_squared_dict["power"][agent_idx] = np.nan
coefficients_dict["power"].append((np.nan, np.nan))
else:
# Not enough data points
for model in model_names:
r_squared_dict[model][agent_idx] = np.nan
coefficients_dict[model].append(None)
# Plotting code for this agent
# Create a new figure
plt.figure(figsize=(10, 6))
# Plot the data points
plt.scatter(X_valid, y_valid, label="Data", color="black")
# Generate x values for plotting fitted curves
x_min = np.min(X_valid)
x_max = np.max(X_valid)
x_plot = np.linspace(x_min, x_max, 100)
# Plot each fitted model with R² in the label
for model in model_names:
coeffs = coefficients_dict[model][agent_idx]
r_squared = r_squared_dict[model][agent_idx]
if (
coeffs is not None
and not any([np.isnan(c) for c in np.atleast_1d(coeffs)])
and not np.isnan(r_squared)
):
# Depending on the model, compute y values for plotting
if model == "linear":
a, b = coeffs
x_plot_log = np.log10(x_plot[x_plot > 0])
if len(x_plot_log) > 0:
y_plot = linear_model(x_plot_log, a, b)
plt.plot(
x_plot[x_plot > 0],
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "exponential":
y_plot = exponential_model(x_plot, *coeffs)
plt.plot(
x_plot,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "logarithmic":
x_plot_positive = x_plot[x_plot > 0]
if len(x_plot_positive) > 0:
y_plot = logarithmic_model(x_plot_positive, *coeffs)
plt.plot(
x_plot_positive,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "quadratic":
y_plot = quadratic_model(x_plot, *coeffs)
plt.plot(
x_plot,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "power":
x_plot_positive = x_plot[x_plot > 0]
if len(x_plot_positive) > 0:
y_plot = power_model(x_plot_positive, *coeffs)
plt.plot(
x_plot_positive,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
else:
continue # Skip models with invalid coefficients or R²
# Add labels and legend
plt.xlabel("SPEI Probability")
plt.ylabel("Yield Ratio")
plt.title(
f"Agent {agent_idx}, irr class {self.var.farmer_class[agent_idx]}, crop {self.var.crop_calendar[agent_idx, 0, 0]} "
)
plt.legend()
plt.grid(True)
# Save the plot to a file
filename = os.path.join(output_folder, f"agent_{agent_idx}.png")
plt.savefig(filename)
plt.close()
# Compute median R² for each model
for model in model_names:
valid_r2 = r_squared_dict[model][~np.isnan(r_squared_dict[model])]
median_r2 = np.median(valid_r2) if len(valid_r2) > 0 else np.nan
print(f"Median R² for {model}: {median_r2}")
def calculate_yield_spei_relation_test_group(self):
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import curve_fit
# Create unique groups based on agent properties
crop_elevation_group = self.create_unique_groups(10)
unique_crop_combinations, group_indices = np.unique(
crop_elevation_group, axis=0, return_inverse=True
)
# Mask out empty rows (agents) where data is zero or NaN
mask_agents = np.any(self.var.yearly_yield_ratio != 0, axis=1) & np.any(
self.var.yearly_SPEI_probability != 0, axis=1
)
# Apply the mask to data and group indices
masked_yearly_yield_ratio = self.var.yearly_yield_ratio[mask_agents, :]
masked_SPEI_probability = self.var.yearly_SPEI_probability[mask_agents, :]
group_indices = group_indices[mask_agents]
# Number of groups
n_groups = unique_crop_combinations.shape[0]
# Define regression models
def linear_model(X, a, b):
return a * X + b
def exponential_model(X, a, b):
return a * np.exp(b * X)
def logarithmic_model(X, a, b):
return a * np.log(X) + b
def quadratic_model(X, a, b, c):
return a * X**2 + b * X + c
def power_model(X, a, b):
return a * X**b
# Initialize dictionaries for coefficients and R² values
model_names = ["linear", "exponential", "logarithmic", "quadratic", "power"]
r_squared_dict = {model: np.full(n_groups, np.nan) for model in model_names}
coefficients_dict = {model: [None] * n_groups for model in model_names}
# Create a folder to save the plots
output_folder = "plots/relation_test"
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# For each group, perform regression with different models
for group_idx in range(n_groups):
# Get indices of agents in this group
agent_indices = np.where(group_indices == group_idx)[0]
if len(agent_indices) == 0:
# No data for this group
continue
# Get data for the group
y_data = masked_yearly_yield_ratio[
agent_indices, :
] # shape (num_agents_in_group, num_years)
X_data = masked_SPEI_probability[agent_indices, :] # same shape
# Remove values where SPEI probability is greater than 1
invalid_mask = X_data >= 1
y_data[invalid_mask] = np.nan
X_data[invalid_mask] = np.nan
# Compute mean over agents in the group (axis=0 corresponds to years)
y_group = np.nanmean(y_data, axis=0) # shape (num_years,)
X_group = np.nanmean(X_data, axis=0) # same shape
# Remove any years with NaN values
valid_indices = (~np.isnan(y_group)) & (~np.isnan(X_group)) & (X_group > 0)
y_group_valid = y_group[valid_indices]
X_group_valid = X_group[valid_indices]
if len(X_group_valid) >= 2:
# Prepare data
X_group_log = np.log10(X_group_valid)
# Model 1: Linear in log-transformed X
try:
popt, _ = curve_fit(
linear_model, X_group_log, y_group_valid, maxfev=10000
)
a, b = popt
y_pred = linear_model(X_group_log, a, b)
ss_res = np.sum((y_group_valid - y_pred) ** 2)
ss_tot = np.sum((y_group_valid - np.mean(y_group_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["linear"][group_idx] = r_squared
coefficients_dict["linear"][group_idx] = (a, b)
except (RuntimeError, ValueError):
pass # Keep NaN in R² and None in coefficients
# Model 2: Exponential
try:
popt, _ = curve_fit(
exponential_model, X_group_valid, y_group_valid, maxfev=10000
)
y_pred = exponential_model(X_group_valid, *popt)
ss_res = np.sum((y_group_valid - y_pred) ** 2)
ss_tot = np.sum((y_group_valid - np.mean(y_group_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["exponential"][group_idx] = r_squared
coefficients_dict["exponential"][group_idx] = popt
except (RuntimeError, ValueError):
pass
# Model 3: Logarithmic
try:
popt, _ = curve_fit(
logarithmic_model, X_group_valid, y_group_valid, maxfev=10000
)
y_pred = logarithmic_model(X_group_valid, *popt)
ss_res = np.sum((y_group_valid - y_pred) ** 2)
ss_tot = np.sum((y_group_valid - np.mean(y_group_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["logarithmic"][group_idx] = r_squared
coefficients_dict["logarithmic"][group_idx] = popt
except (RuntimeError, ValueError):
pass
# Model 4: Quadratic
try:
popt, _ = curve_fit(
quadratic_model, X_group_valid, y_group_valid, maxfev=10000
)
y_pred = quadratic_model(X_group_valid, *popt)
ss_res = np.sum((y_group_valid - y_pred) ** 2)
ss_tot = np.sum((y_group_valid - np.mean(y_group_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["quadratic"][group_idx] = r_squared
coefficients_dict["quadratic"][group_idx] = popt
except (RuntimeError, ValueError):
pass
# Model 5: Power
try:
popt, _ = curve_fit(
power_model, X_group_valid, y_group_valid, maxfev=10000
)
y_pred = power_model(X_group_valid, *popt)
ss_res = np.sum((y_group_valid - y_pred) ** 2)
ss_tot = np.sum((y_group_valid - np.mean(y_group_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
r_squared_dict["power"][group_idx] = r_squared
coefficients_dict["power"][group_idx] = popt
except (RuntimeError, ValueError):
pass
# Plotting code for this group
plt.figure(figsize=(10, 6))
plt.scatter(X_group_valid, y_group_valid, label="Data", color="black")
# Generate x values for plotting fitted curves
x_min = np.min(X_group_valid)
x_max = np.max(X_group_valid)
x_plot = np.linspace(x_min, x_max, 100)
for model in model_names:
coeffs = coefficients_dict[model][group_idx]
r_squared = r_squared_dict[model][group_idx]
if (
coeffs is not None
and not any([np.isnan(c) for c in np.atleast_1d(coeffs)])
and not np.isnan(r_squared)
):
if model == "linear":
a, b = coeffs
x_plot_positive = x_plot[x_plot > 0]
x_plot_log = np.log10(x_plot_positive)
if len(x_plot_log) > 0:
y_plot = linear_model(x_plot_log, a, b)
plt.plot(
x_plot_positive,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "exponential":
y_plot = exponential_model(x_plot, *coeffs)
plt.plot(
x_plot,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "logarithmic":
x_plot_positive = x_plot[x_plot > 0]
if len(x_plot_positive) > 0:
y_plot = logarithmic_model(x_plot_positive, *coeffs)
plt.plot(
x_plot_positive,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "quadratic":
y_plot = quadratic_model(x_plot, *coeffs)
plt.plot(
x_plot,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
elif model == "power":
x_plot_positive = x_plot[x_plot > 0]
if len(x_plot_positive) > 0:
y_plot = power_model(x_plot_positive, *coeffs)
plt.plot(
x_plot_positive,
y_plot,
label=f"{model} (R²={r_squared:.3f})",
linewidth=2,
)
# Add labels and legend
plt.xlabel("SPEI Probability")
plt.ylabel("Yield Ratio")
plt.title(f"Group {group_idx}")
plt.legend()
plt.grid(True)
# Save the plot to a file
filename = os.path.join(output_folder, f"group_{group_idx}.png")
plt.savefig(filename)
plt.close()
else:
# Not enough data points for this group
continue
# Compute median R² for each model across all groups
for model in model_names:
valid_r2 = r_squared_dict[model][~np.isnan(r_squared_dict[model])]
median_r2 = np.median(valid_r2) if len(valid_r2) > 0 else np.nan
print(f"Median R² for {model}: {median_r2}")
# Assign relations to agents based on their group
# Here, we'll choose the model with the highest median R²
# Alternatively, you can select the best model per group
# For simplicity, we'll assign the linear model coefficients to agents
# Example: Assign linear model coefficients to agents
a_array = np.full(len(group_indices), np.nan)
b_array = np.full(len(group_indices), np.nan)
for group_idx in range(n_groups):
if coefficients_dict["linear"][group_idx] is not None:
a, b = coefficients_dict["linear"][group_idx]
agent_mask = group_indices == group_idx
a_array[agent_mask] = a
b_array[agent_mask] = b
# Assign to agents
self.var.farmer_yield_probability_relation = np.column_stack((a_array, b_array))
# Print overall best-fitting model based on median R²
median_r2_values = {
model: np.nanmedian(r_squared_dict[model]) for model in model_names
}
best_model_overall = max(median_r2_values, key=median_r2_values.get)
print(f"Best-fitting model overall: {best_model_overall}")
def calculate_yield_spei_relation(self):
# Number of agents
n_agents = self.var.yearly_yield_ratio.shape[0]
# Initialize arrays for coefficients and R²
a_array = np.zeros(n_agents)
b_array = np.zeros(n_agents)
r_squared_array = np.zeros(n_agents)
# Loop over each agent
for agent_idx in range(n_agents):
# Get data for the agent
y_data = self.var.yearly_yield_ratio[agent_idx, :]
X_data = self.var.yearly_SPEI_probability[agent_idx, :]
# Log-transform X_data, handling zeros
with np.errstate(divide="ignore"):
X_data_log = np.log10(X_data)
# Mask out zeros and NaNs
valid_mask = (
(~np.isnan(y_data))
& (~np.isnan(X_data_log))
& (y_data != 0)
& (X_data != 0)
)
y_valid = y_data[valid_mask]
X_valid = X_data_log[valid_mask]
if len(X_valid) >= 2:
# Prepare matrices
X_matrix = np.vstack([X_valid, np.ones(len(X_valid))]).T
# Perform linear regression
coefficients = np.linalg.lstsq(X_matrix, y_valid, rcond=None)[0]
a, b = coefficients
# Calculate R²
y_pred = a * X_valid + b
ss_res = np.sum((y_valid - y_pred) ** 2)
ss_tot = np.sum((y_valid - np.mean(y_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
else:
# Not enough data points
a, b, r_squared = np.nan, np.nan, np.nan
a_array[agent_idx] = a
b_array[agent_idx] = b
r_squared_array[agent_idx] = r_squared
# Assign relations to agents
self.var.farmer_yield_probability_relation = np.column_stack((a_array, b_array))
# Print median R²
valid_r2 = r_squared_array[~np.isnan(r_squared_array)]
print("Median R²:", np.median(valid_r2) if len(valid_r2) > 0 else "N/A")
def calculate_yield_spei_relation_group(self):
# Create unique groups
group_indices, n_groups = self.create_unique_groups(
self.var.adaptations[:, WELL_ADAPTATION] > 0,
)
assert (np.any(self.var.yearly_SPEI_probability != 0, axis=1) > 0).all()
# Apply the mask to data
masked_yearly_yield_ratio = self.var.yearly_yield_ratio
masked_SPEI_probability = self.var.yearly_SPEI_probability
# Initialize arrays for coefficients and R²
a_array = np.zeros(n_groups)
b_array = np.zeros(n_groups)
r_squared_array = np.zeros(n_groups)
for group_idx in range(n_groups):
agent_indices = np.where(group_indices == group_idx)[0]
# Get data for the group
y_data = masked_yearly_yield_ratio[agent_indices, :]
X_data = masked_SPEI_probability[agent_indices, :]
# Remove invalid values where SPEI probability >= 1 or yield <= 0
mask = (X_data >= 1) | (y_data <= 0)
y_data[mask] = np.nan
X_data[mask] = np.nan
# Compute mean over agents (axis=0 corresponds to years)
y_group = np.nanmean(y_data, axis=0) # shape (num_years,)
X_group = np.nanmean(X_data, axis=0) # shape (num_years,)
# Remove any entries with NaN values
valid_indices = (~np.isnan(y_group)) & (~np.isnan(X_group)) & (y_group > 0)
y_group_valid = y_group[valid_indices]
X_group_valid = X_group[valid_indices]
if len(X_group_valid) >= 2:
# Take the natural logarithm of y_group_valid
ln_y_group = np.log(y_group_valid)
# Prepare matrices for linear regression
X_matrix = np.vstack([X_group_valid, np.ones(len(X_group_valid))]).T
# Perform linear regression on ln(y) = b * X + ln(a)
coefficients = np.linalg.lstsq(X_matrix, ln_y_group, rcond=None)[0]
b, ln_a = coefficients
a = np.exp(ln_a)
# Calculate predicted y values
y_pred = a * np.exp(b * X_group_valid)
# Calculate R²
ss_res = np.sum((y_group_valid - y_pred) ** 2)
ss_tot = np.sum((y_group_valid - np.mean(y_group_valid)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan
else:
# Not enough data points
a, b, r_squared = np.nan, np.nan, np.nan
a_array[group_idx] = a
b_array[group_idx] = b
r_squared_array[group_idx] = r_squared
# Assign relations to agents
farmer_yield_probability_relation = np.column_stack(
(a_array[group_indices], b_array[group_indices])
)
self.var.farmer_yield_probability_relation = farmer_yield_probability_relation
# Print median R²
weighted_r2 = r_squared_array[group_indices]
valid_r2 = weighted_r2[~np.isnan(weighted_r2)]
print(
"Median R² for exponential model:",
np.median(valid_r2) if len(valid_r2) > 0 else "N/A",
)
def adapt_crops(self) -> None:
# Fetch loan configuration
loan_duration = 2
index = self.cultivation_costs[0].get(self.model.current_time)
cultivation_cost = self.cultivation_costs[1][index]
cultivation_cost_current_crop = cultivation_cost[
self.var.region_id, self.var.crop_calendar[:, 0, 0]
]
annual_cost_empty = np.zeros(self.n, dtype=np.float32)
# No constraint
extra_constraint = np.ones_like(annual_cost_empty, dtype=bool)
# Set variable which indicates all possible crop options
unique_crop_calendars = np.unique(self.var.crop_calendar[:, :, 0], axis=0)
(
total_profits,
profits_no_event,
total_profits_adaptation,
profits_no_event_adaptation,
new_crop_nr,
new_farmer_id,
) = self.profits_SEUT_crops(unique_crop_calendars)
total_annual_costs_m2 = (
self.var.all_loans_annual_cost[:, -1, 0] / self.field_size_per_farmer
)
# Construct a dictionary of parameters to pass to the decision module functions
decision_params = {
"loan_duration": loan_duration,
"expenditure_cap": self.var.expenditure_cap,
"n_agents": self.n,
"sigma": self.var.risk_aversion.data,
"p_droughts": 1 / self.var.p_droughts[:-1],
"profits_no_event": profits_no_event,
"profits_no_event_adaptation": profits_no_event_adaptation,
"total_profits": total_profits,
"total_profits_adaptation": total_profits_adaptation,
"risk_perception": self.var.risk_perception.data,
"total_annual_costs": total_annual_costs_m2,
"adaptation_costs": annual_cost_empty,
"adapted": np.zeros(self.n, dtype=np.int32),
"time_adapted": np.full(self.n, 2),
"T": np.full(
self.n,
2,
),
"discount_rate": self.var.discount_rate.data,
"extra_constraint": extra_constraint,
}
# Determine the SEUT of the current crop
SEUT_do_nothing = self.decision_module.calcEU_do_nothing(**decision_params)
# Determine the SEUT of the other crop options
SEUT_crop_options = np.full(
(self.n, len(unique_crop_calendars)), 0, dtype=np.float32
)
for idx, crop_option in enumerate(unique_crop_calendars[:, 0]):
# Determine the cost difference between old and potential new crop
new_crop_nr_option = new_crop_nr[:, idx]
cultivation_cost_new_crop = cultivation_cost[
self.var.region_id, new_crop_nr_option
]
cost_difference_adaptation = (
cultivation_cost_new_crop - cultivation_cost_current_crop
)
decision_params_option = copy.deepcopy(decision_params)
decision_params_option.update(
{
"total_profits_adaptation": total_profits_adaptation[idx, :, :],
"profits_no_event_adaptation": profits_no_event_adaptation[idx, :],
"adaptation_costs": cost_difference_adaptation,
}
)
SEUT_crop_options[:, idx] = self.decision_module.calcEU_adapt(
**decision_params_option
)
assert np.any(SEUT_do_nothing != -1) or np.any(SEUT_crop_options != -1)
# Determine the best adaptation option
best_option_SEUT = np.max(SEUT_crop_options, axis=1)
chosen_option = np.argmax(SEUT_crop_options, axis=1)
# Determine the crop of the best option
row_indices = np.arange(new_crop_nr.shape[0])
new_crop_nr_temp = new_crop_nr[row_indices, chosen_option]
new_id_temp = new_farmer_id[row_indices, chosen_option]
# Determine for which agents it is beneficial to switch crops
SEUT_adaptation_decision = (
(best_option_SEUT > (SEUT_do_nothing)) & (new_id_temp != -1)
) # Filter out crops chosen due to small diff in do_nothing and adapt SEUT calculation
# Adjust the intention threshold based on whether neighbors already have similar crop
# Check for each farmer which crops their neighbors are cultivating
social_network_crops = self.var.crop_calendar[self.var.social_network, 0, 0]
# Check whether adapting agents have adaptation type in their network and create mask
network_has_crop = np.any(
social_network_crops == new_crop_nr_temp[:, None],
axis=1,
)
# Increase intention factor if someone in network has crop
intention_factor_adjusted = self.var.intention_factor.copy()
intention_factor_adjusted[network_has_crop] += 0.3
# Determine whether it passed the intention threshold
random_values = np.random.rand(*intention_factor_adjusted.shape)
intention_mask = random_values < intention_factor_adjusted
# Set the adaptation mask
SEUT_adaptation_decision = SEUT_adaptation_decision & intention_mask
new_crop_nr_final = new_crop_nr_temp[SEUT_adaptation_decision]
new_id_final = new_id_temp[SEUT_adaptation_decision]
print("Crop switching farmers", np.count_nonzero(SEUT_adaptation_decision))
assert not np.any(new_crop_nr_final == -1)
# Switch their crops and update their yield-SPEI relation
self.var.crop_calendar[SEUT_adaptation_decision, :, :] = self.var.crop_calendar[
new_id_final, :, :
]
# Update yield-SPEI relation
self.var.yearly_yield_ratio[SEUT_adaptation_decision, :] = (
self.var.yearly_yield_ratio[new_id_final, :]
)
self.var.yearly_SPEI_probability[SEUT_adaptation_decision, :] = (
self.var.yearly_SPEI_probability[new_id_final, :]
)
[docs]
def adapt_irrigation_well(
self, average_extraction_speed, energy_cost, water_cost
) -> None:
"""
Handle the adaptation of farmers to irrigation wells.
This function checks which farmers will adopt irrigation wells based on their expected utility
and the provided constraints. It calculates the costs and the benefits for each farmer and updates
their statuses (e.g., irrigation source, adaptation costs) accordingly.
Note:
TODO:
- Possibly externalize hard-coded values.
"""
groundwater_depth = self.groundwater_depth
groundwater_depth[groundwater_depth < 0] = 0
annual_cost, well_depth = self.calculate_well_costs_global(
groundwater_depth, average_extraction_speed
)
# Compute the total annual per square meter costs if farmers adapt during this cycle
# This cost is the cost if the farmer would adapt, plus its current costs of previous
# adaptations
total_annual_costs_m2 = (
annual_cost + self.var.all_loans_annual_cost[:, -1, 0]
) / self.field_size_per_farmer
# Solely the annual cost of the adaptation
annual_cost_m2 = annual_cost / self.field_size_per_farmer
loan_duration = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]["loan_duration"]
# Reset farmers' status and irrigation type who exceeded the lifespan of their adaptation
# and who's wells are much shallower than the groundwater depth
expired_adaptations = (
self.var.time_adapted[:, WELL_ADAPTATION] == self.var.lifespan_well
) | (groundwater_depth > self.var.well_depth)
self.var.adaptations[expired_adaptations, WELL_ADAPTATION] = -1
self.var.time_adapted[expired_adaptations, WELL_ADAPTATION] = -1
# Define extra constraints (farmers' wells must reach groundwater)
well_reaches_groundwater = self.var.well_depth > groundwater_depth
extra_constraint = well_reaches_groundwater
# To determine the benefit of irrigation, those who have a well are adapted
adapted = self.var.adaptations[:, WELL_ADAPTATION] > 0
(
energy_diff,
water_diff,
) = self.adaptation_water_cost_difference(adapted, energy_cost, water_cost)
(
total_profits,
profits_no_event,
total_profits_adaptation,
profits_no_event_adaptation,
) = self.profits_SEUT(adapted)
total_profits_adaptation = total_profits_adaptation + energy_diff + water_diff
profits_no_event_adaptation = (
profits_no_event_adaptation + energy_diff + water_diff
)
# Construct a dictionary of parameters to pass to the decision module functions
decision_params = {
"loan_duration": loan_duration,
"expenditure_cap": self.var.expenditure_cap,
"n_agents": self.n,
"sigma": self.var.risk_aversion.data,
"p_droughts": 1 / self.var.p_droughts[:-1],
"total_profits_adaptation": total_profits_adaptation,
"profits_no_event": profits_no_event,
"profits_no_event_adaptation": profits_no_event_adaptation,
"total_profits": total_profits,
"risk_perception": self.var.risk_perception.data,
"total_annual_costs": total_annual_costs_m2,
"adaptation_costs": annual_cost_m2,
"adapted": adapted,
"time_adapted": self.var.time_adapted[:, WELL_ADAPTATION],
"T": np.full(
self.n,
self.model.config["agent_settings"]["farmers"]["expected_utility"][
"adaptation_well"
]["decision_horizon"],
),
"discount_rate": self.var.discount_rate.data,
"extra_constraint": extra_constraint,
}
# Calculate the EU of not adapting and adapting respectively
SEUT_do_nothing = self.decision_module.calcEU_do_nothing(**decision_params)
SEUT_adapt = self.decision_module.calcEU_adapt(**decision_params)
assert (SEUT_do_nothing != -1).any() or (SEUT_adapt != -1).any()
SEUT_adaptation_decision = self.update_adaptation_decision(
adaptation_type=WELL_ADAPTATION,
adapted=adapted,
loan_duration=loan_duration,
annual_cost=annual_cost,
SEUT_do_nothing=SEUT_do_nothing,
SEUT_adapt=SEUT_adapt,
)
# Set their well depth
self.var.well_depth[SEUT_adaptation_decision] = well_depth[
SEUT_adaptation_decision
]
# Print the percentage of adapted households
percentage_adapted = round(
np.sum(self.var.adaptations[:, WELL_ADAPTATION] > 0) / self.n * 100,
2,
)
print("Irrigation well farms:", percentage_adapted, "(%)")
[docs]
def adapt_irrigation_efficiency(self, energy_cost, water_cost) -> None:
"""
Handle the adaptation of farmers to irrigation wells.
This function checks which farmers will adopt irrigation wells based on their expected utility
and the provided constraints. It calculates the costs and the benefits for each farmer and updates
their statuses (e.g., irrigation source, adaptation costs) accordingly.
Note:
TODO:
- Possibly externalize hard-coded values.
"""
# placeholder
m2_adaptation_costs = np.full(
self.n,
self.model.config["agent_settings"]["farmers"]["expected_utility"][
"adaptation_sprinkler"
]["m2_cost"],
)
loan_duration = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_sprinkler"]["loan_duration"]
# Placeholder
costs_irrigation_system = m2_adaptation_costs * self.field_size_per_farmer
# interest_rate = self.get_value_per_farmer_from_region_id(
# self.var.lending_rate, self.model.current_time
# )
interest_rate = 0.05
annual_cost = costs_irrigation_system * (
interest_rate
* (1 + interest_rate) ** loan_duration
/ ((1 + interest_rate) ** loan_duration - 1)
)
total_annual_costs_m2 = (
annual_cost + self.var.all_loans_annual_cost[:, -1, 0]
) / self.field_size_per_farmer
# Solely the annual cost of the adaptation
annual_cost_m2 = annual_cost / self.field_size_per_farmer
# Create mask for those who have access to irrigation water
has_irrigation_access = ~np.all(
self.var.yearly_abstraction_m3_by_farmer[:, TOTAL_IRRIGATION, :] == 0,
axis=1,
)
# Reset farmers' status and irrigation type who exceeded the lifespan of their adaptation
# or who's never had access to irrigation water
expired_adaptations = (
self.var.time_adapted[:, IRRIGATION_EFFICIENCY_ADAPTATION]
== self.var.lifespan_irrigation
) | has_irrigation_access
self.var.adaptations[expired_adaptations, IRRIGATION_EFFICIENCY_ADAPTATION] = -1
self.var.time_adapted[
expired_adaptations, IRRIGATION_EFFICIENCY_ADAPTATION
] = -1
# To determine the benefit of irrigation, those who have above 90% irrigation efficiency have adapted
adapted = self.var.adapted[:, IRRIGATION_EFFICIENCY_ADAPTATION] > 0
(
energy_diff,
water_diff,
) = self.adaptation_water_cost_difference(adapted, energy_cost, water_cost)
(
total_profits,
profits_no_event,
total_profits_adaptation,
profits_no_event_adaptation,
) = self.profits_SEUT(IRRIGATION_EFFICIENCY_ADAPTATION, adapted)
total_profits_adaptation = total_profits_adaptation + energy_diff + water_diff
profits_no_event_adaptation = (
profits_no_event_adaptation + energy_diff + water_diff
)
# Construct a dictionary of parameters to pass to the decision module functions
decision_params = {
"loan_duration": loan_duration,
"expenditure_cap": self.var.expenditure_cap,
"n_agents": self.n,
"sigma": self.var.risk_aversion.data,
"p_droughts": 1 / self.var.p_droughts[:-1],
"total_profits_adaptation": total_profits_adaptation,
"profits_no_event": profits_no_event,
"profits_no_event_adaptation": profits_no_event_adaptation,
"total_profits": total_profits,
"risk_perception": self.var.risk_perception.data,
"total_annual_costs": total_annual_costs_m2,
"adaptation_costs": annual_cost_m2,
"adapted": adapted,
"time_adapted": self.var.time_adapted[:, IRRIGATION_EFFICIENCY_ADAPTATION],
"T": np.full(
self.n,
self.model.config["agent_settings"]["farmers"]["expected_utility"][
"adaptation_sprinkler"
]["decision_horizon"],
),
"discount_rate": self.var.discount_rate.data,
"extra_constraint": has_irrigation_access,
}
# Calculate the EU of not adapting and adapting respectively
SEUT_do_nothing = self.decision_module.calcEU_do_nothing(**decision_params)
SEUT_adapt = self.decision_module.calcEU_adapt(**decision_params)
assert (SEUT_do_nothing != -1).any() or (SEUT_adapt != -1).any()
SEUT_adaptation_decision = self.update_adaptation_decision(
adaptation_type=IRRIGATION_EFFICIENCY_ADAPTATION,
adapted=adapted,
loan_duration=loan_duration,
annual_cost=annual_cost,
SEUT_do_nothing=SEUT_do_nothing,
SEUT_adapt=SEUT_adapt,
)
# Update irrigation efficiency for farmers who adapted
self.var.irrigation_efficiency[SEUT_adaptation_decision] = 0.9
# Print the percentage of adapted households
percentage_adapted = round(
np.sum(self.var.adapted[:, IRRIGATION_EFFICIENCY_ADAPTATION])
/ len(self.var.adapted[:, IRRIGATION_EFFICIENCY_ADAPTATION])
* 100,
2,
)
print("Irrigation efficient farms:", percentage_adapted, "(%)")
def adapt_irrigation_expansion(self, energy_cost, water_cost) -> None:
# Constants
adaptation_type = 3
loan_duration = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_sprinkler"]["loan_duration"]
# If the farmers have drip/sprinkler irrigation, they would also have additional costs of expanding that
# Costs are less than the initial expansion
adapted_irr_eff = np.where((self.var.adapted[:, 2] == 1), 1, 0)
total_costs = np.zeros(self.n, dtype=np.float32)
total_costs[adapted_irr_eff] = 2 * self.field_size_per_farmer * 0.5
# interest_rate = self.get_value_per_farmer_from_region_id(
# self.var.lending_rate, self.model.current_time
# )
interest_rate = 0.05
annual_cost = total_costs * (
interest_rate
* (1 + interest_rate) ** loan_duration
/ ((1 + interest_rate) ** loan_duration - 1)
)
# Farmers will have the same yearly water costs added if they expand
annual_cost += energy_cost
annual_cost += water_cost
# Will also have the input/labor costs doubled
annual_cost += np.sum(self.var.all_loans_annual_cost[:, 0, :], axis=1)
total_annual_costs_m2 = (
annual_cost + self.var.all_loans_annual_cost[:, -1, 0]
) / self.field_size_per_farmer
annual_cost_m2 = annual_cost / self.field_size_per_farmer
# Create mask for those who have access to irrigation water
has_irrigation_access = ~np.all(
self.var.yearly_abstraction_m3_by_farmer[:, TOTAL_IRRIGATION, :] == 0,
axis=1,
)
# Reset farmers' status and irrigation type who exceeded the lifespan of their adaptation
# or who's never had access to irrigation water
expired_adaptations = (
self.var.time_adapted[:, adaptation_type] == self.var.lifespan_irrigation
) | np.all(
self.var.yearly_abstraction_m3_by_farmer[:, TOTAL_IRRIGATION, :] == 0,
axis=1,
)
self.var.adapted[expired_adaptations, adaptation_type] = 0
self.var.time_adapted[expired_adaptations, adaptation_type] = -1
adapted = np.where((self.var.adapted[:, adaptation_type] == 1), 1, 0)
(
total_profits,
profits_no_event,
total_profits_adaptation,
profits_no_event_adaptation,
) = self.profits_SEUT(adaptation_type, adapted)
# Construct a dictionary of parameters to pass to the decision module functions
decision_params = {
"loan_duration": loan_duration,
"expenditure_cap": self.var.expenditure_cap,
"n_agents": self.n,
"sigma": self.var.risk_aversion.data,
"p_droughts": 1 / self.var.p_droughts[:-1],
"total_profits_adaptation": total_profits_adaptation,
"profits_no_event": profits_no_event,
"profits_no_event_adaptation": profits_no_event_adaptation,
"total_profits": total_profits,
"risk_perception": self.var.risk_perception.data,
"total_annual_costs": total_annual_costs_m2,
"adaptation_costs": annual_cost_m2,
"adapted": adapted,
"time_adapted": self.var.time_adapted[:, adaptation_type],
"T": np.full(
self.n,
self.model.config["agent_settings"]["farmers"]["expected_utility"][
"adaptation_sprinkler"
]["decision_horizon"],
),
"discount_rate": self.var.discount_rate.data,
"extra_constraint": has_irrigation_access,
}
# Calculate the EU of not adapting and adapting respectively
SEUT_do_nothing = self.decision_module.calcEU_do_nothing(**decision_params)
SEUT_adapt = self.decision_module.calcEU_adapt(**decision_params)
assert (SEUT_do_nothing != -1).any or (SEUT_adapt != -1).any()
SEUT_adaptation_decision = self.update_adaptation_decision(
adaptation_type=adaptation_type,
adapted=adapted,
loan_duration=loan_duration,
annual_cost=annual_cost,
SEUT_do_nothing=SEUT_do_nothing,
SEUT_adapt=SEUT_adapt,
)
# Update irrigation efficiency for farmers who adapted
self.var.fraction_irrigated_field[SEUT_adaptation_decision] = 1
# Print the percentage of adapted households
percentage_adapted = round(
np.sum(self.var.adapted[:, adaptation_type])
/ len(self.var.adapted[:, adaptation_type])
* 100,
2,
)
print("Irrigation expanded farms:", percentage_adapted, "(%)")
def update_adaptation_decision(
self,
adaptation_type,
adapted,
loan_duration,
annual_cost,
SEUT_do_nothing,
SEUT_adapt,
):
# Compare EU values for those who haven't adapted yet and get boolean results
SEUT_adaptation_decision = SEUT_adapt > SEUT_do_nothing
social_network_adaptation = adapted[self.var.social_network]
# Check whether adapting agents have adaptation type in their network and create mask
network_has_adaptation = np.any(social_network_adaptation == 1, axis=1)
# Increase intention factor if someone in network has crop
intention_factor_adjusted = self.var.intention_factor.copy()
intention_factor_adjusted[network_has_adaptation] += 0.3
# Determine whether it passed the intention threshold
random_values = np.random.rand(*intention_factor_adjusted.shape)
intention_mask = random_values < intention_factor_adjusted
SEUT_adaptation_decision = SEUT_adaptation_decision & intention_mask
# Update the adaptation status
self.var.adaptations[SEUT_adaptation_decision, adaptation_type] = 1
# Reset the timer for newly adapting farmers and update timers for others
self.var.time_adapted[SEUT_adaptation_decision, adaptation_type] = 0
self.var.time_adapted[
self.var.time_adapted[:, adaptation_type] != -1, adaptation_type
] += 1
# Update annual costs and disposable income for adapted farmers
self.var.all_loans_annual_cost[
SEUT_adaptation_decision, adaptation_type + 1, 0
] += annual_cost[SEUT_adaptation_decision] # For wells specifically
self.var.all_loans_annual_cost[SEUT_adaptation_decision, -1, 0] += annual_cost[
SEUT_adaptation_decision
] # Total loan amount
# set loan tracker
self.var.loan_tracker[SEUT_adaptation_decision, adaptation_type + 1, 0] += (
loan_duration
)
return SEUT_adaptation_decision
[docs]
def calculate_water_costs(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate the water and energy costs per agent and the average extraction speed.
This method computes the energy costs for agents using groundwater, the water costs for all agents
depending on their water source, and the average extraction speed per agent. It also updates the
loans and annual costs associated with water and energy use.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
- energy_costs (np.ndarray): Energy costs per agent (LCU/year).
- water_costs (np.ndarray): Water costs per agent (LCU/year).
- average_extraction_speed (np.ndarray): Average water extraction speed per agent (m³/s).
"""
# Get electricity costs per agent based on their region and current time
electricity_costs = np.full(
self.n,
self.get_value_per_farmer_from_region_id(
self.electricity_cost, self.model.current_time
),
dtype=np.float32,
)
# Initialize energy and water costs arrays
energy_costs = np.zeros(self.n, dtype=np.float32)
water_costs = np.zeros(self.n, dtype=np.float32)
# Compute total pump duration per agent (average over crops)
total_pump_duration = np.mean(self.var.total_crop_age, axis=1)
# Get groundwater depth per agent and ensure non-negative values
groundwater_depth = self.groundwater_depth.copy()
groundwater_depth[groundwater_depth < 0] = 0
# Create unique groups based on crop combinations and elevation
has_well = self.var.adaptations[:, WELL_ADAPTATION] > 0
group_indices, n_groups = self.create_unique_groups()
# Compute yearly water abstraction per m² per agent
yearly_abstraction_m3_per_m2 = (
self.var.yearly_abstraction_m3_by_farmer
/ self.field_size_per_farmer[..., None, None]
)
# Initialize array to store average extraction per agent
average_extraction_m2 = np.full(self.n, np.nan, dtype=np.float32)
# Loop over each unique crop group to compute average extraction
for group_idx in range(n_groups):
farmers_in_group = group_indices == group_idx
# Only select the agents with a well
farmers_in_group_with_well = np.where(farmers_in_group & has_well)
# Extract the abstraction values for the group, excluding zeros
extraction_values = yearly_abstraction_m3_per_m2[
farmers_in_group_with_well, :3, :
]
non_zero_extractions = extraction_values[extraction_values != 0]
# Compute average extraction for the group if there are non-zero values
if non_zero_extractions.size > 0:
average_extraction = np.mean(non_zero_extractions) # m³ per m² per year
else:
average_extraction = 0.0
# Store the average extraction for each group
average_extraction_m2[farmers_in_group] = average_extraction
# Compute average extraction per agent (m³/year)
average_extraction = average_extraction_m2 * self.field_size_per_farmer
# Compute average extraction speed per agent (m³/s)
average_extraction_speed = (
average_extraction / 365 / self.var.pump_hours / 3600
) # Convert from m³/year to m³/s
# Create boolean masks for different types of water sources
main_irrigation_sources = self.main_irrigation_source
mask_channel = main_irrigation_sources == CHANNEL_IRRIGATION
mask_reservoir = main_irrigation_sources == RESERVOIR_IRRIGATION
mask_groundwater = main_irrigation_sources == GROUNDWATER_IRRIGATION
# Compute power required for groundwater extraction per agent (kW)
power = (
self.var.specific_weight_water
* groundwater_depth[mask_groundwater]
* average_extraction_speed[mask_groundwater]
/ self.var.pump_efficiency
) / 1000 # Convert from W to kW
# Compute energy consumption per agent (kWh/year)
energy = power * (total_pump_duration[mask_groundwater] * self.var.pump_hours)
# Get energy cost rate per agent (LCU per kWh)
energy_cost_rate = electricity_costs[mask_groundwater]
# Compute energy costs per agent (LCU/year) for groundwater irrigating farmers
energy_costs[mask_groundwater] = energy * energy_cost_rate
# Compute water costs for agents using channel water (LCU/year)
water_costs[mask_channel] = (
average_extraction[mask_channel] * self.var.water_costs_m3_channel
)
# Compute water costs for agents using reservoir water (LCU/year)
water_costs[mask_reservoir] = (
average_extraction[mask_reservoir] * self.var.water_costs_m3_reservoir
)
# Compute water costs for agents using groundwater (LCU/year)
water_costs[mask_groundwater] = (
average_extraction[mask_groundwater] * self.var.water_costs_m3_groundwater
)
# Assume minimal interest rate as farmers pay directly
interest_rate_farmer = 0.0001 # Annual interest rate
loan_duration = 2 # Loan duration in years
# Compute annual cost of water and energy using annuity formula
# A = P * [r(1+r)^n] / [(1+r)^n -1], where P is principal, r is interest rate, n is loan duration
annuity_factor = (
interest_rate_farmer
* (1 + interest_rate_farmer) ** loan_duration
/ ((1 + interest_rate_farmer) ** loan_duration - 1)
)
annual_cost_water_energy = (water_costs + energy_costs) * annuity_factor
# Update loan records with the annual cost of water and energy
for i in range(4):
# Find the first available loan slot
if np.all(self.var.all_loans_annual_cost.data[:, 5, i] == 0):
self.var.all_loans_annual_cost.data[:, 5, i] = annual_cost_water_energy
self.var.loan_tracker[annual_cost_water_energy > 0, 5, i] = (
loan_duration
)
break
# Add the annual cost to the total loan annual costs
self.var.all_loans_annual_cost.data[:, -1, 0] += annual_cost_water_energy
return energy_costs, water_costs, average_extraction_speed
[docs]
def calculate_well_costs_global(
self, groundwater_depth: np.ndarray, average_extraction_speed: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculate the annual costs associated with well installation and operation globally.
This function computes the annual costs for installing wells, maintaining them, and the energy costs
associated with pumping groundwater for each agent (farmer). It takes into account regional variations
in costs and agent-specific parameters such as groundwater depth and extraction speed.
Parameters:
groundwater_depth (np.ndarray): Array of groundwater depths per agent (in meters).
average_extraction_speed (np.ndarray): Array of average water extraction speeds per agent (m³/s).
Returns:
Tuple[np.ndarray, np.ndarray]:
- annual_cost (np.ndarray): Annual cost per agent (local currency units per year).
- potential_well_length (np.ndarray): Potential well length per agent (in meters).
"""
# Retrieve aquifer-specific unit costs for well drilling per meter
well_cost_class_1 = self.get_value_per_farmer_from_region_id(
self.why_10, self.model.current_time
)
well_cost_class_2 = self.get_value_per_farmer_from_region_id(
self.why_20, self.model.current_time
)
well_cost_class_3 = self.get_value_per_farmer_from_region_id(
self.why_30, self.model.current_time
)
# Initialize the well unit cost array with zeros
well_unit_cost = np.zeros_like(self.var.why_class, dtype=np.float32)
# Assign unit costs to each agent based on their well class using boolean indexing
well_unit_cost[self.var.why_class == 1] = well_cost_class_1[
self.var.why_class == 1
]
well_unit_cost[self.var.why_class == 2] = well_cost_class_2[
self.var.why_class == 2
]
well_unit_cost[self.var.why_class == 3] = well_cost_class_3[
self.var.why_class == 3
]
# Get electricity costs per agent based on their region and current time
electricity_costs = self.get_value_per_farmer_from_region_id(
self.electricity_cost, self.model.current_time
)
# Calculate potential well length per agent
# Potential well length is the sum of the maximum initial saturated thickness and the groundwater depth
potential_well_length = self.var.max_initial_sat_thickness + groundwater_depth
# Calculate the installation cost per agent (cost per meter * potential well length)
install_cost = well_unit_cost * potential_well_length
# Calculate maintenance cost per agent (as a fraction of the installation cost)
maintenance_cost = self.var.maintenance_factor * install_cost
# Calculate the total pump duration per agent (average over crops)
total_pump_duration = np.mean(self.var.total_crop_age, axis=1) # days
# Calculate the power required per agent for pumping groundwater (in kilowatts)
# specific_weight_water (N/m³), groundwater_depth (m), average_extraction_speed (m³/s), pump_efficiency (%)
power = (
self.var.specific_weight_water
* groundwater_depth
* average_extraction_speed
/ self.var.pump_efficiency
) / 1000 # Convert from watts to kilowatts
# Calculate the energy consumption per agent (in kilowatt-hours)
# power (kW), total_pump_duration (days), pump_hours (hours per day)
energy = power * (total_pump_duration * self.var.pump_hours)
# Get energy cost rate per agent (local currency units per kilowatt-hour)
energy_cost_rate = electricity_costs
# Calculate the energy cost per agent (local currency units per year)
energy_cost = energy * energy_cost_rate
# Fetch loan configuration for well installation
loan_config = self.model.config["agent_settings"]["farmers"][
"expected_utility"
]["adaptation_well"]
loan_duration = loan_config["loan_duration"] # Loan duration in years
# Calculate annuity factor for loan repayment using the annuity formula
# A = P * [r(1+r)^n] / [(1+r)^n -1], where:
# A = annual payment, P = principal amount (install_cost), r = interest rate, n = loan duration
interest_rate = 0.05
n = loan_duration
annuity_factor = (interest_rate * (1 + interest_rate) ** n) / (
(1 + interest_rate) ** n - 1
)
# Calculate the annual cost per agent (local currency units per year)
annual_cost = (install_cost * annuity_factor) + energy_cost + maintenance_cost
return annual_cost, potential_well_length
[docs]
def profits_SEUT(
self, adapted: np.ndarray = None
) -> Union[
Tuple[np.ndarray, np.ndarray],
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
]:
"""
Calculate total profits under different drought probability scenarios, with and without adaptation measures
for adaptation types 0 and 1.
Parameters:
adaptation_type (int): The type of adaptation to consider.
- 0: No adaptation.
- 1: Adaptation Type 1 (e.g., installing wells).
adapted (np.ndarray, optional): An array indicating which agents have adapted (relevant for adaptation_type == 1).
Returns:
Depending on adaptation_type, returns a tuple containing total profits and profits under 'no drought' scenario,
possibly including adaptation profits.
"""
# Main function logic
yield_ratios = self.convert_probability_to_yield_ratio()
# Create a mask for valid crops (exclude non-crop values)
crops_mask = (self.var.crop_calendar[:, :, 0] >= 0) & (
self.var.crop_calendar[:, :, 0]
< len(self.var.crop_data["reference_yield_kg_m2"])
)
# Create an array filled with NaNs for reference data
nan_array = np.full_like(
self.var.crop_calendar[:, :, 0], fill_value=np.nan, dtype=float
)
# Compute profits without adaptation
total_profits = self.compute_total_profits(yield_ratios, crops_mask, nan_array)
total_profits, profits_no_event = self.format_results(total_profits)
gains_adaptation = self.adaptation_yield_ratio_difference(adapted, yield_ratios)
# Clip yield ratios to physical boundaries of 0 and 1
yield_ratios_adaptation = np.clip(yield_ratios + gains_adaptation, 0, 1)
# Compute profits with adaptation
total_profits_adaptation = self.compute_total_profits(
yield_ratios_adaptation, crops_mask, nan_array
)
total_profits_adaptation, profits_no_event_adaptation = self.format_results(
total_profits_adaptation
)
return (
total_profits,
profits_no_event,
total_profits_adaptation,
profits_no_event_adaptation,
)
[docs]
def profits_SEUT_crops(
self, unique_crop_calendars
) -> Tuple[
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
]:
"""
Calculate total profits under different drought probability scenarios with crop adaptation measures (Adaptation Type 2).
Returns:
Tuple containing profits without adaptation, profits with adaptation options, and additional data.
"""
# Main function logic
yield_ratios = self.convert_probability_to_yield_ratio()
# Create a mask for valid crops (exclude non-crop values)
crops_mask = (self.var.crop_calendar[:, :, 0] >= 0) & (
self.var.crop_calendar[:, :, 0]
< len(self.var.crop_data["reference_yield_kg_m2"])
)
# Create an array filled with NaNs for reference data
nan_array = np.full_like(
self.var.crop_calendar[:, :, 0], fill_value=np.nan, dtype=float
)
# Compute profits without adaptation
total_profits = self.compute_total_profits(yield_ratios, crops_mask, nan_array)
profit_events, profits_no_event = self.format_results(total_profits)
crop_elevation_group = self.create_unique_groups()
unique_crop_groups, group_indices = np.unique(
crop_elevation_group, axis=0, return_inverse=True
)
# Calculate the yield gains for crop switching for different farmers
(
profit_gains,
new_crop_nr,
new_farmer_id,
) = crop_profit_difference_njit(
yield_ratios=total_profits,
crop_elevation_group=crop_elevation_group,
unique_crop_groups=unique_crop_groups,
group_indices=group_indices,
crop_calendar=self.var.crop_calendar.data,
unique_crop_calendars=unique_crop_calendars,
p_droughts=self.var.p_droughts,
)
total_profits_adaptation = np.full(
(len(unique_crop_calendars), len(self.var.p_droughts[:-1]), self.n),
0.0,
dtype=np.float32,
)
profits_no_event_adaptation = np.full(
(len(unique_crop_calendars), self.n),
0.0,
dtype=np.float32,
)
for crop_option in range(len(unique_crop_calendars)):
profit_gains_option = profit_gains[:, crop_option, :]
profits_adaptation_option = total_profits + profit_gains_option
(
total_profits_adaptation_option,
profits_no_event_adaptation_option,
) = self.format_results(profits_adaptation_option)
total_profits_adaptation[crop_option, :, :] = (
total_profits_adaptation_option
)
profits_no_event_adaptation[crop_option, :] = (
profits_no_event_adaptation_option
)
return (
profit_events,
profits_no_event,
total_profits_adaptation,
profits_no_event_adaptation,
new_crop_nr,
new_farmer_id,
)
[docs]
def compute_total_profits(
self, yield_ratios: np.ndarray, crops_mask: np.ndarray, nan_array: np.ndarray
) -> np.ndarray:
"""
Compute total profits for all agents across different drought scenarios.
Parameters:
yield_ratios (np.ndarray): Yield ratios for agents under different drought scenarios.
crops_mask (np.ndarray): Mask indicating valid crop entries.
nan_array (np.ndarray): Array filled with NaNs for reference.
Returns:
np.ndarray: Total profits for agents under each drought scenario.
"""
total_profits = np.zeros((self.n, yield_ratios.shape[1]))
for col in range(yield_ratios.shape[1]):
total_profits[:, col] = self.yield_ratio_to_profit(
yield_ratios[:, col], crops_mask, nan_array
)
return total_profits
[docs]
def convert_probability_to_yield_ratio(self) -> np.ndarray:
"""
Convert drought probabilities to yield ratios based on the given polynomial relationship.
For each farmer's yield-probability relationship (represented as a polynomial),
this function calculates the inverse of the relationship and then applies the
inverted polynomial to a set of given probabilities to obtain yield ratios.
The resulting yield ratios are then adjusted to lie between 0 and 1. The final
results are stored in `self.var.yield_ratios_drought_event`.
Note:
- It assumes that the polynomial relationship is invertible.
- Adjusts yield ratios to be non-negative and capped at 1.0.
"""
def logarithmic_function(probability, params):
a = params[:, 0]
b = params[:, 1]
x = probability[:, np.newaxis]
return a * np.log10(x) + b
def exponential_function(probability, params):
# Extract parameters
a = params[:, 0] # Shape: (num_agents,)
b = params[:, 1] # Shape: (num_agents,)
x = probability # Shape: (num_events,)
# Reshape arrays for broadcasting
a = a[:, np.newaxis] # Shape: (num_agents, 1)
b = b[:, np.newaxis] # Shape: (num_agents, 1)
x = x[np.newaxis, :] # Shape: (1, num_events)
# Compute the exponential function
return a * np.exp(b * x) # Shape: (num_agents, num_events)
yield_ratios = exponential_function(
1 / self.var.p_droughts, self.var.farmer_yield_probability_relation
)
# Adjust the yield ratios to lie between 0 and 1
yield_ratios = np.clip(yield_ratios, 0, 1)
# Store the results in a global variable
self.var.yield_ratios_drought_event = yield_ratios[:]
return self.var.yield_ratios_drought_event
[docs]
def create_unique_groups(self, *additional_diffentiators):
"""
Create unique groups based on elevation data and merge with crop calendar.
Parameters:
N (int): Number of groups to divide the elevation data into.
Returns:
numpy.ndarray: Merged array with crop calendar and elevation distribution groups.
"""
if additional_diffentiators:
agent_classes = np.stack(
[self.var.farmer_base_class.data, *additional_diffentiators], axis=1
)
else:
agent_classes = self.var.farmer_base_class
groups, group_indices = np.unique(agent_classes, axis=0, return_inverse=True)
return group_indices, groups.shape[0]
[docs]
def adaptation_yield_ratio_difference(
self, adapted: np.ndarray, yield_ratios
) -> np.ndarray:
"""
Calculate the relative yield ratio improvement for farmers adopting a certain adaptation.
This function determines how much better farmers that have adopted a particular adaptation
are doing in terms of their yield ratio as compared to those who haven't.
Args:
adaptation_type: The type of adaptation being considered.
Returns:
An array representing the relative yield ratio improvement for each agent.
TO DO: vectorize
"""
group_indices, n_groups = self.create_unique_groups()
# Initialize array to store relative yield ratio improvement for unique groups
gains_adaptation = np.zeros(
(self.n, self.var.p_droughts.size),
dtype=np.float32,
)
# Loop over each unique group of farmers to determine their average yield ratio
for group_idx in range(n_groups):
# Agents in the current group
group_members = group_indices == group_idx
# Split agents into adapted and unadapted within the group
unadapted_agents = group_members & (~adapted)
adapted_agents = group_members & adapted
if unadapted_agents.any() and adapted_agents.any():
# Calculate mean yield ratio over past years for both adapted and unadapted groups
unadapted_yield_ratio = np.mean(
yield_ratios[unadapted_agents, :], axis=0
)
adapted_yield_ratio = np.mean(yield_ratios[adapted_agents, :], axis=0)
yield_ratio_gain = adapted_yield_ratio - unadapted_yield_ratio
gains_adaptation[group_members, :] = yield_ratio_gain
assert np.max(gains_adaptation) != np.inf, "gains adaptation value is inf"
return gains_adaptation
[docs]
def adaptation_water_cost_difference(
self, adapted: np.ndarray, energy_cost, water_cost
) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculate the relative yield ratio improvement for farmers adopting a certain adaptation.
This function determines how much better farmers that have adopted a particular adaptation
are doing in terms of their yield ratio as compared to those who haven't.
Args:
adapted (np.ndarray): Array indicating adaptation status (0 or 1) for each agent.
energy_cost (np.ndarray): Array of energy costs for each agent.
water_cost (np.ndarray): Array of water costs for each agent.
Returns:
Tuple[np.ndarray, np.ndarray]: Arrays representing the relative energy cost and water cost improvements for each agent.
"""
assert adapted.dtype == bool, "adapted should be a boolean array"
# Create unique groups based on elevation data
group_indices, n_groups = self.create_unique_groups()
# Initialize arrays to store gains per group
unique_water_cost_gain = np.zeros(n_groups, dtype=np.float32)
unique_energy_cost_gain = np.zeros(n_groups, dtype=np.float32)
# For each group, compute gains
for group_idx in range(n_groups):
# Agents in the current group
group_members = group_indices == group_idx
# Split agents into adapted and unadapted within the group
unadapted_agents = group_members & (~adapted)
adapted_agents = group_members & adapted
# Check if both adapted and unadapted agents are present
if np.any(unadapted_agents) and np.any(adapted_agents):
# Calculate mean water and energy costs for unadapted agents
unadapted_water_cost = np.mean(water_cost[unadapted_agents], axis=0)
unadapted_energy_cost = np.mean(energy_cost[unadapted_agents], axis=0)
# Calculate mean water and energy costs for adapted agents
adapted_water_cost = np.mean(water_cost[adapted_agents], axis=0)
adapted_energy_cost = np.mean(energy_cost[adapted_agents], axis=0)
# Calculate gains
water_cost_gain = adapted_water_cost - unadapted_water_cost
energy_cost_gain = adapted_energy_cost - unadapted_energy_cost
# Store gains for the group
unique_water_cost_gain[group_idx] = water_cost_gain
unique_energy_cost_gain[group_idx] = energy_cost_gain
else:
# If not enough data, set gains to zero or np.nan
unique_water_cost_gain[group_idx] = 0 # or np.nan
unique_energy_cost_gain[group_idx] = 0 # or np.nan
# Map gains back to agents using group indices
water_cost_adaptation_gain = unique_water_cost_gain[group_indices]
energy_cost_adaptation_gain = unique_energy_cost_gain[group_indices]
return energy_cost_adaptation_gain, water_cost_adaptation_gain
[docs]
def yield_ratio_to_profit(
self, yield_ratios: np.ndarray, crops_mask: np.ndarray, nan_array: np.ndarray
) -> np.ndarray:
"""
Convert yield ratios to monetary profit values.
This function computes the profit values for each crop based on given yield ratios.
The profit is calculated by multiplying the crop yield in kilograms per sqr. meter with
the average crop price. The function leverages various data inputs, such as current crop
prices and reference yields.
Args:
yield_ratios: The array of yield ratios for the crops.
crops_mask: A mask that denotes valid crops, based on certain conditions.
array_with_reference: An array initialized with NaNs, later used to store reference yields and crop prices.
Returns:
An array representing the profit values for each crop based on the given yield ratios.
Note:
- It assumes that the crop prices are non-NaN for the current model time.
- The function asserts that crop yields in kg are always non-negative.
TODO: Take the average crop prices over the last x years.
"""
# Create blank arrays with only nans
array_with_reference_yield = nan_array.copy()
array_with_price = nan_array.copy()
# Check if prices are monthly or yearly
price_frequency = self.model.config["agent_settings"]["market"][
"price_frequency"
]
if price_frequency == "monthly":
total_price = 0
month_count = 0
# Ending date and start date set to one year prior
end_date = self.model.current_time
start_date = datetime(end_date.year - 1, 1, 1)
# Loop through each month from start_date to end_date to get the sum of crop costs over the past year
current_date = start_date
while current_date <= end_date:
assert self.crop_prices[0] is not None, (
"behavior needs crop prices to work"
)
monthly_price = self.crop_prices[1][
self.crop_prices[0].get(current_date)
]
total_price += monthly_price
# Move to the next month
if current_date.month == 12:
current_date = datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime(
current_date.year, current_date.month + 1, 1
)
month_count += 1
# Calculate the average price over the last year
crop_prices = total_price / month_count
else:
crop_prices = self.agents.market.crop_prices[self.var.region_id]
# Assign the reference yield and current crop price to the array based on valid crop mask
array_with_price[crops_mask] = np.take(
crop_prices, self.var.crop_calendar[:, :, 0][crops_mask].astype(int)
)
assert not np.isnan(
array_with_price[crops_mask]
).any() # Ensure there are no NaN values in crop prices
array_with_reference_yield[crops_mask] = np.take(
self.var.crop_data["reference_yield_kg_m2"].values,
self.var.crop_calendar[:, :, 0][crops_mask].astype(int),
)
# Calculate the product of the average reference yield and average crop price ignoring NaN values
reference_profit_m2 = np.nansum(
array_with_reference_yield * array_with_price, axis=1
)
assert (
reference_profit_m2 >= 0
).all() # Ensure all crop yields are non-negative
# Calculate profit by multiplying yield with price
profit_m2 = yield_ratios * reference_profit_m2
return profit_m2
def update_loans(self) -> None:
# Subtract 1 off each loan duration, except if that loan is at 0
self.var.loan_tracker -= self.var.loan_tracker != 0
# If the loan tracker is at 0, cancel the loan amount and subtract it of the total
expired_loan_mask = self.var.loan_tracker == 0
# Add a column to make it the same shape as the loan amount array
new_column = np.full((self.n, 1, 5), False)
expired_loan_mask = np.column_stack((expired_loan_mask, new_column))
# Sum the expired loan amounts
ending_loans = expired_loan_mask * self.var.all_loans_annual_cost
total_loan_reduction = np.sum(ending_loans, axis=(1, 2))
# Subtract it from the total loans and set expired loans to 0
self.var.all_loans_annual_cost[:, -1, 0] -= total_loan_reduction
self.var.all_loans_annual_cost[expired_loan_mask] = 0
# Adjust for inflation in separate array for export
# Calculate the cumulative inflation from the start year to the current year for each farmer
cumulative_inflation = np.prod(
[
self.get_value_per_farmer_from_region_id(
self.inflation_rate, datetime(year, 1, 1)
)
for year in range(
self.model.config["general"]["spinup_time"].year,
self.model.current_time.year + 1,
)
],
axis=0,
)
self.var.adjusted_annual_loan_cost = (
self.var.all_loans_annual_cost / cumulative_inflation[..., None, None]
)
def get_value_per_farmer_from_region_id(
self, data, time, subset=None
) -> np.ndarray:
index = data[0].get(time)
if subset is not None:
region_id = self.var.region_id[subset]
else:
region_id = self.var.region_id
unique_region_ids, inv = np.unique(region_id, return_inverse=True)
values = np.full_like(unique_region_ids, np.nan, dtype=np.float32)
for i, region_id in enumerate(unique_region_ids):
values[i] = data[1][region_id][index]
return values[inv]
[docs]
@staticmethod
@njit(cache=True)
def field_size_per_farmer_numba(
field_indices_by_farmer: np.ndarray,
field_indices: np.ndarray,
cell_area: np.ndarray,
) -> np.ndarray:
"""Gets the field size for each farmer.
Args:
field_indices_by_farmer: This array contains the indices where the fields of a farmer are stored in `field_indices`.
field_indices: This array contains the indices of all fields, ordered by farmer. In other words, if a farmer owns multiple fields, the indices of the fields are indices.
cell_area: Subarray of cell_area.
Returns:
field_size_per_farmer: Field size for each farmer in m2.
"""
field_size_per_farmer = np.zeros(
field_indices_by_farmer.shape[0], dtype=np.float32
)
for farmer in range(field_indices_by_farmer.shape[0]):
for field in get_farmer_HRUs(
field_indices, field_indices_by_farmer, farmer
):
field_size_per_farmer[farmer] += cell_area[field]
return field_size_per_farmer
@property
def field_size_per_farmer(self) -> np.ndarray:
"""Gets the field size for each farmer.
Returns:
field_size_per_farmer: Field size for each farmer in m2.
"""
return self.field_size_per_farmer_numba(
self.var.field_indices_by_farmer.data,
self.var.field_indices,
self.HRU.var.cell_area,
)
@property
def is_irrigated(self):
return (
self.var.adaptations[:, [SURFACE_IRRIGATION_EQUIPMENT, WELL_ADAPTATION]] > 0
).any(axis=1)
@property
def irrigated_fields(self) -> np.ndarray:
"""Gets the indices of fields that are irrigated.
Returns:
irrigated_fields: Indices of fields that are irrigated.
"""
irrigated_fields = np.take(
self.is_irrigated,
self.HRU.var.land_owners,
)
irrigated_fields[self.HRU.var.land_owners == -1] = False
return irrigated_fields
@property
def groundwater_depth(self):
groundwater_depth = get_farmer_groundwater_depth(
self.n,
self.model.hydrology.groundwater.groundwater_depth,
self.HRU.var.HRU_to_grid,
self.var.field_indices,
self.var.field_indices_by_farmer.data,
self.HRU.var.cell_area,
)
assert not np.isnan(groundwater_depth).any(), "groundwater depth is nan"
return groundwater_depth
def create_farmer_classes(self, *characteristics):
agent_classes = np.unique(
np.stack(characteristics), axis=1, return_inverse=True
)[1]
return agent_classes
@property
def main_irrigation_source(self):
# Set to 0 if channel abstraction is bigger than reservoir and groundwater, 1 for reservoir, 2 for groundwater and 3 no abstraction
main_irrigation_source = np.argmax(
self.var.yearly_abstraction_m3_by_farmer[:, :TOTAL_IRRIGATION, 0],
axis=1,
)
# Set to -1 for precipitation if there is no abstraction
main_irrigation_source[
self.var.yearly_abstraction_m3_by_farmer[:, :TOTAL_IRRIGATION, 0].sum(
axis=1
)
== 0
] = NO_IRRIGATION
return main_irrigation_source
[docs]
def step(self) -> None:
"""
This function is called at the beginning of each timestep.
Then, farmers harvest and plant crops.
"""
if not self.model.simulate_hydrology:
return
self.harvest()
self.plant()
self.water_abstraction_sum()
## yearly actions
if self.model.current_time.month == 1 and self.model.current_time.day == 1:
if self.model.current_time.year - 1 > self.model.spinup_start.year:
# reset the irrigation limit, but only if a full year has passed already. Otherwise
# the cumulative water deficit is not year completed.
self.var.remaining_irrigation_limit_m3[:] = 0
# Set yearly yield ratio based on the difference between saved actual and potential profit
self.var.yearly_yield_ratio = (
self.var.yearly_profits / self.var.yearly_potential_profits
)
self.save_yearly_spei()
# reset the irrigation limit, but only if a full year has passed already. Otherwise
# the cumulative water deficit is not year completed.
if self.model.current_time.year - 1 > self.model.spinup_start.year:
self.var.remaining_irrigation_limit_m3[:] = (
self.var.irrigation_limit_m3[:]
)
# Create a DataFrame with command area and elevation
df = pd.DataFrame(
{
"command_area": self.farmer_command_area,
"elevation": self.var.elevation,
}
)
# Compute group-specific median elevation
df["group_median"] = df.groupby("command_area")["elevation"].transform(
"median"
)
# Determine lower or higher part and assign distinct ids
up_or_downstream = np.where(df["elevation"] <= df["group_median"], 0, 1)
# create a unique index for each type of crop calendar that a farmer follows
crop_calendar_group = np.unique(
self.var.crop_calendar[:, :, 0], axis=0, return_inverse=True
)[1]
self.var.farmer_base_class[:] = self.create_farmer_classes(
self.farmer_command_area,
up_or_downstream,
crop_calendar_group,
)
print(
"well",
np.mean(
self.var.yearly_yield_ratio[
self.main_irrigation_source == GROUNDWATER_IRRIGATION, 1
]
),
"no irrigation",
np.mean(
self.var.yearly_yield_ratio[
self.main_irrigation_source == NO_IRRIGATION, 1
]
),
"total_mean",
np.mean(self.var.yearly_yield_ratio[:, 1]),
)
timer = TimingModule("crop_farmers")
energy_cost, water_cost, average_extraction_speed = (
self.calculate_water_costs()
)
timer.new_split("water & energy costs")
if (
not self.model.in_spinup
and "ruleset" in self.config
and not self.config["ruleset"] == "no-adaptation"
):
# Determine the relation between drought probability and yield
# self.calculate_yield_spei_relation()
self.calculate_yield_spei_relation_group()
# self.calculate_yield_spei_relation_test_group()
timer.new_split("yield-spei relation")
# These adaptations can only be done if there is a yield-probability relation
if not np.all(self.var.farmer_yield_probability_relation == 0):
if (
not self.config["expected_utility"]["adaptation_well"][
"ruleset"
]
== "no-adaptation"
):
self.adapt_irrigation_well(
average_extraction_speed, energy_cost, water_cost
)
timer.new_split("irr well")
if (
not self.config["expected_utility"]["adaptation_sprinkler"][
"ruleset"
]
== "no-adaptation"
):
self.adapt_irrigation_efficiency(energy_cost, water_cost)
timer.new_split("irr efficiency")
if (
not self.config["expected_utility"]["crop_switching"]["ruleset"]
== "no-adaptation"
):
self.adapt_crops()
timer.new_split("adapt crops")
# self.switch_crops_neighbors()
else:
raise AssertionError(
"Cannot adapt without yield - probability relation"
)
print(timer)
advance_crop_rotation_year(
current_crop_calendar_rotation_year_index=self.var.current_crop_calendar_rotation_year_index,
crop_calendar_rotation_years=self.var.crop_calendar_rotation_years,
)
# Update loans
self.update_loans()
# Reset total crop age
shift_and_update(self.var.total_crop_age, self.var.total_crop_age[:, 0])
for i in range(len(self.var.yearly_abstraction_m3_by_farmer[0, :, 0])):
shift_and_reset_matrix(
self.var.yearly_abstraction_m3_by_farmer[:, i, :]
)
# Shift the potential and yearly profits forward
shift_and_reset_matrix(self.var.yearly_profits)
shift_and_reset_matrix(self.var.yearly_potential_profits)
# if self.model.current_timestep == 100:
# self.add_agent(indices=(np.array([310, 309]), np.array([69, 69])))
# if self.model.current_timestep == 105:
# self.remove_agent(farmer_idx=1000)
self.report(self, locals())
def remove_agents(
self, farmer_indices: list[int], land_use_type: int
) -> np.ndarray:
farmer_indices = np.array(farmer_indices)
if farmer_indices.size > 0:
farmer_indices = np.sort(farmer_indices)[::-1]
HRUs_with_removed_farmers = []
for idx in farmer_indices:
HRUs_with_removed_farmers.append(self.remove_agent(idx, land_use_type))
return np.concatenate(HRUs_with_removed_farmers)
def remove_agent(self, farmer_idx: int, land_use_type: int) -> np.ndarray:
assert farmer_idx >= 0, "Farmer index must be positive."
assert farmer_idx < self.n, (
"Farmer index must be less than the number of agents."
)
last_farmer_HRUs = get_farmer_HRUs(
self.var.field_indices, self.var.field_indices_by_farmer.data, -1
)
last_farmer_field_size = self.field_size_per_farmer[-1] # for testing only
# disown the farmer.
HRUs_farmer_to_be_removed = get_farmer_HRUs(
self.var.field_indices,
self.var.field_indices_by_farmer.data,
farmer_idx,
)
self.HRU.var.land_owners[HRUs_farmer_to_be_removed] = -1
self.HRU.var.crop_map[HRUs_farmer_to_be_removed] = -1
self.HRU.var.crop_age_days_map[HRUs_farmer_to_be_removed] = -1
self.HRU.var.crop_harvest_age_days[HRUs_farmer_to_be_removed] = -1
self.HRU.var.land_use_type[HRUs_farmer_to_be_removed] = land_use_type
# reduce number of agents
self.n -= 1
if not self.n == farmer_idx:
# move data of last agent to the index of the agent that is to be removed, effectively removing that agent.
for name, agent_array in self.agent_arrays.items():
agent_array[farmer_idx] = agent_array[-1]
# reduce the number of agents by 1
assert agent_array.n == self.n + 1
agent_array.n = self.n
# update the field indices of the last agent
self.HRU.var.land_owners[last_farmer_HRUs] = farmer_idx
else:
for agent_array in self.agent_arrays.values():
agent_array.n = self.n
# TODO: Speed up field index updating.
self.update_field_indices()
if self.n == farmer_idx:
assert (
get_farmer_HRUs(
self.var.field_indices,
self.var.field_indices_by_farmer.data,
farmer_idx,
).size
== 0
)
else:
assert np.array_equal(
np.sort(last_farmer_HRUs),
np.sort(
get_farmer_HRUs(
self.var.field_indices,
self.var.field_indices_by_farmer.data,
farmer_idx,
)
),
)
assert math.isclose(
last_farmer_field_size,
self.field_size_per_farmer[farmer_idx],
abs_tol=1,
)
assert (self.HRU.var.land_owners[HRUs_farmer_to_be_removed] == -1).all()
return HRUs_farmer_to_be_removed
[docs]
def add_agent(
self,
indices,
values={
"risk_aversion": 1,
"interest_rate": 1,
"discount_rate": 1,
"adapted": False,
"time_adapted": False,
"SEUT_no_adapt": 1,
"EUT_no_adapt": 1,
"crops": -1,
"irrigation_source": -1,
"well_depth": -1,
"channel_abstraction_m3_by_farmer": 0,
"reservoir_abstraction_m3_by_farmer": 0,
"groundwater_abstraction_m3_by_farmer": 0,
"yearly_abstraction_m3_by_farmer": 0,
"total_crop_age": 0,
"per_harvest_yield_ratio": 0,
"per_harvest_SPEI": 0,
"monthly_SPEI": 0,
"disposable_income": 0,
"household_size": 2,
"yield_ratios_drought_event": 1,
"risk_perception": 1,
"drought_timer": 1,
"yearly_SPEI_probability": 1,
"yearly_yield_ratio": 1,
"yearly_profits": 1,
"yearly_potential_profits": 1,
"farmer_yield_probability_relation": 1,
"irrigation_efficiency": 0.9,
"base_management_yield_ratio": 1,
"yield_ratio_management": 1,
"annual_costs_all_adaptations": 1,
"farmer_class": 1,
"water_use": 1,
"GEV_parameters": 1,
"risk_perc_min": 1,
"risk_perc_max": 1,
"risk_decr": 1,
"decision_horizon": 1,
},
):
"""This function can be used to add new farmers."""
HRU = self.model.data.split(indices)
assert self.HRU.var.land_owners[HRU] == -1, "There is already a farmer here."
self.HRU.var.land_owners[HRU] = self.n
pixels = np.column_stack(indices)[:, [1, 0]]
agent_location = np.mean(
pixels_to_coords(pixels + 0.5, self.HRU.var.gt), axis=0
) # +.5 to use center of pixels
self.n += 1 # increment number of agents
for name, agent_array in self.agent_arrays.items():
agent_array.n += 1
if name == "locations":
agent_array[self.n - 1] = agent_location
elif name == "elevation":
agent_array[self.n - 1] = self.elevation_subgrid.sample_coords(
np.expand_dims(agent_location, axis=0)
)
elif name == "region_id":
agent_array[self.n - 1] = self.var.subdistrict_map.sample_coords(
np.expand_dims(agent_location, axis=0)
)
elif name == "field_indices_by_farmer":
# TODO: Speed up field index updating.
self.update_field_indices()
else:
agent_array[self.n - 1] = values[name]
@property
def n(self):
return self.var._n
@n.setter
def n(self, value):
self.var._n = value
def get_farmer_elevation(self):
# get elevation per farmer
elevation_subgrid = load_grid(
self.model.files["subgrid"]["landsurface/elevation"],
)
elevation_subgrid = np.nan_to_num(elevation_subgrid, copy=False, nan=0.0)
decompressed_land_owners = self.HRU.decompress(self.HRU.var.land_owners)
mask = decompressed_land_owners != -1
return np.bincount(
decompressed_land_owners[mask],
weights=elevation_subgrid[mask],
) / np.bincount(decompressed_land_owners[mask])