"""Spatial tools."""
import datetime
import itertools
import logging
import warnings
from collections.abc import Sequence
from pathlib import Path
from typing import TypeVar
import cartopy.crs
try:
import clisops # For version info
import clisops.core as cl
except KeyError as e:
if e.args[0] == "Author":
warnings.warn(
"The clisops package could not be imported due to a known KeyError bug that occurs with some "
"older versions of ESMF and specific execution setups (such as debugging on a Windows machine). "
"As a workaround, try installing 'importlib-metadata <8.0.0' and/or updating ESMF. If you do not "
"need 'clisops.core' functionalities (e.g. spatial subsetting), you can ignore this warning.",
stacklevel=2,
)
else:
raise e
cl = None
clisops = None
import dask
import geopandas as gpd
import numpy as np
import shapely as shp
import sparse as sp
import xarray as xr
import xclim as xc
from pyproj.crs import CRS
from .config import parse_config
logger = logging.getLogger(__name__)
__all__ = [
"Region",
"creep_fill",
"creep_weights",
"dataset_extent",
"get_crs",
"get_grid_mapping",
"merge_duplicated_stations",
"rotate_vectors",
"subset",
"voronoi_weights",
]
Region = TypeVar("Region", bound=dict)
"""
A region specification, a dictionary with the following valid entries:
- ``name`` : region name for the `domain` column
- ``method`` : the method used for subsetting the region from a dataset:
- ``"shape"`` : The region is defined by a polygon in entry "shape".
- ``"bbox"`` : The region is defined by bounds in entries "lat_bnds" and "lon_bnds".
- ``"sel"`` : The region is defined by bounds in entries with the same name as the spatial coordinates.
- ``"gridpoint"`` : The region is a list of points defined in entries "lon" and "lat"
- ``tile_buffer`` : For methods "bbox" and "shape", the actual subset is enlarged by this many grid cell sizes (approximated).
This differs from clisops\' ``buffer`` argument in :py:func:`~clisops.core.subset.subset_shape`.
- ``shape`` : For method "shape" only. A shapely Polygon, geopandas GeoDataFrame or GeoSeries, or a path to a file that GeoPandas can open.
When multiplie shapes are present, the subsetting is done with the unary union of the geometries.
When the CRS information is missing, EPSG 4326 is assumed.
- ``lat_bnds`` and ``lon_bnds`` : For method "bbox" only. Tuples of the minimum and maximum latitude or longitude values.
- ``lon`` and ``lat`` : For method "gridpoint", 1D arrays of the longitude and latitude coordinates of the points to extract.
- ``<coordinate name>`` : For method "sel", any other entry is understood as a tuple of the bounds to extract for that spatial dimension.
"""
[docs]
@parse_config
def creep_weights(mask: xr.DataArray, n: int = 1, steps: int = 1, mode: str = "clip") -> xr.DataArray:
"""
Compute weights for the creep fill.
The output is a sparse matrix with the same dimensions as `mask`, twice.
If `steps` is larger than 1, the output has a "step" dimension of that length.
Parameters
----------
mask : DataArray
A boolean DataArray. False values are candidates to the filling.
Usually they represent missing values (`mask = da.notnull()`).
All dimensions are creep filled.
n : int
The order of neighbouring to use. 1 means only the adjacent grid cells are used.
steps : int
Apply the algorithm this number of times, creeping `n` neighbours at each step.
mode : {'clip', 'wrap'}
If a cell is on the edge of the domain, `mode='wrap'` will wrap around to find neighbours.
Returns
-------
DataArray
Weights. The dot product must be taken over the last N dimensions, in sequence for each step.
"""
if mode not in ["clip", "wrap"]:
raise ValueError("mode must be either 'clip' or 'wrap'")
if steps > 1:
da = xr.ones_like(mask).where(mask)
weights = []
for i in range(steps):
w = creep_weights(da.notnull())
weights.append(w)
if i < steps - 1: # no need to do it one the last step
da = creep_fill(da, w)
# TODO: If we treated the nan differently we could maybe collapse all weights with dot instead of having to apply them iteratively
return xr.concat(weights, "step")
da = mask
mask = da.values
neighbors = np.array(list(itertools.product(*[np.arange(-n, n + 1) for j in range(mask.ndim)]))).T
src = []
dst = []
w = []
it = np.nditer(mask, flags=["f_index", "multi_index"], order="C")
for i in it:
if not i:
neigh_idx_2d = np.atleast_2d(it.multi_index).T + neighbors
neigh_idx_1d = np.ravel_multi_index(neigh_idx_2d, mask.shape, order="C", mode=mode)
if mode == "clip":
neigh_idx = np.unravel_index(np.unique(neigh_idx_1d), mask.shape, order="C")
elif mode == "wrap":
neigh_idx = np.unravel_index(neigh_idx_1d, mask.shape, order="C")
neigh = mask[neigh_idx]
N = (neigh).sum()
if N > 0:
src.extend([it.multi_index] * N)
dst.extend(np.stack(neigh_idx)[:, neigh].T)
w.extend([1 / N] * N)
else:
src.extend([it.multi_index])
dst.extend([it.multi_index])
w.extend([np.nan])
else:
src.extend([it.multi_index])
dst.extend([it.multi_index])
w.extend([1])
crds = np.concatenate((np.array(src).T, np.array(dst).T), axis=0)
return xr.DataArray(
sp.COO(crds, w, (*da.shape, *da.shape)),
dims=[f"{d}_out" for d in da.dims] + list(da.dims),
coords=da.coords,
name="creep_fill_weights",
)
[docs]
@parse_config
def creep_fill(da: xr.DataArray, w: xr.DataArray) -> xr.DataArray:
"""
Creep fill using pre-computed weights.
Parameters
----------
da : xr.DataArray
A DataArray sharing the dimensions with the one used to compute the weights.
It can have other dimensions.
Dask is supported as long as there are no chunks over the creeped dims.
w : xr.DataArray
The result of `creep_weights`.
Returns
-------
xr.DataArray
Same shape as `da`, but values filled according to `w`.
Examples
--------
>>> w = creep_weights(da.isel(time=0).notnull(), n=1)
>>> da_filled = creep_fill(da, w)
"""
if "step" in w.dims:
out = da
for step in w.step:
out = creep_fill(out, w.sel(step=step))
return out
def _dot(arr, wei):
N = wei.ndim // 2
extra_dim = arr.ndim - N
return np.tensordot(arr, wei, axes=(np.arange(N) + extra_dim, np.arange(N) + N))
N = w.ndim // 2
return xr.apply_ufunc(
_dot,
da,
w,
input_core_dims=[w.dims[N:], w.dims],
output_core_dims=[w.dims[N:]],
dask="parallelized",
output_dtypes=["float64"],
)
[docs]
def rotate_vectors(
uu: xr.DataArray,
vv: xr.DataArray,
crs: cartopy.crs.Projection | None = None,
reverse: bool = False,
) -> tuple[xr.DataArray, xr.DataArray]:
"""
Rotate a vector field defined in its grid space to the real south-north / west-east axes.
Parameters
----------
uu : xr.DataArray
The u component of the vector (along the X grid axis).
Must be CF-compliant, so its coordinates are correctly found.
vv : xr.DataArray
The v component of the vector (along the Y grid axis)
Must be CF-compliant, so its coordinates are correctly found.
crs : pyproj.CRS, optional
The projection of the UU and VV grid. If not given, the grid mapping attribute from UU is read.
reverse : bool
If True, the opposite rotation is performed : inputs are understood as in the real S-N, W-E axes and are
rotated back to the grid axes.
Returns
-------
uuc : xr.DataArray
The west-east component of the vector. Attributes from uu are copied.
vvc : xr.DataArray
The south-north component of the vector. Attributes from vv are copied.
"""
if crs is None:
crs = get_crs(uu.rename("uu").to_dataset())
lalo = cartopy.crs.PlateCarree(globe=crs.globe)
geod = lalo.get_geod()
X = uu.cf["X"]
Y = uu.cf["Y"]
# ensure everything has the same dim order
YY, XX = xr.broadcast(Y, X)
XXarr = XX.values
YYarr = YY.values
# a very small increment along the y axis
if X.dims == Y.dims:
# we got a list of points, can't infer the grid resolution
dy = 1e-5
else:
dy = (Y[1] - Y[0]).item() / 1000
# Get lat lon of center points
crds_c = lalo.transform_points(crs, XXarr, YYarr)
# Get lat lon of points just slightly above along the y axis
crds_y = lalo.transform_points(crs, XXarr, YYarr + dy)
# The azimuth of this short vector
az, _, _ = geod.inv(crds_c[..., 0], crds_c[..., 1], crds_y[..., 0], crds_y[..., 1])
if reverse:
az = -az
# The rotation angle : opposite of the azimuth, in rad
th = xr.DataArray(-np.deg2rad(az), dims=YY.dims, coords=YY.coords)
# Rotation matrix like in Algèbre linéaire et géométrie vectorielle
c = np.cos(th)
s = np.sin(th)
uuc = uu * c - vv * s
vvc = uu * s + vv * c
return uuc.assign_attrs(uu.attrs), vvc.assign_attrs(vv.attrs)
[docs]
def subset(
ds: xr.Dataset,
method: str,
*,
name: str | None = None,
tile_buffer: float = 0,
**kwargs,
) -> xr.Dataset:
r"""
Subset the data to a region.
Either creates a slice and uses the .sel() method, or customizes a call to
clisops.subset() that allows for an automatic buffer around the region.
Parameters
----------
ds : xr.Dataset
Dataset to be subsetted.
method : {'gridpoint', 'bbox', shape', 'sel'}
If the method is 'sel', this is not a call to clisops but only a subsetting with the xarray `.sel()` function.
name : str, optional
Used to rename the 'cat:domain' attribute.
tile_buffer : float
For ['bbox', shape'], uses an approximation of the grid cell size to add a buffer around the requested region.
This differs from clisops' 'buffer' argument in subset_shape().
**kwargs : dict
Arguments to be sent to clisops. See relevant function for details. Depending on the method, required kwargs are:
- gridpoint: lon, lat
- bbox: lon_bnds, lat_bnds
- shape: shape
- sel: slices for each dimension
Returns
-------
xr.Dataset
Subsetted Dataset.
See Also
--------
clisops.core.subset.subset_gridpoint : Used for the 'gridpoint' method.
clisops.core.subset.subset_bbox : Used for the 'bbox' method.
clisops.core.subset.subset_shape : Used for the 'shape' method.
"""
if cl is None and method in ["gridpoint", "bbox", "shape"]:
raise ImportError("The clisops package is required for the 'gridpoint', 'bbox' and 'shape' methods.")
if tile_buffer > 0 and method in ["gridpoint", "sel"]:
warnings.warn(
f"tile_buffer is not used for the '{method}' method. Ignoring the argument.",
UserWarning,
stacklevel=2,
)
if "latitude" not in ds.cf or "longitude" not in ds.cf:
ds = ds.cf.guess_coord_axis()
if method == "gridpoint":
ds_subset = _subset_gridpoint(ds, name=name, **kwargs)
elif method == "bbox":
ds_subset = _subset_bbox(ds, name=name, tile_buffer=tile_buffer, **kwargs)
elif method == "shape":
ds_subset = _subset_shape(ds, name=name, tile_buffer=tile_buffer, **kwargs)
elif method == "sel":
ds_subset = _subset_sel(ds, name=name, **kwargs)
else:
raise ValueError("Subsetting type not recognized. Use 'gridpoint', 'bbox', 'shape' or 'sel'.")
return ds_subset
def _subset_gridpoint(
ds: xr.Dataset,
lon: float | Sequence[float] | xr.DataArray,
lat: float | Sequence[float] | xr.DataArray,
*,
name: str | None = None,
**kwargs,
) -> xr.Dataset:
r"""
Subset the data to a gridpoint.
Parameters
----------
ds : xr.Dataset
Dataset to be subsetted.
lon : float or Sequence[float] or xr.DataArray
Longitude coordinate(s). Must be of the same length as lat.
lat : float or Sequence[float] or xr.DataArray
Latitude coordinate(s). Must be of the same length as lon.
name: str, optional
Used to rename the 'cat:domain' attribute.
**kwargs : dict
Other arguments to be sent to clisops. Possible kwargs are:
- start_date (str): Start date for the subset in the format 'YYYY-MM-DD'.
- end_date (str): End date for the subset in the format 'YYYY-MM-DD'.
- first_level (int or float): First level of the subset.
- last_level (int or float): Last level of the subset.
- tolerance (float): Masks values if the distance to the nearest gridpoint is larger than tolerance in meters.
- add_distance (bool): If True, adds a variable with the distance to the nearest gridpoint.
Returns
-------
xr.Dataset
Subsetted Dataset.
"""
ds = _load_lon_lat(ds)
if not hasattr(lon, "__iter__"):
lon = [lon]
if not hasattr(lat, "__iter__"):
lat = [lat]
if isinstance(lon, xr.DataArray):
dim = lon.dims[0]
lon = lon.rename({dim: "site"})
lat = lat.rename({dim: "site"})
else:
dim = "site"
lon = xr.DataArray(lon, dims=("site",), name="lon")
lat = xr.DataArray(lat, dims=("site",), name="lon")
try:
crs = get_crs(ds)
except (NotImplementedError, KeyError):
crs = None
if (
not set(kwargs.values()) - {None} # No kwargs have been set differently from the default
and crs is not None # the actual crs of the data is known
and "X" in ds.cf.axes
and "Y" in ds.cf.axes # the coords (in the actual crs) are known
and ds.cf["X"].name in ds.indexes # for the loc case
and ds.cf["Y"].name in ds.indexes # the coords are indexable (required for nearest-sel)
and (lon.size * ds.cf["Y"].size * ds.cf["X"].size) > 1000 # the work to do is relatively large
):
# Fast-track : use xarray's nearest-sel on 1D coordinate by converting the request to the actual CRS
PC = cartopy.crs.PlateCarree()
if isinstance(crs, cartopy.crs.PlateCarree):
X = lon
Y = lat
else:
crds = crs.transform_points(PC, lon, lat)
X = xr.DataArray(crds[:, 0], dims=lon.dims, coords=lon.coords, name=ds.cf["X"].name)
Y = xr.DataArray(crds[:, 1], dims=lat.dims, coords=lat.coords, name=ds.cf["Y"].name)
ds_subset = ds.cf.sel(X=X, Y=Y, method="nearest")
if ds_subset.site.size == 1:
ds_subset = ds_subset.squeeze(dim)
else:
ds_subset = cl.subset_gridpoint(ds, lon=lon, lat=lat, **kwargs)
if dim != "site":
ds_subset = ds_subset.rename(site=dim)
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"gridpoint spatial subsetting on {len(lon)} coordinates - clisops v{clisops.__version__}"
)
return update_history_and_name(ds_subset, new_history, name)
def _subset_bbox(
ds: xr.Dataset,
lon_bnds: tuple[float, float] | list[float],
lat_bnds: tuple[float, float] | list[float],
*,
name: str | None = None,
tile_buffer: float = 0,
**kwargs,
) -> xr.Dataset:
r"""
Subset the data to a bounding box.
Parameters
----------
ds : xr.Dataset
Dataset to be subsetted.
lon_bnds : tuple or list of two floats
Longitude boundaries of the bounding box.
lat_bnds : tuple or list of two floats
Latitude boundaries of the bounding box.
name: str, optional
Used to rename the 'cat:domain' attribute.
tile_buffer: float
Uses an approximation of the grid cell size to add a dynamic buffer around the requested region.
**kwargs : dict
Other arguments to be sent to clisops. Possible kwargs are:
- start_date (str): Start date for the subset in the format 'YYYY-MM-DD'.
- end_date (str): End date for the subset in the format 'YYYY-MM-DD'.
- first_level (int or float): First level of the subset.
- last_level (int or float): Last level of the subset.
- time_values (Sequence[str]): A list of datetime strings to subset.
- level_values (Sequence[int or float]): A list of levels to subset.
Returns
-------
xr.Dataset
Subsetted Dataset.
"""
ds = _load_lon_lat(ds)
if tile_buffer > 0:
lon_res, lat_res = _estimate_grid_resolution(ds)
lon_bnds = (
lon_bnds[0] - lon_res * tile_buffer,
lon_bnds[1] + lon_res * tile_buffer,
)
lat_bnds = (
lat_bnds[0] - lat_res * tile_buffer,
lat_bnds[1] + lat_res * tile_buffer,
)
ds_subset = cl.subset_bbox(ds, lon_bnds=lon_bnds, lat_bnds=lat_bnds, **kwargs)
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"bbox spatial subsetting with {'buffer=' + str(tile_buffer) if tile_buffer > 0 else 'no buffer'}"
f", lon_bnds={np.array(lon_bnds)}, lat_bnds={np.array(lat_bnds)}"
f" - clisops v{clisops.__version__}"
)
return update_history_and_name(ds_subset, new_history, name)
def _subset_shape(
ds: xr.Dataset,
shape: str | Path | gpd.GeoDataFrame | shp.Polygon,
*,
name: str | None = None,
tile_buffer: float = 0,
**kwargs,
) -> xr.Dataset:
r"""
Subset the data to a shape.
Parameters
----------
ds : xr.Dataset
Dataset to be subsetted.
shape : str or gpd.GeoDataFrame or shp.Polygon
Path to the shapefile or GeoDataFrame or Polygon (which is assumed to have the ESPG:4326 CRS).
name: str, optional
Used to rename the 'cat:domain' attribute.
tile_buffer: float
Uses an approximation of the grid cell size to add a buffer around the requested region.
**kwargs : dict
Other arguments to be sent to clisops. Possible kwargs are:
- raster_crs (str or int): EPSG number or PROJ4 string.
- shape_crs (str or int): EPSG number or PROJ4 string.
- buffer (float): Buffer size to add around the shape. Units are based on the shape degrees/metres.
- start_date (str): Start date for the subset in the format 'YYYY-MM-DD'.
- end_date (str): End date for the subset in the format 'YYYY-MM-DD'.
- first_level (int or float): First level of the subset.
- last_level (int or float): Last level of the subset.
Returns
-------
xr.Dataset
Subsetted Dataset.
"""
ds = _load_lon_lat(ds)
if isinstance(shape, str | Path):
shape = gpd.read_file(shape)
elif isinstance(shape, shp.Polygon):
shape = gpd.GeoDataFrame(geometry=[shape], crs=CRS(4326))
if tile_buffer > 0:
if kwargs.get("buffer") is not None:
raise ValueError("Both tile_buffer and clisops' buffer were requested. Use only one.")
lon_res, lat_res = _estimate_grid_resolution(ds)
# The buffer argument needs to be in the same units as the shapefile, so it's simpler to always project the shapefile to WGS84.
try:
shape_crs = shape.crs
except AttributeError:
shape_crs = None
if shape_crs is None:
warnings.warn(
"Shapefile does not have a CRS. Compatibility with the dataset is not guaranteed.",
stacklevel=2,
category=UserWarning,
)
elif shape_crs != CRS(4326): # WGS84
warnings.warn(
"Shapefile is not in EPSG:4326. Reprojecting to this CRS.",
UserWarning,
stacklevel=2,
)
shape = shape.to_crs(4326)
kwargs["buffer"] = np.max([lon_res, lat_res]) * tile_buffer
ds_subset = cl.subset_shape(ds, shape=shape, **kwargs)
new_history = (
f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"shape spatial subsetting with {'buffer=' + str(tile_buffer) if tile_buffer > 0 else 'no buffer'}"
f", shape={Path(shape).name if isinstance(shape, str | Path) else 'gpd.GeoDataFrame'}"
f" - clisops v{clisops.__version__}"
)
return update_history_and_name(ds_subset, new_history, name)
def _subset_sel(ds: xr.Dataset, *, name: str | None = None, **kwargs) -> xr.Dataset:
r"""
Subset the data using the .sel() method.
Parameters
----------
ds : xr.Dataset
Dataset to be subsetted.
name: str, optional
Used to rename the 'cat:domain' attribute.
**kwargs : dict
The keys are the dimensions to subset and the values are turned into a slice.
Returns
-------
xr.Dataset
Subsetted Dataset.
"""
# Create a dictionary with slices for each dimension
arg_sel = {dim: slice(*map(float, bounds)) for dim, bounds in kwargs.items()}
# Subset the dataset
ds_subset = ds.sel(**arg_sel)
# Update the history attribute
new_history = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] sel subsetting with arguments {arg_sel}"
return update_history_and_name(ds_subset, new_history, name)
def _load_lon_lat(ds: xr.Dataset) -> xr.Dataset:
"""Load longitude and latitude for more efficient subsetting."""
if xc.core.utils.uses_dask(ds.cf["longitude"]):
logger.info("Loading longitude for more efficient subsetting.")
(ds[ds.cf["longitude"].name],) = dask.compute(ds[ds.cf["longitude"].name])
if xc.core.utils.uses_dask(ds.cf["latitude"]):
logger.info("Loading latitude for more efficient subsetting.")
(ds[ds.cf["latitude"].name],) = dask.compute(ds[ds.cf["latitude"].name])
return ds
[docs]
def get_grid_mapping(ds: xr.Dataset) -> str:
"""
Get the grid_mapping attribute from the dataset.
Parameters
----------
ds : xr.Dataset
The dataset to get the grid mapping from.
Returns
-------
str
The name of the grid mapping variable, or an empty string if not found.
"""
gridmap = [ds[v].attrs["grid_mapping"] for v in ds.data_vars if "grid_mapping" in ds[v].attrs]
gridmap += [c for c in ds.variables if ds[c].attrs.get("grid_mapping_name", None)]
gridmap = list(np.unique(gridmap))
if len(gridmap) > 1:
warnings.warn(f"There are conflicting grid_mapping attributes in the dataset. Assuming {gridmap[0]}.", stacklevel=2)
return gridmap[0] if gridmap else ""
[docs]
def get_crs(gridmap: xr.Dataset | xr.DataArray) -> cartopy.crs.Projection:
"""
Get the cartopy CRS.
Parameters
----------
gridmap : xr.Dataset or xr.DataArray
Either a dataset or dataraay that has a grid mapping variable or that grid mapping variable directly.
Returns
-------
cartopy.crs.Projection
The cartopy crs. Only RotatedPole and ObliqueMercator are supported.
"""
# if not passing the grid mapping var itself, we need to get it
if "grid_mapping_name" not in gridmap.attrs:
if isinstance(gridmap, xr.Dataset):
gridmap_name = get_grid_mapping(gridmap)
else: # DataArray
gridmap_name = gridmap.attrs["grid_mapping"]
if gridmap_name == "" and (
"longitude" in gridmap.cf
and "X" in gridmap.cf
and gridmap.cf["longitude"].name == gridmap.cf["X"].name
and gridmap.cf["X"].ndim == 1
and "latitude" in gridmap.cf
and "Y" in gridmap.cf
and gridmap.cf["latitude"].name == gridmap.cf["Y"].name
and gridmap.cf["Y"].ndim == 1
):
gridmap = xr.DataArray(attrs={"grid_mapping_name": "latitude_longitude"})
else:
gridmap = gridmap[gridmap_name]
cf_params = gridmap.attrs
# RotPole is a spherical-only projection, so having a WGS84 ellipse makes it confusing
# Moreover, climate models usually use spherical models of the earth, so we use a spherical globe by default
# This is different from cartopy, especially with PROJ >= 9.8
# 6370997 m is the conventional of the Earth as a sphere
globe = cartopy.crs.Globe(
datum=cf_params.get("horizontal_datum_name"),
ellipse=cf_params.get("reference_ellipsoid_name", "sphere"),
semimajor_axis=(cf_params.get("earth_radius") or cf_params.get("semi_major_axis") or 6370997),
semiminor_axis=cf_params.get("semi_minor_axis"),
inverse_flattening=cf_params.get("inverse_flattening"),
towgs84=cf_params.get("towgs84"),
)
if cf_params["grid_mapping_name"] == "latitude_longitude":
crs = cartopy.crs.PlateCarree(globe=globe)
elif cf_params["grid_mapping_name"] == "rotated_latitude_longitude":
crs = cartopy.crs.RotatedPole(
pole_longitude=cf_params["grid_north_pole_longitude"],
pole_latitude=cf_params["grid_north_pole_latitude"],
central_rotated_longitude=cf_params.get("north_pole_grid_longitude", 0),
globe=globe,
)
elif cf_params["grid_mapping_name"] == "oblique_mercator":
crs = cartopy.crs.ObliqueMercator(
central_longitude=cf_params["longitude_of_projection_origin"],
central_latitude=cf_params["latitude_of_projection_origin"],
scale_factor=cf_params["scale_factor_at_projection_origin"],
azimuth=cf_params["azimuth_of_central_line"],
false_easting=cf_params.get("false_easting", 0),
false_northing=cf_params.get("false_northing", 0),
globe=globe,
)
elif cf_params["grid_mapping_name"] == "lambert_conformal_conic":
crs = cartopy.crs.LambertConformal(
standard_parallels=(cf_params["standard_parallel"],)
if np.isscalar(cf_params["standard_parallel"])
else tuple(cf_params["standard_parallel"]),
central_longitude=cf_params["longitude_of_central_meridian"],
central_latitude=cf_params["latitude_of_projection_origin"],
false_easting=cf_params.get("false_easting", 0),
false_northing=cf_params.get("false_northing", 0),
globe=globe,
)
else:
raise NotImplementedError(f"Grid mapping {cf_params['grid_mapping_name']} not implemented.")
return crs
def _estimate_grid_resolution(ds: xr.Dataset) -> tuple[float, float]:
# Since this is to compute a buffer, we take the maximum difference as an approximation.
# Estimate the grid resolution
if len(ds.lon.dims) == 1: # 1D lat-lon
lon_res = np.abs(ds.lon.diff("lon").max().values)
lat_res = np.abs(ds.lat.diff("lat").max().values)
else:
lon_res = np.abs(ds.lon.diff(ds.cf["X"].name).max().values)
lat_res = np.abs(ds.lat.diff(ds.cf["Y"].name).max().values)
return lon_res, lat_res
def update_history_and_name(ds_subset, new_history, name):
"""
Update the history attribute and optionally the domain name of a dataset.
Parameters
----------
ds_subset : xarray.Dataset
The dataset to update.
new_history : str
The new history entry.
name : str, optional
The new domain name.
Returns
-------
xarray.Dataset
The updated dataset.
"""
history = new_history + " \n " + ds_subset.attrs["history"] if "history" in ds_subset.attrs else new_history
ds_subset.attrs["history"] = history
if name is not None:
ds_subset.attrs["cat:domain"] = name
return ds_subset
[docs]
def dataset_extent(ds: xr.Dataset, method: str = "shape", name: str | None = None) -> Region:
r"""
The dataset's spatial extent expressed as a xscen Region spec.
Parameters
----------
ds : Dataset
A dataset containing a grid with latitude and longitude coordinates and either
the bounds or a grid mapping understood by :py:func:`~xscen.regrid.create_bounds_gridmapping`.
method : {'shape', 'bbox'}
One of the two methods recognized by xscen to specify an area.
"shape" will compute a shapely polygon in latitude longitude coordinates from
the cell bounds. "bbox" will return the total extent of that polygon.
name : str, optional
A name to give to the region.
Returns
-------
Region
A :py:data:`~xscen.spatial.Region` dictionary useful for subsetting other datasets.
"""
from .regrid import create_bounds_gridmapping
if "lat_bounds" not in ds:
if "lat" in ds and ds.lat.ndim == 1:
ds = ds.cf.add_bounds(["lon", "lat"])
else:
ds = create_bounds_gridmapping(ds)
if ds["lat_bounds"].ndim == 2:
lonb = ds.lon_bounds.isel(bounds=xr.DataArray([0, 0, 1, 1], dims=("bounds",)))
latb = ds.lat_bounds.isel(bounds=xr.DataArray([0, 1, 1, 0], dims=("bounds",)))
lonb, latb = xr.broadcast(lonb, latb)
else:
lonb, latb = ds.lon_bounds, ds.lat_bounds
region = {"method": method}
if name is not None:
region["name"] = name
# Tolerance 0 is to merge colinear segments, without degrading anything else
p = shp.simplify(
shp.unary_union(shp.polygons(shp.linearrings(lonb.transpose(..., "bounds"), latb.transpose(..., "bounds")))),
tolerance=0,
)
match method:
case "shape":
region["shape"] = p
case "bbox":
bnds = p.bounds
region["lon_bnds"] = [bnds[0], bnds[2]]
region["lat_bnds"] = [bnds[1], bnds[3]]
case _ as err:
raise ValueError(f"Method must be 'shape' or 'bbox'. Got {err}.")
return region
[docs]
def merge_duplicated_stations(ds: xr.Dataset, precision: float | None = None) -> xr.Dataset:
"""
Merge identical locations of a dataset.
Identical locations are merged by combination of float values (priority from order of the location dimension).
Non-float values are taken from the first element.
Parameters
----------
ds : Dataset
Dataset with `lon` and `lat` coordinates. Both must be 1D and share the same dimension.
precision : float, optional
Round the coordinate up to this decimal before looking for duplicated points.
Default (None) is not to round.
Returns
-------
Dataset
A dataset with the same variables as the input but with duplicated locations merged.
"""
dim = ds.lat.dims[0]
# Geopandas et des Points, c'est plus rapide que Pandas et des tuples
if precision is not None:
lon = ds.lon.round(precision)
lat = ds.lat.round(precision)
else:
lon, lat = ds.lon, ds.lat
df = gpd.GeoDataFrame(index=ds[dim], geometry=[shp.Point(lo, la) for lo, la in zip(lon, lat, strict=True)])
dups = df.duplicated("geometry")
if dups.sum() == 0:
# aucun dup
return ds
dss = []
N = 0
for _, grp in df.groupby("geometry"):
if len(grp) == 1:
dss.append(ds.sel({dim: [grp.index[0]]}))
else:
N += len(grp) - 1
dsi = ds.sel({dim: grp.index})
dss.append(
dsi.map(
lambda x, d: (x.bfill(d) if x.dtype == "f" else x).isel({d: [0]}),
args=(dim,),
)
)
h = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Merged {N} co-located points along dimension {dim}."
return update_history_and_name(xr.concat(dss, dim), h, None)
[docs]
def voronoi_weights(
ds: xr.Dataset,
region: gpd.GeoDataFrame | shp.Polygon | None = None,
maxfrac: float | None = None,
minlocs: int = 0,
_pts: gpd.GeoDataFrame | None = None,
) -> xr.DataArray:
"""
Compute a weight for each location in the dataset inversely proportionnate to the location density.
The total extent of the dataset is divided in regions using the Voronoi partition and weights are
computed from the area of these regions. This is meant for station data for example.
Parameters
----------
ds : Dataset
Dataset with `lon` and `lat` coordinates. Both must be 1D and share the same dimension.
region : GeoDataFrame or Polygon, optional
A polygon to constrain the total area to partition with Voronoi polygons.
Can also be a GeoDataFrame, in which case the weights are done for each feature.
Defaults to a convex hull around ds with a buffer of 1°.
See :py:func:`dataset_extent`.
maxfrac : float, optional
Limits the maximal region area to this fraction of the total area.
minlocs : int
When `region` is a GeoDataFrame, features containing less than this number of locations are removed from the output.
_pts : GeoDataFrame, optional
A GeoDataFrame of points to use for the Voronoi partition.
This is a shortcut for when iterating over regions.
Returns
-------
DataArray
The weights along the same dimension as the locations on the input.
If region was a GeoDataFrame, another dimension "geom" partitions the weights
for each feature and other columns are added as auxiliary coordinates (as with :py:func:`xs.spatial_mean`).
"""
dim = ds.lon.dims[0]
if _pts is None:
pts = gpd.GeoDataFrame(
index=ds[dim],
geometry=[shp.Point(lo, la) for lo, la in zip(ds.lon, ds.lat, strict=True)],
)
else: # Shortcut for when iterating over regions
pts = _pts
if region is None:
region = pts.union_all().convex_hull.buffer(1)
elif isinstance(region, gpd.GeoDataFrame):
weights = []
for _, row in region.iterrows():
weights.append(voronoi_weights(ds, row.geometry, maxfrac=maxfrac, _pts=pts).expand_dims(geom=[row.name]))
weight = xr.concat(weights, "geom")
weight = weight.assign_coords(region.drop(columns=["geometry"]).to_xarray().rename({region.index.name or "index": "geom"}))
if minlocs > 0:
weight = weight.where((weight > 0).sum(ds.lon.dims) >= minlocs, drop=True)
weight = weight.where((weight > 0).any("geom"), drop=True)
return weight
zones = gpd.GeoDataFrame(geometry=pts.voronoi_polygons(extend_to=region)).clip(region)
zones["geometry"] = zones.buffer(0)
zones["zone_area"] = zones.geometry.area
# overlay aligns points with their intersecting polygon
# reset_index, set_index, reindex dance to preserve indexing info and sort final DF as input
area = (gpd.overlay(pts.reset_index(names="station"), zones).set_index("station").reindex(pts.index)["zone_area"]).fillna(0)
if maxfrac is not None:
maxarea = area.sum() * maxfrac
area = area.clip(0, maxarea)
return xr.DataArray(
0 if area.sum() == 0 else area / area.sum(),
dims=(dim,),
coords=ds[dim].coords,
name="weights",
)