Source code for nlmod.dims.time

import datetime as dt
import logging
import warnings

import cftime
import numpy as np
import pandas as pd
import xarray as xr
from pandas._libs.tslibs.np_datetime import OutOfBoundsDatetime, OutOfBoundsTimedelta
from xarray import IndexVariable

from .attributes_encodings import dim_attrs

logger = logging.getLogger(__name__)


[docs] def set_ds_time_deprecated( ds, time=None, steady_state=False, steady_start=True, steady_start_perlen=3652.0, time_units="DAYS", start_time=None, transient_timesteps=0, perlen=1.0, nstp=1, tsmult=1.0, ): """Set timing for a model dataset. Parameters ---------- ds : xarray.Dataset Dataset to add time information to time : list, array or DatetimeIndex of pandas.Timestamps, optional an array with the start-time of the model and the ending times of the stress-periods. When steady_start is True, the first value of time is the start of the transient model period. The default is None. steady_state : bool, optional if True the model is steady state. The default is False. steady_start : bool, optional if True the model is transient with a steady state start time step. The default is True. steady_start_perlen : float, optional stress-period length of the first steady state stress period. Only used if steady_start is True. The period is used to determine the recharge in this steady state stress period. Default is 3652 days (approximately 10 years). time_units : str, optional time unit of the model. The default is 'DAYS', which is the only allowed value for now. start_time : str or datetime, optional start time of the model. When steady_start is True, this is the start_time of the transient model period. Input is ignored when time is assigned. The default is January 1, 2000. transient_timesteps : int, optional number of transient time steps. Only used when steady_state is False. Input is ignored when time is assigned or perlen is an iterable. The default is 0. perlen : float, int, list or np.array, optional length of each stress-period depending on the type: - float or int: this is the length of all the stress periods. - list or array: the items are the length of the stress-periods in days. The length of perlen should match the number of stress-periods (including steady-state stress-periods). When steady_start is True, the length of the first steady state stress period is defined by steady_start_perlen. Input for perlen is ignored when time is assigned. The default is 1.0. nstp : int, optional number of steps. The default is 1. tsmult : float, optional timestep multiplier. The default is 1.0. Returns ------- ds : xarray.Dataset dataset with time variant model data """ warnings.warn( "this function is deprecated and will eventually be removed, " "please use nlmod.time.set_ds_time() in the future.", DeprecationWarning, ) # checks if time_units.lower() != "days": raise NotImplementedError() if time is not None: if isinstance(time, str): time = pd.to_datetime(time) if not hasattr(time, "__iter__"): time = [time] start_time = time[0] if len(time) > 1: perlen = np.diff(time) / pd.to_timedelta(1, unit=time_units) else: perlen = [] if steady_start: perlen = np.insert(perlen, 0, steady_start_perlen) if start_time is None: start_time = "2000" start_time = pd.to_datetime(start_time) if steady_state: if isinstance(perlen, (float, int)): nper = 1 perlen = [perlen] * nper else: nper = len(perlen) else: if isinstance(perlen, (float, int)): perlen = [perlen] * (transient_timesteps + steady_start) if steady_start: perlen[0] = steady_start_perlen nper = len(perlen) if hasattr(perlen, "__iter__"): transient_timesteps = len(perlen) if steady_start: transient_timesteps = transient_timesteps - 1 nper = transient_timesteps if steady_start: start_time = start_time - dt.timedelta(days=perlen[0]) time_dt = start_time + np.cumsum(pd.to_timedelta(perlen, unit=time_units)) ds = ds.assign_coords(coords={"time": time_dt}) ds.time.attrs["time_units"] = time_units ds.time.attrs["start"] = str(start_time) ds.time.attrs["nstp"] = nstp ds.time.attrs["tsmult"] = tsmult # netcdf files cannot handle booleans ds.time.attrs["steady_start"] = int(steady_start) ds.time.attrs["steady_state"] = int(steady_state) # add to ds (for new version nlmod) # add steady, nstp and tsmult to dataset steady = int(steady_state) * np.ones(len(time_dt), dtype=int) if steady_start: steady[0] = 1 ds["steady"] = ("time",), steady if isinstance(nstp, (int, np.integer)): nstp = nstp * np.ones(len(time), dtype=int) ds["nstp"] = ("time",), nstp if isinstance(tsmult, float): tsmult = tsmult * np.ones(len(time)) ds["tsmult"] = ("time",), tsmult return ds
[docs] def _pd_timestamp_to_cftime(time_pd): """Convert a pandas timestamp into a cftime stamp Parameters ---------- time_pd : pd.Timestamp or list of pd.Timestamp datetimes Returns ------- cftime.datetime or list of cftime.datetime """ if hasattr(time_pd, "__iter__"): return [_pd_timestamp_to_cftime(tpd) for tpd in time_pd] else: return cftime.datetime( time_pd.year, time_pd.month, time_pd.day, time_pd.hour, time_pd.minute, time_pd.second, )
[docs] def set_ds_time( ds, start, time=None, perlen=None, steady=False, steady_start=True, time_units="DAYS", nstp=1, tsmult=1.0, ): """Set time discretisation for model dataset. Parameters ---------- ds : xarray.Dataset model dataset start : int, float, str, pandas.Timestamp or cftime.datetime model start. When start is an integer or float it is interpreted as the number of days of the first stress-period. When start is a string, pandas Timestamp or cftime datetime it is the start datetime of the simulation. Use cftime datetime when you get an OutOfBounds error using pandas. time : float, int or array-like, optional float(s) (indicating elapsed time) or timestamp(s) corresponding to the end of each stress period in the model. When time is a single value, the model will have only one stress period. When time is None, the stress period lengths have to be supplied via perlen. The default is None. perlen : float, int or array-like, optional length of each stress-period. Only used when time is None. When perlen is a single value, the model will have only one stress period. The default is None. steady : arraylike or bool, optional arraylike indicating which stress periods are steady-state, by default False, which sets all stress periods to transient with the first period determined by value of `steady_start`. steady_start : bool, optional whether to set the first period to steady-state, default is True, only used when steady is passed as single boolean. time_units : str, optional time units, by default "DAYS" nstp : int or array-like, optional number of steps per stress period, stored in ds.attrs, default is 1 tsmult : float, optional timestep multiplier within stress periods, stored in ds.attrs, default is 1.0 Returns ------- ds : xarray.Dataset model dataset with added time coordinate """ if time is None and perlen is None: raise (ValueError("Please specify either time or perlen in set_ds_time")) elif perlen is not None: if time is not None: msg = f"Cannot use both time and perlen. Ignoring perlen: {perlen}" logger.warning(msg) else: if isinstance(perlen, (int, np.integer, float)): perlen = [perlen] time = np.cumsum(perlen) if isinstance(time, str) or not hasattr(time, "__iter__"): time = [time] try: # parse start if isinstance(start, (int, np.integer, float)): if isinstance(time[0], (int, np.integer, float, str)): raise TypeError( "Make sure 'start' or 'time' argument is a valid TimeStamp" ) start = time[0] - pd.to_timedelta(start, "D") elif isinstance(start, str): start = pd.Timestamp(start) elif isinstance(start, (pd.Timestamp, cftime.datetime)): pass elif isinstance(start, np.datetime64): start = pd.Timestamp(start) else: raise TypeError("Cannot parse start datetime.") # parse time make sure 'time' and 'start' are same type (pd.Timestamps or cftime.datetime) if isinstance(time[0], (int, np.integer, float)): if isinstance(start, cftime.datetime): time = [start + dt.timedelta(days=int(td)) for td in time] else: time = start + pd.to_timedelta(time, time_units) elif isinstance(time[0], str): try: time = pd.to_datetime(time, format="mixed") except TypeError: # Fallback for older pandas versions without format="mixed". time = pd.to_datetime(time) if isinstance(start, cftime.datetime): time = _pd_timestamp_to_cftime(time) elif isinstance(time[0], (pd.Timestamp)): if isinstance(start, cftime.datetime): time = _pd_timestamp_to_cftime(time) elif isinstance(time[0], (np.datetime64, xr.core.variable.Variable)): logger.info( "time arguments with types np.datetime64, xr.core.variable.Variable not tested!" ) elif isinstance(time[0], cftime.datetime): start = _pd_timestamp_to_cftime(start) else: msg = f"Cannot process 'time' argument. Datatype -> {type(time)} not understood." raise TypeError(msg) except (OutOfBoundsDatetime, OutOfBoundsTimedelta) as e: msg = ( "cannot convert 'start' and 'time' to pandas datetime, use cftime " "types for 'start' and 'time'" ) raise type(e)(msg) from e if time[0] <= start: msg = ( "The timestamp of the first stress period cannot be before or " "equal to the model start time! Please modify `time` or `start`!" ) logger.error(msg) raise ValueError(msg) # create time coordinates ds = ds.assign_coords(coords={"time": time}) ds.coords["time"].attrs = dim_attrs["time"] # add steady, nstp and tsmult to dataset ds = set_time_variables( ds, start, time, steady, steady_start, time_units, nstp, tsmult ) return ds
[docs] def set_ds_time_numeric( ds, start, time=None, perlen=None, steady=False, steady_start=True, time_units="DAYS", nstp=1, tsmult=1.0, ): """Set a numerical time discretisation for a model dataset. Parameters ---------- ds : xarray.Dataset model dataset start : int, float, str, pandas.Timestamp or cftime.datetime model start. When start is an integer or float it is interpreted as the number of days of the first stress-period. When start is a string, pandas Timestamp or cftime datetime it is the start datetime of the simulation. Use cftime datetime when you get an OutOfBounds error using pandas. time : float, int or array-like, optional float(s) (indicating elapsed time) corresponding to the end of each stress period in the model. When time is a single value, the model will have only one stress period. When time is None, the stress period lengths have to be supplied via perlen. The default is None. perlen : float, int or array-like, optional length of each stress-period. Only used when time is None. When perlen is a single value, the model will have only one stress period. The default is None. steady : arraylike or bool, optional arraylike indicating which stress periods are steady-state, by default False, which sets all stress periods to transient with the first period determined by value of `steady_start`. steady_start : bool, optional whether to set the first period to steady-state, default is True, only used when steady is passed as single boolean. time_units : str, optional time units, by default "DAYS" nstp : int or array-like, optional number of steps per stress period, stored in ds.attrs, default is 1 tsmult : float, optional timestep multiplier within stress periods, stored in ds.attrs, default is 1.0 Returns ------- ds : xarray.Dataset model dataset with added time coordinate """ if time is None and perlen is None: raise (ValueError("Please specify either time or perlen in set_ds_time")) elif perlen is not None: if time is not None: msg = f"Cannot use both time and perlen. Ignoring perlen: {perlen}" logger.warning(msg) else: if isinstance(perlen, (int, np.integer, float)): perlen = [perlen] time = np.cumsum(perlen) if isinstance(time, str) or not hasattr(time, "__iter__"): time = [time] # check time if isinstance(time[0], (str, pd.Timestamp, cftime.datetime)): raise TypeError("'time' argument should be of a numerical type") time = np.asarray(time, dtype=float) if (time <= 0.0).any(): msg = "timesteps smaller or equal to 0 are not allowed" logger.error(msg) raise ValueError(msg) # create time coordinates ds = ds.assign_coords(coords={"time": time}) ds.coords["time"].attrs = dim_attrs["time"] # add steady, nstp and tsmult to dataset ds = set_time_variables( ds, start, time, steady, steady_start, time_units, nstp, tsmult ) return ds
[docs] def set_time_variables(ds, start, time, steady, steady_start, time_units, nstp, tsmult): """Add data variables: steady, nstp and tsmult, set attributes: start, time_units. Parameters ---------- ds : xarray.Dataset model dataset start : int, float, str, pandas.Timestamp or cftime.datetime model start. When start is an integer or float it is interpreted as the number of days of the first stress-period. When start is a string, pandas Timestamp or cftime datetime it is the start datetime of the simulation. Use cftime datetime when you get an OutOfBounds error using pandas. time : array-like, optional numerical (indicating elapsed time) or timestamps corresponding to the end of each stress period in the model. steady : arraylike or bool, optional arraylike indicating which stress periods are steady-state, by default False, which sets all stress periods to transient with the first period determined by value of `steady_start`. steady_start : bool, optional whether to set the first period to steady-state, default is True, only used when steady is passed as single boolean. time_units : str, optional time units, by default "DAYS" nstp : int or array-like, optional number of steps per stress period, stored in ds.attrs, default is 1 tsmult : float, optional timestep multiplier within stress periods, stored in ds.attrs, default is 1.0 Returns ------- ds : xarray.Dataset model dataset with attributes added to the time coordinates """ # add steady, nstp and tsmult to dataset if isinstance(steady, bool): steady = int(steady) * np.ones(len(time), dtype=int) if steady_start: steady[0] = 1 ds["steady"] = ("time",), steady if isinstance(nstp, (int, np.integer)): nstp = nstp * np.ones(len(time), dtype=int) ds["nstp"] = ("time",), nstp if isinstance(tsmult, float): tsmult = tsmult * np.ones(len(time)) ds["tsmult"] = ("time",), tsmult if time_units == "D": time_units = "DAYS" ds.time.attrs["time_units"] = time_units ds.time.attrs["start"] = str(start) return ds
[docs] def ds_time_idx_from_tdis_settings(start, perlen, nstp=1, tsmult=1.0, time_units="D"): """Get time index from TDIS perioddata: perlen, nstp, tsmult. Parameters ---------- start : str, pd.Timestamp start datetime perlen : array-like array of period lengths nstp : int, or array-like optional number of steps per period, by default 1 tsmult : float or array-like, optional timestep multiplier per period, by default 1.0 time_units : str, optional time units, by default "D" Returns ------- IndexVariable time coordinate for xarray data-array or dataset """ deltlist = [] for kper, delt in enumerate(perlen): if not isinstance(nstp, int): kstpkper = nstp[kper] else: kstpkper = nstp if not isinstance(tsmult, float): tsm = tsmult[kper] else: tsm = tsmult if tsm > 1.0: delt0 = delt * (tsm - 1) / (tsm**kstpkper - 1) delt = delt0 * tsm ** np.arange(kstpkper) else: delt = np.ones(kstpkper) * delt / kstpkper deltlist.append(delt) dt_arr = np.cumsum(np.concatenate(deltlist)) return ds_time_idx(dt_arr, start_datetime=start, time_units=time_units)
[docs] def estimate_nstp( forcing, perlen=1, tsmult=1.1, nstp_min=1, nstp_max=25, return_dt_arr=False ): """Scale the nstp's linearly between the min and max of the forcing. Ensures that the first time step of this stress period connects to the last time step of the previous stress period. The ratio between the two time-step durations can be at most tsmult. Parameters ---------- forcing : array-like Array with a forcing value for each stress period. Forcing can be for example a pumping rate or a rainfall intensity. perlen : float or array of floats (nper) An array of the stress period lengths. tsmult : float or array of floats (nper) Time step multiplier. nstp : int or array of ints (nper) Number of time steps in each stress period. nstp_min : int nstp value for the stress period with the smallest forcing. nstp_max : int nstp value for the stress period with the largest forcing. Returns ------- nstp : np.ndarray Array with a nstp for each stress period. dt_arr : np.ndarray (optional) if `return_dt_arr` is `True` returns the durations of the timesteps corresponding with the returned nstp. """ nt = len(forcing) # Scaled linear between min and max. array nstp will be modified along the way nstp = (forcing - np.min(forcing)) / (np.max(forcing) - np.min(forcing)) * ( nstp_max - nstp_min ) + nstp_min perlen = np.full(nt, fill_value=perlen) tsmult = np.full(nt, fill_value=tsmult) # Duration of the first time step of each stress period. Equation TM6A16 p.4-5 eq.1 dt0_arr = np.where( tsmult == 1.0, perlen / nstp, perlen * (tsmult - 1) / (tsmult**nstp - 1), ) for i in range(nt - 1): dt_end = dt0_arr[i] * tsmult[i] ** nstp[i] dt0_next = dt_end * tsmult[i + 1] if dt0_next < dt0_arr[i + 1]: dt0_arr[i + 1] = dt0_next # Equation derived from TM6A16 p.4-5 eq.1 if tsmult[i + 1] == 1.0: nstp[i + 1] = perlen[i + 1] / dt0_arr[i + 1] else: nstp[i + 1] = np.log( perlen[i + 1] * (tsmult[i + 1] - 1) / dt0_next + 1 ) / np.log(tsmult[i + 1]) nstp_ceiled = np.ceil(nstp).astype(int) if return_dt_arr: dt0_ceiled = np.where( tsmult == 1.0, perlen / nstp_ceiled, perlen * (tsmult - 1) / (tsmult**nstp_ceiled - 1), ) dt_lists = [ [dt0i * tsmulti**nstpii for nstpii in range(nstpi)] for dt0i, nstpi, tsmulti in zip(dt0_ceiled, nstp_ceiled, tsmult) ] dt_arr = np.concatenate(dt_lists) return nstp_ceiled, dt_arr else: return nstp_ceiled
[docs] def get_perlen(ds): """Get perlen from ds. Parameters ---------- ds : xarray.Dataset dataset with time variant model data Returns ------- perlen : list of floats length of a stress period. """ deltat = pd.to_timedelta(1, ds.time.time_units) if ds.time.dtype.kind == "M": # dtype is pandas timestamps perlen = [ (pd.to_datetime(ds["time"].data[0]) - pd.to_datetime(ds.time.start)) / deltat ] if len(ds["time"]) > 1: perlen.extend(np.diff(ds["time"]) / deltat) elif ds.time.dtype.kind == "O": perlen = [ (ds["time"].data[0] - _pd_timestamp_to_cftime(pd.Timestamp(ds.time.start))) / deltat ] if len(ds["time"]) > 1: perlen.extend(np.diff(ds["time"]) / deltat) elif ds.time.dtype.kind in ["i", "f"]: perlen = [ds["time"][0]] perlen.extend(np.diff(ds["time"].values)) return perlen
[docs] def get_time_step_length(perlen, nstp, tsmult): """Get the length of the timesteps within a single stress-period. Parameters ---------- perlen : float The length of the stress period, in the time unit of the model (generally days). nstp : int The numer of timesteps within the stress period. tsmult : float THe time step multiplier, generally equal or lager than 1. Returns ------- t : np.ndarray An array with the length of each of the timesteps within the stress period, in the same unit as perlen. """ t = np.array([tsmult**x for x in range(nstp)]) t = t * perlen / t.sum() return t
[docs] def ds_time_from_model(gwf): warnings.warn( "this function was renamed to `ds_time_idx_from_model`. " "Please use the new function name.", DeprecationWarning, ) return ds_time_idx_from_model(gwf)
[docs] def ds_time_idx_from_model(gwf): """Get time index variable from model (gwf or gwt). Parameters ---------- gwf : flopy MFModel object groundwater flow or groundwater transport model Returns ------- IndexVariable time coordinate for xarray data-array or dataset """ return ds_time_idx_from_modeltime(gwf.modeltime)
[docs] def ds_time_from_modeltime(modeltime): warnings.warn( "this function was renamed to `ds_time_idx_from_model`. " "Please use the new function name.", DeprecationWarning, ) return ds_time_idx_from_modeltime(modeltime)
[docs] def ds_time_idx_from_modeltime(modeltime): """Get time index variable from modeltime object. Parameters ---------- modeltime : flopy ModelTime object modeltime object (e.g. gwf.modeltime) Returns ------- IndexVariable time coordinate for xarray data-array or dataset """ return ds_time_idx( np.cumsum(modeltime.perlen), start_datetime=modeltime.start_datetime, time_units=modeltime.time_units, )
[docs] def ds_time_idx(t, start_datetime=None, time_units="D", dtype="datetime"): """Get time index variable from elapsed time array. Parameters ---------- t : np.array array containing elapsed time, usually in days start_datetime : str, pd.Timestamp, optional starting datetime time_units : str, optional time units, default is days dtype : str, optional dtype of time index. Can be 'datetime' or 'float'. If 'datetime' try to create a pandas datetime index if that fails use cftime lib. Default is 'datetime'. Returns ------- IndexVariable time coordinate for xarray data-array or dataset """ if (start_datetime is None) or (dtype in ["int", "float"]): times = t else: try: dtarr = pd.to_timedelta(t, time_units) times = pd.Timestamp(start_datetime) + dtarr except (OutOfBoundsDatetime, OutOfBoundsTimedelta) as e: msg = f"using cftime time index because of {e}" logger.debug(msg) start = _pd_timestamp_to_cftime(pd.Timestamp(start_datetime)) times = [start + dt.timedelta(days=int(td)) for td in t] time = IndexVariable(["time"], times) time.attrs["time_units"] = time_units if start_datetime is not None: time.attrs["start"] = str(start_datetime) return time
[docs] def dataframe_to_flopy_timeseries( df, ds=None, package=None, filename=None, time_series_namerecord=None, interpolation_methodrecord="stepwise", append=False, shift_label_from_right_to_left=False, ): """ Convert a pandas DataFrame to a FloPy time series. This function converts a time-indexed DataFrame into a list of tuples formatted for use with FloPy's time series input. If a FloPy package is provided, it will either initialize or append a time series package. Parameters ---------- df : pandas.DataFrame DataFrame containing the time series data. The index must be datetime-like if `ds` is provided. ds : xarray.Dataset, optional Model Dataset. package : flopy.mf6.ModflowGwfwel, flopy.mf6.ModflowGwfdrn or similar, optional The FloPy package to which the time series should be attached. filename : str, optional Filename for the time series package. If not provided, defaults to `{package.filename}_ts`. time_series_namerecord : list of str, optional List of time series names corresponding to DataFrame columns. If not provided, defaults to the DataFrame column names. interpolation_methodrecord : str or list of str, default "stepwise" Interpolation method(s) to use for the time series. Can be a single string or a list of methods (one for each column). append : bool, default False If True, the time series will be appended to an existing time series package. Otherwise, a new package will be initialized. shift_label_from_right_to_left : bool, default False If True, sets the index of df from the end of the timestep to the start of the timestep (which is required if interpolation_methodrecord="stepwise"). The default is False. Returns ------- list of tuple or flopy.mf6.ModflowUtlts If `package` is None, returns a list of (time, value1, value2, ...) tuples. If `package` is provided, returns the created or updated time series package. Raises ------ AssertionError If DataFrame contains NaNs or if `ds.time` is not datetime64. """ msg = "No nan values are allowed in dataframe_to_flopy_timeseries" assert not df.isna().any(axis=None), msg msg = "get recharge requires a datetime64[ns] time index" assert ds.time.dtype.kind == "M", msg if shift_label_from_right_to_left and ds is not None: df.loc[pd.to_datetime(ds.time.start)] = None df = df.sort_index().shift(-1).ffill() if ds is not None: # set index to days after the start of the simulation df = df.copy() df.index = (df.index - pd.to_datetime(ds.time.start)) / pd.Timedelta(1, "D") # generate a list of tuples with time as the first record, followed by the columns timeseries = [(i,) + tuple(v) for i, v in zip(df.index, df.values)] if package is None: return timeseries if filename is None: filename = f"{package.filename}_ts" if time_series_namerecord is None: time_series_namerecord = list(df.columns) if isinstance(interpolation_methodrecord, str): interpolation_methodrecord = [interpolation_methodrecord] * len(df.columns) # initialize or append a new package method = package.ts.append_package if append else package.ts.initialize return method( filename=filename, timeseries=timeseries, time_series_namerecord=time_series_namerecord, interpolation_methodrecord=interpolation_methodrecord, )
[docs] def ds_time_to_pandas_index(da_or_ds, include_start=True): """Convert xarray time index to pandas datetime index. Parameters ---------- da_or_ds : xarray.Dataset or xarray.DataArray dataset with time index or time data array include_start : bool, optional include the start time in the index, by default True Returns ------- pd.DatetimeIndex pandas datetime index """ if isinstance(da_or_ds, xr.Dataset): da = da_or_ds["time"] elif isinstance(da_or_ds, xr.DataArray): da = da_or_ds else: raise ValueError( "Either pass model dataset or time dataarray (e.g. `ds.time`)." ) if include_start: if da.dtype.kind == "M": # "M" is a numpy datetime-type return da.to_index().insert(0, pd.Timestamp(da.start)) elif da.dtype.kind == "O": start = _pd_timestamp_to_cftime(pd.Timestamp(da.start)) return da.to_index().insert(0, start) elif da.dtype.kind in ["i", "f"]: return da.to_index().insert(0, 0) else: raise TypeError("Unknown time dtype") else: return da.to_index()