import datetime as dt
import logging
import os
import warnings
import numpy as np
import pandas as pd
import xarray as xr
from .. import NLMOD_DATADIR, cache
from ..dims.layers import insert_layer, remove_layer, remove_layer_dim_from_top
from ..util import MissingValueError
logger = logging.getLogger(__name__)
GEOTOP_URL = "https://www.dinodata.nl/opendap/GeoTOP/geotop.nc"
[docs]
def get_lithok_props(rgb_colors=True):
fname = os.path.join(NLMOD_DATADIR, "geotop", "litho_eenheden.csv")
df = pd.read_csv(fname, index_col=0)
if rgb_colors:
df["color"] = pd.Series(get_lithok_colors())
return df
[docs]
def get_lithok_colors():
colors = {
0: (200, 200, 200),
1: (157, 78, 64),
2: (0, 146, 0),
3: (194, 207, 92),
5: (255, 255, 0),
6: (243, 225, 6),
7: (231, 195, 22),
8: (216, 163, 32),
9: (95, 95, 255),
}
colors = {key: tuple(x / 255 for x in color) for key, color in colors.items()}
return colors
[docs]
def get_strat_props():
fname = os.path.join(NLMOD_DATADIR, "geotop", "REF_GTP_STR_UNIT.csv")
df = pd.read_csv(fname, keep_default_na=False, na_values="")
# rename the columns to previously used values
# so existing nlmod-code will keep working
df = df.rename(
columns={"STR_UNIT_CD": "code", "VOXEL_NR": "strat", "DESCRIPTION": "name"}
)
# calculate color from red, green and blue columns
color = {}
for index in df.index:
color[index] = (
df.at[index, "RED_DEC"] / 255,
df.at[index, "GREEN_DEC"] / 255,
df.at[index, "BLUE_DEC"] / 255,
)
df["color"] = color
df = df.drop(columns=["RED_DEC", "GREEN_DEC", "BLUE_DEC"]).set_index("strat")
return df
[docs]
def get_kh_kv_table(kind="Brabant"):
if kind == "Brabant":
fname = os.path.join(
NLMOD_DATADIR,
"geotop",
"hydraulische_parameterisering_geotop_noord-brabant_en_noord-_en_midden-limburg.csv",
)
df = pd.read_csv(fname)
else:
raise (ValueError(f"Unknown kind in get_kh_kv_table: '{kind}'"))
return df
[docs]
def split_layers_on_geul(strat, units_no_geul, geulen):
"""Modifies the stratigraphic data from geotop in such a way that every stratigraphic
unit is completely above or completely below any other unit (and never both above
and below the same unit). This is useful for creating a layer model from the
stratigraphic units.
This function splits the geulen over multiple layers and splits other layers
locally when a geul is partly above and partly below that layer.
Some extra logic is added to minimize the number of geul layers by finding the
most efficient layer to add the geul to.
Parameters
----------
strat : xarray DataArray
with dimensions z, y and x, with values corresponding to the stratigraphic
unit in each voxel.
units_no_geul : list
Ordered list of stratigraphic units without geulen.
geulen : list
Ordered list of geulen units.
Returns
-------
strat : xarray DataArray
Modified stratigraphic data, including all the new geul units and the non-geul
units that are split because the geul was in between them.
new_unit_order : list
Ordered list with new stratigraphic units, including all the new geul units
and the non-geul units that are split because the geul was in between them.
Notes
-----
the new stratigraphic data contains more unit numbers than the original
stratigraphic data. When a layer is split by a geul, the part of the layer
below the geul gets a new unit number. The same goes for the geul itself when it
occurs across multiple layers.
"""
unit_order = units_no_geul.copy()
strat = strat.copy()
if not np.unique(strat.z.diff(dim="z")) == -0.5:
raise ValueError(
"this function assumes a layer thickness of 0.5 m, please check the z values of the strat DataArray"
)
z = (
strat["z"]
.data[:, np.newaxis, np.newaxis]
.repeat(len(strat.y), 1)
.repeat(len(strat.x), 2)
)
for ilay_geul, geul in enumerate(geulen):
# 1 get top/bot units
# a. get top/bot for this geul
mask = strat == geul
maxz = np.max(np.where(mask, z, -np.inf), axis=0)
minz = np.min(np.where(mask, z, np.inf), axis=0)
top_geul = np.where(np.isfinite(maxz), maxz + 0.25, np.nan)
bot_geul = np.where(np.isfinite(minz), minz - 0.25, np.nan)
# b. get top and bottom height (m NAP) of all other stratographic layers
# top layer is a dummy layer in order to obtain a different value for the
# case where no maximum is found (0) and the case where the maximum is found
# in the top layer (1).
shape_no_geul = (len(unit_order) + 1, len(strat.y), len(strat.x))
top = np.full(shape_no_geul, np.nan)
bot = np.full(shape_no_geul, np.nan)
for layer, unit in enumerate(unit_order):
mask = strat == unit
# Use finite sentinels to avoid all-NaN slice warnings for empty cells.
maxz = np.max(np.where(mask, z, -np.inf), axis=0)
minz = np.min(np.where(mask, z, np.inf), axis=0)
top[layer + 1] = np.where(np.isfinite(maxz), maxz + 0.25, np.nan)
bot[layer + 1] = np.where(np.isfinite(minz), minz - 0.25, np.nan)
# c. get top and bottom height of the model
lay_top = (np.isfinite(strat)).argmax(dim="z").values
z_top = (
np.take_along_axis(z, lay_top[np.newaxis, ...], axis=0).squeeze(axis=0)
+ 0.25
) # z value of top layer
lay_bot = (np.isfinite(strat[::-1])).argmax(dim="z").values
z_bot = (
np.take_along_axis(z[::-1], lay_bot[np.newaxis, ...], axis=0).squeeze(
axis=0
)
- 0.25
) # z value of bottom layer (nearly always -50.25)
# 2 find units above and below the geul
# a find for each cell the layer on top of the geul
top_lay_geul = (top_geul == bot).argmax(
axis=0
) # index (+1) of unit directly above the geul
top_lay_geul[
np.isnan(top_geul)
] = -999 # -999 if geul is not present in vertical
geul_between_lay = ((top_geul < top) & (top_geul > bot)).argmax(axis=0)
top_lay_geul = np.where(
geul_between_lay != 0, -geul_between_lay, top_lay_geul
) # -index if geul is in between a unit.
top_lay_geul[top_lay_geul == 0] = -888 # -888 if layer above geul is also geul.
top_lay_geul[top_geul == z_top] = 0 # 0 if geul is the top layer
# b find for each cell the layer below the geul
bot_lay_geul = (bot_geul == top).argmax(
axis=0
) # index (+1) of unit directly below the geul
bot_lay_geul[
np.isnan(bot_geul)
] = -999 # -999 if geul is not present in vertical
bot_lay_geul[bot_lay_geul == 0] = -888 # -888 if layer below geul is also geul.
bot_lay_geul[bot_geul == z_bot] = (
len(unit_order) + 1
) # 999 if geul is the bottom layer
bot_lay_geul = np.where(
bot_lay_geul == -888, (bot_geul > bot).argmax(axis=0), bot_lay_geul
) # if layer below geul is also geul, take the first non-geul layer.
geul_between_lay = ((bot_geul > bot) & (bot_geul < top)).argmax(axis=0)
bot_lay_geul = np.where(
geul_between_lay != 0, -geul_between_lay, bot_lay_geul
) # -index if geul is in between a unit.
# check assumption if geul is in both arrays in between a unit
if not (
(bot_lay_geul < 0) & (bot_lay_geul > -100)
== (top_lay_geul < 0) & (top_lay_geul > -100)
).all():
raise ValueError(
"unexpected geulen configuration, probably because a geul is cut by another stratigraphic unit."
)
# 3 decide where to add the geul layer
# create an empty array to store for each cell the index number of the unit.
# The geul will be added right below this unit.
lay_geul = np.ones_like(top_lay_geul) * np.nan
# Get the layers where the geul is in between a unit.
layers, counts = np.unique(
bot_lay_geul[(bot_lay_geul < 0) & (bot_lay_geul != -999)],
return_counts=True,
)
if len(layers) == 0: # a. The geul is never in between a unit.
layers, counts = np.unique(
[top_lay_geul, bot_lay_geul - 1], return_counts=True
)
layers = layers[np.argsort(counts)][::-1]
layers = [lay for lay in layers if lay >= 0]
for lay in layers:
mask = np.isnan(lay_geul) & (~np.isnan(top_geul))
if (~mask).all():
break # all cells have a layer assigned
lay_geul = np.where(
mask & (top_lay_geul <= lay) & (bot_lay_geul - 1 >= lay),
lay,
lay_geul,
) # assign geul to unit
else: # b. In some places the geul is in between a unit.
layers = layers[np.argsort(counts)]
for lay in layers:
# Assign geul where it is in between a unit
lay_geul[lay == top_lay_geul] = lay
# Assign geul to same unit wherever that is possible
lay_geul[(abs(lay) >= top_lay_geul) & (abs(lay) <= bot_lay_geul)] = abs(
lay
)
# c. In some places the geul is still not assigned to a unit.
# In those cases, assign the geul to the layer closest to the in
# between unit.
# absolute difference between possible top and in between unit
dif_top = np.ones((len(layers), *top_geul.shape)) * np.nan
# absolute difference between possible bottom and in between unit
dif_bot = np.ones((len(layers), *top_geul.shape)) * np.nan
for i, lay in enumerate(layers):
lay = abs(lay)
dif_top[i] = np.abs(np.abs(top_lay_geul) - lay)
dif_bot[i] = np.abs(np.abs(bot_lay_geul) - 1 - lay)
top_min = np.min(
dif_top, axis=0
) # minimal difference between possible top and in between unit
bot_min = np.min(
dif_bot, axis=0
) # minimal difference between possible bottom and in between unit
topbot_min = np.argmin((top_min, bot_min), axis=0)
# closest possible layer to inbetween unit
closest_lay = np.where(topbot_min == 1, bot_lay_geul - 1, top_lay_geul)
# assign geul to closest layer where it is not yet assigned
mask = np.isnan(lay_geul) & (~np.isnan(top_geul))
lay_geul = np.where(mask, closest_lay, lay_geul)
# 4. Modify strat in such a way that the geul is inserted as separate layers.
new_unit_order = unit_order.copy()
layers = np.unique(lay_geul)
layers_abs = np.unique(np.abs(layers))[
::-1
] # sort absolute values in descending order
layers_ordered = [
-l if (-l in layers) else l for l in layers_abs
] # use negative value if available
for geul_lay_count, lay in enumerate(layers_ordered):
if np.isnan(lay):
continue
geul_subset = geul + (10000 * (geul_lay_count + 1))
if lay == 0: # geul is the top layer
logger.debug(f"adding geul {geul} on top of model as {geul_subset}")
new_unit_order = [geul_subset] + new_unit_order
mask1 = np.abs(lay_geul) == np.abs(lay)
mask4 = strat == geul
strat = xr.where(mask4 & mask1, geul_subset, strat)
continue
ilay = abs(int(lay)) - 1 # correction for dummy layer
unit = unit_order[ilay]
# Add new geul units to strat and add the geul units to the ordered units.
if lay < 0: # geul is in between a unit
logger.debug(f"geul {geul} below and above unit {unit}")
unit_above = unit + (10000 * (ilay_geul + 1))
logger.debug(f"split {unit}, part above geul is {unit_above}")
mask2 = strat == unit # 3d mask of where unit is present
mask3 = z > bot_geul # 3d mask of where z value is above geul
strat = xr.where(
(mask2 & mask3), unit_above, strat
) # add unit above geul
logger.debug(
f"adding geul {geul} below unit {unit_above} as {geul_subset}"
)
mask1 = np.abs(lay_geul) == np.abs(
lay
) # 2d mask of where geul can be added to this layer
mask4 = strat == geul # 3d mask of where geul is present
strat = xr.where(mask4 & mask1, geul_subset, strat) # add geul
new_unit_order = (
new_unit_order[:ilay]
+ [unit_above, geul_subset]
+ new_unit_order[ilay:]
) # update order with (part of) unit above geul and geul
else:
# add geul below unit
logger.debug(f"adding geul {geul} below unit {unit} as {geul_subset}")
mask1 = np.abs(lay_geul) == np.abs(lay)
mask4 = strat == geul
strat = xr.where(mask4 & mask1, geul_subset, strat)
new_unit_order = (
new_unit_order[: ilay + 1]
+ [geul_subset]
+ new_unit_order[ilay + 1 :]
)
logger.debug(f"new order of units: {new_unit_order}")
unit_order = new_unit_order.copy()
return strat, new_unit_order
[docs]
@cache.cache_netcdf()
def to_model_layers(
geotop_ds,
strat_props=None,
method_geulen="add_to_layer_below",
drop_layer_dim_from_top=True,
**kwargs,
):
"""Convert geotop voxel dataset to layered dataset.
Converts geotop data to dataset with layer elevations and hydraulic conductivities.
Optionally uses hydraulic conductivities provided present in geotop_ds.
Parameters
----------
geotop_ds : xr.DataSet
geotop voxel dataset (download using `get_geotop(extent)`)
strat_props : pd.DataFrame, optional
The properties (code and name) of the stratigraphic units. Load with
get_strat_props() when None. The default is None.
method_geulen : str, optional
strat-units >=6000 are so-called 'geulen' (paleochannels, gullies). These are
difficult to add to the layer model, because they can occur above and/or below
any other unit. Multiple methods are available to handle these 'geulen'.
The method "add_to_layer_below" adds the thickness of the 'geul' to the layer
with a positive thickness below the 'geul'. The method "add_to_layer_above"
adds the thickness of the 'geul' to the layer with a positive thickness above
the 'geul'. The method "add_as_layer" tries to add the 'geulen' as one or more
layers, which can fail if a 'geul' is locally both below the top and above the
bottom of another layer (splitting the layer in two, which is not supported).
The method "split_layers" splits layers when a 'geul' is both below the top and
above the bottom of another layer. The default is "add_to_layer_below".
drop_layer_dim_from_top : bool, optional
When True, fill NaN values in top and botm and drop the layer dimension from
top. This will transform top and botm to the data model in MODFLOW. An advantage
of this data model is that the layer model is consistent by definition, with no
possibilities of gaps between layers. The default is True.
kwargs : dict
Kwargs are passed to `aggregate_to_ds()`
Returns
-------
ds: xr.DataSet
dataset with top and botm (and optionally kh and kv) per geotop layer
"""
if strat_props is None:
strat_props = get_strat_props()
# get all strat-units in Dataset
strat = geotop_ds["strat"]
units = np.unique(strat)
units = units[~np.isnan(units)].astype(int)
if "SEQ_NR" in strat_props.columns:
# sort units based on SEQ_NR in strat_props
units = strat_props.loc[units, "SEQ_NR"].sort_values().index.values
else:
# stratigraphy unit (geo eenheid) 2000 is above 1130
if (2000 in units) and (1130 in units):
units[(units == 2000) + (units == 1130)] = [2000, 1130]
if method_geulen == "split_layers":
# remove geulen from units
logger.warning(
"the 'split_layers' method for geulen is still experimental and not yet thoroughly tested."
)
units_no_geul = [unit for unit in units if unit < 6000]
geulen = [unit for unit in units if unit >= 6000]
strat, units = split_layers_on_geul(strat, units_no_geul, geulen)
shape = (len(units), len(geotop_ds.y), len(geotop_ds.x))
# fill top and bot
top = np.full(shape, np.nan)
bot = np.full(shape, np.nan)
z = (
geotop_ds["z"]
.data[:, np.newaxis, np.newaxis]
.repeat(len(geotop_ds.y), 1)
.repeat(len(geotop_ds.x), 2)
)
layers = []
geulen = geulen if method_geulen == "split_layers" else []
uc = np.unique([str(u)[-4:] for u in units], return_counts=True)
split_unit_counter = {int(unit): count for unit, count in zip(*uc)}
for layer, unit in enumerate(units):
mask = strat.values == unit
# Use finite sentinels to avoid all-NaN slice warnings for empty cells.
maxz = np.max(np.where(mask, z, -np.inf), axis=0)
minz = np.min(np.where(mask, z, np.inf), axis=0)
top[layer] = np.where(np.isfinite(maxz), maxz + 0.25, np.nan)
bot[layer] = np.where(np.isfinite(minz), minz - 0.25, np.nan)
if int(unit) in strat_props.index:
layers.append(strat_props.at[unit, "code"])
else:
str_unit = str(int(unit))
if method_geulen == "split_layers" and len(str_unit) > 4:
unit = int(str_unit[-4:])
if unit in split_unit_counter:
split_unit_counter[unit] -= 1
else:
logger.warning(f"Unknown strat-value: {str_unit}")
subset = split_unit_counter[unit]
if unit in strat_props.index:
layers.append(f"{strat_props.at[unit, 'code']}_{subset}")
else:
logger.warning(f"Unknown strat-value: {unit}")
layers.append(unit)
else:
logger.warning(f"Unknown strat-value: {unit}")
layers.append(unit)
if unit >= 6000 and method_geulen != "split_layers":
geulen.append(layers[-1])
dims = ("layer", "y", "x")
coords = {"layer": layers, "y": geotop_ds.y, "x": geotop_ds.x}
ds = xr.Dataset({"top": (dims, top), "botm": (dims, bot)}, coords=coords)
if method_geulen is None or method_geulen == "split_layers":
pass
elif method_geulen == "add_as_layer":
top = ds["top"].copy(deep=True)
bot = ds["botm"].copy(deep=True)
for geul in geulen:
ds = remove_layer(ds, geul)
for geul in geulen:
ds = insert_layer(ds, geul, top.loc[geul], bot.loc[geul])
elif method_geulen == "add_to_layer_below":
top = ds["top"].copy(deep=True)
bot = ds["botm"].copy(deep=True)
for geul in geulen:
ds = remove_layer(ds, geul)
for geul in geulen:
todo = (top.loc[geul] - bot.loc[geul]) > 0.0
for layer in ds.layer:
if not todo.any():
continue
# adds the thickness of the geul to the layer below the geul
mask = (top.loc[geul] > bot.loc[layer]) & todo
if mask.any():
ds["top"].loc[layer].data[mask] = np.maximum(
top.loc[geul].data[mask], top.loc[layer].data[mask]
)
todo.data[mask] = False
if todo.any():
# unless the geul is the bottom layer
# then its thickness is added to the last active layer
# idomain = get_idomain(ds)
# fal = get_last_active_layer_from_idomain(idomain)
logger.warning(
f"Geul {geul} is at the bottom of the GeoTOP-dataset in {int(todo.sum())} cells, where it is ignored"
)
elif method_geulen == "add_to_layer_above":
top = ds["top"].copy(deep=True)
bot = ds["botm"].copy(deep=True)
for geul in geulen:
ds = remove_layer(ds, geul)
for geul in geulen:
todo = (top.loc[geul] - bot.loc[geul]) > 0.0
for layer in reversed(ds.layer):
if not todo.any():
continue
# adds the thickness of the geul to the layer above the geul
mask = (bot.loc[geul] < top.loc[layer]) & todo
if mask.any():
ds["botm"].loc[layer].data[mask] = np.minimum(
bot.loc[geul].data[mask], bot.loc[layer].data[mask]
)
todo.data[mask] = False
if todo.any():
# unless the geul is the top layer
# then its thickness is added to the last active layer
# idomain = get_idomain(ds)
# fal = get_first_active_layer_from_idomain(idomain)
logger.warning(
f"Geul {geul} is at the top of the GeoTOP-dataset in {int(todo.sum())} cells, where it is ignored"
)
else:
raise (ValueError(f"Unknown method to deal with geulen: '{method_geulen}'"))
ds.attrs["geulen"] = geulen
if drop_layer_dim_from_top:
ds = remove_layer_dim_from_top(ds)
if "kh" in geotop_ds and "kv" in geotop_ds:
aggregate_to_ds(geotop_ds, ds, **kwargs)
# add atributes
for datavar in ds:
ds[datavar].attrs["source"] = "Geotop"
ds[datavar].attrs["url"] = GEOTOP_URL
ds[datavar].attrs["date"] = dt.datetime.now().strftime("%Y%m%d")
if datavar in ["top", "bot"]:
ds[datavar].attrs["units"] = "mNAP"
elif datavar in ["kh", "kv"]:
ds[datavar].attrs["units"] = "m/day"
return ds
[docs]
def get_geotop(*args, **kwargs):
"""Get a slice of the geotop netcdf url within the extent, set the x and y
coordinates to match the cell centers and keep only the strat and lithok data
variables.
.. deprecated:: 0.10.0
`get_geotop` will be removed in nlmod 1.0.0, it is replaced by
`download_geotop` because of new naming convention
https://github.com/gwmod/nlmod/issues/47
Parameters
----------
extent : list, tuple or np.array
desired model extent (xmin, xmax, ymin, ymax)
url : str, optional
url of geotop netcdf file. The default is nlmod.read.geotop.GEOTOP_URL:
http://www.dinodata.nl/opendap/GeoTOP/geotop.nc
probabilities : bool, optional
if True, also download probability data. The default is False.
Returns
-------
gt : xarray Dataset
slices geotop netcdf.
"""
warnings.warn(
"this function is deprecated and will eventually be removed, "
"please use nlmod.read.geotop.download_geotop() in the future.",
DeprecationWarning,
)
return download_geotop(*args, **kwargs)
[docs]
@cache.cache_netcdf()
def download_geotop(extent, url=None, probabilities=False, chunks="auto"):
"""Get a slice of the geotop netcdf url within the extent, set the x and y
coordinates to match the cell centers and keep only the strat and lithok data
variables.
Parameters
----------
extent : list, tuple or np.array
desired model extent (xmin, xmax, ymin, ymax)
url : str, optional
url of geotop netcdf file. The default is nlmod.read.geotop.GEOTOP_URL:
http://www.dinodata.nl/opendap/GeoTOP/geotop.nc
probabilities : bool, optional
if True, also download probability data. The default is False.
chunks : int, dict, 'auto' or None. The default is 'auto'.
If provided, used to load the data into dask arrays.
- ``chunks="auto"`` will use dask ``auto`` chunking.
- ``chunks=None`` skips using dask. This uses xarray's internally private lazy
indexing classes, but data is eagerly loaded into memory as numpy arrays when
accessed. This can be more efficient for smaller arrays or when large arrays
are sliced before computation.
See dask chunking for more details.
Returns
-------
gt : xarray Dataset
slices geotop netcdf.
"""
if url is None:
url = GEOTOP_URL
gt = xr.open_dataset(url, chunks=chunks)
# only download requisite data
data_vars = ["strat", "lithok"]
if probabilities:
data_vars += [
"kans_1",
"kans_2",
"kans_3",
"kans_4",
"kans_5",
"kans_6",
"kans_7",
"kans_8",
"kans_9",
"onz_lk",
"onz_ls",
]
# set x and y dimensions to cell center
for dim in ["x", "y"]:
old_dim = gt[dim].values
gt[dim] = old_dim + (old_dim[1] - old_dim[0]) / 2
# get data vars and slice extent
gt = gt[data_vars].sel(x=slice(extent[0], extent[1]), y=slice(extent[2], extent[3]))
# change order of dimensions from x, y, z to z, y, x
gt = gt.transpose("z", "y", "x")
# flip z, and y coordinates
gt = gt.isel(z=slice(None, None, -1), y=slice(None, None, -1))
# add missing value
# gt.strat.attrs["missing_value"] = -127
return gt
[docs]
def add_top_and_botm(ds):
"""Add the top and bottom of the voxels to the GeoTOP Dataset.
This makes sure the structure of the geotop dataset is more like regis, and we can
use the cross-section class (DatasetCrossSection from nlmod.
Parameters
----------
ds : xr.Dataset
The geotop-dataset.
Returns
-------
ds : xr.Dataset
The geotop-dataset, with added variables "top" and "botm".
"""
bottom = np.expand_dims(ds.z.values - 0.25, axis=(1, 2))
bottom = np.repeat(np.repeat(bottom, len(ds.y), 1), len(ds.x), 2)
bottom[np.isnan(ds.strat.values)] = np.nan
ds["botm"] = ("z", "y", "x"), bottom
top = np.expand_dims(ds.z.values + 0.25, axis=(1, 2))
top = np.repeat(np.repeat(top, len(ds.y), 1), len(ds.x), 2)
top[np.isnan(ds.strat.values)] = np.nan
ds["top"] = ("z", "y", "x"), top
return ds
[docs]
def add_kh_and_kv(
gt,
df,
stochastic=None,
kh_method="arithmetic_mean",
kv_method="harmonic_mean",
anisotropy=1.0,
kh="kh",
kv="kv",
kh_df="kh",
kv_df="kv",
):
"""Add kh and kv variables to the voxels of the GeoTOP Dataset.
Parameters
----------
gt : xr.Dataset
The geotop dataset, at least with variable lithok.
df : pd.DataFrame
A DataFrame with information about the kh and optionally kv, for different
lithoclasses or stratigraphic units. The DataFrame must contain the columns
'lithok' and 'kh', and optionally 'strat' and 'kv'. As an example see
nlmod.read.geotop.get_kh_kv_table().
stochastic : bool, str or None, optional
When stochastic is True or a string, use the stochastic data of GeoTOP. The only
supported method right now is "linear", which means kh and kv are determined
from a linear weighted mean of the voxels. For kh the method from kh_method is
used to calculate the mean. For kv the method from kv_method is used. When
stochastic is False or None, the stochastic data of GeoTOP is not used. The
default is None.
kh_method : str, optional
The method to calculate the weighted mean of kh values when stochastic is True
or "linear". Allowed values are "arithmetic_mean" and "harmonic_mean". The
default is "arithmetic_mean".
kv_method : str, optional
The method to calculate the weighted mean of kv values when stochastic is True
or "linear". Allowed values are "arithmetic_mean" and "harmonic_mean". The
default is "arithmetic_mean".
anisotropy : float, optional
The anisotropy value used when there are no kv values in df. The default is 1.0.
kh : str, optional
THe name of the new variable with kh values in gt. The default is "kh".
kv : str, optional
The name of the new variable with kv values in gt. The default is "kv".
kh_df : str, optional
The name of the column with kh values in df. The default is "kh".
kv_df : str, optional
THe name of the column with kv values in df. The default is "kv".
Raises
------
DESCRIPTION.
Returns
-------
gt : xr.Dataset
Datset with voxel-data, with the added variables 'kh' and 'kv'.
"""
if isinstance(stochastic, bool):
if stochastic:
stochastic = "linear"
else:
stochastic = None
if kh_method not in ["arithmetic_mean", "harmonic_mean"]:
raise (ValueError("Unknown kh_method: {kh_method}"))
if kv_method not in ["arithmetic_mean", "harmonic_mean"]:
raise (ValueError("Unknown kv_method: {kv_method}"))
strat = gt["strat"].values
msg = "Determining kh and kv of geotop-data based on lithoclass"
if df.index.name in ["lithok", "strat"]:
df = df.reset_index()
if "strat" in df:
msg = f"{msg} and stratigraphy"
logger.info(msg)
if kh_df not in df:
raise (MissingValueError(f"No {kh_df} defined in df"))
if kv_df not in df:
logger.info(f"Setting kv equal to kh / {anisotropy}")
if stochastic is None:
# calculate kh and kv from most likely lithoclass
lithok = gt["lithok"].values
kh_ar = np.full(lithok.shape, np.nan)
kv_ar = np.full(lithok.shape, np.nan)
if "strat" in df:
combs = np.column_stack((strat.ravel(), lithok.ravel()))
# drop nans
combs = combs[~np.isnan(combs).any(1)].astype(int)
# get unique combinations of strat and lithok
combs_un = np.unique(combs, axis=0)
for istrat, ilithok in combs_un:
mask = (strat == istrat) & (lithok == ilithok)
kh_ar[mask], kv_ar[mask] = _get_kh_kv_from_df(
df, ilithok, istrat, anisotropy=anisotropy, mask=mask
)
else:
for ilithok in np.unique(lithok[~np.isnan(lithok)]):
mask = lithok == ilithok
kh_ar[mask], kv_ar[mask] = _get_kh_kv_from_df(
df,
ilithok,
anisotropy=anisotropy,
mask=mask,
)
elif stochastic == "linear":
strat_un = np.unique(strat[~np.isnan(strat)])
kh_ar = np.full(strat.shape, 0.0)
kv_ar = np.full(strat.shape, 0.0)
probability_total = np.full(strat.shape, 0.0)
for ilithok in df["lithok"].unique():
if ilithok == 0:
# there are no probabilities defined for lithoclass 'antropogeen'
continue
probability = gt[f"kans_{ilithok}"].values
if "strat" in df:
khi, kvi = _handle_nans_in_stochastic_approach(
np.nan, np.nan, kh_method, kv_method
)
khi = np.full(strat.shape, khi)
kvi = np.full(strat.shape, kvi)
for istrat in strat_un:
mask = (strat == istrat) & (probability > 0)
if not mask.any():
continue
kh_sel, kv_sel = _get_kh_kv_from_df(
df, ilithok, istrat, anisotropy=anisotropy, mask=mask
)
if np.isnan(kh_sel):
probability[mask] = 0.0
kh_sel, kv_sel = _handle_nans_in_stochastic_approach(
kh_sel, kv_sel, kh_method, kv_method
)
khi[mask], kvi[mask] = kh_sel, kv_sel
else:
khi, kvi = _get_kh_kv_from_df(df, ilithok, anisotropy=anisotropy)
if np.isnan(khi):
probability[:] = 0.0
khi, kvi = _handle_nans_in_stochastic_approach(
khi, kvi, kh_method, kv_method
)
if kh_method == "arithmetic_mean":
kh_ar = kh_ar + probability * khi
else:
kh_ar = kh_ar + (probability / khi)
if kv_method == "arithmetic_mean":
kv_ar = kv_ar + probability * kvi
else:
kv_ar = kv_ar + (probability / kvi)
probability_total += probability
if kh_method == "arithmetic_mean":
kh_ar = np.divide(
kh_ar,
probability_total,
out=np.full_like(kh_ar, np.nan),
where=probability_total > 0,
)
else:
kh_ar = np.divide(
probability_total,
kh_ar,
out=np.full_like(kh_ar, np.nan),
where=kh_ar != 0,
)
if kv_method == "arithmetic_mean":
kv_ar = np.divide(
kv_ar,
probability_total,
out=np.full_like(kv_ar, np.nan),
where=probability_total > 0,
)
else:
kv_ar = np.divide(
probability_total,
kv_ar,
out=np.full_like(kv_ar, np.nan),
where=kv_ar != 0,
)
else:
raise (ValueError(f"Unsupported value for stochastic: '{stochastic}'"))
dims = gt["strat"].dims
gt[kh] = dims, kh_ar
gt[kv] = dims, kv_ar
return gt
[docs]
def _get_kh_kv_from_df(df, ilithok, istrat=None, anisotropy=1.0, mask=None):
mask_df = df["lithok"] == ilithok
if istrat is not None:
mask_df = mask_df & (df["strat"] == istrat)
if not np.any(mask_df):
msg = f"No conductivities found for stratigraphic unit {istrat}"
if istrat is not None:
msg = f"{msg} and lithoclass {ilithok}"
if mask is None:
msg = f"{msg}. Setting values of voxels to NaN."
else:
msg = f"{msg}. Setting values of {mask.sum()} voxels to NaN."
logger.warning(msg)
return np.nan, np.nan
kh = df.loc[mask_df, "kh"].mean()
if "kv" in df:
kv = df.loc[mask_df, "kv"].mean()
if np.isnan(kv):
kv = kh / anisotropy
if np.isnan(kh):
kh = kv * anisotropy
else:
kv = kh / anisotropy
return kh, kv
[docs]
def _handle_nans_in_stochastic_approach(kh, kv, kh_method, kv_method):
if np.isnan(kh):
if kh_method == "arithmetic_mean":
kh = 0.0
else:
kh = np.inf
if np.isnan(kv):
if kv_method == "arithmetic_mean":
kv = 0.0
else:
kv = np.inf
return kh, kv
[docs]
def aggregate_to_ds(
gt, ds, kh="kh", kv="kv", kd="kD", c="c", kh_gt="kh", kv_gt="kv", add_kd_and_c=False
):
"""Aggregate voxels from GeoTOP to layers in a model DataSet with top and botm, to
calculate kh and kv.
Parameters
----------
gt : xr.Dataset
A Dataset containing the Geotop voxel data.
ds : xr.Dataset
A Dataset containing the top and botm of the layers.
kh : str, optional
The name of the new variable for the horizontal conductivity in ds. The default
is "kh".
kv : str, optional
The name of the new variable for the vertical conductivity in ds. The default is
"kv".
kd : str, optional
The name of the variable for the horizontal transmissivity. Only used when
add_kd_and_c is True. The default is "kD".
c : str, optional
The name of the variable for the vertical reistance. Only used when add_kd_and_c
is True The default is "c".
kh_gt : str, optional
The name of the variable for the horizontal conductivity in gt. The default is
"kh".
kv_gt : str, optional
The name of the variable for the vertical conductivity in gt. The default is
"kv".
add_kd_and_c : bool, optional
Add the variables kd and c to ds. The default is False.
Returns
-------
ds : xr.Dataset
The Dataset ds, with added variables kh and kv (and optionally kd and c).
"""
assert (ds.x == gt.x).all() and (ds.y == gt.y).all()
msg = "Please add '{}' to geotop-Dataset first, using add_kh_and_kv()"
if kh_gt not in gt:
raise (MissingValueError(msg.format(kh_gt)))
if kv_gt not in gt:
raise (MissingValueError(msg.format(kv_gt)))
kD_ar = []
c_ar = []
kh_ar = []
kv_ar = []
for ilay in range(len(ds.layer)):
if ilay == 0:
top = ds["top"]
if "layer" in top.dims:
top = top[0].drop_vars("layer")
else:
if "layer" in ds["top"].dims:
top = ds["top"][ilay].drop_vars("layer")
else:
top = ds["botm"][ilay - 1].drop_vars("layer")
bot = ds["botm"][ilay].drop_vars("layer")
gt_top = (gt["z"] + 0.25).broadcast_like(gt[kh_gt])
gt_bot = (gt["z"] - 0.25).broadcast_like(gt[kh_gt])
gt_top = gt_top.where(gt_top < top, top)
gt_top = gt_top.where(gt_top > bot, bot)
gt_bot = gt_bot.where(gt_bot < top, top)
gt_bot = gt_bot.where(gt_bot > bot, bot)
gt_thk = gt_top - gt_bot
# kD is the sum of thickness multiplied by conductivity
kD_ar.append((gt_thk * gt[kh_gt]).sum("z"))
# c is the sum of thickness devided by conductivity
c_ar.append((gt_thk / gt[kv_gt]).sum("z"))
# caluclate kh and hv
d_gt = gt_top - gt_bot
# use only the thickness with valid kh-values
D = d_gt.where(~np.isnan(gt[kh_gt])).sum("z")
kh_ar.append(kD_ar[-1] / D)
# use only the thickness with valid kv-values
D = d_gt.where(~np.isnan(gt[kv_gt])).sum("z")
kv_ar.append(D / c_ar[-1])
if add_kd_and_c:
ds[kd] = xr.concat(kD_ar, ds.layer)
ds[c] = xr.concat(c_ar, ds.layer)
ds[kh] = xr.concat(kh_ar, ds.layer)
ds[kv] = xr.concat(kv_ar, ds.layer)
return ds
[docs]
def _save_excel_files_as_csv():
"""
This method takes the files REF_GTP_STR_UNIT.xlsx and REF_GTP_LITHO_CLASS.xlsx that
are taken from the GeoTOP 1.6 zipfile downloaded from DINOloket, and saves them as
csv-files. In this way version-control can better process the changes in future
versions of GeoTOP.
Returns
-------
None.
"""
for name in ["REF_GTP_STR_UNIT.xlsx", "REF_GTP_LITHO_CLASS.xlsx"]:
fname = os.path.join(NLMOD_DATADIR, "geotop", name)
df = pd.read_excel(fname, keep_default_na=False)
df.to_csv(fname.replace(".xlsx", ".csv"), index=False)