Source code for nlmod.gwf.lake

import logging

import flopy
import numpy as np
import pandas as pd

from ..dims.layers import get_first_active_layer

logger = logging.getLogger(__name__)

LAKE_KWDS = [
    "STATUS",
    "STAGE",
    "RAINFALL",
    "EVAPORATION",
    "RUNOFF",
    "INFLOW",
    "WITHDRAWAL",
    "AUXILIARY",
    "RATE",
    "INVERT",
    "WIDTH",
    "SLOPE",
    "ROUGH",
]

# order of dictionary matters!
OUTLET_DEFAULT = {
    "couttype": "WEIR",
    "outlet_invert": "use_elevation",
    "outlet_width": 1.0,
    "outlet_rough": 0.0,
    "outlet_slope": 0.0,
}


[docs] def lake_from_gdf( gwf, gdf, ds, rainfall=None, evaporation=None, claktype="VERTICAL", boundname_column="identificatie", obs_type="STAGE", surfdep=0.05, pname="lak", gwt=None, obs_type_gwt="CONCENTRATION", rainfall_concentration=0.0, evaporation_concentration=0.0, **kwargs, ): """Add a lake from a geodataframe. Parameters ---------- gwf : flopy.mf6.ModflowGwf groundwater flow model. gdf : gpd.GeoDataframe geodataframe with the cellids as the index and the columns: lakeno : with the number of the lake strt : with the starting head of the lake clake : with the bed resistance of the lake optional columns are 'STATUS', 'STAGE', 'RAINFALL', 'EVAPORATION', 'RUNOFF', 'INFLOW', 'WITHDRAWAL', 'AUXILIARY', 'RATE', 'INVERT', 'WIDTH', 'SLOPE', 'ROUGH'. These columns should contain the name of a dataarray in ds with the dimension time. if the lake has any outlets they should be specified in the column lakeout : the lake number of the outlet, if this is -1 the water is removed from the model. optinal columns are 'couttype', 'outlet_invert', 'outlet_width', 'outlet_rough' and 'outlet_slope'. These columns should contain a unique value for each outlet. ds : xr.Dataset dataset containing relevant model grid and time information rainfall : int, float, str, np.array or pd.DataFrame, optional The rainfall to be applied on the lakes. If rainfall is a DataFrame, there should be one column for each lake, with lakeno as the column index, or the boundnames if boundname_column is specified. There should be one row for each stress period. To generate rainfall and evaporation from a model dataset, the method `clip_meteorological_data_from_ds` can be used. If rainfall is not a DataFrame, all lakes have the same rainfall-values. When rainfall is a pandas Series or a numpy array, a value for rainfall for each stress period is taken from this series/array. When rainfall is a float, it is directly used as the rainfall-value. The default is None. evaporation : int, float, str, np.array or pd.DataFrame, optional The evaporation to be applied on the lakes. If evaporation is a DataFrame, there should be one column for each lake, with lakeno as the column index, or the boundnames if boundname_column is specified. There should be one row for each stress period. To generate rainfall and evaporation from a model dataset, the method `clip_meteorological_data_from_ds` can be used. If evaporation is not a DataFrame, all lakes have the same evaporation-values. When evaporation is a pandas Series or a numpy array, a value for evaporation for each stress period is taken from this series/array. When evaporation is a float, it is directly used as the evaporation-value. The default is None. claktype : str, optional defines the lake-GWF connection type. For now only VERTICAL is supported. The default is 'VERTICAL'. boundname_column : str, optional The name of the column in gdf to use for the boundnames. The default is "identificatie", which is a unique identifier in the BGT. surfdep : float, optional Defines the surface depression depth for VERTICAL lake-GWF connections. The default is 0.05. pname : str, optional name of the lake package. The default is 'lak'. obs_type : str or list of str, optional observation or list of observations to add to the lake package. The default is 'STAGE'. obs_type_gwt : str or list of str, optional observation or list of observations to add to the lake transport package. The default is 'CONCENTRATION'. **kwargs : passed to flopy.mf6.ModflowGwflak. Raises ------ NotImplementedError Returns ------- lak : flopy lake package """ if claktype != "VERTICAL": raise NotImplementedError("function only tested for claktype=VERTICAL") if ds.gridtype != "vertex": raise NotImplementedError("only works with a vertex grid") assert ds.time.time_units.lower() == "days", "expected time unit days" time_conversion = 86400.0 # length unit is always meters in nlmod # TODO: Let's add a check for a length unit of meters if we ever add a length unit # to ds length_conversion = 1.0 packagedata = [] connectiondata = [] perioddata = {} for iper in range(ds.sizes["time"]): perioddata[iper] = [] if gwt is not None: packagedata_gwt = [] perioddata_gwt = {0: []} lake_settings = [setting for setting in LAKE_KWDS if setting in gdf.columns] outlets = [] outlet_no = 0 fal = get_first_active_layer(ds).data if "lakeno" not in gdf.columns: gdf = add_lakeno_to_gdf(gdf, boundname_column) for lakeno, lake_gdf in gdf.groupby("lakeno"): nlakeconn = lake_gdf.shape[0] if "strt" in lake_gdf: strt = _get_and_check_single_value(lake_gdf, "strt") else: # take the mean of the starting concentrations of the connected cells head = ds["starting_head"].data[fal[lake_gdf.index], lake_gdf.index] area = ds["area"].data[lake_gdf.index] strt = (head * area).sum() / area.sum() if boundname_column is not None: boundname = _get_and_check_single_value(lake_gdf, f"{boundname_column}") packagedata.append([lakeno, strt, nlakeconn, boundname]) else: packagedata.append([lakeno, strt, nlakeconn]) iconn = 0 for icell2d, row in lake_gdf.iterrows(): cellid = (fal[icell2d], icell2d) # assuming lake in the top layer # If BEDLEAK is specified to be NONE, the lake-GWF connection # conductance is solely a function of aquifer properties in the # connected GWF cell and lakebed sediments are assumed to be absent. clake = row["clake"] bedleak = 1 / clake belev = 0.0 # Any value can be specified if CLAKTYPE is VERTICAL telev = 0.0 # Any value can be specified if CLAKTYPE is VERTICAL connlen = 0.0 # Any value can be specified if CLAKTYPE is VERTICAL connwidth = 0.0 # Any value can be specified if CLAKTYPE is VERTICAL connectiondata.append( [ lakeno, iconn, cellid, claktype, bedleak, belev, telev, connlen, connwidth, ] ) iconn += 1 # add outlets to lake if ( "lakeout" in lake_gdf.columns and not lake_gdf["lakeout"].isna().all() and not lake_gdf["lakeout"].eq("").all() ): lakeout = _get_and_check_single_value(lake_gdf, "lakeout") if isinstance(lakeout, str): # when lakeout is a string, it represents the boundname # we need to find the lakeno that belongs to this boundname boundnameout = lakeout if boundname_column not in gdf.columns: raise KeyError( f"Make sure column {boundname_column} is present in gdf" ) mask = gdf[boundname_column] == boundnameout lakeout = gdf.loc[mask, "lakeno"].iloc[0] if not (gdf.loc[mask, "lakeno"] == lakeout).all(): raise ValueError( f"expected single value of lakeno for lakeout {boundnameout}, got {gdf.loc[mask, 'lakeno']}" ) assert lakeno != lakeout, "lakein and lakeout cannot be the same" outsettings = [] for outset, default_value in OUTLET_DEFAULT.items(): if outset not in lake_gdf.columns: logger.debug( f"no value specified for {outset} and lake no {lakeno}, using default value {default_value}" ) setval = default_value else: setval = lake_gdf[outset].iloc[0] if pd.notna(setval): if not (lake_gdf[outset] == setval).all(): raise ValueError( f"expected single data variable for {outset} and lake number {lakeno}, got {lake_gdf[outset]}" ) else: # setval is nan or None setval = default_value logger.debug( f"no value specified for {outset} and lake no {lakeno}, using default value {default_value}" ) if outset == "outlet_invert" and isinstance(setval, str): # setval can be the name of a timeseries # only when it is equal to "use_elevation" we set the invert to strt if setval == "use_elevation": setval = strt outsettings.append(setval) outlets.append([outlet_no, lakeno, lakeout] + outsettings) outlet_no += 1 if boundname_column is None: key = lakeno else: key = boundname for iper in range(ds.sizes["time"]): if rainfall is not None: value = _parse_laksetting_value(rainfall, ds, key, iper) perioddata[iper].append([lakeno, "RAINFALL", value]) if evaporation is not None: value = _parse_laksetting_value(evaporation, ds, key, iper) perioddata[iper].append([lakeno, "EVAPORATION", value]) # add other time variant settings to lake for lake_setting in lake_settings: datavar = _get_and_check_single_value(lake_gdf, lake_setting) if pd.isna(datavar) or datavar == "": # None or nan or "" logger.debug(f"no {lake_setting} given for lake no {lakeno}") continue perioddata[iper].append( [lakeno, lake_setting, ds[datavar].values[iper]] ) if gwt is not None: if "strt_concentration" in lake_gdf.columns: strt = _get_and_check_single_value(lake_gdf, "strt_concentration") else: # take the mean of the starting concentrations of the connected cells conc = ds["chloride"].data[fal[lake_gdf.index], lake_gdf.index] area = ds["area"].data[lake_gdf.index] strt = (conc * area).sum() / area.sum() if boundname_column is not None: packagedata_gwt.append([lakeno, strt, boundname]) else: packagedata_gwt.append([lakeno, strt]) if rainfall is not None: perioddata_gwt[0].append([lakeno, "rainfall", rainfall_concentration]) if evaporation is not None: perioddata_gwt[0].append( [lakeno, "evaporation", evaporation_concentration] ) if boundname_column is not None: observations = {} if isinstance(obs_type, str): obs_type = [obs_type] for otype in obs_type: obs_list = [(x, otype, x) for x in np.unique(gdf[boundname_column])] observations[f"{pname}_{otype}.csv"] = obs_list else: observations = None boundnames = boundname_column is not None lak = flopy.mf6.ModflowGwflak( gwf, surfdep=surfdep, time_conversion=time_conversion, length_conversion=length_conversion, nlakes=len(packagedata), packagedata=packagedata, connectiondata=connectiondata, perioddata=perioddata, boundnames=boundnames, observations=observations, budget_filerecord=f"{pname}.bgt", stage_filerecord=f"{pname}.hds", noutlets=len(outlets) if outlets else None, outlets=outlets if outlets else None, pname=pname, **kwargs, ) if gwt is not None: if boundname_column is not None: observations_gwt = {} if isinstance(obs_type_gwt, str): obs_type_gwt = [obs_type_gwt] for otype in obs_type_gwt: obs_list_gwt = [(x, otype, x) for x in np.unique(gdf[boundname_column])] observations_gwt[f"{pname}_{otype}.gwt.csv"] = obs_list_gwt else: observations_gwt = None lkt = flopy.mf6.ModflowGwtlkt( gwt, pname=lak.package_name, packagedata=packagedata_gwt, lakeperioddata=perioddata_gwt, boundnames=boundnames, observations=observations_gwt, budget_filerecord=f"{pname}_gwt.bgt", concentration_filerecord=f"{pname}_gwt.unc", ) return lak, lkt return lak
[docs] def _get_and_check_single_value(lake_gdf, column): value = lake_gdf[column].iloc[0] if lake_gdf[column].isna().all() or lake_gdf[column].eq("").all(): return value if not (lake_gdf[column] == value).all(): raise (AssertionError(f"A single lake should have a single {column}")) return value
[docs] def _parse_laksetting_value(value, ds, key, iper): if isinstance(value, (float, int, str)): return value elif isinstance(value, pd.Series): assert len(value.index) == len(ds.time) and (value.index == ds.time).all() return value.iloc[iper] elif isinstance(value, pd.DataFrame): assert len(value.index) == len(ds.time) and (value.index == ds.time).all() return value[key].iloc[iper] else: assert len(value) == len(ds.time) return value[iper]
[docs] def add_lakeno_to_gdf(gdf, boundname_column): if boundname_column not in gdf.columns: raise (KeyError(f"Cannot find column {boundname_column} in gdf")) names = gdf[boundname_column].unique() gdf["lakeno"] = None for lakeno, name in enumerate(names): mask = gdf[boundname_column] == name gdf.loc[mask, "lakeno"] = lakeno return gdf
[docs] def _copy_da_from_ds(gdf, ds, variable, boundname_column=None, set_to_0_in_ds=False): if boundname_column is None: columns = gdf["lakeno"].unique() else: columns = gdf[boundname_column].unique() df = pd.DataFrame(index=ds.time, columns=columns) for column in columns: if boundname_column is None: mask = gdf["lakeno"] == column else: mask = gdf[boundname_column] == column cellids = gdf.index[mask] area = ds["area"].loc[cellids] if "time" in ds[variable].dims: da_cells = ds[variable].loc[:, cellids].copy() if set_to_0_in_ds: ds[variable][:, cellids] = 0.0 # calculate thea area-weighted mean df[column] = (da_cells * area).sum("icell2d") / area.sum() else: da_cells = ds[variable].loc[cellids].copy() if set_to_0_in_ds: ds[variable][cellids] = 0.0 # calculate thea area-weighted mean df[column] = float((da_cells * area).sum("icell2d") / area.sum()) return df
[docs] def copy_meteorological_data_from_ds( gdf, ds, boundname_column=None, set_to_0_in_ds=False ): """ Copy meteorlogical data from the model dataset, and return rainfall and evaporation. This method retrieves the values of rainfall and evaporation from a model Dataset. It uses the 'recharge'variable, and optionally the 'evaporation'-variable, and returns a rainfall- and evaporation-DataFrame. These dataframes contain input for each of the lakes. The columns of this DataFrame are either the boundnames (when boundname_column is specified) or the lake-number (lakeno). Parameters ---------- gdf : gpd.GeoDataframe geodataframe with the cellids as the index ds : xr.Dataset dataset containing relevant model grid and time information boundname_column : str, optional The name of the column in gdf to use for the boundnames. When boundname_column is None, the lake-number (lakeno) is used to determine which rows in gdf belong to each lake, and the columns of rainfall and evaporation are set by the lake-number. If boundname_column is not None, the boundnames are used instead of the lake-number. The default is None. set_to_0_in_ds : bool, optional If True, sets the meteorological data to 0 in ds, which is not recommended (see issue https://github.com/gwmod/nlmod/issues/497). The default is False. Returns ------- rainfall : pd.DataFrame The rainfall of each lake (columns) in time (index). evaporation : pd.DataFrame The evaporation of each lake (columns) in time (index). """ if "evaporation" in ds: rainfall = _copy_da_from_ds( gdf, ds, "recharge", boundname_column=boundname_column, set_to_0_in_ds=set_to_0_in_ds, ) evaporation = _copy_da_from_ds( gdf, ds, "evaporation", boundname_column=boundname_column, set_to_0_in_ds=set_to_0_in_ds, ) else: recharge = _copy_da_from_ds( gdf, ds, "recharge", boundname_column=boundname_column, set_to_0_in_ds=set_to_0_in_ds, ) rainfall = recharge.where(recharge > 0.0, 0.0) evaporation = -recharge.where(recharge < 0.0, 0.0) return rainfall, evaporation
[docs] def clip_meteorological_data_from_ds( gdf, ds, boundname_column=None, ): """ Clip meteorlogical data from the model dataset, and return rainfall and evaporation. This method retrieves the values of rainfall and evaporation from a model Dataset. It uses the 'recharge'variable, and optionally the 'evaporation'-variable, and returns a rainfall- and evaporation-DataFrame. These dataframes contain input for each of the lakes. The columns of this DataFrame are either the boundnames (when boundname_column is specified) or the lake-number (lakeno). clip_meteorological_data_from_ds sets the meteorological data in the model detaset to 0. It turns out MODFLOW 6 already does this, and so this can create problemsm, see https://github.com/gwmod/nlmod/issues/497. Therefore, this method is deprected, and replaced by copy_meteorological_data_from_ds. Parameters ---------- gdf : gpd.GeoDataframe geodataframe with the cellids as the index ds : xr.Dataset dataset containing relevant model grid and time information boundname_column : str, optional The name of the column in gdf to use for the boundnames. When boundname_column is None, the lake-number (lakeno) is used to determine which rows in gdf belong to each lake, and the columns of rainfall and evaporation are set by the lake-number. If boundname_column is not None, the boundnames are used instead of the lake-number. The default is None. Returns ------- rainfall : pd.DataFrame The rainfall of each lake (columns) in time (index). evaporation : pd.DataFrame The evaporation of each lake (columns) in time (index). """ logger.warning( "`clip_meteorological_data_from_ds` is deprecated and replaced by " "`copy_meteorological_data_from_ds`. See " "https://github.com/gwmod/nlmod/issues/497." ) return copy_meteorological_data_from_ds( gdf, ds, boundname_column=boundname_column, set_to_0_in_ds=True )