Source code for nlmod.gwf.output

import logging
import warnings

import flopy
import numpy as np
import pandas as pd
import xarray as xr

from ..dims.grid import get_icell2d_from_xy, get_row_col_from_xy
from ..mfoutput.mfoutput import (
    _get_budget_da,
    _get_flopy_data_object,
    _get_grb_file,
    _get_heads_da,
    _get_time_index,
)

logger = logging.getLogger(__name__)


[docs] def get_headfile(ds=None, gwf=None, fname=None, grb_file=None, **kwargs): """Get flopy HeadFile object. Provide one of ds, gwf or fname. Parameters ---------- ds : xarray.Dataset, optional model dataset, by default None gwf : flopy.mf6.ModflowGwf, optional groundwater flow model, by default None fname : str, optional path to heads file, by default None grb_file : str path to file containing binary grid information Returns ------- flopy.utils.HeadFile HeadFile object handle """ return _get_flopy_data_object("head", ds, gwf, fname, grb_file, **kwargs)
[docs] def get_heads_da( ds=None, gwf=None, fname=None, grb_file=None, delayed=False, chunked=False, precision="auto", **kwargs, ): """Read binary heads file. Parameters ---------- ds : xarray.Dataset Xarray dataset with model data. gwf : flopy ModflowGwf Flopy groundwaterflow object. fname : path, optional path to a binary heads file grb_file : str, optional path to file containing binary grid information, only needed if reading output from file using fname delayed : bool, optional if delayed is True, do not load output data into memory, default is False. chunked : bool, optional chunk data array containing output, default is False. precision : str, optional precision of floating point data in the head-file. Accepted values are 'auto', 'single' or 'double'. When precision is 'auto', it is determined from the head-file. Default is 'auto'. Returns ------- da : xarray.DataArray heads data array. """ hobj = get_headfile( ds=ds, gwf=gwf, fname=fname, grb_file=grb_file, precision=precision ) # gwf.output.head() defaults to a structured grid if gwf is not None and ds is None and fname is None: kwargs["modelgrid"] = gwf.modelgrid da = _get_heads_da(hobj, name="head", **kwargs) da.attrs["units"] = "m NAP" # set time index if ds/gwf are provided if ds is not None or gwf is not None: da["time"] = _get_time_index(hobj, ds=ds, gwf_or_gwt=gwf) if ds is not None: da["layer"] = ds.layer if chunked: # chunk data array da = da.chunk("auto") if not delayed: # load into memory da = da.compute() return da
[docs] def get_cellbudgetfile(ds=None, gwf=None, fname=None, grb_file=None, **kwargs): """Get flopy CellBudgetFile object. Provide one of ds, gwf or fname. Parameters ---------- ds : xarray.Dataset, optional model dataset, by default None gwf : flopy.mf6.ModflowGwf, optional groundwater flow model, by default None fname_cbc : str, optional path to cell budget file, by default None grb_file : str, optional path to file containing binary grid information, only needed if fname_cbc is passed as only argument. Returns ------- flopy.utils.CellBudgetFile CellBudgetFile object handle """ return _get_flopy_data_object("budget", ds, gwf, fname, grb_file, **kwargs)
[docs] def get_budget_da( text, ds=None, gwf=None, fname=None, grb_file=None, column="q", delayed=False, chunked=False, precision="auto", **kwargs, ): """Read binary budget file. Parameters ---------- text : str record to get from budget file ds : xarray.Dataset, optional xarray dataset with model data. One of ds or gwf must be provided. gwf : flopy ModflowGwf, optional Flopy groundwaterflow object. One of ds or gwf must be provided. fname : path, optional specify the budget file to load, if not provided budget file will be obtained from ds or gwf. grb_file : str path to file containing binary grid information, only needed if reading output from file using fname column : str name of column in rec-array to read, default is 'q' which contains the fluxes for most budget datasets. delayed : bool, optional if delayed is True, do not load output data into memory, default is False. chunked : bool, optional chunk data array containing output, default is False. precision : str, optional precision of floating point data in the budget-file. Accepted values are 'auto', 'single' or 'double'. When precision is 'auto', it is determined from the budget-file. Default is 'auto'. Returns ------- da : xarray.DataArray budget data array. """ cbcobj = get_cellbudgetfile( ds=ds, gwf=gwf, fname=fname, grb_file=grb_file, precision=precision ) da = _get_budget_da(cbcobj, text, column=column, **kwargs) da.attrs["units"] = "m3/d" # set time index if ds/gwt are provided if ds is not None or gwf is not None: da["time"] = _get_time_index(cbcobj, ds=ds, gwf_or_gwt=gwf) if ds is not None: da["layer"] = ds.layer if chunked: # chunk data array da = da.chunk("auto") if not delayed: # load into memory da = da.compute() return da
[docs] def get_gwl_from_wet_cells(head, layer="layer", botm=None): """Get the groundwater level from a multi-dimensional head array where dry cells are NaN. This methods finds the most upper non-nan-value of each cell or timestep. Parameters ---------- head : xarray.DataArray or numpy array A multi-dimensional array of head values. NaN-values represent inactive or dry cells. layer : string or int, optional The name of the layer dimension of head (if head is a DataArray) or the integer of the layer dimension of head (if head is a numpy array). The default is 'layer'. botm : xarray.DataArray, optional A DataArray with the botm of each model-cell. It can be used to set heads below the botm of the cells to NaN. botm is only used when head is a DataArray. Returns ------- gwl : numpy-array An array of the groundwater-level, without the layer-dimension. """ if isinstance(head, xr.DataArray): head_da = head if botm is not None: head_da = head_da.where(head_da > botm) head = head_da.data if isinstance(layer, str): layer = head_da.dims.index(layer) else: head_da = None # take the first non-nan value along the layer dimension (1) top_layer = np.expand_dims(np.isnan(head).argmin(layer), layer) gwl = np.take_along_axis(head, top_layer, axis=layer) gwl = np.take(gwl, 0, axis=layer) if head_da is not None: dims = list(head_da.dims) dims.pop(layer) coords = dict(head_da.coords) # store the layer in which the groundwater level is of each cell and time top_layer = np.take(top_layer, 0, axis=layer) coords["layer"] = (dims, head_da.layer.data[top_layer]) gwl = xr.DataArray(gwl, dims=dims, coords=coords) return gwl
[docs] def get_flow_residuals(ds, gwf=None, fname=None, grb_file=None, kstpkper=None): """Get the flow residuals of a MODFLOW 6 simulation. Parameters ---------- ds : xarray.Dataset Xarray dataset with model data. gwf : flopy ModflowGwf, optional Flopy groundwaterflow object. One of ds or gwf must be provided. fname : path, optional specify the budget file to load, if not provided budget file will be obtained from ds or gwf. grb_file : str The location of the grb-file. grb_file is determied from ds when None. The default is None. kstpkper : tuple of 2 ints, optional The index of the timestep and the stress period to include in the result. Include all data in the budget-file when None. The default is None. Returns ------- da : xr.DataArray The flow residual in each cell, in m3/d. """ if grb_file is None: grb_file = _get_grb_file(ds) grb = flopy.mf6.utils.MfGrdFile(grb_file) cbf = get_cellbudgetfile(ds=ds, gwf=gwf, fname=fname, grb_file=grb_file) dims = ds["botm"].dims coords = ds["botm"].coords flowja = cbf.get_data(text="FLOW-JA-FACE", kstpkper=kstpkper) mask_active = np.diff(grb.ia) > 0 flowja_index = grb.ia[:-1][mask_active] if kstpkper is None: # loop over all timesteps/stress-periods residuals = [] for iflowja in flowja: # residuals.append(flopy.mf6.utils.get_residuals(iflowja, grb_file)) # use our own faster method instead of a for loop: residual = np.full(grb.shape, np.nan) residual.ravel()[mask_active] = iflowja.flatten()[flowja_index] residuals.append(residual) dims = ("time",) + dims coords = dict(coords) | {"time": _get_time_index(cbf, ds)} else: # residuals = flopy.mf6.utils.get_residuals(flowja[0], grb_file) # use our own faster method instead of a for loop: residuals = np.full(grb.shape, np.nan) residuals.ravel()[mask_active] = flowja[0].flatten()[flowja_index] da = xr.DataArray(residuals, dims=dims, coords=coords) return da
[docs] def get_flow_lower_face( ds, gwf=None, fname=None, grb_file=None, kstpkper=None, lays=None ): """Get the flow over the lower face of all model cells. The flow Lower Face (flf) used to be written to the budget file in previous versions of MODFLOW. In MODFLOW 6 we determine these flows from the flow-ja-face-records. Parameters ---------- ds : xarray.Dataset Xarray dataset with model data. gwf : flopy ModflowGwf, optional Flopy groundwaterflow object. One of ds or gwf must be provided. fname : path, optional specify the budget file to load, if not provided budget file will be obtained from ds or gwf. grb_file : str, optional The location of the grb-file. grb_file is determied from ds when None. The default is None. kstpkper : tuple of 2 ints, optional The index of the timestep and the stress period to include in the result. Include all data in the budget-file when None. The default is None. lays : int or list of ints, optional The layers to include in the result. When lays is None, all layers are included. The default is None. Returns ------- da : xr.DataArray The flow over the lower face of each cell, in m3/d. """ if grb_file is None: grb_file = _get_grb_file(ds) cbf = get_cellbudgetfile(ds=ds, gwf=gwf, fname=fname, grb_file=grb_file) flowja = cbf.get_data(text="FLOW-JA-FACE", kstpkper=kstpkper) if ds.gridtype == "vertex": # determine flf_index first grb = flopy.mf6.utils.MfGrdFile(grb_file) if lays is None: lays = range(grb.nlay) if isinstance(lays, int): lays = [lays] shape = (len(lays), len(ds.icell2d)) flf_index = np.full(shape, -1) # get these properties outside of the for loop to increase speed grb_ia = grb.ia grb_ja = grb.ja for ilay, lay in enumerate(lays): ja_start_next_layer = (lay + 1) * grb.ncpl for icell2d in range(grb.ncpl): node = lay * grb.ncpl + icell2d ia = np.arange(grb_ia[node], grb_ia[node + 1]) mask = grb_ja[ia] >= ja_start_next_layer if mask.any(): # assert mask.sum() == 1 flf_index[ilay, icell2d] = int(ia[mask][0]) coords = ds["botm"][lays].coords else: coords = ds["botm"].coords dims = ds["botm"].dims if kstpkper is None: # loop over all tiesteps/stress-periods flfs = [] for iflowja in flowja: if ds.gridtype == "vertex": flf = np.full(shape, np.nan) mask = flf_index >= 0 flf[mask] = iflowja[0, 0, flf_index[mask]] else: _, _, flf = flopy.mf6.utils.get_structured_faceflows(iflowja, grb_file) flfs.append(flf) dims = ("time",) + dims coords = dict(coords) | {"time": _get_time_index(cbf, ds)} else: if ds.gridtype == "vertex": flfs = np.full(shape, np.nan) mask = flf_index >= 0 flfs[mask] = flowja[0][0, 0, flf_index[mask]] else: _, _, flfs = flopy.mf6.utils.get_structured_faceflows(flowja[0], grb_file) da = xr.DataArray(flfs, dims=dims, coords=coords) if ds.gridtype != "vertex" and lays is not None: da = da.isel(layer=lays) return da
[docs] def get_head_at_point(head, x, y, ds=None, gi=None, drop_nan_layers=True, rotated=True): """Get the head at a certain point from a head DataArray for all cells. Parameters ---------- head : xarray.DataArray A DataArray of heads, with dimensions (time, layer, y, x) or (time, layer, icell2d). x : float The x-coordinate of the requested head. y : float The y-coordinate of the requested head. ds : xarray.Dataset, optional Xarray dataset with model data. Only used when a Vertex grid is used. The default is None. gi : flopy.utils.GridIntersect, optional A GridIntersect class, to determine the cell at point x,y. Only used when a Vertex grid is used, and it is determined from ds when None. The default is None. drop_nan_layers : bool, optional Drop layers that are NaN at all timesteps. The default is True. rotated : bool, optional If the model grid has a rotation, and rotated is False, x and y are in model coordinates. Otherwise x and y are in real world coordinates. The default is True. Returns ------- head_point : xarray.DataArray A DataArray with dimensions (time, layer). """ if "icell2d" in head.dims: if ds is None: raise (ValueError("Please supply ds for a vertex grid")) icell2d = get_icell2d_from_xy(x, y, ds=ds, gi=gi, rotated=rotated) head_point = head.sel(icell2d=icell2d) else: row, col = get_row_col_from_xy(x, y, ds=ds, rotated=rotated) head_point = head.sel(x=ds.x[col], y=ds.y[row]) if drop_nan_layers: # only keep layers that are active at this location head_point = head_point[:, ~head_point.isnull().all("time")] return head_point
[docs] def _calculate_gxg( head_bimonthly: xr.DataArray, below_surfacelevel: bool = False ) -> xr.DataArray: import bottleneck as bn # Most efficient way of finding the three highest and three lowest is via a # partition. See: # https://bottleneck.readthedocs.io/en/latest/reference.html#bottleneck.partition def lowest3_mean(da: xr.DataArray): a = bn.partition(da.values, kth=2, axis=-1) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) result = np.nanmean(a[..., :3], axis=-1) template = da.isel(bimonth=0) return template.copy(data=result) def highest3_mean(da: xr.DataArray): a = bn.partition(-da.values, kth=2, axis=-1) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) result = np.nanmean(-a[..., :3], axis=-1) template = da.isel(bimonth=0) return template.copy(data=result) timesize = head_bimonthly["time"].size if timesize % 24 != 0: raise ValueError("head is not bimonthly for a full set of years") n_year = int(timesize / 24) # First and second date of March: 4, 5; first date of April: 6. month_index = np.array([4, 5, 6]) # Repeat this for every year in dataset, and increment by 24 per repetition. yearly_increments = (np.arange(n_year) * 24)[:, np.newaxis] # Broadcast to a full set gvg_index = xr.DataArray( data=(month_index + yearly_increments), dims=("hydroyear", "bimonth") ) gvg_data = head_bimonthly.isel(time=gvg_index) # Filters years without 3 available measurments. gvg_years = gvg_data.count("bimonth") == 3 gvg_data = gvg_data.where(gvg_years) # Hydrological years: running from 1 April to 1 April in the Netherlands. # Increment run from April (6th date) to April (30th date) for every year. # Broadcast to a full set newdims = ("hydroyear", "bimonth") gxg_index = xr.DataArray( data=(np.arange(6, 30) + yearly_increments[:-1]), dims=newdims, ) gxg_data = head_bimonthly.isel(time=gxg_index) dims = [dim for dim in gxg_data.dims if dim not in newdims] dims.extend(newdims) gxg_data = gxg_data.transpose(*dims) # Filter years without 24 measurements. gxg_years = gxg_data.count("bimonth") == 24 gxg_data = gxg_data.where(gxg_years) # First compute LG3 and HG3 per hydrological year, then compute the mean over the total. if gxg_data.chunks is not None: # If data is lazily loaded/chunked, process data of one year at a time. gxg_data = gxg_data.chunk({"hydroyear": 1}) lg3 = xr.map_blocks(lowest3_mean, gxg_data, template=gxg_data.isel(bimonth=0)) hg3 = xr.map_blocks(highest3_mean, gxg_data, template=gxg_data.isel(bimonth=0)) else: # Otherwise, just compute it in a single go. lg3 = lowest3_mean(gxg_data) hg3 = highest3_mean(gxg_data) gxg = xr.Dataset() gxg["gvg"] = gvg_data.mean(("hydroyear", "bimonth")) ghg = hg3.mean("hydroyear") glg = lg3.mean("hydroyear") if below_surfacelevel: gxg["glg"] = ghg gxg["ghg"] = glg else: gxg["glg"] = glg gxg["ghg"] = ghg # Add the numbers of years used in the calculation gxg["n_years_gvg"] = gvg_years.sum("hydroyear") gxg["n_years_gxg"] = gxg_years.sum("hydroyear") return gxg
[docs] def calculate_gxg( head: xr.DataArray, below_surfacelevel: bool = False, tolerance: pd.Timedelta = pd.Timedelta(days=7), ) -> xr.DataArray: """Calculate GxG groundwater characteristics from head time series. GLG and GHG (average lowest and average highest groundwater level respectively) are calculated as the average of the three lowest (GLG) or highest (GHG) head values per Dutch hydrological year (april - april), for head values measured at a semi-monthly frequency (14th and 28th of every month). GVG (average spring groundwater level) is calculated as the average of groundwater level on 14th and 28th of March, and 14th of April. Supplied head values are resampled (nearest) to the 14/28 frequency. Hydrological years without all 24 14/28 dates present are discarded for glg and ghg. Years without the 3 dates for gvg are discarded. This method is copied from imod-python, and edited so that head-DataArray does not need to contain dimensions 'x' and 'y', so this method also works for refined grids. The original method can be found in: https://gitlab.com/deltares/imod/imod-python/-/blob/master/imod/evaluate/head.py Parameters ---------- head : xr.DataArray of floats Head relative to sea level, in m, or m below surface level if `below_surfacelevel` is set to True. Must have dimenstion 'time'. below_surfacelevel : boolean, optional, default: False. False (default) if heads are relative to a datum (e.g. sea level). If True, heads are taken as m below surface level. tolerance: pd.Timedelta, default: 7 days. Maximum time window allowed when searching for dates around the 14th and 28th of every month. Returns ------- gxg : xr.Dataset Dataset containing ``glg``: average lowest head, ``ghg``: average highest head, ``gvg``: average spring head, ``n_years_gvg``: numbers of years used for gvg, ``n_years_gxg``: numbers of years used for glg and ghg. Examples -------- Load the heads, and calculate groundwater characteristics for the simulation period: >>> import nlmod >>> head = nlmod.gwf.get_heads_da(ds) >>> gxg = nlmod.gwf.output.calculate_gxg(head) """ # if not head.dims == ("time", "y", "x"): # raise ValueError('Dimensions must be ("time", "y", "x")') if not np.issubdtype(head["time"].dtype, np.datetime64): raise ValueError("Time must have dtype numpy datetime64") # Reindex to GxG frequency date_range: every 14th and 28th of the month. start = f"{int(head['time'][0].dt.year)}-01-01" end = f"{int(head['time'][-1].dt.year)}-12-31" dates = pd.date_range(start=start, end=end, freq="SMS") + pd.DateOffset(days=13) head_bimonthly = head.reindex(time=dates, method="nearest", tolerance=tolerance) gxg = _calculate_gxg(head_bimonthly, below_surfacelevel) return gxg