import datetime as dt
import logging
import warnings
import numpy as np
import pandas as pd
from .. import cache, util
from ..dims.grid import get_affine_mod_to_world, is_structured, is_vertex
from ..dims.layers import get_first_active_layer
from ..dims.base import get_ds
from ..dims.shared import get_area
from ..dims.time import ds_time_to_pandas_index
logger = logging.getLogger(__name__)
[docs]
@cache.cache_netcdf(coords_3d=True, coords_time=True)
def get_recharge(
ds,
oc_knmi=None,
method="linear",
most_common_station=False,
add_stn_dimensions=False,
to_model_time=True,
hourly_precision=None,
):
"""Add recharge to model dataset from KNMI data.
Add recharge to the model dataset with knmi data by following these steps:
1. check for each cell (structured or vertex) which knmi measurement
stations (prec and evap) are the closest.
2. download precipitation and evaporation data for all knmi stations that
were found at 1
3. create a recharge array in which each cell has a reference to a
timeseries. Timeseries are created for each unique combination of
precipitation and evaporation. The following packages are created:
a. the rch package itself in which cells with the same
precipitation and evaporation stations are defined. This
package also refers to all the time series package (see b).
b. the time series packages in which the recharge flux is defined
for the time steps of the model. Each package contains the
time series for one or more cels (defined in a).
Supports structured and unstructred datasets.
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
oc_knmi : hpd.ObsCollection
ObsCollection with precipitation (RD) and evaporation (EV24) data from the knmi.
method : str, optional
If 'linear', calculate recharge by subtracting evaporation from precipitation.
If 'separate', add precipitation as 'recharge' and evaporation as 'evaporation'.
Method is only used when `add_stn_dimensions=False`. The default is 'linear'.
most_common_station : bool, optional
When True, only download data from the station that is most common in the model
area. The default is False
Returns
-------
ds : xr.DataSet
dataset with spatial model data including the rch raster
"""
if oc_knmi is None:
start = pd.Timestamp(ds.time.attrs["start"])
end = pd.Timestamp(ds.time.data[-1])
oc_knmi = download_knmi(
ds, start=start, end=end, most_common_station=most_common_station
)
return discretize_knmi(
ds,
oc_knmi,
method=method,
most_common_station=most_common_station,
add_stn_dimensions=add_stn_dimensions,
to_model_time=to_model_time,
hourly_precision=hourly_precision,
)
[docs]
@cache.cache_netcdf(coords_3d=True, coords_time=True)
def discretize_knmi(
ds,
oc_knmi,
method="linear",
most_common_station=True,
add_stn_dimensions=False,
to_model_time=True,
hourly_precision=None,
):
"""discretize knmi data to model grid
Create a dataset with recharge (and evaporation) data by following these steps:
1. Check for each cell (structured or vertex) which knmi measurement
stations (prec and evap) are the closest.
2. Download precipitation and evaporation data for all knmi stations that
were found at 1
3. Create a recharge array in which each cell has a reference to a
timeseries. Timeseries are created for each unique combination of
precipitation and evaporation. The following packages are created:
a. the rch package itself in which cells with the same
precipitation and evaporation stations are defined. This
package also refers to all the time series package (see b).
b. the time series packages in which the recharge flux is defined
for the time steps of the model. Each package contains the
time series for one or more cels (defined in a).
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
oc_knmi : hpd.ObsCollection
ObsCollection with precipitation (RD) and evaporation (EV24) data from the knmi.
method : str, optional
If 'linear', calculate recharge by subtracting evaporation from precipitation.
If 'separate', add precipitation as 'recharge' and evaporation as 'evaporation'.
Method is only used when `add_stn_dimensions=False`. The default is 'linear'.
most_common_station : bool, optional
When True, only use data from the station that is most common in the model
area. The default is True
add_stn_dimensions : bool, optional
When True, add the dimension `time` to the variable `recharge` (and `evaporation`
when `method='seperate'`). When True, add dimension `stn_RD` and `stn_EV24` to
the variable `recharge` and `evaporation`, and add variables "recharge_stn" and
"evaporation_stn" that specify for every grid cell which KNMI-stations are used.
When `add_stn_dimensions` is False, specify recharge (and evaporation when
`method='seperate'`) for every gridcell. The default is False.
to_model_time : bool, optional
When True, resample the recharge and evaporation to the dimension `time` in ds.
When False, save the original times of the KNMI-data in variables `time_RD` and
`time_EV24`. `to_model_time=False` is only supported for
`add_stn_dimensions=True`. The default is True.
hourly_precision : bool, optional
When True, take into account the hour of the daily knmi-measurements (1:00 for
evaporation and 9:00 for precipitation). The default is None, which will raise a
warning, mentioning that hourly_precision is set to False, but the default will
change to True in the future.
Returns
-------
ds : xr.DataSet
dataset with 'recharge' (and 'evaporation') variables.
Raises
------
AttributeError
if the dataset has no time dimension
TypeError
if time dimension does not have a datetime64[ns] type
ValueError
if grid is not vertex
"""
# check time settings
if "time" not in ds:
raise (
AttributeError(
"'Dataset' object has no 'time' dimension. "
"Please run nlmod.time.set_ds_time()"
)
)
if ds.time.dtype.kind != "M":
raise TypeError("get recharge requires a datetime64[ns] time index")
# get locations
locations = get_locations(
ds, oc_knmi=oc_knmi, most_common_station=most_common_station
)
# get recharge data array
keep_coords = ("y", "x")
if to_model_time:
keep_coords = ("time",) + keep_coords
ds_out = util.get_ds_empty(ds, keep_coords=keep_coords)
ds_out.attrs["gridtype"] = ds.gridtype
if is_structured(ds):
dims = ("y", "x")
elif is_vertex(ds):
dims = ("icell2d",)
else:
raise ValueError("gridtype should be structured or vertex")
if hourly_precision is None:
msg = (
"The default of hourly_precision=False will be changed to True in a future "
"version of nlmod. Pass hourly_precision=False to retain current behavior "
"or hourly_precision=True to adopt the future default and silence this "
"warning."
)
logger.warning(msg)
hourly_precision = False
if add_stn_dimensions:
nodata = -999
shape = [len(ds_out[dim]) for dim in dims]
variables = {"recharge": "stn_RD", "evaporation": "stn_EV24"}
for var in variables:
stn_var = variables[var]
ds_out[f"{var}_stn"] = dims, np.full(shape, nodata)
values = [int(x.split("_")[-1]) for x in locations[stn_var]]
if is_structured(ds):
ds_out[f"{var}_stn"].data[locations.row, locations.col] = values
elif is_vertex(ds):
ds_out[f"{var}_stn"].data[locations.index] = values
ds_out[f"{var}_stn"].attrs["nodata"] = nodata
stn_un = locations[stn_var].unique()
column = stn_var.split("_")[-1]
df = pd.DataFrame([x[column] for x in oc_knmi.loc[stn_un, "obs"]]).T
df.columns = [int(x.split("_")[-1]) for x in stn_un]
df.columns.name = stn_var
if to_model_time:
if hourly_precision:
df = df.resample("h").bfill()
else:
df = df.resample("D").nearest()
df = _resample_df_to_model_time(df, ds)
df.index.name = "time"
else:
df.index.name = f"time_{column}"
ds_out[var] = df
if to_model_time:
# make sure all attributes of ds.time are in ds_out.time
ds_out["time"] = ds.time
return ds_out
if not to_model_time:
raise (
NotImplementedError(
"`to_model_time=False` not implemented for `add_stn_dimensions=False`"
)
)
dims = ("time",) + dims
shape = [len(ds_out[dim]) for dim in dims]
if method in ["linear"]:
ds_out["recharge"] = dims, np.zeros(shape)
# find unique combination of precipitation and evaporation station
unique_combinations = locations.drop_duplicates(["stn_RD", "stn_EV24"])[
["stn_RD", "stn_EV24"]
].values
if unique_combinations.shape[1] > 2:
# bug fix for pandas 2.1 where three columns are returned
unique_combinations = unique_combinations[:, :2]
for stn_rd, stn_ev24 in unique_combinations:
# get locations with the same prec and evap station
mask = (locations["stn_RD"] == stn_rd) & (locations["stn_EV24"] == stn_ev24)
loc_sel = locations.loc[mask]
# calculate recharge time series
prec = oc_knmi.loc[stn_rd, "obs"]["RD"]
evap = oc_knmi.loc[stn_ev24, "obs"]["EV24"]
if hourly_precision:
prec = prec.resample("h").bfill()
evap = evap.resample("h").bfill()
else:
prec = prec.resample("D").nearest()
evap = evap.resample("D").nearest()
ts = (prec - evap).dropna()
ts.name = f"{prec.name}-{evap.name}"
_add_ts_to_ds(ts, loc_sel, "recharge", ds_out)
elif method == "separate":
ds_out["recharge"] = dims, np.zeros(shape)
for stn in locations["stn_RD"].unique():
ts = oc_knmi.loc[stn, "obs"]["RD"]
if hourly_precision:
ts = ts.resample("h").bfill()
else:
ts = ts.resample("D").nearest()
loc_sel = locations.loc[(locations["stn_RD"] == stn)]
_add_ts_to_ds(ts, loc_sel, "recharge", ds_out)
ds_out["evaporation"] = dims, np.zeros(shape)
for stn in locations["stn_EV24"].unique():
ts = oc_knmi.loc[stn, "obs"]["EV24"]
if hourly_precision:
ts = ts.resample("h").bfill()
else:
ts = ts.resample("D").nearest()
loc_sel = locations.loc[(locations["stn_EV24"] == stn)]
_add_ts_to_ds(ts, loc_sel, "evaporation", ds_out)
else:
raise (ValueError(f"Unknown method: {method}"))
for datavar in ds_out:
ds_out[datavar].attrs["source"] = "KNMI"
ds_out[datavar].attrs["date"] = dt.datetime.now().strftime("%Y%m%d")
ds_out[datavar].attrs["units"] = "m/day"
return ds_out
[docs]
def _resample_df_to_model_time(df, ds):
tmod = ds_time_to_pandas_index(ds)
by = pd.cut(df.index, tmod, right=True)
df_res = df.groupby(by, observed=False).mean()
df_res.index = tmod[1:]
# when the model frequency is higher than df.index,
# there will be NaN's, which we fill by backfill
if df_res.isna().any(axis=None):
df_res = df_res.bfill()
return df_res
[docs]
def _add_ts_to_ds(timeseries, loc_sel, variable, ds):
"""Add a timeseries to a variable at location loc_sel in model DataSet."""
model_recharge = _resample_df_to_model_time(timeseries, ds)
if model_recharge.isna().any():
raise (ValueError(f"There are NaN-values in {variable}."))
# add data to ds
values = np.repeat(model_recharge.values[:, np.newaxis], loc_sel.shape[0], 1)
if is_structured(ds):
ds[variable].data[:, loc_sel.row, loc_sel.col] = values
elif is_vertex(ds):
ds[variable].data[:, loc_sel.index] = values
[docs]
def _get_locations_vertex(ds):
"""Get dataframe with the locations of the grid cells of a vertex grid.
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
Returns
-------
locations : pandas DataFrame
DataFrame with the locations of all active grid cells.
includes the columns: x, y and layer
"""
# get active locations
fal = get_first_active_layer(ds)
icell2d_active = np.where(fal != fal.attrs["nodata"])[0]
# create dataframe from active locations
x = ds["x"].sel(icell2d=icell2d_active)
y = ds["y"].sel(icell2d=icell2d_active)
if "angrot" in ds.attrs and ds.attrs["angrot"] != 0.0:
# transform coordinates into real-world coordinates
affine = get_affine_mod_to_world(ds)
x, y = affine * (x, y)
layer = fal.sel(icell2d=icell2d_active)
locations = pd.DataFrame(
index=icell2d_active, data={"x": x, "y": y, "layer": layer}
)
hpd = util.import_hydropandas()
locations = hpd.ObsCollection(locations)
return locations
[docs]
def _get_locations_structured(ds):
"""Get dataframe with the locations of the grid cells of a structured grid.
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
Returns
-------
locations : pandas DataFrame
DataFrame with the locations of all active grid cells.
includes the columns: x, y, row, col and layer
"""
# store x and y mids in locations of active cells
fal = get_first_active_layer(ds)
rows, columns = np.where(fal != fal.attrs["nodata"])
x = np.array([ds["x"].data[col] for col in columns])
y = np.array([ds["y"].data[row] for row in rows])
if "angrot" in ds.attrs and ds.attrs["angrot"] != 0.0:
# transform coordinates into real-world coordinates
affine = get_affine_mod_to_world(ds)
x, y = affine * (x, y)
layers = [fal.data[row, col] for row, col in zip(rows, columns)]
hpd = util.import_hydropandas()
locations = hpd.ObsCollection(
pd.DataFrame(
data={"x": x, "y": y, "row": rows, "col": columns, "layer": layers}
)
)
return locations
[docs]
@cache.cache_pickle
def download_knmi(
ds=None,
extent=None,
delr=None,
delc=None,
start=None,
end=None,
most_common_station=False,
):
"""Get precipitation (RD) and evaporation (EV24) data from the knmi at the grid
cells.
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
most_common_station : bool, optional
When True, only download data from the station that is most common in the model
area. The default is False
start : str or datetime, optional
start date of measurements that you want, The default is '2010'.
end : str or datetime, optional
end date of measurements that you want, The default is None.
Raises
------
ValueError
If data is not available for the entire model period
Returns
-------
oc_knmi
hpd.ObsCollection
"""
if ds is None:
ds = get_ds(extent, delr=delr, delc=delc)
locations = get_locations(ds, most_common_station=most_common_station)
oc_knmi = _download_knmi_at_locations(locations, start=start, end=end)
# check if downloaded data is correct
if end is None:
end = ds.time.data[-1]
end = pd.Timestamp(end)
for obs in oc_knmi["obs"]:
msg = f"No data available for time series'{obs.name}'"
if obs.empty:
raise (ValueError(msg))
if obs.index[-1] < end:
raise ValueError(f"{msg} untill date {end}")
return oc_knmi
[docs]
def get_knmi(ds, most_common_station=False, start=None, end=None):
"""Get precipitation (RD) and evaporation (EV24) data from the knmi at the grid
cells.
.. deprecated:: 0.10.0
`get_knmi` will be removed in nlmod 1.0.0, it is replaced by
`download_knmi` because of new naming convention
https://github.com/gwmod/nlmod/issues/47
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
most_common_station : bool, optional
When True, only download data from the station that is most common in the model
area. The default is False
start : str or datetime, optional
start date of measurements that you want, The default is '2010'.
end : str or datetime, optional
end date of measurements that you want, The default is None.
Returns
-------
oc_knmi
hpd.ObsCollection
"""
warnings.warn(
"'get_knmi' is deprecated and will raise an error in the "
"future. Use 'nlmod.read.knmi.download_knmi' to get knmi data for your model",
DeprecationWarning,
)
if start is None:
start = pd.Timestamp(ds.time.attrs["start"])
if end is None:
end = pd.Timestamp(ds.time.data[-1])
return download_knmi(
ds=ds, start=start, end=end, most_common_station=most_common_station
)
[docs]
def get_locations(ds, oc_knmi=None, most_common_station=False):
"""Get the locations of the active grid cells in ds and the nearest (or most common)
precipitation and evaporation station.
Parameters
----------
ds : xr.DataSet
dataset containing relevant model grid information
oc_knmi : hpd.ObsCollection or None, optional
ObsCollection with knmi station data. If None the nearest of all knmi stations
is used.
most_common_station : bool, optional
When True, only download data from the station that is most common in the model
area. The default is False
Raises
------
ValueError
wrong grid type specified.
Returns
-------
locations : pd.DataFrame
each row contains a location (x and y) and the relevant precipitation (stn_RD)
and evaporation (stn_EV24) stations.
"""
# get locations
if is_structured(ds):
locations = _get_locations_structured(ds)
elif is_vertex(ds):
locations = _get_locations_vertex(ds)
else:
raise ValueError("gridtype should be structured or vertex")
util.import_hydropandas(method="nlmod.read.knmi.get_locations()")
from hydropandas.io import knmi
if oc_knmi is not None:
locations["stn_RD"] = knmi.get_nearest_station_df(
locations, stations=oc_knmi.loc[oc_knmi["meteo_var"] == "RD"]
)
locations["stn_EV24"] = knmi.get_nearest_station_df(
locations, stations=oc_knmi.loc[oc_knmi["meteo_var"] == "EV24"]
)
else:
locations["stn_RD"] = knmi.get_nearest_station_df(locations, meteo_var="RD")
locations["stn_EV24"] = knmi.get_nearest_station_df(locations, meteo_var="EV24")
if most_common_station:
if is_structured(ds):
# set the most common station to all locations
locations["stn_RD"] = locations["stn_RD"].value_counts().idxmax()
locations["stn_EV24"] = locations["stn_EV24"].value_counts().idxmax()
else:
# set the station with the largest area to all locations
if "area" not in ds:
ds["area"] = get_area(ds)
locations["area"] = ds["area"].loc[locations.index]
locations["stn_RD"] = locations.groupby("stn_RD").sum()["area"].idxmax()
locations["stn_EV24"] = locations.groupby("stn_EV24").sum()["area"].idxmax()
return locations
[docs]
def _download_knmi_at_locations(locations, start=None, end=None):
"""Get precipitation (RD) and evaporation (EV24) data from the knmi at the locations
Parameters
----------
locations : pd.DataFrame
each row contains a location (x and y) and the relevant precipitation (stn_RD)
and evaporation (stn_EV24) stations.
start : str or datetime, optional
start date of measurements that you want, The default is '2010'.
end : str or datetime, optional
end date of measurements that you want, The default is None.
Returns
-------
oc_knmi
hpd.ObsCollection
"""
stns_rd = locations["stn_RD"].unique()
stns_ev24 = locations["stn_EV24"].unique()
# get knmi data stations closest to any grid cell
hpd = util.import_hydropandas()
olist = []
for stnrd in stns_rd:
o = hpd.PrecipitationObs.from_knmi(
meteo_var="RD", stn=stnrd, start=start, end=end, fill_missing_obs=True
)
olist.append(o)
for stnev24 in stns_ev24:
o = hpd.EvaporationObs.from_knmi(
meteo_var="EV24", stn=stnev24, start=start, end=end, fill_missing_obs=True
)
olist.append(o)
oc_knmi = hpd.ObsCollection(olist)
return oc_knmi
[docs]
def get_knmi_at_locations(locations, ds=None, start=None, end=None):
"""Get precipitation (RD) and evaporation (EV24) data from the knmi at the locations
.. deprecated:: 0.10.0
`get_knmi_at_locations` will be removed in nlmod 1.0.0, it is replaced by
`_download_knmi_at_locations` because of new naming convention
https://github.com/gwmod/nlmod/issues/47
Parameters
----------
locations : pd.DataFrame
each row contains a location (x and y) and the relevant precipitation (stn_RD)
and evaporation (stn_EV24) stations.
ds : xr.DataSet or None, optional
dataset containing relevant time information. If None provide start and end.
start : str or datetime, optional
start date of measurements that you want, The default is '2010'.
end : str or datetime, optional
end date of measurements that you want, The default is None.
Returns
-------
oc_knmi
hpd.ObsCollection
"""
warnings.warn(
"'get_knmi_at_locations' is deprecated and will raise an error in the "
"future. Use 'nlmod.read.knmi._download_knmi_at_locations' to get knmi data",
DeprecationWarning,
)
if start is None:
start = pd.Timestamp(ds.time.attrs["start"])
if end is None:
end = pd.Timestamp(ds.time.data[-1])
return _download_knmi_at_locations(locations, start, end)