Source code for geb.model

import copy
import datetime
from pathlib import Path
from time import time

import numpy as np
import xarray as xr
from dateutil.relativedelta import relativedelta
from honeybees.model import Model as ABM_Model

from geb.agents import Agents
from geb.artists import Artists
from geb.hazards.driver import HazardDriver
from geb.module import Module
from geb.reporter import Reporter
from geb.store import Store

from .evaluate import Evaluate
from .HRUs import load_geom
from .hydrology import Hydrology


[docs] class GEBModel(Module, HazardDriver, ABM_Model): """GEB parent class. Parameters ---------- config: Filepath of the YAML-configuration file (e.g. model.yml). files: Dictionary with the paths of the input files. mode: Mode of the model. Either `w` (write) or `r` (read). timing: Boolean indicating if the model steps should be timed. """ def __init__( self, config: dict, files: dict, mode: str = "w", timing: bool = False, ): self.timing = timing self.mode = mode Module.__init__(self, self, create_var=False) self._multiverse_name = None self.config = self.setup_config(config) # make a deep copy to avoid issues when the model is initialized multiple times self.files = copy.deepcopy(files) for data in self.files.values(): for key, value in data.items(): data[key] = self.input_folder / value self.mask = load_geom(self.files["geoms"]["mask"]) self.store = Store(self) self.artists = Artists(self) # Empty list to hold plantFATE models. If forests are not used, this will be empty self.plantFATE = [] @property def name(self) -> str: return "" def restore(self, store_location: str, timestep: int) -> None: self.store.load(store_location) self.hydrology.groundwater.modflow.restore(self.hydrology.grid.var.heads) self.current_timestep = timestep def multiverse(self): # copy current state of timestep and time store_timestep = copy.copy(self.current_timestep) # set a folder to store the initial state of the multiverse store_location = self.simulation_root / "multiverse" / "forecast" self.store.save(store_location) precipitation_dataarray = self.sfincs.precipitation_dataarray forecasts = xr.open_dataset( self.input_folder / "climate" / "forecasts" / f"{self.current_time.strftime('%Y%m%d')}.nc" ) end_date = forecasts.time[-1].dt.date.item() n_timesteps = (end_date - self.current_time.date()).days for member in forecasts.member: self.multiverse_name = member.item() # self.sfincs.precipitation_dataarray = ( # forecasts.sel(member=member).rename({"accum_precipitation": "precip"}) # / 3600 # ) self.sfincs.precipitation_dataarray = ( precipitation_dataarray / 100 * member.item() ) print(f"Running forecast member {member.item()}...") for _ in range(n_timesteps): self.step() # restore the initial state of the multiverse self.restore(store_location=store_location, timestep=store_timestep) print("Forecast finished, restoring all conditions...") # restore the precipitation dataarray, step out of the multiverse self.sfincs.precipitation_dataarray = precipitation_dataarray self.multiverse_name = None
[docs] def step(self) -> None: """ Forward the model by the given the number of steps. Args: step_size: Number of steps the model should take. Can be integer or string `day`, `week`, `month`, `year`, `decade` or `century`. """ # only if forecasts is used, and if we are not already in multiverse (avoiding infinite recursion) # and if the current date is in the list of forecast days if ( self.config["general"]["forecasts"]["use"] and not self.multiverse_name and self.current_time.date() in self.config["general"]["forecasts"]["days"] ): self.multiverse() t0 = time() HazardDriver.step(self, 1) self.agents.step() if self.simulate_hydrology: self.hydrology.step() self.report(self, locals()) t1 = time() print( f"{self.current_time} ({round(t1 - t0, 4)}s)", flush=True, ) self.current_timestep += 1
def create_datetime(self, date): return datetime.datetime.combine(date, datetime.time(0)) def _initialize( self, create_reporter, current_time, n_timesteps, timestep_length, in_spinup=False, simulate_hydrology=True, clean_report_folder=False, load_data_from_store=False, ) -> None: """Initializes the model.""" self.in_spinup = in_spinup self.simulate_hydrology = simulate_hydrology self.regions = load_geom(self.files["geoms"]["regions"]) self.output_folder.mkdir(parents=True, exist_ok=True) self.spinup_start = datetime.datetime.combine( self.config["general"]["spinup_time"], datetime.time(0) ) self.timestep_length = timestep_length if self.simulate_hydrology: self.hydrology = Hydrology(self) HazardDriver.__init__(self) ABM_Model.__init__( self, current_time, self.timestep_length, n_timesteps=n_timesteps, ) self.agents = Agents(self) if load_data_from_store: self.store.load() if self.simulate_hydrology: self.hydrology.groundwater.initalize_modflow_model() self.hydrology.soil.set_global_variables() if create_reporter: self.reporter = Reporter(self, clean=clean_report_folder)
[docs] def run(self, initialize_only=False) -> None: """Run the model for the entire period, and export water table in case of spinup scenario.""" if not self.store.path.exists(): raise FileNotFoundError( f"The initial conditions folder ({self.store.path.resolve()}) does not exist. Spinup is required before running the model. Please run the spinup first." ) current_time = self.create_datetime(self.config["general"]["start_time"]) end_time = self.create_datetime(self.config["general"]["end_time"]) timestep_length = datetime.timedelta(days=1) n_timesteps = (end_time + timestep_length - current_time) / timestep_length assert n_timesteps.is_integer() n_timesteps = int(n_timesteps) assert n_timesteps > 0, "End time is before or identical to start time" self._initialize( create_reporter=True, current_time=current_time, n_timesteps=n_timesteps, timestep_length=timestep_length, clean_report_folder=True, load_data_from_store=True, ) if initialize_only: return for _ in range(self.n_timesteps): self.step() print("Model run finished, finalizing report...") self.reporter.finalize()
def run_yearly(self) -> None: current_time = self.create_datetime(self.config["general"]["start_time"]) end_time = self.create_datetime(self.config["general"]["end_time"]) assert current_time.month == 1 and current_time.day == 1, ( "In yearly mode start time should be the first day of the year" ) assert end_time.month == 12 and end_time.day == 31, ( "In yearly mode end time should be the last day of the year" ) n_timesteps = end_time.year - current_time.year + 1 self._initialize( create_reporter=True, current_time=current_time, n_timesteps=n_timesteps, timestep_length=relativedelta(years=1), simulate_hydrology=False, clean_report_folder=True, load_data_from_store=True, ) for _ in range(self.n_timesteps): self.step() print("Model run finished, finalizing report...") self.reporter.finalize()
[docs] def spinup(self, initialize_only=False) -> None: """Run the model for the spinup period.""" # set the start and end time for the spinup. The end of the spinup is the start of the actual model run current_time = self.create_datetime(self.config["general"]["spinup_time"]) end_time_exclusive = self.create_datetime(self.config["general"]["start_time"]) if end_time_exclusive.year - current_time.year < 10: print( "Spinup time is less than 10 years. This is not recommended and may lead to issues later." ) timestep_length = datetime.timedelta(days=1) n_timesteps = (end_time_exclusive - current_time) / timestep_length assert n_timesteps.is_integer() n_timesteps = int(n_timesteps) assert n_timesteps > 0, "End time is before or identical to start time" # turn off any reporting for the ABM # self.config["report"] = { # "hydrology.routing": { # "discharge_daily": { # "varname": "grid.var.discharge_m3_s", # "type": "grid", # "function": None, # "format": "zarr", # "single_file": True, # } # } # } self.var = self.store.create_bucket("var") self._initialize( create_reporter=True, current_time=current_time, n_timesteps=n_timesteps, timestep_length=datetime.timedelta(days=1), clean_report_folder=True, in_spinup=True, ) if initialize_only: return for _ in range(self.n_timesteps): self.step() print("Spinup finished, saving conditions at end of spinup...") self.store.save() self.reporter.finalize()
[docs] def estimate_return_periods(self) -> None: """Estimate the risk of the model.""" current_time = self.create_datetime(self.config["general"]["start_time"]) self.config["general"]["name"] = "estimate_return_periods" self._initialize( create_reporter=False, current_time=current_time, n_timesteps=0, timestep_length=relativedelta(years=1), load_data_from_store=False, simulate_hydrology=False, clean_report_folder=False, ) HazardDriver.initialize(self, longest_flood_event=30) self.sfincs.get_return_period_maps()
def evaluate(self, *args, **kwargs) -> None: self.evaluate = Evaluate(self) self.evaluate.run(*args, **kwargs) print("Evaluating model...") @property def current_day_of_year(self) -> int: """Gets the current day of the year. Returns: day: current day of the year. """ return self.current_time.timetuple().tm_yday @property def current_time_unix_s(self) -> int: """Gets the current time in unix seconds. Returns: time: current time in unix seconds. """ return np.datetime64(self.current_time, "s").astype(np.int64).item() @property def simulation_root(self) -> Path: """Gets the simulation root. Returns: simulation_root: Path of the simulation root. """ folder = Path("simulation_root") / self.run_name folder.mkdir(parents=True, exist_ok=True) return folder @property def simulation_root_spinup(self) -> Path: """Gets the simulation root of the spinup. Returns: simulation_root: Path of the simulation root. """ folder = Path("simulation_root") / "spinup" folder.mkdir(parents=True, exist_ok=True) return folder @property def run_name(self): if self.mode == "w" and self.in_spinup: return "spinup" else: if "name" in self.config["general"]: return self.config["general"]["name"] else: print( 'No "name" specified in config file under general. Using "default".' ) return "default" @property def multiverse_name(self): return self._multiverse_name @multiverse_name.setter def multiverse_name(self, value): self._multiverse_name = str(value) if value else None @property def output_folder(self): return Path(self.config["general"]["output_folder"]) @property def input_folder(self): return Path(self.config["general"]["input_folder"]) @property def crs(self): return 4326 @property def bounds(self): return self.mask.total_bounds @property def xmin(self): return self.bounds[0] @property def xmax(self): return self.bounds[2] @property def ymin(self): return self.bounds[1] @property def ymax(self): return self.bounds[3]
[docs] def close(self) -> None: """Finalizes the model.""" if ( self.mode == "w" and hasattr(self, "simulate_hydrology") and self.simulate_hydrology ): Hydrology.finalize(self) from geb.workflows.io import all_async_readers for reader in all_async_readers: reader.close()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()