"""Functions to aggregate data over time and space."""
import datetime
import logging
import os
import warnings
from collections.abc import Sequence
from copy import deepcopy
from pathlib import Path
from types import ModuleType
from typing import Optional, Union
import geopandas as gpd
import numpy as np
import pandas as pd
import scipy
import shapely
import xarray as xr
import xclim as xc
import xclim.core.calendar
try:
import xesmf as xe
except ImportError:
xe = None
from xclim.core.indicator import Indicator
from .config import parse_config
from .extract import subset_warming_level
from .indicators import compute_indicators
from .spatial import subset
from .utils import standardize_periods, unstack_dates, update_attr
logger = logging.getLogger(__name__)
__all__ = [
"climatological_mean",
"climatological_op",
"compute_deltas",
"produce_horizon",
"spatial_mean",
]
# Dummy function to make gettext aware of translatable-strings
def _(s):
return s
[docs]
@parse_config
def climatological_mean(
ds: xr.Dataset,
*,
window: Optional[int] = None,
min_periods: Optional[int] = None,
interval: int = 1,
periods: Optional[Union[list[str], list[list[str]]]] = None,
to_level: Optional[str] = "climatology",
) -> xr.Dataset:
"""Compute the mean over 'year' for given time periods, respecting the temporal resolution of ds.
Parameters
----------
ds : xr.Dataset
Dataset to use for the computation.
window : int, optional
Number of years to use for the time periods.
If left at None and periods is given, window will be the size of the first period.
If left at None and periods is not given, the window will be the size of the input dataset.
min_periods : int, optional
For the rolling operation, minimum number of years required for a value to be computed.
If left at None and the xrfreq is either QS or AS and doesn't start in January, min_periods will be one less than window.
If left at None, it will be deemed the same as 'window'.
interval : int
Interval (in years) at which to provide an output.
periods : list of str or list of lists of str, optional
Either [start, end] or list of [start, end] of continuous periods to be considered.
This is needed when the time axis of ds contains some jumps in time.
If None, the dataset will be considered continuous.
to_level : str, optional
The processing level to assign to the output.
If None, the processing level of the inputs is preserved.
Returns
-------
xr.Dataset
Returns a Dataset of the climatological mean, by calling climatological_op with option op=='mean'.
"""
warnings.warn(
"xs.climatological_mean is deprecated and will be abandoned in a future release. "
"Use xs.climatological_op with option op=='mean' instead.",
category=FutureWarning,
)
return climatological_op(
ds,
op="mean",
window=window,
min_periods=min_periods,
stride=interval,
periods=periods,
rename_variables=False,
to_level=to_level,
horizons_as_dim=False,
)
[docs]
@parse_config
def climatological_op( # noqa: C901
ds: xr.Dataset,
*,
op: Union[str, dict] = "mean",
window: Optional[int] = None,
min_periods: Optional[Union[int, float]] = None,
stride: int = 1,
periods: Optional[Union[list[str], list[list[str]]]] = None,
rename_variables: bool = True,
to_level: str = "climatology",
horizons_as_dim: bool = False,
) -> xr.Dataset:
"""Perform an operation 'op' over time, for given time periods, respecting the temporal resolution of ds.
Parameters
----------
ds : xr.Dataset
Dataset to use for the computation.
op : str or dict
Operation to perform over time.
The operation can be any method name of xarray.core.rolling.DatasetRolling, 'linregress', or a dictionary.
If 'op' is a dictionary, the key is the operation name and the value is a dict of kwargs
accepted by the operation. While other operations are technically possible,
the following are recommended and tested:
['max', 'mean', 'median', 'min', 'std', 'sum', 'var', 'linregress'].
Operations beyond methods of xarray.core.rolling.DatasetRolling include:
- 'linregress' : Computes the linear regression over time, using
scipy.stats.linregress and employing years as regressors.
The output will have a new dimension 'linreg_param' with coordinates:
['slope', 'intercept', 'rvalue', 'pvalue', 'stderr', 'intercept_stderr'].
Only one operation per call is supported, so len(op)==1 if a dict.
window : int, optional
Number of years to use for the rolling operation.
If left at None and periods is given, window will be the size of the first period. Hence, if periods are of
different lengths, the shortest period should be passed first.
If left at None and periods is not given, the window will be the size of the input dataset.
min_periods : int or float, optional
For the rolling operation, minimum number of years required for a value to be computed.
If left at None and the xrfreq is either QS or AS and doesn't start in January,
min_periods will be one less than window.
Otherwise, if left at None, it will be deemed the same as 'window'.
If passed as a float value between 0 and 1, this will be interpreted as the floor of the fraction of the window size.
stride : int
Stride (in years) at which to provide an output from the rolling window operation.
periods : list of str or list of lists of str, optional
Either [start, end] or list of [start, end] of continuous periods to be considered.
This is needed when the time axis of ds contains some jumps in time.
If None, the dataset will be considered continuous.
rename_variables : bool
If True, '_clim_{op}' will be added to variable names.
to_level : str, optional
The processing level to assign to the output.
If None, the processing level of the inputs is preserved.
horizons_as_dim : bool
If True, the output will have 'horizon' and the frequency as 'month', 'season' or 'year' as
dimensions and coordinates. The 'time' coordinate will be unstacked to horizon and frequency dimensions.
Horizons originate from periods and/or windows and their stride in the rolling operation.
Returns
-------
xr.Dataset
Dataset with the results from the climatological operation.
"""
# Daily data is not supported
if len(ds.time) > 3 and xr.infer_freq(ds.time) == "D":
raise NotImplementedError(
"xs.climatological_op does not currently support daily data."
)
# more than one operation per call is not supported (yet), case for dict
if isinstance(op, dict) and len(op) > 1:
raise NotImplementedError(
"xs.climatological_op does not currently support more than one operation per call."
)
# unstack 1D time in coords (day, month, and year) to make climatological mean faster
try:
mindex_coords = xr.Coordinates.from_pandas_multiindex(
pd.MultiIndex.from_arrays(
[
ds.time.dt.year.values,
ds.time.dt.month.values,
ds.time.dt.day.values,
],
names=["year", "month", "day"],
),
dim="time",
)
ds_unstack = ds.assign_coords(coords=mindex_coords).unstack("time")
except (
AttributeError,
ValueError,
): # Fixme when xscen is pinned to xarray >= 2023.11.0
ind = pd.MultiIndex.from_arrays(
[ds.time.dt.year.values, ds.time.dt.month.values, ds.time.dt.day.values],
names=["year", "month", "day"],
)
ds_unstack = ds.assign(time=ind).unstack("time")
# Rolling will ignore gaps in time, so raise an exception beforehand
if (not all(ds_unstack.year.diff(dim="year", n=1) == 1)) & (periods is None):
raise ValueError("Data is not continuous. Use the 'periods' argument.")
# define periods, windows, and min_periods
periods = standardize_periods(
periods or [[int(ds_unstack.year[0]), int(ds_unstack.year[-1])]]
)
window = window or int(periods[0][1]) - int(periods[0][0]) + 1
# there is one less occurrence when a period crosses years
freq_across_year = [
f"{f}-{mon}"
for mon in xr.coding.cftime_offsets._MONTH_ABBREVIATIONS.values()
for f in ["AS", "QS"]
if mon != "JAN"
]
if (
any(
x in freq_across_year
for x in [
ds.attrs.get("cat:xrfreq"),
(xr.infer_freq(ds.time) if len(ds.time) > 3 else None),
]
)
and min_periods is None
):
min_periods = window - 1
# unpack min_periods as fraction of window
if isinstance(min_periods, float):
if 0 < min_periods <= 1:
min_periods = int(np.floor(min_periods * window))
else:
raise ValueError(
f"When 'min_periods' is passed as a 'float', it must be between 0 and 1. Got {min_periods}."
)
# set min_periods
min_periods = min_periods or window
if min_periods > window:
raise ValueError("'min_periods' should be smaller or equal to 'window'")
# if op is a dict, unpack it
if isinstance(op, dict):
op, op_kwargs = list(op.items())[0]
op_kwargs.setdefault("keep_attrs", True)
else:
op_kwargs = {"keep_attrs": True}
# special case for averaging standard deviations: need to convert to variance before averaging
ds_has_std = False
std_v = []
if op == "mean":
for vv in ds_unstack.data_vars:
if (
"std" in vv
or "standard deviation"
in ds_unstack[vv].attrs.get("description", "").lower()
):
ds_unstack[vv] = np.square(ds_unstack[vv])
ds_has_std = True
std_v.extend([vv])
# Compute the climatological operation
concats = []
for period in periods:
# Rolling average
ds_rolling = ds_unstack.sel(year=slice(period[0], period[1])).rolling(
year=window, min_periods=min_periods
)
# apply operation on rolling object
if hasattr(ds_rolling, op) and callable(getattr(ds_rolling, op)):
if op not in ["max", "mean", "median", "min", "std", "sum", "var"]:
warnings.warn(
f"The requested operation '{op}' has not been tested and may produce unexpected results."
)
ds_rolling = getattr(ds_rolling, op)(**op_kwargs)
# revert variance to std, where applicable
if op == "mean" and ds_has_std:
for vv in std_v:
ds_rolling[vv] = np.sqrt(ds_rolling[vv])
# Select the windows at provided stride, starting from the first full window's operation result
ds_rolling = ds_rolling.isel(year=slice(window - 1, None, stride))
elif op == "linregress":
def _ulinregress(x, y, **kwargs):
# Wrapper for scipy.stats.linregress to unpack multiple return values in xr.apply_ufunc
valid_x = ~np.isnan(x)
valid_y = ~np.isnan(y)
mask = valid_x & valid_y
if np.sum(mask) >= kwargs.get("min_periods", 1):
reg = scipy.stats.linregress(
x, y, alternative=kwargs.get("alternative", "two-sided")
)
out = np.array(
[
reg.slope,
reg.intercept,
reg.rvalue,
reg.pvalue,
reg.stderr,
reg.intercept_stderr,
]
)
else:
out = np.full(6, np.nan)
return out
# prepare kwargs
linreg_kwargs = {
k: v for k, v in op_kwargs.items() if "keep_attrs" not in k
}
linreg_kwargs["min_periods"] = min_periods
# unwrap DatasetRolling object and select years subset
dsr_construct = ds_rolling.construct(window_dim="window", keep_attrs=True)
dsr_construct = dsr_construct.isel(year=slice(window - 1, None, stride))
# construct array to use years as x values (==regressors) in xr.apply_ufunc
years_as_x_values = xr.DataArray(
np.arange(dsr_construct.window.size)
.repeat(dsr_construct.year.size)
.reshape(dsr_construct.window.size, dsr_construct.year.size)
+ dsr_construct.year.values
- window
+ 1,
dims=["window", "year"],
coords={
"window": dsr_construct.window.values,
"year": dsr_construct.year.values,
},
)
# apply linregress along windows
ds_rolling = xr.apply_ufunc(
_ulinregress,
years_as_x_values,
dsr_construct,
input_core_dims=[["window"], ["window"]],
output_core_dims=[["linreg_param"]],
vectorize=True,
dask="parallelized",
output_dtypes=["float32"],
dask_gufunc_kwargs={"output_sizes": {"linreg_param": 6}},
keep_attrs="no_conflicts",
kwargs=linreg_kwargs,
)
# label new coords
ds_rolling.coords["linreg_param"] = [
"slope",
"intercept",
"rvalue",
"pvalue",
"stderr",
"intercept_stderr",
]
else:
raise ValueError(f"Operation '{op}' not implemented.")
# build horizons
horizons = xr.DataArray(
[f"{yr - (window - 1)}-{yr}" for yr in ds_rolling.year.values],
dims=dict(year=ds_rolling.year),
).astype(str)
ds_rolling = ds_rolling.assign_coords(horizon=horizons)
# revert to 1D time, rebuilding time coord
ds_rolling = ds_rolling.stack(time=("year", "month", "day"))
if isinstance(ds.indexes["time"], pd.core.indexes.datetimes.DatetimeIndex):
time_coord = pd.to_datetime(
{
"year": ds_rolling.year.values - window + 1,
"month": ds_rolling.month.values,
"day": ds_rolling.day.values,
}
).to_list()
elif isinstance(ds.indexes["time"], xr.coding.cftimeindex.CFTimeIndex):
time_coord = [
xclim.core.calendar.datetime_classes[ds.time.dt.calendar](
y - window + 1, m, d
)
for y, m, d in zip(
ds_rolling.year.values,
ds_rolling.month.values,
ds_rolling.day.values,
)
]
else:
raise ValueError("The type of 'time' was not understood.")
ds_rolling = ds_rolling.drop_vars({"month", "year", "time", "day"})
ds_rolling = ds_rolling.assign_coords(time=time_coord).transpose("time", ...)
# append to list of results
concats.extend([ds_rolling])
# end loop over periods
# concatenate results
ds_rolling = xr.concat(concats, dim="time", data_vars="minimal")
# update data_vars names, attrs, history
if rename_variables:
ds_rolling = ds_rolling.rename_vars(
{vv: f"{vv}_clim_{op}" for vv in ds_rolling.data_vars}
)
for vv in ds_rolling.data_vars:
for a in ["description", "long_name"]:
try:
op_format = dict.fromkeys(
("mean", "std", "var", "sum"), "adj"
) | dict.fromkeys(("max", "min"), "noun")
operation = xc.core.formatting.default_formatter.format_field(
op, op_format[op]
)
except (KeyError, ValueError):
operation = op
update_attr(
ds_rolling[vv],
a,
_("{window}-year climatological {operation} of {attr}."),
window=window,
operation=operation,
)
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {window}-year climatological {operation} "
f"over window (non-centered), with a minimum of {min_periods} years of data - xarray v{xr.__version__}"
)
history = (
new_history + " \n " + ds_rolling[vv].attrs["history"]
if "history" in ds_rolling[vv].attrs
else new_history
)
ds_rolling[vv].attrs["history"] = history
# update processing level
if to_level is not None:
ds_rolling.attrs["cat:processing_level"] = to_level
if horizons_as_dim:
# restructure output to have horizons as a dimension instead of stacked horizons per year/season/month
new_coords = {1: "year", 4: "season", 12: "month"}
new_coord_len = len(np.unique(ds_rolling.time.dt.month))
if new_coord_len == 1:
all_horizons = [
ds_rolling.sel(time=ds_rolling.horizon == horizon)
.swap_dims({"time": "horizon"})
.assign(
time_2D=xr.DataArray(
ds_rolling.time.sel(
time=ds_rolling.horizon == horizon
).values.copy(),
dims=["time"],
)
)
.drop_vars("time")
.rename({"time_2D": "time"})
.set_coords("time")
.squeeze(dim="time")
for horizon in np.unique(ds_rolling.horizon.values)
]
else:
all_horizons = [
unstack_dates(
ds_rolling.sel(time=ds_rolling.horizon == horizon).assign(
time_2D=xr.DataArray(
ds_rolling.time.sel(
time=ds_rolling.horizon == horizon
).values.copy(),
dims=["time"],
)
),
new_dim=new_coords[new_coord_len],
)
.drop_vars("horizon")
.assign_coords(horizon=("time", [horizon]))
.swap_dims({"time": "horizon"})
.drop_vars("time")
.rename({"time_2D": "time"})
.set_coords("time")
for horizon in np.unique(ds_rolling.horizon.values)
]
return xr.concat(all_horizons, dim="horizon")
else:
return ds_rolling
[docs]
@parse_config
def compute_deltas( # noqa: C901
ds: xr.Dataset,
reference_horizon: Union[str, xr.Dataset],
*,
kind: Union[str, dict] = "+",
rename_variables: bool = True,
to_level: Optional[str] = "deltas",
) -> xr.Dataset:
"""Compute deltas in comparison to a reference time period, respecting the temporal resolution of ds.
Parameters
----------
ds : xr.Dataset
Dataset to use for the computation.
reference_horizon : str or xr.Dataset
Either a YYYY-YYYY string corresponding to the 'horizon' coordinate of the reference period,
or a xr.Dataset containing the climatological mean.
kind : str or dict
['+', '/', '%'] Whether to provide absolute, relative, or percentage deltas.
Can also be a dictionary separated per variable name.
rename_variables : bool
If True, '_delta_YYYY-YYYY' will be added to variable names.
to_level : str, optional
The processing level to assign to the output.
If None, the processing level of the inputs is preserved.
Returns
-------
xr.Dataset
Returns a Dataset with the requested deltas.
"""
if isinstance(reference_horizon, str):
if reference_horizon not in ds.horizon:
raise ValueError(
f"The reference horizon {reference_horizon} is not in the dataset."
)
# Separate the reference from the other horizons
if xc.core.utils.uses_dask(ds["horizon"]):
ds["horizon"].load()
ref = ds.where(ds.horizon == reference_horizon, drop=True)
elif isinstance(reference_horizon, xr.Dataset):
ref = reference_horizon
if "horizon" in ref:
reference_horizon = np.unique(ref["horizon"])
if len(reference_horizon) != 1:
raise ValueError(
"The reference dataset appears to contain multiple horizons."
)
reference_horizon = reference_horizon[0]
else:
reference_horizon = "unknown_horizon"
else:
raise ValueError(
"reference_horizon should be either a string or an xarray.Dataset."
)
if "time" in ds:
if (len(ds.time) >= 3) and (xr.infer_freq(ds.time) == "D"):
raise NotImplementedError(
"xs.compute_deltas does not currently support daily data."
)
# Remove references to 'year' in REF
try:
mindex_coords_1 = xr.Coordinates.from_pandas_multiindex(
pd.MultiIndex.from_arrays(
[ref.time.dt.month.values, ref.time.dt.day.values],
names=["month", "day"],
),
dim="time",
)
ref = ref.assign_coords(coords=mindex_coords_1).unstack("time")
mindex_coords_2 = xr.Coordinates.from_pandas_multiindex(
pd.MultiIndex.from_arrays(
[
ds.time.dt.year.values,
ds.time.dt.month.values,
ds.time.dt.day.values,
],
names=["year", "month", "day"],
),
dim="time",
)
other_hz = ds.assign_coords(coords=mindex_coords_2).unstack("time")
except (
AttributeError,
ValueError,
): # Fixme when xscen is pinned to xarray >= 2023.11.0
ind = pd.MultiIndex.from_arrays(
[ref.time.dt.month.values, ref.time.dt.day.values],
names=["month", "day"],
)
ref = ref.assign(time=ind).unstack("time")
ind = pd.MultiIndex.from_arrays(
[
ds.time.dt.year.values,
ds.time.dt.month.values,
ds.time.dt.day.values,
],
names=["year", "month", "day"],
)
other_hz = ds.assign(time=ind).unstack("time")
else:
other_hz = ds
ref = ref.squeeze()
deltas = xr.Dataset(coords=other_hz.coords, attrs=other_hz.attrs)
# Calculate deltas
for vv in list(ds.data_vars):
v_name = (
vv
if rename_variables is False
else f"{vv}_delta_{reference_horizon.replace('-', '_')}"
)
with xr.set_options(keep_attrs=True):
if (isinstance(kind, dict) and kind[vv] == "+") or kind == "+":
_kind = "abs."
deltas[v_name] = other_hz[vv] - ref[vv]
elif (isinstance(kind, dict) and kind[vv] == "/") or kind == "/":
_kind = "rel."
deltas[v_name] = other_hz[vv] / ref[vv]
deltas[v_name].attrs["units"] = ""
elif (isinstance(kind, dict) and kind[vv] == "%") or kind == "%":
_kind = "pct."
deltas[v_name] = 100 * (other_hz[vv] - ref[vv]) / ref[vv]
deltas[v_name].attrs["units"] = "%"
else:
raise ValueError(
f"Delta 'kind' not understood. Should be '+', '/' or '%', received {kind}."
)
# modify attrs and history
deltas[v_name].attrs["delta_kind"] = _kind
deltas[v_name].attrs["delta_reference"] = reference_horizon
for a in ["description", "long_name"]:
update_attr(
deltas[v_name],
a,
_("{attr1}: {kind} delta compared to {refhoriz}."),
others=[other_hz[vv]],
refhoriz=reference_horizon,
kind=_kind,
)
new_history = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {_kind} delta vs. {reference_horizon} - xarray v{xr.__version__}"
history = (
new_history + " \n " + deltas[v_name].attrs["history"]
if "history" in deltas[v_name].attrs
else new_history
)
deltas[v_name].attrs["history"] = history
if "time" in ds:
# get back to 1D time
deltas = deltas.stack(time=("year", "month", "day"))
# rebuild time coord
if isinstance(ds.indexes["time"], pd.core.indexes.datetimes.DatetimeIndex):
time_coord = list(
pd.to_datetime(
{
"year": deltas.year.values,
"month": deltas.month.values,
"day": deltas.day.values,
}
).values
)
elif isinstance(ds.indexes["time"], xr.coding.cftimeindex.CFTimeIndex):
time_coord = [
xclim.core.calendar.datetime_classes[ds.time.dt.calendar](y, m, d)
for y, m, d in zip(
deltas.year.values, deltas.month.values, deltas.day.values
)
]
else:
raise ValueError("The type of 'time' could not be understood.")
deltas = (
deltas.drop_vars({"year", "day", "month", "time"})
.assign(time=time_coord)
.transpose("time", ...)
)
deltas = deltas.reindex_like(ds)
if to_level is not None:
deltas.attrs["cat:processing_level"] = to_level
return deltas
[docs]
@parse_config
def spatial_mean( # noqa: C901
ds: xr.Dataset,
method: str,
*,
spatial_subset: Optional[bool] = None,
call_clisops: Optional[bool] = False,
region: Optional[Union[dict, str]] = None,
kwargs: Optional[dict] = None,
simplify_tolerance: Optional[float] = None,
to_domain: Optional[str] = None,
to_level: Optional[str] = None,
) -> xr.Dataset:
"""Compute the spatial mean using a variety of available methods.
Parameters
----------
ds : xr.Dataset
Dataset to use for the computation.
method : str
'cos-lat' will weight the area covered by each pixel using an approximation based on latitude.
'interp_centroid' will find the region's centroid (if coordinates are not fed through kwargs),
then perform a .interp() over the spatial dimensions of the Dataset.
The coordinate can also be directly fed to .interp() through the 'kwargs' argument below.
'xesmf' will make use of xESMF's SpatialAverager. This will typically be more precise,
especially for irregular regions, but can be much slower than other methods.
spatial_subset : bool, optional
If True, xscen.spatial.subset will be called prior to the other operations. This requires the 'region' argument.
If None, this will automatically become True if 'region' is provided and the subsetting method is either 'cos-lat' or 'mean'.
region : dict or str, optional
Description of the region and the subsetting method (required fields listed in the Notes).
If method=='interp_centroid', this is used to find the region's centroid.
If method=='xesmf', the bounding box or shapefile is given to SpatialAverager.
Can also be "global", for global averages.
This is simply a shortcut for `{'name': 'global', 'method': 'bbox', 'lon_bnds' [-180, 180], 'lat_bnds': [-90, 90]}`.
kwargs : dict, optional
Arguments to send to either mean(), interp() or SpatialAverager().
For SpatialAverager, one can give `skipna` or `output_chunks` here, to be passed to the averager call itself.
simplify_tolerance : float, optional
Precision (in degree) used to simplify a shapefile before sending it to SpatialAverager().
The simpler the polygons, the faster the averaging, but it will lose some precision.
to_domain : str, optional
The domain to assign to the output.
If None, the domain of the inputs is preserved.
to_level : str, optional
The processing level to assign to the output.
If None, the processing level of the inputs is preserved.
Returns
-------
xr.Dataset
Returns a Dataset with the spatial dimensions averaged.
Notes
-----
'region' required fields:
name: str
Region name used to overwrite domain in the catalog.
method: str
['gridpoint', 'bbox', shape', 'sel']
tile_buffer: float, optional
Multiplier to apply to the model resolution. Only used if spatial_subset==True.
kwargs
Arguments specific to the method used.
See Also
--------
xarray.Dataset.mean, xarray.Dataset.interp, xesmf.SpatialAverager
"""
kwargs = kwargs or {}
if method == "mean":
warnings.warn(
"xs.spatial_mean with method=='mean' is deprecated and will be abandoned in a future release. "
"Use method=='cos-lat' instead for a more robust but similar method.",
category=FutureWarning,
)
elif method == "interp_coord":
warnings.warn(
"xs.spatial_mean with method=='interp_coord' is deprecated. Use method=='interp_centroid' instead.",
category=FutureWarning,
)
method = "interp_centroid"
if call_clisops:
warnings.warn(
"call_clisops has been renamed and is deprecated. Use spatial_subset instead.",
category=FutureWarning,
)
spatial_subset = call_clisops
if region == "global":
region = {
"name": "global",
"method": "bbox",
"lat_bnds": [-90 + 1e-5, 90 - 1e-5],
}
# `spatial_subset` won't wrap coords on the bbox, we need to fit the system used on ds.
if ds.cf["longitude"].min() >= 0:
region["lon_bnds"] = [0, 360]
else:
region["lon_bnds"] = [-180, 180]
if (
(region is not None)
and (region["method"] in region)
and (isinstance(region[region["method"]], dict))
):
warnings.warn(
"You seem to be using a deprecated version of region. Please use the new formatting.",
category=FutureWarning,
)
region = deepcopy(region)
if "buffer" in region:
region["tile_buffer"] = region.pop("buffer")
_kwargs = region.pop(region["method"])
region.update(_kwargs)
if (
(region is not None)
and (spatial_subset is None)
and (method in ["mean", "cos-lat"])
):
logger.info("Automatically turning spatial_subset to True based on inputs.")
spatial_subset = True
# If requested, call xscen.spatial.subset prior to averaging
if spatial_subset:
ds = subset(ds, **region)
if method == "cos-lat":
if "latitude" not in ds.cf.coordinates:
raise ValueError(
"Could not determine the latitude name using CF conventions. "
"Use kwargs = {lat: str} to specify the coordinate name."
)
if "units" not in ds.cf["latitude"].attrs:
logger.warning(
f"{ds.attrs.get('cat:id', '')}: Latitude does not appear to have units. Make sure that the computation is right."
)
elif ds.cf["latitude"].attrs["units"] != "degrees_north":
logger.warning(
f"{ds.attrs.get('cat:id', '')}: Latitude units is '{ds.cf['latitude'].attrs['units']}', expected 'degrees_north'. "
f"Make sure that the computation is right."
)
if ((ds.cf["longitude"].min() < -160) & (ds.cf["longitude"].max() > 160)) or (
(ds.cf["longitude"].min() < 20) & (ds.cf["longitude"].max() > 340)
):
logger.warning(
"The region appears to be crossing the -180/180° meridian. Bounds computation is currently bugged in cf_xarray. "
"Make sure that the computation is right."
)
weights = np.cos(np.deg2rad(ds.cf["latitude"]))
if ds.cf["longitude"].ndim == 1:
dims = ds.cf["longitude"].dims + ds.cf["latitude"].dims
else:
if "longitude" not in ds.cf.bounds:
ds = ds.cf.add_bounds(["longitude", "latitude"])
# Weights the weights by the cell area (in °²)
weights = weights * xr.DataArray(
shapely.area(
shapely.polygons(shapely.linearrings(ds.lon_bounds, ds.lat_bounds))
),
dims=ds.cf["longitude"].dims,
coords=ds.cf["longitude"].coords,
)
dims = ds.cf["longitude"].dims
ds_agg = ds.weighted(weights).mean(dims, keep_attrs=True)
# Prepare the History field
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"weighted mean(dim={[d for d in ds.cf.axes['X'] + ds.cf.axes['Y']]}) using a 'cos-lat' approximation of areacella (in deg2)"
)
# This simply calls .mean() over the spatial dimensions
elif method == "mean":
if "dim" not in kwargs:
kwargs["dim"] = ds.cf.axes["X"] + ds.cf.axes["Y"]
ds_agg = ds.mean(keep_attrs=True, **kwargs)
# Prepare the History field
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"xarray.mean(dim={kwargs['dim']}) - xarray v{xr.__version__}"
)
# This calls .interp() to a pair of coordinates
elif method == "interp_centroid":
# Find the centroid
if region is None:
if ds.cf.axes["X"][0] not in kwargs:
kwargs[ds.cf.axes["X"][0]] = ds[ds.cf.axes["X"][0]].mean().values
if ds.cf.axes["Y"][0] not in kwargs:
kwargs[ds.cf.axes["Y"][0]] = ds[ds.cf.axes["Y"][0]].mean().values
else:
if region["method"] == "gridpoint":
if len(region["lon"] != 1):
raise ValueError(
"Only a single location should be used with interp_centroid."
)
centroid = {
"lon": region["lon"],
"lat": region["lat"],
}
elif region["method"] == "bbox":
centroid = {
"lon": np.mean(region["lon_bnds"]),
"lat": np.mean(region["lat_bnds"]),
}
elif region["method"] == "shape":
if not isinstance(region["shape"], gpd.GeoDataFrame):
s = gpd.read_file(region["shape"])
else:
s = region["shape"]
if len(s != 1):
raise ValueError(
"Only a single polygon should be used with interp_centroid."
)
centroid = {"lon": s.centroid[0].x, "lat": s.centroid[0].y}
else:
raise ValueError("'method' not understood.")
kwargs.update(centroid)
ds_agg = ds.interp(**kwargs)
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"xarray.interp(**{kwargs}) - xarray v{xr.__version__}"
)
# Uses xesmf.SpatialAverager
elif method == "xesmf":
if xe is None:
raise ImportError(
"Method xesmf requires xESMF to work, please install that package."
)
# If the region is a bounding box, call shapely and geopandas to transform it into an input compatible with xesmf
if region["method"] == "bbox":
polygon_geom = shapely.box(
region["lon_bnds"][0],
region["lat_bnds"][0],
region["lon_bnds"][1],
region["lat_bnds"][1],
)
polygon = gpd.GeoDataFrame(index=[0], geometry=[polygon_geom])
# Prepare the History field
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"xesmf.SpatialAverager over {region['lon_bnds']}{region['lat_bnds']} - xESMF v{xe.__version__}"
)
# If the region is a shapefile, open with geopandas
elif region["method"] == "shape":
if not isinstance(region["shape"], gpd.GeoDataFrame):
polygon = gpd.read_file(region["shape"])
name = Path(region["shape"]).name
else:
polygon = region["shape"]
name = f"{len(polygon)} polygons"
# Simplify the geometries to a given tolerance, if needed.
# The simpler the polygons, the faster the averaging, but it will lose some precision.
if simplify_tolerance is not None:
polygon["geometry"] = polygon.simplify(
tolerance=simplify_tolerance, preserve_topology=True
)
# Prepare the History field
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"xesmf.SpatialAverager over {name} - xESMF v{xe.__version__}"
)
else:
raise ValueError("'method' should be one of [bbox, shape].")
kwargs_copy = deepcopy(kwargs)
call_kwargs = {"skipna": kwargs_copy.pop("skipna", False)}
if "output_chunks" in kwargs:
call_kwargs["output_chunks"] = kwargs_copy.pop("output_chunks")
# Pre-emptive segmentization. Same threshold as xESMF, but there's not strong analysis behind this choice
geoms = shapely.segmentize(polygon.geometry, 1)
if (
ds.cf["longitude"].ndim == 2
and "longitude" not in ds.cf.bounds
and "rotated_pole" in ds
):
from .regrid import create_bounds_rotated_pole
ds = ds.update(create_bounds_rotated_pole(ds))
savg = xe.SpatialAverager(ds, geoms, **kwargs_copy)
ds_agg = savg(ds, keep_attrs=True, **call_kwargs)
extra_coords = {
col: xr.DataArray(polygon[col], dims=("geom",))
for col in polygon.columns
if col != "geometry"
}
extra_coords["geom"] = xr.DataArray(polygon.index, dims=("geom",))
ds_agg = ds_agg.assign_coords(**extra_coords)
if len(polygon) == 1:
ds_agg = ds_agg.squeeze("geom")
else:
raise ValueError(
"Subsetting method should be ['cos-lat', 'interp_centroid', 'xesmf']"
)
# History
history = (
new_history + " \n " + ds_agg.attrs["history"]
if "history" in ds_agg.attrs
else new_history
)
ds_agg.attrs["history"] = history
# Attrs
if to_domain is not None:
ds_agg.attrs["cat:domain"] = to_domain
if to_level is not None:
ds_agg.attrs["cat:processing_level"] = to_level
return ds_agg
[docs]
@parse_config
def produce_horizon( # noqa: C901
ds: xr.Dataset,
indicators: Union[
str,
os.PathLike,
Sequence[Indicator],
Sequence[tuple[str, Indicator]],
ModuleType,
],
*,
periods: Optional[Union[list[str], list[list[str]]]] = None,
warminglevels: Optional[dict] = None,
to_level: Optional[str] = "horizons",
period: Optional[list] = None,
) -> xr.Dataset:
"""
Compute indicators, then the climatological mean, and finally unstack dates in order
to have a single dataset with all indicators of different frequencies.
Once this is done, the function drops 'time' in favor of 'horizon'.
This function computes the indicators and does an interannual mean.
It stacks the season and month in different dimensions and adds a dimension `horizon` for the period or the warming level, if given.
Parameters
----------
ds : xr.Dataset
Input dataset with a time dimension.
indicators : Union[str, os.PathLike, Sequence[Indicator], Sequence[Tuple[str, Indicator]], ModuleType]
Indicators to compute. It will be passed to the `indicators` argument of `xs.compute_indicators`.
periods : list of str or list of lists of str, optional
Either [start, end] or list of [start_year, end_year] for the period(s) to be evaluated.
If both periods and warminglevels are None, the full time series will be used.
warminglevels : dict, optional
Dictionary of arguments to pass to `py:func:xscen.subset_warming_level`.
If 'wl' is a list, the function will be called for each value and produce multiple horizons.
If both periods and warminglevels are None, the full time series will be used.
to_level : str, optional
The processing level to assign to the output.
If there is only one horizon, you can use "{wl}", "{period0}" and "{period1}" in the string to dynamically
include that information in the processing level.
Returns
-------
xr.Dataset
Horizon dataset.
"""
if "warminglevel" in ds and len(ds.warminglevel) != 1:
raise ValueError(
"Input dataset should only have `warminglevel` dimension of length 1. "
"If you want to use produce_horizon for multiple warming levels, "
"extract the full time series and use the `warminglevels` argument instead."
)
if period is not None:
warnings.warn(
"The 'period' argument is deprecated and will be removed in a future version. Use 'periods' instead.",
category=FutureWarning,
)
periods = [standardize_periods(period, multiple=False)]
all_periods = []
if periods is not None:
all_periods.extend(standardize_periods(periods))
if warminglevels is not None:
if isinstance(warminglevels["wl"], (int, float)):
all_periods.append(warminglevels)
elif isinstance(warminglevels["wl"], list):
template = deepcopy(warminglevels)
for wl in warminglevels["wl"]:
all_periods.append({**template, "wl": wl})
else:
raise ValueError(
f"Could not understand the format of warminglevels['wl']: {warminglevels['wl']}"
)
if len(all_periods) == 0:
all_periods = standardize_periods(
[[int(ds.time.dt.year[0]), int(ds.time.dt.year[-1])]]
)
out = []
for period in all_periods:
if isinstance(period, list):
if ds.time.dt.year[0] <= int(period[0]) and ds.time.dt.year[-1] >= int(
period[1]
):
ds_sub = ds.sel(time=slice(period[0], period[1]))
else:
warnings.warn(
f"The requested period {period} is not fully covered by the input dataset. "
"The requested period will be skipped."
)
ds_sub = None
else:
ds_sub = subset_warming_level(ds, **period)
if ds_sub is not None:
# compute indicators
ind_dict = compute_indicators(
ds=(
ds_sub.squeeze(dim="warminglevel")
if "warminglevel" in ds_sub.dims
else ds_sub
),
indicators=indicators,
)
# Compute the window-year mean
ds_merge = xr.Dataset()
for freq, ds_ind in ind_dict.items():
if freq != "fx":
ds_mean = climatological_op(
ds_ind,
op="mean", # ToDo: make op an argument of produce_horizon
rename_variables=False,
horizons_as_dim=True,
).drop_vars("time")
else:
ds_mean = ds_ind.expand_dims(
dim={
"horizon": [
f"{ds.time.dt.year[0].item()}-{ds.time.dt.year[-1].item()}"
]
}
)
ds_mean["horizon"] = ds_mean["horizon"].astype(str)
if "warminglevel" in ds_mean.coords:
wl = np.array([ds_mean["warminglevel"].item()])
wl_attrs = ds_mean["warminglevel"].attrs
ds_mean = ds_mean.drop_vars("warminglevel")
ds_mean["horizon"] = wl
ds_mean["horizon"].attrs.update(wl_attrs)
# put all indicators in one dataset
for var in ds_mean.data_vars:
ds_merge[var] = ds_mean[var]
ds_merge.attrs.update(ds_mean.attrs)
out.append(ds_merge)
# If automatic attributes are not the same for all indicators, warn the user.
if len(out) > 0:
for v in out[0].data_vars:
if not all(
[
all(
[
out[0][v].attrs[attr] == out[i][v].attrs[attr]
for i in range(1, len(out))
]
)
for attr in ["long_name", "description"]
]
):
warnings.warn(
f"The attributes for variable {v} are not the same for all horizons, "
"probably because the periods were not of the same length. "
"Attributes will be kept from the first horizon, but this might not be the most appropriate."
)
out = xr.concat(out, dim="horizon")
out.attrs["cat:xrfreq"] = "fx"
out.attrs["cat:frequency"] = "fx"
if to_level:
if len(all_periods) == 1:
if (isinstance(all_periods[0], dict)) or (
"warminglevel" in ds.dims and warminglevels is None
):
to_level = to_level.format(
wl=(
ds_sub.warminglevel.values[0]
if isinstance(all_periods[0], dict)
else ds.warminglevel.values[0]
)
)
else:
to_level = to_level.format(
period0=all_periods[0][0], period1=all_periods[0][1]
)
out.attrs["cat:processing_level"] = to_level
return out
else:
raise ValueError("No horizon could be computed. Check your inputs.")