import logging
import warnings
import flopy
import numpy as np
try:
import numba
_NUMBA_AVAILABLE = True
except ImportError:
numba = None
_NUMBA_AVAILABLE = False
import xarray as xr
from geopandas import GeoSeries, points_from_xy
from ..util import LayerError, _get_value_from_ds_datavar
from . import grid
from .resample import fillnan_da
from .shared import GridTypeDims
logger = logging.getLogger(__name__)
[docs]
def calculate_thickness(ds, top="top", bot="botm"):
"""Calculate thickness from dataset.
Parameters
----------
ds : xarray.Dataset
dataset containing information about top and bottom elevations
of layers
top : str, optional
name of data variable containing tops, by default "top"
bot : str, optional
name of data variable containing bottoms, by default "botm"
Returns
-------
thickness : xarray.DataArray
DataArray containing thickness information
"""
# calculate thickness
if ds[top].ndim == ds[bot].ndim and ds[top].ndim in [2, 3]:
if ds[top].shape[0] == ds[bot].shape[0]:
# top is 3D, every layer has top and bot
thickness = ds[top] - ds[bot]
else:
raise ValueError("3d top and bot should have same number of layers")
elif ds[top].ndim == (ds[bot].ndim - 1) and ds[top].ndim in [1, 2]:
if ds[top].shape[-1] == ds[bot].shape[-1]:
# top is only top of first layer
thickness = xr.zeros_like(ds[bot])
for lay, _ in enumerate(thickness):
if lay == 0:
thickness[lay] = ds[top] - ds[bot][lay]
else:
thickness[lay] = ds[bot][lay - 1] - ds[bot][lay]
else:
raise ValueError("2d top should have same last dimension as bot")
# subtracting floats can result in rounding errors. Mainly anoying for zero thickness layers.
thickness = thickness.where(~np.isclose(thickness, 0.0), 0.0)
if isinstance(ds[bot], xr.DataArray):
thickness.name = "thickness"
if hasattr(ds[bot], "long_name"):
thickness.attrs["long_name"] = "thickness"
if hasattr(ds[bot], "standard_name"):
thickness.attrs["standard_name"] = "thickness_of_layer"
if hasattr(ds[bot], "units"):
if ds[bot].units == "mNAP":
thickness.attrs["units"] = "m"
else:
thickness.attrs["units"] = ds[bot].units
return thickness
[docs]
def calculate_transmissivity(
ds, kh="kh", thickness="thickness", top="top", botm="botm"
):
"""Calculate the transmissivity (T) as the product of the horizontal conductance
(kh) and the thickness (D).
Parameters
----------
ds : xarray.Dataset
dataset containing information about top and bottom elevations
of layers
kh : str, optional
name of data variable containing horizontal conductivity, by default 'kh'
thickness : str, optional
name of data variable containing thickness, if this data variable does not exists
thickness is calculated using top and botm. By default 'thickness'
top : str, optional
name of data variable containing tops, only used to calculate thickness if not
available in dataset. By default "top"
botm : str, optional
name of data variable containing bottoms, only used to calculate thickness if not
available in dataset. By default "botm"
Returns
-------
T : xarray.DataArray
DataArray containing transmissivity (T). NaN where layer thickness is zero
"""
if thickness in ds:
thickness = ds[thickness]
else:
thickness = calculate_thickness(ds, top=top, bot=botm)
# nan where layer does not exist (thickness is 0)
thickness_nan = xr.where(thickness == 0, np.nan, thickness)
# calculate transmissivity
T = thickness_nan * ds[kh]
if hasattr(T, "long_name"):
T.attrs["long_name"] = "transmissivity"
if hasattr(T, "standard_name"):
T.attrs["standard_name"] = "T"
if hasattr(thickness, "units"):
if hasattr(ds[kh], "units"):
if ds[kh].units == "m/day" and thickness.units in ["m", "mNAP"]:
T.attrs["units"] = "m2/day"
else:
T.attrs["units"] = ""
else:
T.attrs["units"] = ""
return T
[docs]
def calculate_resistance(
ds, kv="kv", thickness="thickness", top="top", botm="botm", between_layers=None
):
"""Calculate vertical resistance (c) of model layers from the vertical conductivity
(kv) and the thickness.
Parameters
----------
ds : xarray.Dataset
dataset containing information about top and bottom elevations
of layers
kv : str, optional
name of data variable containing vertical conductivity, by default 'kv'
thickness : str, optional
name of data variable containing thickness, if this data variable does not exists
thickness is calculated using top and botm. By default 'thickness'
top : str, optional
name of data variable containing tops, only used to calculate thickness if not
available in dataset. By default "top"
botm : str, optional
name of data variable containing bottoms, only used to calculate thickness if not
available in dataset. By default "botm"
between_layers : bool, optional
If True, calculate the resistance between the layers, which MODFLOW uses to
calculate the flow. The resistance between two layers is then assigned to the
top layer, and the bottom model layer gets a resistance of infinity.
If False, calculate the resistance of the layers themselves. The default is
True. However, in a future version of nlmod the default will be changed to
False.
Returns
-------
c : xarray.DataArray
DataArray containing vertical resistance (c). NaN where layer thickness is zero
"""
if between_layers is None:
logger.warning(
(
"The default of between_layers=True in calculate_resistance is "
"deprecated and will be changed to False in a future version of nlmod. "
"Pass between_layers=True to retain current behavior or "
"between_layers=False to adopt the future default and silence "
"this warning."
)
)
between_layers = True # will be changed to False in future version of nlmod
if thickness in ds:
thickness = ds[thickness]
else:
thickness = calculate_thickness(ds, top=top, bot=botm)
if between_layers:
# nan where layer does not exist (thickness is 0)
thickness_nan = xr.where(thickness == 0, np.nan, thickness)
kv_nan = xr.where(thickness == 0, np.nan, ds[kv])
# backfill thickness and kv to get the right value for the layer below
thickness_bfill = thickness_nan.bfill(dim="layer")
kv_bfill = kv_nan.bfill(dim="layer")
# calculate resistance
c = xr.zeros_like(thickness)
for ilay in range(ds.sizes["layer"] - 1):
ctop = (thickness_nan.sel(layer=ds.layer[ilay]) * 0.5) / kv_nan.sel(
layer=ds.layer[ilay]
)
cbot = (thickness_bfill.sel(layer=ds.layer[ilay + 1]) * 0.5) / kv_bfill.sel(
layer=ds.layer[ilay + 1]
)
c[ilay] = ctop + cbot
c[ilay + 1] = np.inf
else:
c = thickness / ds[kv]
if hasattr(c, "long_name"):
c.attrs["long_name"] = "resistance"
if hasattr(c, "standard_name"):
c.attrs["standard_name"] = "c"
if hasattr(thickness, "units"):
if hasattr(ds[kv], "units"):
if ds[kv].units == "m/day" and thickness.units in ["m", "mNAP"]:
c.attrs["units"] = "day"
else:
c.attrs["units"] = ""
else:
c.attrs["units"] = ""
return c
[docs]
def split_layers_ds(
ds,
split_dict,
layer="layer",
top="top",
bot="botm",
return_reindexer=False,
start_suffix_at=1,
):
"""Split layers based in Dataset.
Parameters
----------
ds : xarray.Dataset
xarray Dataset containing information about layers (layers, top and bot)
split_dict : dict
dictionary with name (string) or index (integer) of layers to split as keys.
There are two options for the values of the dictionary, to indicate how to split
up layer: an iterable of factors. E.g. {'BXk1': [1, 3]} will split layer 'BXk1'
into 2 layers, with the first layer equal to 0.25 of the original thickness and
the second layer equal to 0.75 of the original thickness.
The second option would be to set the value to the number of layers to split the
layer into, e.g. {'BXk1': 2}, which is equal to {'BXk1': [0.5, 0.5]}.
layer : str, optional
name of layer dimension, by default 'layer'
top : str, optional
name of data variable containing top of layers, by default 'top'
bot : str, optional
name of data variable containing bottom of layers, by default 'botm'
return_reindexer : bool, optional
Return a dictionary that can be used to reindex variables from the original
layer-dimension to the new layer-dimension when True. The default is False.
start_suffix_at : int, optional
The suffix that the first splitted layer will receive, for layers that were
splitted into multiple sub-layers. The default is 1.
Returns
-------
ds : xarray.Dataset
Dataset with new tops and bottoms taking into account split layers, and filled
data for other variables.
"""
layers = list(ds.layer.data)
# Work on a shallow copy of split_dict
split_dict = split_dict.copy()
# do some input-checking on split_dict
for lay0 in list(split_dict):
if isinstance(lay0, int) & (ds.layer.dtype != int):
# if layer is an integer, and ds.layer is not of integer type
# replace lay0 by the name of the layer
split_dict[layers[lay0]] = split_dict.pop(lay0)
lay0 = layers[lay0]
if isinstance(split_dict[lay0], (int, np.integer)):
# If split_dict[lay0] is of integer type
# split the layer in evenly thick layers
split_dict[lay0] = [1 / split_dict[lay0]] * split_dict[lay0]
elif hasattr(split_dict[lay0], "__iter__"):
# make sure the fractions add up to 1
assert np.isclose(np.sum(split_dict[lay0]), 1), (
f"Fractions for splitting layer '{lay0}' do not add up to 1."
)
split_dict[lay0] = split_dict[lay0] / np.sum(split_dict[lay0])
else:
raise ValueError(
"split_dict should contain an iterable of factors or an integer"
)
logger.info(f"Splitting layers {list(split_dict)}")
if "layer" in ds["top"].dims:
msg = "Top in ds has a layer dimension. split_layers_ds will remove the layer dimension from top in ds."
logger.warning(msg)
else:
ds = ds.copy()
ds["top"] = ds["botm"] + calculate_thickness(ds)
layers_org = layers.copy()
# add extra layers (keep the original ones for now, as we will copy data first)
for lay0 in split_dict:
for i, _ in enumerate(split_dict[lay0]):
index = layers.index(lay0)
layers.insert(index, lay0 + "_" + str(i + start_suffix_at))
layers_org.insert(index, lay0)
ds = ds.reindex({"layer": layers})
# calclate a new top and botm, and fill other variables with original data
th = calculate_thickness(ds, top=top, bot=bot)
for lay0 in split_dict:
logger.info(f"Split '{lay0}' into {len(split_dict[lay0])} sub-layers")
th0 = th.loc[lay0]
for var in ds:
if layer not in ds[var].dims:
continue
if lay0 == list(split_dict)[0] and var not in [top, bot]:
logger.info(
f"Fill values for variable '{var}' in split"
" layers with the values from the original layer."
)
ds = _split_var(
ds, var, lay0, th0, split_dict[lay0], top, bot, start_suffix_at
)
# drop the original layers
ds = ds.drop_sel(layer=list(split_dict))
# remove layer dimension from top again
ds = remove_layer_dim_from_top(ds, inconsistency_threshold=0.001)
if return_reindexer:
# determine reindexer
reindexer = dict(zip(layers, layers_org))
for lay0 in split_dict:
reindexer.pop(lay0)
return ds, reindexer
return ds
[docs]
def _split_var(ds, var, layer, thickness, fctrs, top, bot, start_suffix_at):
"""Internal method to split a variable of one layer in multiple layers."""
for i in range(len(fctrs)):
name = layer + "_" + str(i + start_suffix_at)
if var == top:
# take orignal top and subtract thickness of higher splitted layers
ds[var].loc[name] = ds[var].loc[layer] - np.sum(fctrs[:i]) * thickness
elif var == bot:
# take original bottom and add thickness of lower splitted layers
ds[var].loc[name] = ds[var].loc[layer] + np.sum(fctrs[i + 1 :]) * thickness
else:
# take data from the orignal layer
ds[var].loc[name] = ds[var].loc[layer]
return ds
[docs]
def layer_combine_top_bot(ds, combine_layers, layer="layer", top="top", bot="botm"):
"""Calculate new tops and bottoms for combined layers.
Parameters
----------
ds : xarray.Dataset
xarray Dataset containing information about layers
(layers, top and bot)
combine_layers : list of tuple of ints
list of tuples, with each tuple containing integers indicating
layer indices to combine into one layer. E.g. [(0, 1), (2, 3)] will
combine layers 0 and 1 into a single layer (with index 0) and layers
2 and 3 into a second layer (with index 1).
layer : str, optional
name of layer dimension, by default 'layer'
top : str, optional
name of data variable containing top of layers, by default 'top'
bot : str, optional
name of data variable containing bottom of layers, by default 'botm'
Returns
-------
new_top, new_bot : xarray.DataArrays
DataArrays containing new tops and bottoms after splitting layers.
reindexer : dict
dictionary mapping new to old layer indices.
"""
# calculate new number of layers
new_nlay = (
ds[layer].size - sum((len(c) for c in combine_layers)) + len(combine_layers)
)
# create new DataArrays for storing new top/bot
new_bot = _get_empty_layered_da(ds[bot], nlay=new_nlay)
new_top = _get_empty_layered_da(ds[top], nlay=new_nlay)
# dict to keep track of old and new layer indices
reindexer = {}
j = 0 # new layer index
icomb = 0 # combine layer index
# loop over original layers
for i in range(ds.layer.size):
# check whether to combine layers
if i in np.concatenate(combine_layers):
# get indices of layers
c = combine_layers[icomb]
old_names = ds.layer.isel(layer=list(c)).to_numpy().tolist()
# store new and original layer indices
reindexer[j] = c
# only need to calculate new top/bot once for each merged layer
if i == np.min(c):
with np.printoptions(legacy="1.21"):
logger.debug(
f"{j:2d}: Merge layers {c} ({old_names}) as layer {j}, "
"calculate new top/bot."
)
tops = ds[top].data[c, ...]
bots = ds[bot].data[c, ...]
new_top.data[j] = np.nanmax(tops, axis=0)
new_bot.data[j] = np.nanmin(bots, axis=0)
elif i == np.max(c):
# advance combine layer index after merging layers
icomb += 1
# advance new layer index
j += 1
continue
else:
# no need to merge more than once, so continue loop
continue
else:
# do not merge, only map old layer index to new layer index
logger.debug(
f"{j:2d}: Do not merge, map old layer index {i} to new layer index {j}."
)
new_top.data[j] = ds[top].data[i]
new_bot.data[j] = ds[bot].data[i]
reindexer[j] = i
j += 1
return new_top, new_bot, reindexer
[docs]
def sum_param_combined_layers(da, reindexer):
"""Calculate combined layer parameter with sum.
Parameters
----------
da : xarray.DataArray
data array to calculate combined parameters for
reindexer : dict
dictionary mapping new layer indices to old layer indices
Returns
-------
da_new : xarray.DataArray
data array containing new parameters for combined layers and old
parameters for unmodified layers.
"""
da_new = _get_empty_layered_da(da, nlay=list(reindexer.keys())[-1] + 1)
for k, v in reindexer.items():
if isinstance(v, tuple):
psum = np.sum(da.data[v, ...], axis=0)
else:
psum = da.data[v]
da_new.data[k] = psum
return da_new
[docs]
def _get_empty_layered_da(da, nlay):
"""Get empty DataArray with number of layer specified by nlay.
Parameters
----------
da : xarray.DataArray
original data array
nlay : int
number of layers
Returns
-------
da_new : xarray.DataArray
new empty data array with updated number of layers
"""
if set(GridTypeDims.STRUCTURED_LAYERED.value).issubset(da.dims):
dims = GridTypeDims.STRUCTURED_LAYERED.value
coords = {
"layer": np.arange(nlay),
"y": da["y"],
"x": da["x"],
}
elif set(GridTypeDims.VERTEX_LAYERED.value).issubset(da.dims):
dims = GridTypeDims.VERTEX_LAYERED.value
coords = {
"layer": np.arange(nlay),
"icell2d": da["icell2d"],
}
else:
raise TypeError("Cannot determine grid type of data array.")
return xr.DataArray(data=np.nan, dims=dims, coords=coords)
[docs]
def kheq_combined_layers(kh, thickness, reindexer):
"""Calculate equivalent horizontal hydraulic conductivity.
Parameters
----------
kh : xarray.DataArray
data array containing horizontal hydraulic conductivity
thickness : xarray.DataArray
data array containing thickness per layer
reindexer : OrdererDict
dictionary mapping new layer indices to old layer indices
Returns
-------
da_kh : xarray.DataArray
data array containing equivalent horizontal hydraulic conductivity
for combined layers and original hydraulic conductivity in unmodified
layers
"""
da_kh = _get_empty_layered_da(kh, nlay=list(reindexer.keys())[-1] + 1)
for k, v in reindexer.items():
if isinstance(v, tuple):
kheq = np.nansum(
thickness.data[v, ...] * kh.data[v, ...], axis=0
) / np.nansum(thickness.data[v, ...], axis=0)
kheq[np.isinf(kheq)] = np.nan
else:
kheq = kh.data[v]
da_kh.data[k] = kheq
return da_kh
[docs]
def kveq_combined_layers(kv, thickness, reindexer):
"""Calculate equivalent vertical hydraulic conductivity.
Parameters
----------
kv : xarray.DataArray
data array containing vertical hydraulic conductivity
thickness : xarray.DataArray
data array containing thickness per layer
reindexer : OrdererDict
dictionary mapping new layer indices to old layer indices
Returns
-------
da_kv : xarray.DataArray
data array containing equivalent vertical hydraulic conductivity
for combined layers and original hydraulic conductivity in unmodified
layers
"""
da_kv = _get_empty_layered_da(kv, nlay=list(reindexer.keys())[-1] + 1)
for k, v in reindexer.items():
if isinstance(v, tuple):
numerator = np.nansum(thickness.data[v, ...], axis=0)
denominator = np.nansum(thickness.data[v, ...] / kv.data[v, ...], axis=0)
kveq = np.divide(
numerator,
denominator,
out=np.full_like(numerator, np.nan),
where=denominator != 0,
)
else:
kveq = kv.data[v]
da_kv.data[k] = kveq
return da_kv
[docs]
def combine_layers_ds(
ds,
combine_layers,
layer="layer",
top="top",
bot="botm",
kh="kh",
kv="kv",
kD="kD",
c="c",
return_reindexer=False,
):
"""Combine layers in Dataset.
Parameters
----------
ds : xarray.Dataset
xarray Dataset containing information about layers
(layers, top and bot)
combine_layers : dict or list of iterables
dictionary with new layer names as keys and a collection of layer names
or indices to merge as values. Alternatively a list of iterables, with
each iterable containing strings or integers indicating layers to
merge. The new layer will be named after the first of the layers to be merged.
layer : str, optional
name of layer dimension, by default 'layer'
top : str, optional
name of data variable containing top of layers, by default 'top'
bot : str, optional
name of data variable containing bottom of layers, by default 'botm'
kh : str, optional
name of data variable containg horizontal hydraulic conductivity,
by default 'kh'. Not parsed if set to None.
kv : str, optional
name of data variable containg vertical hydraulic conductivity,
by default 'kv'. Not parsed if set to None.
kD : str, optional
name of data variable containg transmissivity or kD,
by default 'kD'. Not parsed if set to None.
c : str, optional
name of data variable containg resistance or c,
by default 'c'. Not parsed if set to None.
return_reindexer : bool, optional
Return a dictionary that can be used to reindex variables from the original
layer-dimension to the new layer-dimension when True. The default is False.
Returns
-------
ds_combine : xarray.Dataset
Dataset with new tops and bottoms taking into account combined layers,
and recalculated values for parameters (kh, kv, kD, c).
Examples
--------
Given some layer model Dataset with named layers. Specifying which layers to merge
can be done the following ways.
As a dictionary:
>>> combine_layers = {
"new_layer_name": [0, 1] # as layer indices
"PZWAz": ["PZWAz2", "PZWAz3", "PZWAz4"], # as strings
}
As a list of iterables:
>>> combine_layers = [
(0, 1), # as layer indices
("PZWAz2", "PZWAz3", "PZWAz4"), # as strings
]
Note
----
When passing integers to combine_layers, these are always intepreted as the
layer index (i.e. starting at 0 and numbered consecutively), and not the
layer "name". If the dataset layer names are integers, only the layer index
can be used to specify which layers to merge.
"""
data_vars = []
for dv in [kh, kv, kD, c]:
if dv is not None:
data_vars.append(dv)
parsed_dv = set([top, bot] + data_vars)
check_remaining_dv = set(ds.data_vars.keys()) - parsed_dv
keep_dv = []
for dv in check_remaining_dv:
if "layer" in ds[dv].dims:
msg = f"Data variable has 'layer' dimension and will be dropped: {dv}"
logger.warning(msg)
else:
keep_dv.append(dv)
# calculate new tops/bots
logger.info("Calculating new layer tops and bottoms...")
if "layer" in ds[top].dims:
msg = (
f"Datavar {top} has a layer dimension. combine_layers_ds will"
" remove the layer dimension from {top}."
)
logger.info(msg)
else:
ds = ds.copy()
ds[top] = ds[bot] + calculate_thickness(ds)
if isinstance(combine_layers, dict):
# remove single layer entries if they exist:
combine_layers = {k: v for k, v in combine_layers.items() if len(v) > 1}
new_layer_names = combine_layers.keys()
combine_layers_integer = [
tuple(np.where(ds.layer.isin(x))[0]) if isinstance(x[0], str) else x
for x in combine_layers.values()
]
else:
# remove single layer entries if they exist:
combine_layers = [x for x in combine_layers if len(x) > 1]
combine_layers_integer = [
tuple(np.where(ds.layer.isin(x))[0]) if isinstance(x[0], str) else x
for x in combine_layers
]
new_layer_names = [ds.layer.data[x[0]] for x in combine_layers_integer]
if len(combine_layers_integer) == 0:
logger.warning("No layers are combined")
return ds[[x for x in parsed_dv if x in ds] + keep_dv]
# make sure there are no layers in between
check = [(np.diff(x) == 1).all() for x in combine_layers_integer]
if not np.all(check):
msg = ""
with np.printoptions(legacy="1.21"):
for m in np.nonzero(~np.array(check))[0]:
if isinstance(combine_layers, dict):
layer_names = combine_layers[list(combine_layers.keys())[m]]
else:
layer_names = combine_layers[m]
msg += f"\n{m}: {layer_names} {combine_layers_integer[m]}"
raise AssertionError(
f"Only consecutive layers can be combined. Check input: {msg}"
)
# set new layer name dictionary
new_layer_names = dict(zip(combine_layers_integer, new_layer_names))
# collection for data arrays
da_dict = {}
new_top, new_bot, reindexer = layer_combine_top_bot(
ds, combine_layers_integer, layer=layer, top=top, bot=bot
)
da_dict[top] = new_top
da_dict[bot] = new_bot
# calculate original thickness
thickness = calculate_thickness(ds, top=top, bot=bot)
# calculate equivalent kh/kv
if kh is not None:
logger.info("Calculate equivalent '%s' for combined layers.", kh)
da_dict[kh] = kheq_combined_layers(ds[kh], thickness, reindexer)
if kv is not None:
logger.info("Calculate equivalent '%s' for combined layers.", kv)
da_dict[kv] = kveq_combined_layers(ds[kv], thickness, reindexer)
if kD is not None and kD in ds:
logger.info("Calculate value '%s' for combined layers with sum.", kD)
da_dict[kD] = sum_param_combined_layers(ds[kD], reindexer)
if c is not None and c in ds:
logger.info("Calculate value '%s' for combined layers with sum.", c)
da_dict[c] = sum_param_combined_layers(ds[c], reindexer)
# get new layer names, based on first sub-layer from each combined layer
layer_names = []
for _, i in reindexer.items():
if isinstance(i, tuple):
layercode = new_layer_names[i]
else:
layercode = ds[layer].data[i]
layer_names.append(layercode)
# assign new layer names
for k, da in da_dict.items():
da_dict[k] = da.assign_coords(layer=layer_names)
# add original data variables to new dataset
for dv in keep_dv:
da_dict[dv] = ds[dv]
# create new dataset
logger.info("Done! Created new dataset with combined layers!")
ds_combine = xr.Dataset(da_dict, attrs=ds.attrs)
# remove layer dimension from top
ds_combine = remove_layer_dim_from_top(ds_combine, inconsistency_threshold=0.001)
if return_reindexer:
return ds_combine, reindexer
return ds_combine
[docs]
def add_kh_kv_from_ml_layer_to_ds(
ml_layer_ds, ds, anisotropy, fill_value_kh, fill_value_kv
):
"""Add kh and kv from a model layer dataset to the model dataset.
Supports structured and vertex grids.
Parameters
----------
ml_layer_ds : xarray.Dataset
dataset with model layer data with kh and kv
ds : xarray.Dataset
dataset with model data where kh and kv are added to
anisotropy : int or float
factor to calculate kv from kh or the other way around
fill_value_kh : int or float, optional
use this value for kh if there is no data in the layer model. The
default is 1.0.
fill_value_kv : int or float, optional
use this value for kv if there is no data in the layer model. The
default is 1.0.
Returns
-------
ds : xarray.Dataset
dataset with model data with new kh and kv
Notes
-----
some model dataset, such as regis, also have 'c' and 'kd' values. These
are ignored at the moment
"""
warnings.warn(
"add_kh_kv_from_ml_layer_to_ds is deprecated. Please use nlmod.grid.update_ds_from_layer_ds instead.",
DeprecationWarning,
)
ds.attrs["anisotropy"] = anisotropy
ds.attrs["fill_value_kh"] = fill_value_kh
ds.attrs["fill_value_kv"] = fill_value_kv
logger.info("add kh and kv from model layer dataset to modflow model")
kh, kv = get_kh_kv(
ml_layer_ds["kh"],
ml_layer_ds["kv"],
anisotropy,
fill_value_kh=fill_value_kh,
fill_value_kv=fill_value_kv,
)
ds["kh"] = kh
ds["kv"] = kv
return ds
[docs]
def set_model_top(ds, top, min_thickness=0.0, copy=True):
"""Set the model top, changing layer bottoms when necessary as well.
If the new top is higher than the previous top, the extra thickness is added to the
highest layer with a thickness larger than 0.
Parameters
----------
ds : xarray.Dataset
The model dataset, containing the current top.
top : xarray.DataArray
The new model top, with the same shape as the current top.
min_thickness : float, optional
The minimum thickness of top layers. When the thickness is less than
min_thickness, the thicjness is added to a deeper layer. The default is 0.0.
copy : bool, optional
If copy=True, data in the return value is always copied, so the original Dataset
is not altered. The default is True.
Returns
-------
ds : xarray.Dataset
The model dataset, containing the new top.
"""
if "layer" in ds["top"].dims:
raise (ValueError("set_model_top does not support top with a layer dimension"))
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
if isinstance(top, (float, int)):
top = xr.full_like(ds["top"], top)
elif not top.shape == ds["top"].shape:
raise (
ValueError("Please make sure the new top has the same shape as the old top")
)
if np.any(np.isnan(top)):
raise (ValueError("Please make sure the new top does not contain nans"))
# where the botm is equal to the top, the layer is inactive
# set the botm to the new top at these locations
ds["botm"] = ds["botm"].where(ds["botm"] != ds["top"], top)
# make sure the botm is never higher than the new top
ds["botm"] = ds["botm"].where((top - ds["botm"]) > min_thickness, top)
# change the current top
ds["top"] = top
# remove inactive layers
ds = remove_inactive_layers(ds)
return ds
[docs]
def set_layer_top(ds, layer, top, copy=True):
"""Set the top of a layer."""
assert layer in ds.layer
if "layer" in ds["top"].dims:
raise (ValueError("set_layer_top does not support top with a layer dimension"))
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
lay = np.where(ds.layer == layer)[0][0]
if lay == 0:
# change the top of the model
ds["top"] = top
# make sure the botm of all layers is never higher than the new top
ds["botm"] = ds["botm"].where(ds["botm"] < top, top)
else:
# change the botm of the layer above
ds["botm"][lay - 1] = top
# make sure the top of the layers above is never lower than the new top
ds["top"] = ds["top"].where(ds["top"] > top, top)
# make sure the botm of the layers above is never higher than the new top
ds["botm"][: lay - 1] = ds["botm"][: lay - 1].where(
ds["botm"][: lay - 1] > top, top
)
# make sure the botms of lower layers are lower than top
ds["botm"][lay:] = ds["botm"][lay:].where(ds["botm"][lay:] < top, top)
return ds
[docs]
def set_layer_botm(ds, layer, botm, copy=True):
"""Set the bottom of a layer."""
assert layer in ds.layer
if "layer" in ds["top"].dims:
raise (ValueError("set_layer_botm does not support top with a layer dimension"))
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
lay = np.where(ds.layer == layer)[0][0]
# make sure the botm of the layers above is lever lower than the new botm
ds["botm"][:lay] = ds["botm"][:lay].where(ds["botm"][:lay] > botm, botm)
ds["botm"][lay] = botm
# make sure the botm of the layers below is never higher than the new botm
mask = ds["botm"][lay + 1 :] < botm
ds["botm"][lay + 1 :] = ds["botm"][lay + 1 :].where(mask, botm)
return ds
[docs]
def set_layer_thickness(ds, layer, thickness, change="botm", copy=True):
"""Set the layer thickness by changing the bottom of the layer."""
assert layer in ds.layer
assert change == "botm", "Only change=botm allowed for now"
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
lay = np.where(ds.layer == layer)[0][0]
if lay == 0:
top = ds["top"]
else:
top = ds["botm"][lay - 1]
new_botm = top - thickness
ds = set_layer_botm(ds, layer, new_botm)
return ds
[docs]
def get_zcellcenters(ds):
"""Calculate the z-coordinates of cell centers. Equivalent of modelgrid.zcellcenters in flopy.
Parameters
----------
ds : xarray.Dataset
dataset containing information about layers, including top and botm
Returns
-------
zcellcenters : xarray.DataArray
data array containing the z-coordinates of cell centers
"""
if "layer" in ds["top"].dims:
top = ds["top"]
else:
top = ds["botm"] + calculate_thickness(ds)
return (top + ds["botm"]) / 2
[docs]
def set_minimum_layer_thickness(ds, layer, min_thickness, change="botm", copy=True):
"""Make sure layer has a minimum thickness by lowering the botm of the layer where
neccesary.
"""
assert layer in ds.layer
assert change == "botm", "Only change=botm allowed for now"
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
lay = np.where(ds.layer == layer)[0][0]
if lay == 0:
top = ds["top"]
else:
top = ds["botm"][lay - 1]
botm = ds["botm"][lay]
mask = (top - botm) > min_thickness
new_botm = botm.where(mask, top - min_thickness)
ds = set_layer_botm(ds, layer, new_botm)
return ds
[docs]
def remove_thin_layers(
ds, min_thickness=0.1, update_thickness_every_layer=False, copy=True
):
"""Remove cells with a thickness less than min_thickness (setting the thickness to
0)
The thickness of the removed cells is added to the first active layer below
Parameters
----------
ds : xr,Dataset
Dataset containing information about layers.
min_thickness : float, optional
THe minimum thickness of a layer. The default is 0.1.
update_thickness_every_layer : bool, optional
If True, loop over the layers, from the top down, and remove thin layers, adding
the thickness to the first active layer below. If the thickness of this layer is
increased more than min_thickness (even if it originally was thinner than
min_thickness), the layer is kept. If update_thickness_every_layer is False, all
cells that originally were thinner than min_thickness are removed. The default
is False.
copy : bool, optional
If copy=True, data in the return value is always copied, so the original Dataset
is not altered. The default is True.
Returns
-------
ds : xr.Dataset
Dataset containing information about layers.
"""
if "layer" in ds["top"].dims:
msg = "remove_thin_layers does not support top with a layer dimension"
raise (ValueError(msg))
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
thickness = calculate_thickness(ds)
if update_thickness_every_layer:
for lay_org in range(len(ds.layer)):
# determine where the layer is too thin
mask = thickness[lay_org] < min_thickness
if not mask.any():
continue
# we will set the botm to the top in these cells, so we first get the top
if lay_org == 0:
top = ds["top"]
else:
top = ds["botm"][lay_org - 1]
# loop over the layers, starting from lay_org
for lay in range(lay_org, len(ds.layer)):
if lay > lay_org:
# only keep cells in mask that had no thickness to begin with
# we need to increase the botm in these cells as well
mask = mask & (thickness[lay] <= 0)
if not mask.any():
break
# set the botm equal to the top in the cells in mask
ds["botm"][lay] = ds["botm"][lay].where(~mask, top)
# calculate the thickness again, using the new botms
thickness = calculate_thickness(ds)
else:
mask = thickness >= min_thickness
# set botm of layers with a small thickness to NaN
ds["botm"] = ds["botm"].where(mask)
# fill botm of the first layer by top (setting the cell thickness to 0)
ds["botm"][0] = ds["botm"][0].fillna(ds["top"])
# fill nans in botm by copying values from higher layers
ds["botm"] = ds["botm"].ffill("layer")
return ds
[docs]
def get_kh_kv(kh, kv, anisotropy, fill_value_kh=1.0, fill_value_kv=0.1, idomain=None):
"""Create kh and kv grid data for flopy from existing kh, kv and anistropy grids with
nan values (typically from REGIS).
fill nans in kh grid in these steps:
1. take kv and multiply by anisotropy, if this is nan:
2. take fill_value_kh
fill nans in kv grid in these steps:
1. take kh and divide by anisotropy, if this is nan:
2. take fill_value_kv
Supports structured and vertex grids.
Parameters
----------
kh : xarray.DataArray
kh from regis with nan values shape(nlay, nrow, ncol) or
shape(nlay, len(icell2d))
kv : xarray.DataArray
kv from regis with nan values shape(nlay, nrow, ncol) or
shape(nlay, len(icell2d))
anisotropy : int or float
factor to calculate kv from kh or the other way around
fill_value_kh : int or float, optional
use this value for kh if there is no data in kh, kv and
anisotropy. The default is 1.0.
fill_value_kv : int or float, optional
use this value for kv if there is no data in kv, kh and
anisotropy. The default is 1.0.
idomain : xarray.DataArray, optional
The idomain DataArray, used in log-messages, to report the number of active
cells that are filled. When idomain is None, the total number of cells that are
filled is reported, and not just the active cells. The default is None.
Returns
-------
kh : np.ndarray
kh without nan values (nlay, nrow, ncol) or shape(nlay, len(icell2d))
kv : np.ndarray
kv without nan values (nlay, nrow, ncol) or shape(nlay, len(icell2d))
"""
for layer in kh.layer.data:
if ~np.all(np.isnan(kh.loc[layer])):
logger.debug(f"layer {layer} has a kh")
elif ~np.all(np.isnan(kv.loc[layer])):
logger.debug(f"layer {layer} has a kv")
else:
logger.info(f"kv and kh both undefined in layer {layer}")
# fill kh by kv * anisotropy
msg_suffix = f" of kh by multipying kv with an anisotropy of {anisotropy}"
kh = _fill_var(kh, kv * anisotropy, idomain, msg_suffix)
# fill kv by kh / anisotropy
msg_suffix = f" of kv by dividing kh by an anisotropy of {anisotropy}"
kv = _fill_var(kv, kh / anisotropy, idomain, msg_suffix)
# fill kh by fill_value_kh
msg_suffix = f" of kh with a value of {fill_value_kh}"
if "units" in kh.attrs:
msg_suffix = f"{msg_suffix} {kh.units}"
kh = _fill_var(kh, fill_value_kh, idomain, msg_suffix)
# fill kv by fill_value_kv
msg_suffix = f" of kv with a value of {fill_value_kv}"
if "units" in kv.attrs:
msg_suffix = f"{msg_suffix} {kv.units}"
kv = _fill_var(kv, fill_value_kv, idomain, msg_suffix)
return kh, kv
[docs]
def _fill_var(var, by, idomain, msg_suffix=""):
mask = np.isnan(var)
if isinstance(by, xr.DataArray):
mask = mask & (~np.isnan(by))
if mask.any():
var = var.where(~mask, by)
if idomain is not None:
mask = mask & (idomain > 0)
if mask.any():
logger.info(
f"Filling {int(mask.sum())} values in active cells{msg_suffix}"
)
else:
logger.info(f"Filling {int(mask.sum())} values {msg_suffix}")
return var
[docs]
def fill_top_bot_kh_kv_at_mask(ds, fill_mask):
"""Fill values in top, bot, kh and kv.
Fill where:
1. the cell is True in fill_mask
2. the cell thickness is greater than 0
Fill values:
- top: 0
- bot: minimum of bottom_filled or top
- kh: kh_filled if thickness is greater than 0
- kv: kv_filled if thickness is greater than 0
Parameters
----------
ds : xr.DataSet
model dataset
fill_mask : xr.DataArray
1 where a cell should be replaced by masked value.
Returns
-------
ds : xr.DataSet
model dataset with adjusted data variables: 'top', 'botm', 'kh', 'kv'
"""
# zee cellen hebben altijd een top gelijk aan 0
ds["top"].values = np.where(fill_mask, 0, ds["top"])
for lay in range(ds.sizes["layer"]):
bottom_nan = xr.where(fill_mask, np.nan, ds["botm"][lay])
bottom_filled = fillnan_da(bottom_nan, ds=ds)
kh_nan = xr.where(fill_mask, np.nan, ds["kh"][lay])
kh_filled = fillnan_da(kh_nan, ds=ds)
kv_nan = xr.where(fill_mask, np.nan, ds["kv"][lay])
kv_filled = fillnan_da(kv_nan, ds=ds)
if lay == 0:
# top ligt onder bottom_filled -> laagdikte wordt 0
# top ligt boven bottom_filled -> laagdikte o.b.v. bottom_filled
mask_top = ds["top"] < bottom_filled
ds["botm"][lay] = xr.where(fill_mask * mask_top, ds["top"], bottom_filled)
else:
# top ligt onder bottom_filled -> laagdikte wordt 0
# top ligt boven bottom_filled -> laagdikte o.b.v. bottom_filled
mask_top = ds["botm"][lay - 1] < bottom_filled
ds["botm"][lay] = xr.where(
fill_mask * mask_top, ds["botm"][lay - 1], bottom_filled
)
ds["kh"][lay] = xr.where(fill_mask * mask_top, ds["kh"][lay], kh_filled)
ds["kv"][lay] = xr.where(fill_mask * mask_top, ds["kv"][lay], kv_filled)
return ds
[docs]
def fill_nan_top_botm_kh_kv(
ds,
anisotropy=10.0,
fill_value_kh=1.0,
fill_value_kv=0.1,
remove_nan_layers=True,
):
"""Update a model dataset, by removing nans and adding necessary info.
Steps:
1. Compute top and botm values, by filling nans by data from other layers
2. Remove inactive layers, with no positive thickness anywhere
3. Compute kh and kv, filling nans with anisotropy or fill_values
"""
# 1
ds = remove_layer_dim_from_top(ds)
# 2
if remove_nan_layers:
# remove inactive layers
ds = remove_inactive_layers(ds)
# 3
idomain = get_idomain(ds)
ds["kh"], ds["kv"] = get_kh_kv(
ds["kh"],
ds["kv"],
anisotropy,
fill_value_kh=fill_value_kh,
fill_value_kv=fill_value_kv,
idomain=idomain,
)
return ds
[docs]
def fill_nan_top_botm(ds):
"""Remove Nans in non-existent layers in botm and top variables.
The NaNs are removed by setting the value to the top and botm of higher/lower
layers that do exist.
Parameters
----------
ds : xr.DataSet
model DataSet
Returns
-------
ds : xarray.Dataset
dataset with filled top and botm data
"""
if "layer" in ds["top"].dims:
top_max = ds["top"].max("layer")
else:
top_max = ds["top"]
# fill nans in botm of the first layer
ds["botm"][0] = ds["botm"][0].where(~ds["botm"][0].isnull(), top_max)
# then use ffill to fill nans in deeper layers
ds["botm"] = ds["botm"].ffill("layer")
if "layer" in ds["top"].dims:
# remove nans from top by setting it equal to botm
# which sets the layer thickness to 0
ds["top"] = ds["top"].where(~ds["top"].isnull(), ds["botm"])
return ds
[docs]
def set_nan_top_and_botm(ds, copy=True):
"""Sets Nans for non-existent layers in botm and top variables.
Nans are only added to top when it contains a layer dimension.
Parameters
----------
ds : xr.DataSet
model DataSet
copy : bool, optional
If copy=True, data in the return value is always copied, so the original Dataset
is not altered. The default is True.
Returns
-------
ds : xarray.Dataset
dataset with nans in top and botm at non-existent layers.
"""
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
thickness = calculate_thickness(ds)
mask = thickness > 0
if "layer" in ds["top"].dims:
ds["top"] = ds["top"].where(mask)
ds["botm"] = ds["botm"].where(mask)
return ds
[docs]
def remove_layer_dim_from_top(
ds,
check=True,
set_non_existing_layers_to_nan=False,
inconsistency_threshold=0.0,
return_inconsistencies=False,
copy=True,
):
"""Change top from 3d to 2d, removing NaNs in top and botm in the process.
This method sets variable `top` to the top of the upper layer (like the definition
in MODFLOW). This removes redundant data, as the top of all layers (except
the first layer) is equal to the botm of the layer above.
Parameters
----------
ds : xr.DataSet
model DataSet
check : bool, optional
If True, checks for inconsistencies in the layer model and report to logger as
warning. The default is True.
set_non_existing_layers_to_nan bool, optional
If True, sets the value of the botm-variable to NaN for non-existent layers.
This is not recommended, as this might break some procedures in nlmod. The
default is False.
inconsistency_threshold : float, optional
The threshold, above which to report inconsistencies in the layer model (where
the botm of a layer is not equal to top of a deeper layer). The default is 0.0.
return_inconsistencies : bool, optional
if True, return the difference between the top of layers and the botm of a
deeper layer, at the location where inconsitencies have been reported.
copy : bool, optional
If copy=True, data in the return value is always copied, so the original Dataset
is not altered. The default is True.
Returns
-------
ds : xarray.Dataset or numpy.array
dataset without a layer dimension in top, if return_inconsistencies is False. If
return_inconsistencies is True, a numpy arrays with non-equal top and bottoms is
returned.
"""
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
if "layer" in ds["top"].dims:
ds = fill_nan_top_botm(ds)
if check:
dz = ds["botm"][:-1].data - ds["top"][1:].data
inconsistencies = np.abs(dz) > inconsistency_threshold
n = inconsistencies.sum()
if n > 0:
msg = f"Botm of layer is not equal to top of deeper layer in {n} cells"
logger.warning(msg)
if return_inconsistencies:
return np.vstack(
(
ds["botm"].data[:-1][inconsistencies],
ds["top"].data[1:][inconsistencies],
)
)
ds["top"] = ds["top"][0]
if set_non_existing_layers_to_nan:
ds = set_nan_top_and_botm(ds, copy=False)
return ds
[docs]
def add_layer_dim_to_top(ds, set_non_existing_layers_to_nan=True, copy=True):
"""Change top from 2d to 3d, setting top and botm to NaN for non-existent layers.
Parameters
----------
ds : xr.DataSet
model DataSet
set_non_existing_layers_to_nan : bool, optional
If True, set the values of top and botm to . The default is True.
copy : bool, optional
If copy=True, data in the return value is always copied, so the original Dataset
is not altered. The default is True.
Returns
-------
ds : xarray.Dataset
dataset with a layer dimension in top.
"""
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
ds["top"] = ds["botm"] + calculate_thickness(ds)
if set_non_existing_layers_to_nan:
ds = set_nan_top_and_botm(ds, copy=False)
return ds
[docs]
def convert_to_modflow_top_bot(ds, **kwargs):
"""Removes the layer dimension from top and fills nans in top and botm.
Alias to remove_layer_dim_from_top
"""
ds = remove_layer_dim_from_top(ds, **kwargs)
[docs]
def convert_to_regis_top_bot(ds, **kwargs):
"""Adds a layer dimension to top and sets non-existing cells to nan in top and botm.
Alias to add_layer_dim_to_top
"""
ds = add_layer_dim_to_top(ds, **kwargs)
[docs]
def remove_inactive_layers(ds):
"""Remove layers which only contain inactive cells.
Parameters
----------
ds : xr.Dataset
The model Dataset.
Returns
-------
ds : xr.Dataset
The model Dataset without inactive layers.
"""
idomain = get_idomain(ds)
# only keep layers with at least one active cell
ds = ds.sel(layer=(idomain > 0).any(idomain.dims[1:]))
return ds
[docs]
def get_idomain(ds, active_domain="active_domain"):
"""Get idomain from a model Dataset.
Idomain is calculated from the thickness of the layers, and will be 1 for all layers
with a positive thickness, and -1 (pass-through) otherwise. Additionally, an
"active_domain" DataArray can be applied which can be 2d or 3d.
Idomain is set to 0 where "active_domain" is False or 0.
Parameters
----------
ds : xr.Dataset
The model Dataset.
active_domain : str or xr.DataArray, optional
boolean array indicating the active model domain (True = active). Idomain is
set to 0 everywhere else. If passed as str, this variable is taken from ds, if
passed as xr.DataArray, this DataArray is used. The default is "active_domain".
Returns
-------
ds : xr.DataArray
DataArray of idomain-variable.
"""
# set idomain with a default of -1 (pass-through)
idomain = xr.full_like(ds["botm"], -1, int)
idomain.name = None
# drop attributes inherited from botm
idomain.attrs.clear()
# set idomain of cells with a positive thickness to 1
thickness = calculate_thickness(ds)
# subtracting floats can result in rounding errors. Mainly anoying for zero
# thickness layers.
thickness = thickness.where(~np.isclose(thickness, 0.0), 0.0)
idomain.data[thickness.data > 0.0] = 1
# set idomain above/below the first/last active layer to 0
idomain.data[idomain.where(idomain > 0).ffill(dim="layer").isnull()] = 0
idomain.data[idomain.where(idomain > 0).bfill(dim="layer").isnull()] = 0
# set idomain to 0 in the inactive part of the model
active = _get_value_from_ds_datavar(
ds, "active_domain", datavar=active_domain, default=None, warn=False
)
if active is not None:
idomain = idomain.where(active, 0)
return idomain
[docs]
def get_first_active_layer(ds, **kwargs):
"""Get the first active layer in each cell from a model ds.
Parameters
----------
ds : xr.DataSet
Model Dataset
**kwargs : dict
Kwargs are passed on to get_first_active_layer_from_idomain.
Returns
-------
first_active_layer : xr.DataArray
raster in which each cell has the zero based number of the first
active layer. Shape can be (y, x) or (icell2d)
"""
idomain = get_idomain(ds)
return get_first_active_layer_from_idomain(idomain, **kwargs)
[docs]
def get_first_active_layer_from_idomain(idomain, nodata=-999):
"""Get the first (top) active layer in each cell from the idomain.
Parameters
----------
idomain : xr.DataArray
idomain. Shape can be (layer, y, x) or (layer, icell2d)
nodata : int, optional
nodata value. used for cells that are inactive in all layers.
The default is -999.
Returns
-------
first_active_layer : xr.DataArray
raster in which each cell has the zero based number of the first
active layer. Shape can be (y, x) or (icell2d)
"""
logger.debug("get first active modellayer for each cell in idomain")
first_active_layer = xr.where(idomain[0] == 1, 0, nodata)
for i in range(1, idomain.shape[0]):
if not (first_active_layer == nodata).any():
break
first_active_layer = xr.where(
(first_active_layer == nodata) & (idomain[i] == 1),
i,
first_active_layer,
)
first_active_layer.attrs["nodata"] = nodata
return first_active_layer
[docs]
def get_last_active_layer(ds, **kwargs):
"""Get the last active layer in each cell from a model ds.
Parameters
----------
ds : xr.DataSet
Model Dataset
**kwargs : dict
Kwargs are passed on to get_last_active_layer_from_idomain.
Returns
-------
last_active_layer : xr.DataArray
raster in which each cell has the zero based number of the last
active layer. Shape can be (y, x) or (icell2d)
"""
idomain = get_idomain(ds)
return get_last_active_layer_from_idomain(idomain, **kwargs)
[docs]
def get_last_active_layer_from_idomain(idomain, nodata=-999):
"""Get the last (bottom) active layer in each cell from the idomain.
Parameters
----------
idomain : xr.DataArray
idomain. Shape can be (layer, y, x) or (layer, icell2d)
nodata : int, optional
nodata value. used for cells that are inactive in all layers.
The default is -999.
Returns
-------
last_active_layer : xr.DataArray
raster in which each cell has the zero based number of the last
active layer. Shape can be (y, x) or (icell2d)
"""
logger.debug("get last active modellayer for each cell in idomain")
last_active_layer = xr.where(idomain[-1] == 1, idomain.shape[0] - 1, nodata)
for i in range(idomain.shape[0] - 2, -1, -1):
if not (last_active_layer == nodata).any():
break
last_active_layer = xr.where(
(last_active_layer == nodata) & (idomain[i] == 1),
i,
last_active_layer,
)
last_active_layer.attrs["nodata"] = nodata
return last_active_layer
[docs]
def get_layer_of_z(ds, z, above_model=-999, below_model=-999):
"""Get the layer of a certain z-value in all cells from a model ds.
Parameters
----------
ds : xr.DataSet
Model Dataset
z : float or xr.DataArray
The z-value for which the layer is determined
above_model : int, optional
value used for cells where z is above the top of the model. The default is -999.
below_model : int, optional
value used for cells where z is below the top of the model. The default is -999.
Returns
-------
layer : xr.DataArray
DataArray with values representing the integer layer index. Shape can be (y, x)
or (icell2d)
"""
layer = xr.where(ds["botm"][0] < z, 0, below_model)
for i in range(1, len(ds.layer)):
layer = xr.where((layer == below_model) & (ds["botm"][i] < z), i, layer)
# set layer to above_model where z is above top
if "layer" not in ds["top"].dims:
layer = xr.where(z > ds["top"], above_model, layer)
else:
layer = xr.where(z > ds["top"].isel(layer=0), above_model, layer)
# set nodata attribute
layer.attrs["above_model"] = above_model
layer.attrs["below_model"] = below_model
# drop layer coordinates, as it is inherited from one of the actions above
layer = layer.drop_vars("layer")
return layer
[docs]
def update_idomain_from_thickness(idomain, thickness, mask):
"""Get new idomain from thickness in the cells where mask is 1 (or True).
Idomain becomes:
- 1: if cell thickness is bigger than 0
- 0: if cell thickness is 0 and it is the top layer
- -1: if cell thickness is 0 and the layer is in between active cells
Parameters
----------
idomain : xr.DataArray
raster with idomain of each cell. dimensions should be (layer, y, x) or
(layer, icell2d).
thickness : xr.DataArray
raster with thickness of each cell. dimensions should be (layer, y, x)
or (layer, icell2d).
mask : xr.DataArray
raster with ones in cell where the ibound is adjusted. dimensions
should be (y, x) or (icell2d).
Returns
-------
idomain : xr.DataArray
raster with adjusted idomain of each cell. dimensions should be
(layer, y, x) or (layer, icell2d).
"""
warnings.warn(
"update_idomain_from_thickness is deprecated. Please use get_idomain instead.",
DeprecationWarning,
)
for ilay, thick in enumerate(thickness):
if ilay == 0:
mask1 = (thick == 0) * mask
idomain[ilay] = xr.where(mask1, 0, idomain[ilay])
mask2 = (thick > 0) * mask
idomain[ilay] = xr.where(mask2, 1, idomain[ilay])
else:
mask1 = (thick == 0) * mask * (idomain[ilay - 1] == 0)
idomain[ilay] = xr.where(mask1, 0, idomain[ilay])
mask2 = (thick == 0) * mask * (idomain[ilay - 1] != 0)
idomain[ilay] = xr.where(mask2, -1, idomain[ilay])
mask3 = (thick != 0) * mask
idomain[ilay] = xr.where(mask3, 1, idomain[ilay])
return idomain
[docs]
def aggregate_by_weighted_mean_to_ds(ds, source_ds, var_name):
"""Aggregate source data to a model dataset using the weighted mean.
The weighted average per model layer is calculated for the variable in the
source dataset. The datasets must have the same grid.
Parameters
----------
ds : xr.Dataset
model dataset containing layer information (x, y, top, botm)
source_ds : xr.Dataset
dataset containing x, y, top, botm and a data variable to aggregate.
var_name : str
name of the data array to aggregate
Returns
-------
da : xarray.DataArray
data array containing aggregated values from source dataset
Raises
------
ValueError
if source_ds does not have a layer dimension
See Also
--------
nlmod.read.geotop.aggregate_to_ds
"""
msg = "x and/or y coordinates do not match between 'ds' and 'source_ds'"
assert (ds.x == source_ds.x).all() and (ds.y == source_ds.y).all(), msg
if "layer" in ds["top"].dims:
# make sure there is no layer dimension in top
ds["top"] = ds["top"].max(dim="layer")
if "layer" not in source_ds.dims:
raise ValueError("Requires 'source_ds' to have a 'layer' dimension!")
agg_ar = []
for ilay in range(len(ds.layer)):
if ilay == 0:
top = ds["top"]
else:
top = ds["botm"][ilay - 1].drop_vars("layer")
bot = ds["botm"][ilay].drop_vars("layer")
s_top = source_ds.top
s_bot = source_ds.bottom
s_top = s_top.where(s_top < top, top)
s_top = s_top.where(s_top > bot, bot)
s_bot = s_bot.where(s_bot < top, top)
s_bot = s_bot.where(s_bot > bot, bot)
s_thk = s_top - s_bot
agg_ar.append(
(s_thk * source_ds[var_name]).sum("layer")
/ s_thk.where(~np.isnan(source_ds[var_name])).sum("layer")
)
return xr.concat(agg_ar, ds.layer)
[docs]
def check_elevations_consistency(ds):
if "layer" in ds["top"].dims:
tops = ds["top"].data
top_ref = np.full(tops.shape[1:], np.nan)
for lay, layer in zip(range(tops.shape[0]), ds.layer.data):
top = tops[lay]
mask = ~np.isnan(top)
higher = top[mask] > top_ref[mask]
if np.any(higher):
n = int(higher.sum())
logger.warning(
f"The top of layer {layer} is higher than the top of a previous layer in {n} cells"
)
top_ref[mask] = top[mask]
bots = ds["botm"].data
bot_ref = np.full(bots.shape[1:], np.nan)
for lay, layer in zip(range(bots.shape[0]), ds.layer.data):
bot = bots[lay]
mask = ~np.isnan(bot)
higher = bot[mask] > bot_ref[mask]
if np.any(higher):
n = int(higher.sum())
logger.warning(
f"The bottom of layer {layer} is higher the bottom of a previous layer in {n} cells"
)
bot_ref[mask] = bot[mask]
thickness = calculate_thickness(ds)
mask = thickness < 0.0
if mask.any():
logger.warning(f"Thickness of layers is negative in {int(mask.sum())} cells.")
[docs]
def insert_layer(ds, name, top, bot, kh=None, kv=None, copy=True):
"""Inserts a layer in a model Dataset, burning it in an existing layer model.
This method loops over the existing layers, and checks if (part of) the new layer
needs to be inserted above the existing layer, and if the top or bottom of the
existing layer needs to be altered.
When comparing the height of the new layer with an existing layer, there are 7
options:
1 The new layer is entirely above the existing layer: layer is added completely
above existing layer. When the bottom of the new layer is above the top of the
existing layer (which can happen for the first layer), this creates a gap in the
layer model. In the returned Dataset, this gap is added to the existing layer.
2 part of the new layer lies within an existing layer, but the bottom of the new
layer is never below the bottom of the existing layer: layer is added above the
existing layer, and the top of existing layer is lowered.
3 there are locations where the new layer is above the bottom of the existing layer,
but below the top of the existing layer. The new layer splits the existing layer
into two sub-layers. This is not supported (yet) and raises an Exception.
4 part of the new layer lies above the bottom of the existing layer, while at other
locations the new layer is below the existing layer. The new layer is split, part
of the layer is added above the existing layer, and part of the new layer is added
to the layer model in the next iteration(s) (above the next layer).
5 Only the upper part of the new layer overlaps with the existing layer: the layer
is not added above the extsing layer, but the bottom of the existing layer is
raised because of the overlap.
6 The new layer is below the existing layer everywhere. Nothing happens, move on to
the next existing layer.
7 When (part of) the new layer is not added to the layer model after comparison
with the last existing layer, the (remaining part of) the new layer is added below
the existing layers, at the bottom of the model.
Parameters
----------
ds : xarray.Dataset
xarray Dataset containing information about layers
name : string
The name of the new layer.
top : xr.DataArray
The top of the new layer.
bot : xr.DataArray
The bottom of the new layer..
kh : xr.DataArray, optional
The horizontal conductivity of the new layer. The default is None.
kv : xr.DataArray, optional
The vertical conductivity of the new layer. The default is None.
copy : bool, optional
If copy=True, data in the return value is always copied, so the original Dataset
is not altered. The default is True.
Returns
-------
ds : xarray.Dataset
xarray Dataset containing the new layer(s)
"""
shape = ds["botm"].shape[1:]
assert top.shape == shape
assert bot.shape == shape
if copy:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
if "layer" in ds["top"].dims:
msg = "Top in ds has a layer dimension. insert_layer will remove the layer dimension from top in ds."
logger.warning(msg)
else:
ds["top"] = ds["botm"] + calculate_thickness(ds)
if kh is not None:
assert kh.shape == shape
if kv is not None:
assert kv.shape == shape
todo = ~(np.isnan(top.data) | np.isnan(bot.data)) & ((top - bot).data > 0)
if not todo.any():
logger.warning(f"Thickness of new layer {name} is never larger than 0")
isplit = None
for layer in ds.layer.data:
if not todo.any():
continue
# determine the top and bottom of layer, taking account they could be NaN
# we assume a zero thickness when top or bottom is NaN
top_layer = ds["top"].loc[layer:].max("layer").data
bot_layer = ds["botm"].loc[layer].data
mask = np.isnan(bot_layer)
bot_layer[mask] = top_layer[mask]
top_higher_than_bot = top.data > bot_layer
if not top_higher_than_bot[todo].any():
# 6 the new layer is entire below the existing layer, do nothing
continue
bot_lower_than_top = bot.data < top_layer
bot_lower_than_bot = bot.data < bot_layer
if not bot_lower_than_top[todo].any():
# 1 the new layer can be added on top of the existing layer
if isplit is not None:
isplit += 1
ds = _insert_layer_above(
ds, layer, name, isplit, todo, top, bot, kh, kv, copy
)
todo[todo] = False
continue
# do not increase top of layer to bottom of new layer
if bot_lower_than_top[todo].any():
# the new layer can be added on top of the existing layer,
# possibly only partly
if not bot_lower_than_bot[todo].any():
# 2 the top of the existing layer needs to be lowered
mask = todo & bot_lower_than_top
new_top_layer = ds["top"].loc[layer]
new_top_layer.data[mask] = bot.data[mask]
ds["top"].loc[layer] = new_top_layer
# the new layer can be added on top of the existing layer
if isplit is not None:
isplit += 1
ds = _insert_layer_above(
ds, layer, name, isplit, todo, top, bot, kh, kv, copy
)
todo[todo] = False
continue
if not bot_lower_than_bot[todo].all():
bot_higher_than_bot = bot.data > bot_layer
if not bot_higher_than_bot[todo].any():
continue
top_lower_than_top = top.data < top_layer
if (todo & bot_higher_than_bot & top_lower_than_top).any():
# 3 the existing layer needs to be split,
# as part of it is below and part is above the new layer
msg = (
f"Existing layer {layer} exists in some cells both above and "
f"below the inserted layer {name}. Therefore existing layer "
f"{layer} needs to be split in two, which is not supported."
)
raise (LayerError(msg))
# 4 the new layer needs to be split, as part of the new layer is
# above the bottom of the existing layer, and part of it is below the
# existing layer
if isplit is None:
isplit = 1
else:
isplit += 1
# the top of the existing layer needs to be lowered
mask = todo & bot_higher_than_bot & bot_lower_than_top
new_top_layer = ds["top"].loc[layer]
new_top_layer.data[mask] = bot.data[mask]
ds["top"].loc[layer] = new_top_layer
# and we insert the new layer
mask = todo & bot_higher_than_bot
ds = _insert_layer_above(
ds, layer, name, isplit, mask, top, bot, kh, kv, copy
)
todo[mask] = False
mask = todo & top_higher_than_bot
if mask.any():
# 5 when the new layer is not added above the existing layer, as the bottom
# of the new layer is always lower than the bottom of the existing
# layer: the bottom of the existing layer needs to be raised to the top
# of the new layer
new_bot_layer = ds["botm"].loc[layer]
new_bot_layer.data[mask] = top.data[mask]
ds["botm"].loc[layer] = new_bot_layer
if todo.any():
# 7 the new layer needs to be added to the bottom of the model
if isplit is not None:
isplit += 1
ds = _insert_layer_below(ds, None, name, isplit, mask, top, bot, kh, kv, copy)
# remove layer dimension from top again
ds = remove_layer_dim_from_top(ds, inconsistency_threshold=0.001)
return ds
[docs]
def _insert_layer_above(ds, above_layer, name, isplit, mask, top, bot, kh, kv, copy):
new_layer_name = _get_new_layer_name(name, isplit)
layers = list(ds.layer.data)
if above_layer is None:
above_layer = layers[0]
layers.insert(layers.index(above_layer), new_layer_name)
ds = ds.reindex({"layer": layers}, copy=copy)
ds = _set_new_layer_values(ds, new_layer_name, mask, top, bot, kh, kv)
return ds
[docs]
def _insert_layer_below(ds, below_layer, name, isplit, mask, top, bot, kh, kv, copy):
new_layer_name = _get_new_layer_name(name, isplit)
layers = list(ds.layer.data)
if below_layer is None:
below_layer = layers[-1]
layers.insert(layers.index(below_layer) + 1, new_layer_name)
ds = ds.reindex({"layer": layers}, copy=copy)
ds = _set_new_layer_values(ds, new_layer_name, mask, top, bot, kh, kv)
return ds
[docs]
def _set_new_layer_values(ds, new_layer_name, mask, top, bot, kh, kv):
ds["top"].loc[new_layer_name].data[mask] = top.data[mask]
ds["botm"].loc[new_layer_name].data[mask] = bot.data[mask]
if kh is not None:
ds["kh"].loc[new_layer_name].data[mask] = kh.data[mask]
if kv is not None:
ds["kv"].loc[new_layer_name].data[mask] = kv.data[mask]
return ds
[docs]
def _get_new_layer_name(name, isplit):
new_layer_name = name
if isplit is not None:
new_layer_name = new_layer_name + "_" + str(isplit)
return new_layer_name
[docs]
def remove_layer(ds, layer):
"""Removes a layer from a Dataset, without changing elevations of other layers.
This will create gaps in the layer model.
"""
layers = list(ds.layer.data)
if layer not in layers:
raise (KeyError(f"layer '{layer}' not present in Dataset"))
if "layer" not in ds["top"].dims:
# make a copy, so we are sure we do not alter the original DataSet
ds = ds.copy(deep=True)
index = layers.index(layer)
if index == 0:
# lower the top to the botm of the first layer
ds["top"] = ds["botm"].loc[layers[0]]
layers.remove(layer)
ds = ds.reindex({"layer": layers})
return ds
[docs]
def get_isosurface_1d(da, z, value, left=np.nan, right=np.nan):
"""Linear interpolation to get the first elevation corresponding to value.
This function interpolates linearly along z, if da crosses the given interpolation
value at multiple depths, the first elevation is returned.
Note
----
This function is no longer used in nlmod, but is kept for backward compatibility,
and as a reference for the implementations in get_isosurface.
Parameters
----------
da : 1d-array
array of values, e.g. concentration, pressure, etc.
z : 1d-array
array of elevations
value : float
value for which to compute the elevations corresponding to value
left : float, optional
value to return when value is below the minimum of da. The default is np.nan.
right : float, optional
value to return when value is above the maximum of da. The default is np.nan.
Returns
-------
float
first elevation at which data crosses value
See Also
--------
get_isosurface : generalization of this function to 3D and 4D DataArrays, with
support for numba and numpy implementations.
_get_isosurface_1d_numpy : vectorized numpy implementation of this function
_get_isosurface_1d_numba : numba implementation of this function
"""
mask_valid = np.isfinite(da)
z, da = z[mask_valid], da[mask_valid]
f = da - value
if len(z) < 2:
return np.nan
# exact first hit
idx0 = np.flatnonzero(f == 0)
if idx0.size:
return z[idx0[0]]
# first sign change interval
s = f[:-1] * f[1:]
idx = np.flatnonzero(s < 0)
# no crossing
if not idx.size:
if value < da.min():
return left
elif value > da.max():
return right
else:
return np.nan
i = idx[0]
return z[i] + (value - da[i]) * (z[i + 1] - z[i]) / (da[i + 1] - da[i])
[docs]
def _get_isosurface_1d_numpy(da_arr, z_arr, value, left=np.nan, right=np.nan):
"""
Vectorized numpy implementation of get_isosurface_1d.
da_arr, z_arr: ndarrays with layer as the LAST axis. When called via
xr.apply_ufunc with input_core_dims=[["layer"], ["layer"], []], xarray
automatically moves the layer dimension to the last axis before calling
this function.
Returns array of shape da_arr.shape[:-1].
"""
valid = np.isfinite(da_arr)
f = np.where(valid, da_arr - value, np.nan)
# --- exact hits ---
exact = f == 0.0
has_exact = exact.any(axis=-1)
idx_exact = np.argmax(exact, axis=-1)
z_exact = np.take_along_axis(z_arr, idx_exact[..., np.newaxis], axis=-1)[..., 0]
# --- first sign change between adjacent valid pairs ---
sc = (f[..., :-1] * f[..., 1:] < 0) & valid[..., :-1] & valid[..., 1:]
has_sc = sc.any(axis=-1)
idx_sc = np.argmax(sc, axis=-1)
z_i = np.take_along_axis(z_arr, idx_sc[..., np.newaxis], axis=-1)[..., 0]
z_i1 = np.take_along_axis(z_arr, (idx_sc + 1)[..., np.newaxis], axis=-1)[..., 0]
da_i = np.take_along_axis(da_arr, idx_sc[..., np.newaxis], axis=-1)[..., 0]
da_i1 = np.take_along_axis(da_arr, (idx_sc + 1)[..., np.newaxis], axis=-1)[..., 0]
interp = z_i + (value - da_i) * (z_i1 - z_i) / (da_i1 - da_i)
# --- out-of-bounds ---
da_min = np.nanmin(da_arr, axis=-1)
da_max = np.nanmax(da_arr, axis=-1)
out = np.full(da_arr.shape[:-1], np.nan)
out = np.where(has_exact, z_exact, out)
out = np.where(~has_exact & has_sc, interp, out)
out = np.where(~has_exact & ~has_sc & (value < da_min), left, out)
out = np.where(~has_exact & ~has_sc & (value > da_max), right, out)
return out
if _NUMBA_AVAILABLE:
@numba.guvectorize(
["(float64[:], float64[:], float64, float64, float64, float64[:])"],
"(n),(n),(),(),()->()",
nopython=True,
target="parallel", # or "cpu" for single-threaded
cache=True,
)
def _get_isosurface_1d_gufunc_numba(da, z, value, left, right, out): # numba impl
"""Numba implementation of get_isosurface_1d.
This is some wizardry that automatically returns an out variable without
having to specify that in the call. The signature of the gufunc is specified
in the decorator.
Parameters
----------
da : 1d-array
array of values, e.g. concentration, pressure, etc.
z : 1d-array
array of elevations
value : float
value for which to compute the elevations corresponding to value
left : float
value to return when value is below the minimum of da.
right : float
value to return when value is above the maximum of da.
Returns
-------
out : float
first elevation at which data crosses value
"""
# collect valid entries
n_valid = 0
for i in range(len(da)):
if np.isfinite(da[i]):
n_valid += 1
if n_valid < 2:
out[0] = np.nan
return
z_v = np.empty(n_valid)
da_v = np.empty(n_valid)
j = 0
for i in range(len(da)):
if np.isfinite(da[i]):
z_v[j] = z[i]
da_v[j] = da[i]
j += 1
f0 = da_v[0] - value
da_min = da_v[0]
da_max = da_v[0]
# exact first hit
if f0 == 0.0:
out[0] = z_v[0]
return
for i in range(1, n_valid):
fi = da_v[i] - value
if fi == 0.0:
out[0] = z_v[i]
return
if da_v[i] < da_min:
da_min = da_v[i]
if da_v[i] > da_max:
da_max = da_v[i]
# first sign change
fp = da_v[0] - value
for i in range(1, n_valid):
fc = da_v[i] - value
if fp * fc < 0.0:
out[0] = z_v[i - 1] + (value - da_v[i - 1]) * (z_v[i] - z_v[i - 1]) / (
da_v[i] - da_v[i - 1]
)
return
fp = fc
# no crossing
if value < da_min:
out[0] = left
elif value > da_max:
out[0] = right
else:
out[0] = np.nan
[docs]
def _get_isosurface_1d_numba(
da: np.ndarray,
z: np.ndarray,
value: float,
left: float,
right: float,
) -> np.ndarray:
"""Typed wrapper so linters see the correct signature."""
return _get_isosurface_1d_gufunc_numba(da, z, value, left, right) # pylint: disable=no-value-for-parameter
[docs]
def _get_isosurface_numba(da, z, value, left=np.nan, right=np.nan, **kwargs):
"""Wrapper for numba implementation of get_isosurface_1d.
This wrapper is needed to move the layer dimension to the last position, as required
by the gufunc, and to move the result back to an xarray DataArray with the correct
dimensions and coordinates.
Parameters
----------
da : xr.DataArray
3D or 4D DataArray with values, e.g. concentration, pressure
z : xr.DataArray
3D DataArray with elevations
value : float
value at which to compute the elevations of the isosurface
left : float, optional
value to return when value is above the maximum of da. The default is np.nan.
right : float, optional
value to return when value is below the minimum of da. The default is np.nan.
kwargs : dict
additional arguments passed to xarray.apply_ufunc, not used in this function but
included for consistency with get_isosurface.
Returns
-------
xr.DataArray
2D/3D DataArray with elevations of the isosurface
"""
# move layer axis to last position
layer_dim = next(d for d in da.dims if d not in {"time", "x", "y", "icell2d"})
da_t = da.transpose(..., layer_dim)
z_t = z.transpose(..., layer_dim)
result_np = _get_isosurface_1d_numba(
da_t.values,
z_t.values,
np.float64(value),
np.float64(left),
np.float64(right),
)
dims = [d for d in da.dims if d != layer_dim]
return xr.DataArray(
result_np,
dims=dims,
coords={d: da.coords[d] for d in dims if d in da.coords},
)
[docs]
def get_isosurface(
da,
z,
value,
left=np.nan,
right=np.nan,
method="numba",
input_core_dims=None,
exclude_dims=None,
**kwargs,
):
"""Linear interpolation to compute the elevation of an isosurface.
Currently only supports linear interpolation.
Parameters
----------
da : xr.DataArray
3D or 4D DataArray with values, e.g. concentration, pressure, etc.
z : xr.DataArray
3D DataArray with elevations
value : float
value at which to compute the elevations of the isosurface
left : float, optional
value to return when value is above the maximum of da. The default is np.nan.
right : float, optional
value to return when value is below the minimum of da. The default is np.nan.
method : str, optional
method to compute the isosurface. The default is "numba".
Other option is "numpy". The numba method is usually faster than the
numpy method, but the numpy method can be faster for small datasets, and does
not require numba to be installed.
input_core_dims : list of lists, optional
list of core dimensions for each input, if not provided assumes core dimensions
are any dimensions that are not x, y or icell2d. Example input_core_dims for
structured datasets da ("time", "layer", "y", "x") and z ("layer", "y", "x"),
and value (float) would be:
`input_core_dims=[["layer"], ["layer"], []]`.
exclude_dims : set, optional
set of dimensions that can change shape in computation. The default is None,
which assumes the layer dimension is allowed to change. In the example
above this would mean `exclude_dims={"layer"}`. This will result in the
an isosurface for each time step.
kwargs : dict
additional arguments passed to xarray.apply_ufunc
Returns
-------
xr.DataArray
2D/3D DataArray with elevations of the isosurface
"""
if method == "numba":
if not _NUMBA_AVAILABLE:
logger.warning(
"numba is not installed, falling back to numpy method for get_isosurface."
)
method = "numpy"
else:
return _get_isosurface_numba(
da,
z,
value,
left=left,
right=right,
**kwargs,
)
if method == "numpy":
if input_core_dims is None:
dims_da = set(da.dims) - {"time", "x", "y", "icell2d"}
dims_z = set(z.dims) - {"x", "y", "icell2d"}
input_core_dims = [list(dims_da), list(dims_z), []]
if exclude_dims is None:
exclude_dims = {"layer"}
return xr.apply_ufunc(
_get_isosurface_1d_numpy,
da,
z,
value,
input_core_dims=input_core_dims,
exclude_dims=exclude_dims,
dask="parallelized",
output_dtypes=[float],
kwargs={"right": right, "left": left},
**kwargs,
)
[docs]
def add_bathymetry_to_layer_model(
ds, datavar_sea="northsea", datavar_bathymetry="bathymetry", kh_sea=10, kv_sea=10
):
"""Add bathymetry to a layer model.
Performs the following steps:
a) fill top, bot, kh and kv add northsea cell by extrapolation
b) add bathymetry to the layer model.
c) remove inactive layers
Parameters
----------
ds : xr.Dataset
model dataset
datavar_northsea: str, optional
checks if this datavar is available in ds, if not it will create the datavar
by calling 'get_northsea'.
"""
logger.info(
"Filling NaN values in top/botm and kh/kv in "
"North Sea using bathymetry data from jarkus"
)
# fill top, bot, kh, kv at sea cells
fal = get_first_active_layer(ds)
fill_mask = (fal == fal.attrs["nodata"]) * ds[datavar_sea]
ds = fill_top_bot_kh_kv_at_mask(ds, fill_mask)
# add bathymetry
ds = _add_bathymetry_to_top_bot_kh_kv(
ds, ds[datavar_bathymetry], fill_mask, kh_sea=kh_sea, kv_sea=kv_sea
)
# remove inactive layers
ds = remove_inactive_layers(ds)
return ds
[docs]
def _add_bathymetry_to_top_bot_kh_kv(ds, bathymetry, fill_mask, kh_sea, kv_sea):
"""Add bathymetry to the top and bot of each layer for all cells with fill_mask.
This method sets the top of the model at fill_mask to 0 m, and changes the first
layer to sea, by setting the botm of this layer to bathymetry, kh to kh_sea and kv
to kv_sea. If deeper layers are above bathymetry. the layer depth is set to
bathymetry.
Parameters
----------
ds : xarray.Dataset
dataset with model data, should
bathymetry : xarray DataArray
bathymetry data
fill_mask : xr.DataArray
cell value is 1 if you want to add bathymetry
kh_sea : int or float, optional
the horizontal conductance of the sea cells
kv_sea : int or float, optional
the vertical conductance of the sea cells
Returns
-------
ds : xarray.Dataset
dataset with model data where the top, bot, kh and kv are changed
"""
ds["top"].values = np.where(fill_mask, 0.0, ds["top"])
lay = 0
ds["botm"][lay] = xr.where(fill_mask, bathymetry, ds["botm"][lay])
ds["kh"][lay] = xr.where(fill_mask, kh_sea, ds["kh"][lay])
ds["kv"][lay] = xr.where(fill_mask, kv_sea, ds["kv"][lay])
# reset bot for all layers based on bathymetry
for lay in range(1, ds.sizes["layer"]):
ds["botm"][lay] = np.where(
ds["botm"][lay] > ds["botm"][lay - 1],
ds["botm"][lay - 1],
ds["botm"][lay],
)
return ds
[docs]
def get_modellayers_screens(ds, screen_top, screen_bottom, xy=None, icell2d=None):
"""Get the model layer of a well.
Parameters
----------
ds : xarray.Dataset
xarray Dataset with with top and bottoms, can be structured or vertex.
screen_top : np.ndarray of shape(nobs)
collection of screen top values).
screen_bottom : np.ndarray of shape(nobs)
collection of screen bottom values. Should have the same length as screen_top
and xy.
xy : np.ndarray of shape(nobs, 2), optional
list of x,y coordinates
icell2d : np.ndarray of shape(nobs), optional
To speed up the process for vertex grids a list of icell2d indices can be
given instead of a list of xy coordinates.
Returns
-------
list of floats
zero-based indices of modellayers
nan if screen above or below model boundaries
if screen spans multiple layers, choose layer with most screen length.
"""
if grid.is_vertex(ds):
if icell2d is None:
gi = flopy.utils.GridIntersect(grid.modelgrid_from_ds(ds))
icell2d = [grid.get_icell2d_from_xy(x, y, ds, gi=gi) for x, y in xy]
# make dataset of observations
ds_obs = ds.sel(icell2d=icell2d)
ds_obs["screen_top"] = (("icell2d"), screen_top)
ds_obs["screen_bot"] = (("icell2d"), screen_bottom)
dimname = "icell2d"
elif grid.is_structured(ds):
# make dataset of observations
dimname = "n_obs"
x = xr.DataArray(np.asarray(xy)[:, 0], dims=dimname)
y = xr.DataArray(np.asarray(xy)[:, 1], dims=dimname)
ds_obs = ds.sel(x=x, y=y, method="nearest")
ds_obs["screen_top"] = ((dimname), screen_top)
ds_obs["screen_bot"] = ((dimname), screen_bottom)
modellayers = _get_modellayers_dsobs(ds_obs, dimname=dimname)
return modellayers
[docs]
def _get_modellayers_dsobs(ds_obs, dimname="n_obs"):
"""Get modellayers from a dataset of observation point data.
Parameters
----------
ds_obs : xr.Dataset
typically a subset of a model dataset with only data for observations.
dimname : str, optional
name of the observation dimension. 'icell2d' is used for vertex grids
and 'n_obs' for structured grid, by default 'n_obs'.
Returns
-------
list of floats
zero-based indices of modellayers
nan if screen above or below model boundaries
if screen spans multiple layers, choose layer with most screen length.
Raises
------
ValueError
If any screen top is lower or equal to screen bottom.
"""
if (ds_obs["screen_top"] < ds_obs["screen_bot"]).any():
errors = ds_obs.where(ds_obs["screen_top"] < ds_obs["screen_bot"], drop=True)
raise ValueError(
f"screen top is equal to or below screen bottom: {errors[dimname].values}"
)
# get model layers for screen top and bottom
ds_obs["modellayer_top"] = (
(dimname,),
[
np.argmax(ds_obs["screen_top"].values[i] > ds_obs["botm"].values[:, i])
for i in range(ds_obs.sizes[dimname])
],
)
ds_obs["modellayer_top"] = xr.where(
ds_obs["screen_top"] >= ds_obs["top"], np.inf, ds_obs["modellayer_top"]
)
ds_obs["modellayer_top"] = xr.where(
ds_obs["screen_top"] <= ds_obs["botm"].isel(layer=-1),
-np.inf,
ds_obs["modellayer_top"],
)
ds_obs["modellayer_bot"] = (
(dimname,),
[
np.argmax(ds_obs["screen_bot"].values[i] > ds_obs["botm"].values[:, i])
for i in range(ds_obs.sizes[dimname])
],
)
ds_obs["modellayer_bot"] = xr.where(
ds_obs["screen_bot"] >= ds_obs["top"], np.inf, ds_obs["modellayer_bot"]
)
ds_obs["modellayer_bot"] = xr.where(
ds_obs["screen_bot"] <= ds_obs["botm"].isel(layer=-1),
-np.inf,
ds_obs["modellayer_bot"],
)
# screen top is above model top but screen bottom is below model top
mask = (ds_obs["modellayer_top"] == np.inf) & ~(ds_obs["modellayer_bot"] == np.inf)
ds_obs["modellayer_top"] = xr.where(mask, 0, ds_obs["modellayer_top"])
ds_obs["screen_top"] = xr.where(mask, ds_obs["top"], ds_obs["screen_top"])
# screen bot is below model botm but screen top is above model botm
mask = (ds_obs["modellayer_bot"] == -np.inf) & ~(
ds_obs["modellayer_top"] == -np.inf
)
ds_obs["modellayer_bot"] = xr.where(
mask, ds_obs.sizes["layer"] - 1, ds_obs["modellayer_bot"]
)
ds_obs["screen_bot"] = xr.where(mask, ds_obs["botm"][-1], ds_obs["screen_bot"])
# combine modellayer_top and modellayer_bot to get modellayer
def get_max_overlap_model_layer(i):
mtop = ds_obs["modellayer_top"].values[i]
mbot = ds_obs["modellayer_bot"].values[i]
if ~np.isfinite(mtop) & ~np.isfinite(mbot):
return np.nan # observation below or above model boundaries
if mtop == mbot:
return mtop # screen top and bot in same layer
# find modellayer with the longest screen length
ftop = ds_obs["screen_top"].values[i]
fbot = ds_obs["screen_bot"].values[i]
botm_single_obs = (
ds_obs["botm"]
.isel(**{dimname: i}, layer=range(int(mtop - 1), int(mbot + 1)))
.values
)
botm_single_obs[0] = ftop
botm_single_obs[-1] = fbot
return np.argmin(np.diff(botm_single_obs)) + mtop
modellayer = [get_max_overlap_model_layer(i) for i in range(ds_obs.sizes[dimname])]
return modellayer
[docs]
def get_modellayers_indexer(
ds,
df,
x="x",
y="y",
screen_top="screen_top",
screen_bottom="screen_bottom",
full_output=False,
drop_nan_layers=True,
):
"""Get a model layer indexer (dataset) for a dataframe with observation wells.
The dataframe must contain the spatial data of the observation wells, i.e.
the x, y coordinates and the screen top and bottom values. The column names
corresponding to these values can be specified with keyword arguments.
Note that the returned x and y data (if grid is structured) are the cell
coordinates and not the original coordinates of the observation points!
Parameters
----------
ds : xr.Dataset
Model Dataset containing layer elevations (top, botm).
df : pd.DataFrame
DataFrame with x, y coordinates and screen_top and screen_bottom values.
x : str, optional
name of the x-coordinate column in df. The default is "x".
y : str, optional
name of the y-coordinate column in df. The default is "y".
screen_top : str, optional
name of the screen top column in df. The default is "screen_top".
screen_bottom : str, optional
name of the screen bottom column in df. The default is "screen_bottom".
full_output : bool, optional
If True, return all variables needed to construct the model layer indexer.
If False, return only the variables needed to index a data array.
drop_nan_layers : bool, optional
If True, drop the observation wells for which the model layer cannot be
determined. These probably lie above or below the model. The default is True.
If False, the model layer indexer will contain NaN values for these wells.
Returns
-------
obs_ds : xr.Dataset
Dataset with the following variables:
- x: x-coordinate of the cell in which the observation point is located
- y: y-coordinate of the cell in which the observation point is located
- layer : model layer of each observation point
Optionally returns the following variables if full_output is True:
- icell2d: cell id of the observation point
- screen_top: top of the screen
- screen_bottom: bottom of the screen
- top: top elevation of the model at the observation point
- botm: bottom elevations of the model layers at the observation point
- x_obs_local: local x-coordinate of the observation point (if grid is rotated)
- y_obs_local: local y-coordinate of the observation point (if grid is rotated)
Examples
--------
Given some model dataset `ds` and a dataframe `df` containing the metadata for
multiple observation wells:
>>> idx = nlmod.layers.get_modellayer_indexer(ds, df)
This indexer dataset can be used to obtain the simulated heads for each observation
well:
>>> heads.sel(**idx)
"""
# create geodataframe of points
pts = GeoSeries(points_from_xy(df["x"], df["y"]))
pts_to_cellid = grid.get_cellids_from_xy(df["x"].values, df["y"].values, ds=ds)
npts_outside_domain = pts_to_cellid.isna().sum()
if npts_outside_domain > 0:
maskpts = ~pts_to_cellid.isna()
pts = pts[maskpts]
pts_to_cellid = pts_to_cellid[maskpts]
df = df.loc[maskpts.values].copy()
logger.warning(
"Warning! Dropped %d points outside the model domain.", npts_outside_domain
)
# ensure result is integer, should not contain nans after masking pts outside domain
pts_to_cellid = pts_to_cellid.astype(int)
# build obs dataset
rename_dict = {
x: "x",
y: "y",
screen_top: "screen_top",
screen_bottom: "screen_bot",
}
obs_ds = (
df.loc[:, [x, y, screen_top, screen_bottom]]
.to_xarray()
.rename_vars(rename_dict)
)
dim = obs_ds["x"].dims[0] # get dimension name
obs_ds["icell2d"] = ((dim,), pts_to_cellid.values)
# get tops and bottoms from modelgrid
rename_vars = {"modellayer": "layer"} # rename to use result directly as indexer
if grid.is_structured(ds):
shape = ds["botm"].shape
_, irow, icol = grid.node_to_lrc(obs_ds["icell2d"], shape)
obs_ds["top"] = (dim,), ds["top"].isel(y=irow, x=icol).values
obs_ds["botm"] = ("layer", dim), ds["botm"].isel(y=irow, x=icol).values
obs_ds["xlocal"] = (dim,), ds.x.isel(x=icol).values
obs_ds["ylocal"] = (dim,), ds.y.isel(y=irow).values
rename_vars.update({"x": "x_obs", "y": "y_obs", "xlocal": "x", "ylocal": "y"})
elif grid.is_vertex(ds):
obs_ds["top"] = (dim,), ds["top"].isel(icell2d=obs_ds["icell2d"]).values
obs_ds["botm"] = (
("layer", dim),
ds["botm"].isel(icell2d=obs_ds["icell2d"]).values,
)
# compute modellayer
obs_ds["modellayer"] = ((dim,), _get_modellayers_dsobs(obs_ds, dimname=dim))
# use dataset layer index as result if there are no NaNs in result
if not obs_ds["modellayer"].isnull().any():
obs_ds["modellayer"].values = ds["layer"][
obs_ds["modellayer"].astype(int)
].values
elif drop_nan_layers:
nan_mask = obs_ds["modellayer"].isnull()
pts = pts.loc[~nan_mask.values]
obs_ds = obs_ds.dropna(dim, subset=["modellayer"])
obs_ds["modellayer"].values = ds["layer"][
obs_ds["modellayer"].astype(int)
].values
else:
logger.warning(
"There are observation wells above/below the model top/bottom. "
"The model layer indexer will contain NaN values for these wells."
)
# rename variables to match original input
obs_ds = obs_ds.rename_vars({v: k for k, v in rename_dict.items()})
# if full_output is False drop vars to keep only what we need for indexing
if not full_output:
drop = [
"top",
"botm",
"screen_top",
"screen_bottom",
"modellayer_top",
"modellayer_bot",
]
rename_dims = None
else:
drop = []
# avoid conflicts between layer dimension from model dataset and computed
# modellayer
rename_dims = {"layer": "ilayer"}
if not full_output and grid.is_vertex(ds):
drop += ["x", "y"]
elif not full_output and grid.is_structured(ds):
drop += ["icell2d", "x_obs", "y_obs"]
# add local x, y coords of observation points if structured grid is rotated
if full_output and grid.is_rotated(ds) and grid.is_structured(ds):
affine = grid.get_affine_world_to_mod(ds)
pts_local = pts.affine_transform(affine.to_shapely())
obs_ds["x_obs_local"] = (dim,), pts_local.x.values
obs_ds["y_obs_local"] = (dim,), pts_local.y.values
return obs_ds.rename_vars(rename_vars).drop_vars(drop).rename_dims(rename_dims)