Source code for nlmod.gwf.surface_water

import logging
import warnings

import flopy
import numpy as np
import pandas as pd
import xarray as xr
from shapely.geometry import Polygon

from ..cache import cache_pickle
from ..dims.grid import (
    gdf_to_grid,
    get_delc,
    get_delr,
    get_extent_polygon,
)
from ..dims.layers import get_idomain
from ..read import bgt, waterboard
from ..util import extent_to_polygon, gdf_intersection_join, zonal_statistics, tqdm

logger = logging.getLogger(__name__)


[docs] @cache_pickle def aggregate(gdf, method, ds=None): """Aggregate surface water features. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing surfacewater polygons per grid cell. Must contain columns 'stage' (waterlevel), 'c0' (bottom resistance), and 'botm' (bottom elevation) method : str, optional "area_weighted" for area-weighted params, "max_area" for max area params "de_lange" for De Lange formula for conductance ds : xr.Dataset, optional DataSet containing model layer information (only required for method='de_lange') Returns ------- celldata : pd.DataFrame DataFrame with aggregated surface water parameters per grid cell """ required_cols = {"stage", "c0", "botm"} missing_cols = required_cols.difference(gdf.columns) if len(missing_cols) > 0: raise ValueError(f"Missing columns in DataFrame: {missing_cols}") # Post process intersection result gr = gdf.groupby(by="cellid") celldata = pd.DataFrame(index=gr.groups.keys()) for cid, group in tqdm(gr, desc="Aggregate surface water data"): stage, cond, rbot = get_surfacewater_params(group, method, cid=cid, ds=ds) celldata.loc[cid, "stage"] = stage celldata.loc[cid, "cond"] = cond celldata.loc[cid, "rbot"] = rbot celldata.loc[cid, "area"] = group.area.sum() return celldata
[docs] def get_surfacewater_params(group, method, cid=None, ds=None, delange_params=None): if method == "area_weighted": # stage stage = agg_area_weighted(group, "stage") # cond c0 = agg_area_weighted(group, "c0") cond = group.area.sum() / c0 # rbot rbot = group["botm"].min() elif method == "max_area": # stage stage = agg_max_area(group, "stage") # cond c0 = agg_max_area(group, "c0") cond = group.area.sum() / c0 # rbot rbot = group["botm"].min() elif method == "de_lange": if ds is None: raise ValueError("Please supply model dataset (ds) when method=='de_lange'") # get additional requisite parameters if delange_params is None: delange_params = {} # defaults c1 = delange_params.pop("c1", 0.0) N = delange_params.pop("N", 1e-3) # stage stage = agg_area_weighted(group, "stage") # cond c0 = agg_area_weighted(group, "c0") _, _, cond = agg_de_lange(group, cid, ds, c1=c1, c0=c0, N=N) # rbot rbot = group["botm"].min() else: raise ValueError(f"Method '{method}' not recognized!") return stage, cond, rbot
[docs] def agg_max_area(gdf, col): return gdf.loc[gdf.area.idxmax(), col]
[docs] def agg_area_weighted(gdf, col): nanmask = gdf[col].isna() aw = (gdf.area * gdf[col]).sum(skipna=True) / gdf.loc[~nanmask].area.sum() return aw
[docs] def agg_de_lange(group, cid, ds, c1=0.0, c0=1.0, N=1e-3, crad_positive=True): (A, laytop, laybot, kh, kv) = get_subsurface_params_by_cellid(ds, cid) rbot = group["botm"].min() # select active layers thickness = -np.diff(np.hstack((laytop, laybot))) active = thickness > 0 laybot = laybot[active] kh = kh[active] kv = kv[active] thickness = thickness[active] # layer thickn. if np.isnan(rbot): raise ValueError(f"rbot is NaN in cell {cid}") H0 = laytop - laybot[laybot < rbot][0] ilay = 0 rlay = np.where(laybot < rbot)[0][0] # equivalent hydraulic conductivities H = thickness[ilay : rlay + 1] kv = kv[ilay : rlay + 1] kh = kh[ilay : rlay + 1] kveq = np.sum(H) / np.sum(H / kv) kheq = np.sum(H * kh) / np.sum(H) # length len_est = estimate_polygon_length(group) li = len_est.sum() # correction if group contains multiple shapes # but covers whole cell if group.area.sum() == A: delr = get_delr(ds) assert len(np.unique(delr)) == 1, "Variable grid size is not yet supported" delc = get_delc(ds) assert len(np.unique(delc)) == 1, "Variable grid size is not yet supported" li = A / np.max([delr[0], delc[0]]) # width B = group.area.sum(skipna=True) / li # mean water level p = group.loc[group.area.idxmax(), "stage"] # waterlevel # calculate params pstar, cstar, cond = de_lange_eqns( A, H0, kveq, kheq, c1, li, B, c0, p, N, crad_positive=crad_positive ) return pstar, cstar, cond
[docs] def get_subsurface_params_by_cellid(ds, cid): r, c = cid A = ds.area.isel(x=c, y=r).data laytop = ds["top"].isel(x=c, y=r).data laybot = ds["botm"].isel(x=c, y=r).data kv = ds["kv"].isel(x=c, y=r).data kh = ds["kh"].isel(x=c, y=r).data return A, laytop, laybot, kh, kv
[docs] def de_lange_eqns(A, H0, kv, kh, c1, li, Bin, c0, p, N, crad_positive=True): """Calculates the conductance according to De Lange. Parameters ---------- A : float celoppervlak (m2) H0 : float doorstroomde dikte (m) kv : float verticale doorlotendheid (m/d) kh : float horizontale doorlatendheid (m/d) c1 : float deklaagweerstand (d) li : float lengte van de waterlopen (m) Bin : float bodembreedte (m) c0 : float slootbodemweerstand (d) p : float water peil N : float grondwateraanvulling crad_positive: bool, optional whether to allow negative crad values. If True, crad will be set to 0 if it is negative. Returns ------- float Conductance (m2/d) """ if li > 1e-3 and Bin > 1e-3 and A > 1e-3: Bcor = max(Bin, 1e-3) # has no effect L = A / li - Bcor y = c1 + H0 / kv labdaL = np.sqrt(y * kh * H0) if L > 1e-3: xL = L / (2 * labdaL) FL = xL * coth(xL) else: FL = 0.0 labdaB = np.sqrt(y * kh * H0 * c0 / (y + c0)) xB = Bcor / (2 * labdaB) FB = xB * coth(xB) CL = (c0 + y) * FL + (c0 * L / Bcor) * FB if CL == 0.0: CB = 1.0 else: CB = (c1 + c0 + H0 / kv) / (CL - c0 * L / Bcor) * CL # volgens Kees Maas mag deze ook < 0 zijn... # er miste ook een correctie in de log voor anisotropie # Crad = max(0., L / (np.pi * np.sqrt(kv * kh)) # * np.log(4 * H0 / (np.pi * Bcor))) crad = radial_resistance(L, Bcor, H0, kh, kv) if crad_positive: crad = max([0.0, crad]) # Conductance pSl = Bcor * li / A if pSl >= 1.0 - 1e-10: Wp = 1 / (pSl / CB) + crad - c1 else: Wp = 1 / ((1.0 - pSl) / CL + pSl / CB) + crad - c1 cond = A / Wp # cstar, pstar cLstar = CL + crad pstar = p + N * (cLstar - y) * (y + c0) * L / (Bcor * cLstar + L * y) cstar = cLstar * (c0 + y) * (Bcor + L) / (Bcor * cLstar + L * y) return pstar, cstar, cond else: return 0.0, 0.0, 0.0
[docs] def radial_resistance(L, B, H, kh, kv): return ( L / (np.pi * np.sqrt(kh * kv)) * np.log(4 * H * np.sqrt(kh) / (np.pi * B * np.sqrt(kv))) )
[docs] def coth(x): return 1.0 / np.tanh(x)
[docs] def estimate_polygon_length(gdf): # estimate length from polygon (for shapefactor > 4) shape_factor = gdf.length / np.sqrt(gdf.area) len_est1 = (gdf.length - np.sqrt(gdf.length**2 - 16 * gdf.area)) / 4 len_est2 = (gdf.length + np.sqrt(gdf.length**2 - 16 * gdf.area)) / 4 len_est = pd.concat([len_est1, len_est2], axis=1).max(axis=1) # estimate length from minimum rotated rectangle (for shapefactor < 4) min_rect = gdf.geometry.apply(lambda g: g.minimum_rotated_rectangle) xy = min_rect.apply( lambda g: np.sqrt( (np.array(g.exterior.xy[0]) - np.array(g.exterior.xy[0][0])) ** 2 + (np.array(g.exterior.xy[1]) - np.array(g.exterior.xy[1][0])) ** 2 ) ) len_est3 = xy.apply(lambda a: np.partition(a.flatten(), -2)[-2]) # update length estimate where shape factor is lower than 4 len_est.loc[shape_factor < 4] = len_est3.loc[shape_factor < 4] return len_est
[docs] def distribute_cond_over_lays( cond, cellid, rivbot, laytop, laybot, idomain=None, kh=None, stage=None ): """Distribute the conductance in a cell over the layers in that cell, based on the the river-bottom and the layer bottoms, and optionally based on the stage and the hydraulic conductivity. """ if isinstance(rivbot, (np.ndarray, xr.DataArray)): rivbot = float(rivbot[cellid]) if len(laybot.shape) == 3: # the grid is structured grid laytop = laytop[cellid[0], cellid[1]] laybot = laybot[:, cellid[0], cellid[1]] if idomain is not None: idomain = idomain[:, cellid[0], cellid[1]] if kh is not None: kh = kh[:, cellid[0], cellid[1]] elif len(laybot.shape) == 2: # the grid is a vertex grid laytop = laytop[cellid] laybot = laybot[:, cellid] if idomain is not None: idomain = idomain[:, cellid] if kh is not None: kh = kh[:, cellid] if stage is None or isinstance(stage, str): lays = np.arange(int(np.sum(rivbot < laybot)) + 1) elif np.isfinite(stage): lays = np.arange(int(np.sum(stage < laybot)), int(np.sum(rivbot < laybot)) + 1) else: lays = np.arange(int(np.sum(rivbot < laybot)) + 1) if idomain is not None: # only distribute conductance over active layers lays = lays[idomain[lays] > 0] topbot = np.hstack((laytop, laybot)) topbot[topbot < rivbot] = rivbot d = -1 * np.diff(topbot) if kh is not None: kd = kh * d else: kd = d if np.all(kd <= 0): # when for some reason the kd is 0 in all layers (for example when the # river bottom is above all the layers), add to the first active layer if idomain is not None: try: first_active = np.where(idomain > 0)[0][0] except IndexError: warnings.warn(f"No active layers in {cellid}, returning NaNs.") return np.nan, np.nan else: first_active = 0 lays = [first_active] kd[first_active] = 1.0 conds = cond * kd[lays] / np.sum(kd[lays]) return np.array(lays), np.array(conds)
[docs] def build_spd( celldata, pkg, ds, layer_method="lay_of_rbot", desc=None, silent=False, ): """Build stress period data for package (RIV, DRN, GHB). Parameters ---------- celldata : geopandas.GeoDataFrame GeoDataFrame containing data. Cellid must be the index, and must have columns "rbot", "stage" and "cond". Optional columns are 'boundname' and 'aux'. These columns should have type str. pkg : str Modflow package: RIV, DRN or GHB ds : xr.Dataset DataSet containing model layer information layer_method: layer_method : str, optional The method used to distribute the conductance over the layers. Possible values are 'lay_of_rbot' and 'distribute_cond_over_lays'. The default is "lay_of_rbot". desc : string, optional The description of the progressbar. The default is None, so desc will be "Building stress period data RIV/DRN/GHB". silent : bool, optional Do not show a progressbar when silent is True. The default is False. Returns ------- spd : list list containing stress period data: - RIV: [(cellid), stage, cond, rbot] - DRN: [(cellid), elev, cond] - GHB: [(cellid), elev, cond] """ spd = [] top = ds.top.data botm = ds.botm.data idomain = get_idomain(ds).data kh = ds.kh.data # ignore records without a stage mask = celldata["stage"].isna() if mask.any(): logger.warning(f"{mask.sum()} records without a stage ignored") celldata = celldata[~mask] if desc is None: desc = f"Building stress period data {pkg}" for cellid, row in tqdm( celldata.iterrows(), total=celldata.index.size, desc=desc, disable=silent, ): # check if there is an active layer for this cell if ds.gridtype == "vertex": idomain_cell = idomain[:, cellid] botm_cell = botm[:, cellid] elif ds.gridtype == "structured": idomain_cell = idomain[:, cellid[0], cellid[1]] botm_cell = botm[:, cellid[0], cellid[1]] if (idomain_cell <= 0).all(): continue # rbot if "rbot" in row.index: rbot = row["rbot"] if np.isnan(rbot): raise ValueError(f"rbot is NaN in cell {cellid}") elif pkg == "RIV": raise ValueError("Column 'rbot' required for building RIV package!") else: rbot = np.nan # stage stage = row["stage"] if not isinstance(stage, str) and stage < rbot and np.isfinite(rbot): logger.warning( f"WARNING: stage below bottom elevation in {cellid}, " "stage reset to rbot!" ) stage = rbot # conductance cond = row["cond"] # check value if np.isnan(cond): raise ValueError( f"Conductance is NaN in cell {cellid}. Info: area={row.area:.2f} " f"len={row.len_estimate:.2f}, BL={row['rbot']}" ) if cond < 0: raise ValueError( f"Conductance is negative in cell {cellid}. Info: area={row.area:.2f} " f"len={row.len_estimate:.2f}, BL={row['rbot']}" ) if layer_method == "distribute_cond_over_lays": # if surface water penetrates multiple layers: lays, conds = distribute_cond_over_lays( cond, cellid, rbot, top, botm, idomain, kh, stage, ) elif layer_method == "lay_of_rbot": mask = (rbot > botm_cell) & (idomain_cell > 0) if not mask.any(): # rbot is below the bottom of the model, maybe the stage is above it? mask = (stage > botm_cell) & (idomain_cell > 0) if not mask.any(): raise ( ValueError("rbot and stage are below the bottom of the model") ) lays = [np.where(mask)[0][0]] conds = [cond] else: raise (ValueError(f"Method {layer_method} unknown")) auxlist = [] if "aux" in row: auxlist.append(row["aux"]) if "boundname" in row: auxlist.append(row["boundname"]) if ds.gridtype == "vertex": cellid = (cellid,) # write SPD for lay, cond in zip(lays, conds): cid = (lay,) + cellid if pkg == "RIV": spd.append([cid, stage, cond, rbot] + auxlist) elif pkg in ["DRN", "GHB"]: spd.append([cid, stage, cond] + auxlist) return spd
[docs] def add_info_to_gdf(*args, **kwargs): logger.warning( "nlmod.gwf.surface_water.add_info_to_gdf is deprecated. " "Use nlmod.util.gdf_intersection_join instead." ) return gdf_intersection_join(*args, **kwargs)
[docs] def get_gdf_stage(gdf, season="winter"): """Get the stage from a GeoDataFrame for a specific season. Parameters ---------- gdf : GeoDataFrame A GeoDataFrame of the polygons of the BGT with added information in the columns 'summer_stage', 'winter_stage', and 'ahn_min'. season : str, optional The season for which the stage needs to be determined. The default is "winter". Returns ------- stage : pandas.Series The stage for each of the records in the GeoDataFrame. """ stage = gdf[f"{season}_stage"].copy() if "ahn_min" in gdf: # when the minimum surface level is above the stage # or when no stage is available # use the minimum surface level stage = pd.concat((stage, gdf["ahn_min"]), axis=1).max(axis=1) return stage
[docs] def download_level_areas( gdf=None, extent=None, config=None, raise_exceptions=True, drop_duplicates=True, **kwargs, ): """Download level areas (peilgebieden) of bronhouders. Parameters ---------- gdf : geopandas.GeoDataFrame, optional A GeoDataFrame with surface water features, containing the column "bronhouder". extent : list, tuple or np.array, optional Model extent (xmin, xmax, ymin, ymax). When extent is None, all data of the water boards in gdf are downloaded downloaded. config : dict, optional A dictionary with information about the webservices of the water boards. When config is None, it is created with nlmod.read.waterboard.get_configuration(). The default is None. raise_exceptions : bool, optional Raises exceptions, mostly caused by a webservice that is offline. When raise_exceptions is False, the error is raised as a warning. The default is True. drop_duplicates : bool, optional Drop features with a duplicate index, keeping the first occurence. The default is True. Returns ------- la : dict A dictionary with the name of the waterboards as keys and GeoDataFrames with level areas as values. """ if config is None: config = waterboard.get_configuration() wbs = _get_waterboard_selection(gdf=gdf, extent=extent, config=config) la = {} data_kind = "level_areas" for wb in wbs: logger.info(f"Downloading {data_kind} for {wb}") try: lawb = waterboard.download_data(wb, data_kind, extent, **kwargs) if len(lawb) == 0: logger.info(f"No {data_kind} for {wb} found within model area") continue if drop_duplicates: mask = lawb.index.duplicated() if mask.any(): msg = "Dropping {} level area(s) of {} with duplicate indexes" logger.warning(msg.format(mask.sum(), wb)) lawb = lawb.loc[~mask] la[wb] = lawb mask = ~la[wb].is_valid if mask.any(): logger.warning( f"{mask.sum()} geometries of level areas of {wb} are invalid. Thet are made valid by adding a buffer of 0.0." ) # first copy to prevent ValueError: assignment destination is read-only la[wb] = la[wb].copy() la[wb].loc[mask, "geometry"] = la[wb][mask].buffer(0.0) except Exception as e: if str(e) == f"{data_kind} not available for {wb}": logger.warning(e) elif raise_exceptions: raise else: logger.warning(e) return la
[docs] def download_watercourses( gdf=None, extent=None, config=None, raise_exceptions=True, **kwargs ): """Download watercourses of bronhouders. Parameters ---------- gdf : geopandas.GeoDataFrame, optional A GeoDataFrame with surface water features, containing the column "bronhouder". Determine the required waterboards for this gdf, when not None. The default is None. extent : list, tuple or np.array, optional Model extent (xmin, xmax, ymin, ymax). When extent is None, all data of the water boards in gdf are downloaded downloaded. The default is None. config : dict, optional A dictionary with information about the webservices of the water boards. When config is None, it is created with nlmod.read.waterboard.get_configuration(). The default is None. raise_exceptions : bool, optional Raises exceptions, mostly caused by a webservice that is offline. When raise_exceptions is False, the error is raised as a warning. The default is True. Returns ------- wc : dict A dictionary with the name of the waterboards as keys and GeoDataFrames with watercourses as values. """ if config is None: config = waterboard.get_configuration() wbs = _get_waterboard_selection(gdf=gdf, extent=extent, config=config) wc = {} data_kind = "watercourses" for wb in wbs: logger.info(f"Downloading {data_kind} for {wb}") try: wcwb = waterboard.download_data(wb, data_kind, extent, **kwargs) if len(wcwb) == 0: logger.info(f"No {data_kind} for {wb} found within model area") continue wc[wb] = wcwb except Exception as e: if str(e) == f"{data_kind} not available for {wb}": logger.warning(e) elif raise_exceptions: raise else: logger.warning(e) return wc
[docs] def _get_waterboard_selection(gdf=None, extent=None, config=None): """Internal method to select waterboards to get data from.""" if config is None: config = waterboard.get_configuration() if gdf is None and extent is None: raise (ValueError("Please specify either gdf or extent")) if gdf is not None: bronhouders = gdf["bronhouder"].unique() wbs = [] for wb in config.keys(): if config[wb]["bgt_code"] in bronhouders: wbs.append(wb) elif extent is not None: wb_gdf = waterboard.download_polygons() wbs = wb_gdf.index[wb_gdf.intersects(extent_to_polygon(extent))] return wbs
[docs] def add_stages_from_waterboards( gdf, la=None, extent=None, columns=None, config=None, min_total_overlap=0.0, silent=False, ): """Add information from level areas (peilgebieden) to bgt-polygons. Parameters ---------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features, containing the column "bronhouder". la : dict, optional A dictionary with the name of the waterboards as keys and GeoDataFrames with level areas as values. It is generated by download_level_areas when None. The default is None. extent : list, tuple or np.array Model extent (xmin, xmax, ymin, ymax). When extent is None, all data of the water boards in gdf are downloaded downloaded. columns : TYPE, optional The columns that are added to gdf. Columns defaults to 'summer_stage' and 'winter_stage' when None. The default is None. config : dict, optional A dictionary with information about the webservices of the water boards. When config is None, it is created with nlmod.read.waterboard.get_configuration(). The default is None. min_total_overlap : float, optional Only add data from waterboards to gdf when the total overlap between a feature in gdf with all the features from the waterboard is larger than the fraction min_total_overlap. The default is 0.0. silent : bool, optional If true, do not show prgressbars. The default is False. Returns ------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features, with the added columns """ if config is None: config = waterboard.get_configuration() if la is None: la = download_level_areas(gdf, extent=extent, config=config) if columns is None: columns = ["summer_stage", "winter_stage"] gdf[columns] = np.nan for wb in la.keys(): if len(la[wb]) == 0: continue mask = gdf["bronhouder"] == config[wb]["bgt_code"] gdf.loc[mask, columns] = gdf_intersection_join( la[wb], gdf[mask], columns=columns, min_total_overlap=min_total_overlap, desc=f"Adding {columns} from {wb}", silent=silent, )[columns] return gdf
[docs] def add_bottom_height_from_waterboards( gdf, wc=None, extent=None, columns=None, config=None, min_total_overlap=0.0 ): """Add information from watercourses to bgt-polygons. Parameters ---------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features, containing the column "bronhouder". wc : dict, optional A dictionary with the name of the waterboards as keys and GeoDataFrames with watercourses as values. It is generated by download_watercourses when None. The default is None. extent : list, tuple or np.array Model extent (xmin, xmax, ymin, ymax). When extent is None, all data of the water boards in gdf are downloaded downloaded. columns : TYPE, optional The columns that are added to gdf. Columns defaults to 'bottom_height' when None. The default is None. config : dict, optional A dictionary with information about the webservices of the water boards. When config is None, it is created with nlmod.read.waterboard.get_configuration(). The default is None. min_total_overlap : float, optional Only add data from waterboards to gdf when the total overlap between a feature in gdf with all the features from the waterboard is larger than the fraction min_total_overlap. The default is 0.0. Returns ------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features, with the added columns """ if config is None: config = waterboard.get_configuration() if wc is None: wc = download_watercourses(gdf, extent=extent, config=config) if columns is None: columns = ["bottom_height"] gdf[columns] = np.nan for wb in wc.keys(): if len(wc[wb]) == 0: continue mask = gdf["bronhouder"] == config[wb]["bgt_code"] gdf.loc[mask, columns] = gdf_intersection_join( wc[wb], gdf[mask], columns=columns, min_total_overlap=min_total_overlap, desc=f"Adding {columns} from {wb}", geom_type="LineString", )[columns] return gdf
[docs] def get_gdf(ds=None, extent=None, fname_ahn=None, ahn=None, buffer=0.0): """Generate a GeoDataFrame based on BGT-data and data from waterboards. Parameters ---------- ds : TYPE, optional The Model Dataset, used to determine the extent (when None) and to grid the surface level features. The default is None. extent : list, tuple or np.array Model extent (xmin, xmax, ymin, ymax). When extent is None, extent is extracted from ds fname_ahn : str, optional When not None, fname_ahn is the path to a tiff-file with ahn-data, to calculate the minimum height of the surface level near the surface water features. The default is None. ahn : xr.DataArray, optional When not None, ahn is a DataArray containing the height of the surface level and is used to calculate the minimum height of the surface level near the surface water features. The default is None. buffer : float, optional The buffer that is applied around surface water features to calculate the minimum surface level near these features. The default is 0.0. Returns ------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features, with added columns from waterboards and gridded to the model grid (when ds is aupplied) """ if extent is None: if ds is None: raise (ValueError("Please supply either ds or extent to get_gdf")) extent = get_extent_polygon(ds) gdf = bgt.download_bgt(extent) if fname_ahn is not None: from rasterstats import zonal_stats stats = zonal_stats(gdf.geometry.buffer(buffer), fname_ahn, stats="min") gdf["ahn_min"] = [x["min"] for x in stats] if ahn is not None: if fname_ahn is not None: logger.warning("Data from {fname_ahn} is overwritten by data from ahn") gdf = add_min_ahn_to_gdf(gdf, ahn, buffer=buffer) if isinstance(extent, Polygon): bs = extent.bounds extent = [bs[0], bs[2], bs[1], bs[3]] gdf = add_stages_from_waterboards(gdf, extent=extent) gdf = add_bottom_height_from_waterboards(gdf, extent=extent) if ds is not None: return gdf_to_grid(gdf, ds).set_index("cellid") return gdf
[docs] def add_min_ahn_to_gdf( gdf, ahn, buffer=0.0, column="ahn_min", statistic="min", **kwargs ): """Add a column with the minimum surface level height near surface water features. Parameters ---------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features ahn : xr.DataArray A DataArray containing the height of the surface level. buffer : float, optional The buffer that is applied around surface water features to calculate the minimum surface level near these features. The default is 0.0. column : string, optional The name of the new column in gdf containing the minimum surface level height. The default is 'ahn_min'. statistic : string, optional The statistic to calculate at each surface water feature. The default is 'min'. progressbar : bool, optional Show a progressbar when True. The default is False. Returns ------- gdf : geopandas.GeoDataFrame A GeoDataFrame with surface water features, with an added column containing the minimum surface level height near the features. """ gdf = zonal_statistics( gdf, ahn, columns=column, buffer=buffer, statistics=statistic, **kwargs ) return gdf
[docs] def gdf_to_seasonal_pkg( gdf, gwf, ds, pkg="DRN", default_water_depth=0.5, boundname_column="identificatie", c0=1.0, summer_months=(4, 5, 6, 7, 8, 9), layer_method="lay_of_rbot", silent=False, start_summer=None, start_winter=None, season_filename="season.ts", **kwargs, ): """Add a surface water package to a groundwater-model, based on input from a GeoDataFrame. This method adds two boundary conditions for each record in the GeoDataFrame: one for the winter_stage and one for the summer_stage. The conductance of each record is a time-series called 'winter' or 'summer' with values of either 0 or 1. These conductance values are multiplied by an auxiliary variable that contains the actual conductance. Parameters ---------- gdf : GeoDataFrame A GeoDataFrame with Polygon-data. Cellid must be the index and must have columns 'winter_stage' and 'summer_stage'. gwf : flopy ModflowGwf groundwaterflow object. ds : xr.Dataset Dataset with model data. pkg: str, optional The package to generate. Possible options are 'DRN', 'RIV' and 'GHB'. The default is 'DRN'. default_water_depth : float, optional The default water depth, only used when there is no 'rbot' column in gdf or when this column contains nans. The default is 0.5. boundname_column : str, optional The name of the column in gdf to use for the boundnames. The default is "identificatie", which is a unique identifier in the BGT. c0 : float, optional The resistance of the surface water, in days. Only used when there is no 'cond' column in gdf. The default is 1.0. summer_months : list or tuple, optional The months in which 'summer_stage' is active (one-based). The parameter summer_months is used to calculate start_summer or start_winter, when they are None. The default is (4, 5, 6, 7, 8, 9), which means summer is from april through september. layer_method : str, optional The method used to distribute the conductance over the layers. Possible values are 'lay_of_rbot' and 'distribute_cond_over_lays'. The default is "lay_of_rbot". silent : bool, optional Do not show a progressbar when silent is True. The default is False. start_summer : str, optional A string with the month and day of the start of summer (one-based), seperated by '-'. For example '4-1' for april 1st. When start_summer is None it is calculated from the parameter 'summer_months'. The default is None. start_winter : str, optional A string with the month and day of the start of winter (one-based), seperated by '-'. For example '10-1' for october 1st. When start_winter is None it is calculated from the parameter 'summer_months'. The default is None. season_filename : str, optional A string with the name of the file containing the timeseries of the seasons. The default is 'season.ts'. **kwargs : dict Kwargs are passed onto ModflowGwfdrn, ModflowGwfriv or ModflowGwfghb. Returns ------- package : ModflowGwfdrn, ModflowGwfriv or ModflowGwfghb The generated flopy-package """ if gdf.index.name != "cellid": # if "cellid" not in gdf: # gdf = gdf_to_grid(gdf, gwf) gdf = gdf.set_index("cellid") else: # make sure changes to the DataFrame are temporarily gdf = gdf.copy() stages = ( get_gdf_stage(gdf, "winter"), get_gdf_stage(gdf, "summer"), ) # make sure we have a bottom height if "rbot" not in gdf: gdf["rbot"] = np.nan mask = gdf["rbot"].isna() if mask.any(): logger.info( f"Filling {mask.sum()} NaN's in rbot using a water depth of {default_water_depth} meter." ) min_stage = pd.concat(stages, axis=1).min(axis=1) gdf.loc[mask, "rbot"] = min_stage[mask] - default_water_depth if "cond" not in gdf: logger.info( f"Calcluating {pkg}-conductance based on as resistance of {c0} days." ) gdf["cond"] = gdf.geometry.area / c0 if boundname_column is not None: gdf["boundname"] = gdf[boundname_column] spd = [] seasons = ["winter", "summer"] for iseason, season in enumerate(seasons): # use a winter and summer level gdf["stage"] = stages[iseason] mask = gdf["stage"] < gdf["rbot"] gdf.loc[mask, "stage"] = gdf.loc[mask, "rbot"] gdf["aux"] = season spd.extend( build_spd( gdf, pkg, ds, layer_method=layer_method, desc=f"Building stress period data for {season} {pkg}", silent=silent, ) ) # from the release notes (6.3.0): # When this AUXMULTNAME option is used, the multiplier value in the # AUXMULTNAME column should not be represented with a time series unless # the value to scale is also represented with a time series # So we switch the conductance (column 2) and the multiplier (column 3/4) spd = np.array(spd, dtype=object) if len(spd) > 0: if pkg == "RIV": spd[:, [2, 4]] = spd[:, [4, 2]] else: spd[:, [2, 3]] = spd[:, [3, 2]] spd = spd.tolist() if boundname_column is None: observations = None else: observations = [] for boundname in np.unique(gdf[boundname_column]): observations.append((boundname, pkg, boundname)) observations = {f"{pkg}_flows.csv": observations} if pkg == "DRN": cl = flopy.mf6.ModflowGwfdrn elif pkg == "RIV": cl = flopy.mf6.ModflowGwfriv elif pkg == "GHB": cl = flopy.mf6.ModflowGwfghb else: raise (ValueError(f"Unknown package: {pkg}")) package = cl( gwf, stress_period_data={0: spd}, boundnames=boundname_column is not None, auxmultname="cond_fact", auxiliary=["cond_fact"], observations=observations, **kwargs, ) # add timeseries for the seasons 'winter' and 'summer' add_season_timeseries( ds, package, summer_months=summer_months, winter_name="winter", summer_name="summer", start_summer=start_summer, start_winter=start_winter, filename=season_filename, ) return package
[docs] def add_season_timeseries( ds, package, summer_months=(4, 5, 6, 7, 8, 9), filename="season.ts", summer_name="summer", winter_name="winter", start_summer=None, start_winter=None, ): """Add time series indicating which season is active (e.g. summer/winter). Parameters ---------- ds : xr.Dataset Xarray dataset used for time discretization. package : flopy.mf6 package Modflow 6 package to add time series to. summer_months : tuple, optional The summer months (one-based). The parameter summer_months is used to calculate start_summer or start_winter, when they are None. The default is (4, 5, 6, 7, 8, 9), so from april to september. filename : str, optional The name of time series file. The default is "season.ts". winter_name : str, optional The name of the time-series with ones in winter. The default is "winter". summer_name : str, optional The name of the time-series with ones in summer. The default is "summer". start_summer : str, optional A string with the month and day of the start of summer (one-based), seperated by '-'. For example '4-1' for april 1st. When start_summer is None it is calculated from the parameter 'summer_months'. The default is None. start_winter : str, optional A string with the month and day of the start of winter (one-based), seperated by '-'. For example '10-1' for october 1st. When start_winter is None it is calculated from the parameter 'summer_months'. The default is None. """ if ds.time.dtype.kind != "M": raise TypeError("add_season_timeseries requires a datetime64[ns] time index") start_summer, start_winter = _get_start_summer_and_winter( start_summer, start_winter, summer_months ) tmin = pd.to_datetime(ds.time.start) if _is_in_summer(tmin, start_summer, start_winter): ts_data = [(0.0, 0.0, 1.0)] else: ts_data = [(0.0, 1.0, 0.0)] tmax = pd.to_datetime(ds["time"].data[-1]) years = range(tmin.year, tmax.year + 1) for year in years: # add a record for the start of summer time = pd.Timestamp(f"{year}-{start_summer}") time = (time - tmin) / pd.to_timedelta(1, "D") if time > 0: ts_data.append((time, 1.0, 0.0)) # add a record for the start of winter time = pd.Timestamp(f"{year}-{start_winter}") time = (time - tmin) / pd.to_timedelta(1, "D") if time > 0: ts_data.append((time, 0.0, 1.0)) return package.ts.initialize( filename=filename, timeseries=ts_data, time_series_namerecord=[summer_name, winter_name], interpolation_methodrecord=["stepwise", "stepwise"], )
[docs] def _get_start_summer_and_winter(start_summer, start_winter, summer_months): if start_summer is None: start_summer = f"{summer_months[0]}-1" if start_winter is None: start_winter = f"{summer_months[-1] + 1}-1" return start_summer, start_winter
[docs] def _is_in_summer(time, start_summer, start_winter): after_start_summer = time >= pd.Timestamp(f"{time.year}-{start_summer}") before_start_winter = time < pd.Timestamp(f"{time.year}-{start_winter}") return after_start_summer and before_start_winter
[docs] def get_seaonal_timeseries( ds, summer_value, winter_value, summer_months=(4, 5, 6, 7, 8, 9), start_summer=None, start_winter=None, ): """ Get a time-series of winter-values and summer-values. The index of the returned series is set at the start of the period that the value describes. This is the way that flopy uses this value in a timeseries, when interpolation_methodrecord is set to 'stepwise'. Parameters ---------- ds : xr.Dataset Xarray dataset used for time discretization. summer_value : float The value to be used in summer. winter_value : float The value to be used in winter. summer_months : tuple, optional summer months (one-based). The parameter summer_months is used to calculate start_summer or start_winter, when they are None. The default is (4, 5, 6, 7, 8, 9), so from april to september. start_summer : str, optional A string with the month and day of the start of summer (one-based), seperated by '-'. For example '4-1' for april 1st. When start_summer is None it is calculated from the parameter 'summer_months'. The default is None. start_winter : str, optional A string with the month and day of the start of winter (one-based), seperated by '-'. For example '10-1' for october 1st. When start_winter is None it is calculated from the parameter 'summer_months'. The default is None. Returns ------- s : pd.Series A series with stages as the values and time as the index. """ start_summer, start_winter = _get_start_summer_and_winter( start_summer, start_winter, summer_months ) tmin = pd.to_datetime(ds.time.start) index = [tmin] if _is_in_summer(tmin, start_summer, start_winter): values = [summer_value] else: values = [winter_value] tmax = pd.to_datetime(ds["time"].data[-1]) years = range(tmin.year, tmax.year + 1) for year in years: # add a record for the start of summer time = pd.Timestamp(f"{year}-{start_summer}") if time > tmin: index.append(time) values.append(summer_value) # add a record for the start of winter time = pd.Timestamp(f"{year}-{start_winter}") if time > tmin: index.append(time) values.append(winter_value) s = pd.Series(values, index=index) return s
[docs] def rivdata_from_xylist(gwf, xylist, layer, stage, cond, rbot, aux=None): gi = flopy.utils.GridIntersect(gwf.modelgrid) df = gi.intersect(xylist, shapetype="linestring", geo_dataframe=True) riv_data = [] if "row" in df.columns: # structured grid for row, col in zip(df["row"], df["col"]): idata = [(layer, row, col), stage, cond, rbot] if aux is not None: idata.append(aux) riv_data.append(idata) else: # unstructured grid for cid in df["cellid"]: idata = [(layer, cid), stage, cond, rbot] if aux is not None: idata.append(aux) riv_data.append(idata) return riv_data