Source code for xscen.spatial

"""Spatial tools."""

import datetime
import itertools
import logging
import warnings
from collections.abc import Sequence
from pathlib import Path
from typing import TypeVar

import cartopy.crs


try:
    import clisops  # For version info
    import clisops.core as cl
except KeyError as e:
    if e.args[0] == "Author":
        warnings.warn(
            "The clisops package could not be imported due to a known KeyError bug that occurs with some "
            "older versions of ESMF and specific execution setups (such as debugging on a Windows machine). "
            "As a workaround, try installing 'importlib-metadata <8.0.0' and/or updating ESMF. If you do not "
            "need 'clisops.core' functionalities (e.g. spatial subsetting), you can ignore this warning.",
            stacklevel=2,
        )
    else:
        raise e
    cl = None
    clisops = None
import dask
import geopandas as gpd
import numpy as np
import shapely as shp
import sparse as sp
import xarray as xr
import xclim as xc
from pyproj.crs import CRS

from .config import parse_config


logger = logging.getLogger(__name__)

__all__ = [
    "Region",
    "creep_fill",
    "creep_weights",
    "dataset_extent",
    "get_crs",
    "get_grid_mapping",
    "merge_duplicated_stations",
    "rotate_vectors",
    "subset",
    "voronoi_weights",
]


Region = TypeVar("Region", bound=dict)
"""
A region specification, a dictionary with the following valid entries:

- ``name`` : region name for the `domain` column
- ``method`` : the method used for subsetting the region from a dataset:
    - ``"shape"`` : The region is defined by a polygon in entry "shape".
    - ``"bbox"`` : The region is defined by bounds in entries "lat_bnds" and "lon_bnds".
    - ``"sel"`` : The region is defined by bounds in entries with the same name as the spatial coordinates.
    - ``"gridpoint"`` : The region is a list of points defined in entries "lon" and "lat"

- ``tile_buffer`` : For methods "bbox" and "shape", the actual subset is enlarged by this many grid cell sizes (approximated).
  This differs from clisops\' ``buffer`` argument in :py:func:`~clisops.core.subset.subset_shape`.
- ``shape`` : For method "shape" only. A shapely Polygon, geopandas GeoDataFrame or GeoSeries, or a path to a file that GeoPandas can open.
  When multiplie shapes are present, the subsetting is done with the unary union of the geometries.
  When the CRS information is missing, EPSG 4326 is assumed.
- ``lat_bnds`` and ``lon_bnds`` : For method "bbox" only. Tuples of the minimum and maximum latitude or longitude values.
- ``lon`` and ``lat`` : For method "gridpoint", 1D arrays of the longitude and latitude coordinates of the points to extract.
- ``<coordinate name>`` : For method "sel", any other entry is understood as a tuple of the bounds to extract for that spatial dimension.
"""


[docs] @parse_config def creep_weights(mask: xr.DataArray, n: int = 1, steps: int = 1, mode: str = "clip") -> xr.DataArray: """ Compute weights for the creep fill. The output is a sparse matrix with the same dimensions as `mask`, twice. If `steps` is larger than 1, the output has a "step" dimension of that length. Parameters ---------- mask : DataArray A boolean DataArray. False values are candidates to the filling. Usually they represent missing values (`mask = da.notnull()`). All dimensions are creep filled. n : int The order of neighbouring to use. 1 means only the adjacent grid cells are used. steps : int Apply the algorithm this number of times, creeping `n` neighbours at each step. mode : {'clip', 'wrap'} If a cell is on the edge of the domain, `mode='wrap'` will wrap around to find neighbours. Returns ------- DataArray Weights. The dot product must be taken over the last N dimensions, in sequence for each step. """ if mode not in ["clip", "wrap"]: raise ValueError("mode must be either 'clip' or 'wrap'") if steps > 1: da = xr.ones_like(mask).where(mask) weights = [] for i in range(steps): w = creep_weights(da.notnull()) weights.append(w) if i < steps - 1: # no need to do it one the last step da = creep_fill(da, w) # TODO: If we treated the nan differently we could maybe collapse all weights with dot instead of having to apply them iteratively return xr.concat(weights, "step") da = mask mask = da.values neighbors = np.array(list(itertools.product(*[np.arange(-n, n + 1) for j in range(mask.ndim)]))).T src = [] dst = [] w = [] it = np.nditer(mask, flags=["f_index", "multi_index"], order="C") for i in it: if not i: neigh_idx_2d = np.atleast_2d(it.multi_index).T + neighbors neigh_idx_1d = np.ravel_multi_index(neigh_idx_2d, mask.shape, order="C", mode=mode) if mode == "clip": neigh_idx = np.unravel_index(np.unique(neigh_idx_1d), mask.shape, order="C") elif mode == "wrap": neigh_idx = np.unravel_index(neigh_idx_1d, mask.shape, order="C") neigh = mask[neigh_idx] N = (neigh).sum() if N > 0: src.extend([it.multi_index] * N) dst.extend(np.stack(neigh_idx)[:, neigh].T) w.extend([1 / N] * N) else: src.extend([it.multi_index]) dst.extend([it.multi_index]) w.extend([np.nan]) else: src.extend([it.multi_index]) dst.extend([it.multi_index]) w.extend([1]) crds = np.concatenate((np.array(src).T, np.array(dst).T), axis=0) return xr.DataArray( sp.COO(crds, w, (*da.shape, *da.shape)), dims=[f"{d}_out" for d in da.dims] + list(da.dims), coords=da.coords, name="creep_fill_weights", )
[docs] @parse_config def creep_fill(da: xr.DataArray, w: xr.DataArray) -> xr.DataArray: """ Creep fill using pre-computed weights. Parameters ---------- da : xr.DataArray A DataArray sharing the dimensions with the one used to compute the weights. It can have other dimensions. Dask is supported as long as there are no chunks over the creeped dims. w : xr.DataArray The result of `creep_weights`. Returns ------- xr.DataArray Same shape as `da`, but values filled according to `w`. Examples -------- >>> w = creep_weights(da.isel(time=0).notnull(), n=1) >>> da_filled = creep_fill(da, w) """ if "step" in w.dims: out = da for step in w.step: out = creep_fill(out, w.sel(step=step)) return out def _dot(arr, wei): N = wei.ndim // 2 extra_dim = arr.ndim - N return np.tensordot(arr, wei, axes=(np.arange(N) + extra_dim, np.arange(N) + N)) N = w.ndim // 2 return xr.apply_ufunc( _dot, da, w, input_core_dims=[w.dims[N:], w.dims], output_core_dims=[w.dims[N:]], dask="parallelized", output_dtypes=["float64"], )
[docs] def rotate_vectors( uu: xr.DataArray, vv: xr.DataArray, crs: cartopy.crs.Projection | None = None, reverse: bool = False, ) -> tuple[xr.DataArray, xr.DataArray]: """ Rotate a vector field defined in its grid space to the real south-north / west-east axes. Parameters ---------- uu : xr.DataArray The u component of the vector (along the X grid axis). Must be CF-compliant, so its coordinates are correctly found. vv : xr.DataArray The v component of the vector (along the Y grid axis) Must be CF-compliant, so its coordinates are correctly found. crs : pyproj.CRS, optional The projection of the UU and VV grid. If not given, the grid mapping attribute from UU is read. reverse : bool If True, the opposite rotation is performed : inputs are understood as in the real S-N, W-E axes and are rotated back to the grid axes. Returns ------- uuc : xr.DataArray The west-east component of the vector. Attributes from uu are copied. vvc : xr.DataArray The south-north component of the vector. Attributes from vv are copied. """ if crs is None: crs = get_crs(uu.rename("uu").to_dataset()) lalo = cartopy.crs.PlateCarree(globe=crs.globe) geod = lalo.get_geod() X = uu.cf["X"] Y = uu.cf["Y"] # ensure everything has the same dim order YY, XX = xr.broadcast(Y, X) XXarr = XX.values YYarr = YY.values # a very small increment along the y axis if X.dims == Y.dims: # we got a list of points, can't infer the grid resolution dy = 1e-5 else: dy = (Y[1] - Y[0]).item() / 1000 # Get lat lon of center points crds_c = lalo.transform_points(crs, XXarr, YYarr) # Get lat lon of points just slightly above along the y axis crds_y = lalo.transform_points(crs, XXarr, YYarr + dy) # The azimuth of this short vector az, _, _ = geod.inv(crds_c[..., 0], crds_c[..., 1], crds_y[..., 0], crds_y[..., 1]) if reverse: az = -az # The rotation angle : opposite of the azimuth, in rad th = xr.DataArray(-np.deg2rad(az), dims=YY.dims, coords=YY.coords) # Rotation matrix like in Algèbre linéaire et géométrie vectorielle c = np.cos(th) s = np.sin(th) uuc = uu * c - vv * s vvc = uu * s + vv * c return uuc.assign_attrs(uu.attrs), vvc.assign_attrs(vv.attrs)
[docs] def subset( ds: xr.Dataset, method: str, *, name: str | None = None, tile_buffer: float = 0, **kwargs, ) -> xr.Dataset: r""" Subset the data to a region. Either creates a slice and uses the .sel() method, or customizes a call to clisops.subset() that allows for an automatic buffer around the region. Parameters ---------- ds : xr.Dataset Dataset to be subsetted. method : {'gridpoint', 'bbox', shape', 'sel'} If the method is 'sel', this is not a call to clisops but only a subsetting with the xarray `.sel()` function. name : str, optional Used to rename the 'cat:domain' attribute. tile_buffer : float For ['bbox', shape'], uses an approximation of the grid cell size to add a buffer around the requested region. This differs from clisops' 'buffer' argument in subset_shape(). **kwargs : dict Arguments to be sent to clisops. See relevant function for details. Depending on the method, required kwargs are: - gridpoint: lon, lat - bbox: lon_bnds, lat_bnds - shape: shape - sel: slices for each dimension Returns ------- xr.Dataset Subsetted Dataset. See Also -------- clisops.core.subset.subset_gridpoint : Used for the 'gridpoint' method. clisops.core.subset.subset_bbox : Used for the 'bbox' method. clisops.core.subset.subset_shape : Used for the 'shape' method. """ if cl is None and method in ["gridpoint", "bbox", "shape"]: raise ImportError("The clisops package is required for the 'gridpoint', 'bbox' and 'shape' methods.") if tile_buffer > 0 and method in ["gridpoint", "sel"]: warnings.warn( f"tile_buffer is not used for the '{method}' method. Ignoring the argument.", UserWarning, stacklevel=2, ) if "latitude" not in ds.cf or "longitude" not in ds.cf: ds = ds.cf.guess_coord_axis() if method == "gridpoint": ds_subset = _subset_gridpoint(ds, name=name, **kwargs) elif method == "bbox": ds_subset = _subset_bbox(ds, name=name, tile_buffer=tile_buffer, **kwargs) elif method == "shape": ds_subset = _subset_shape(ds, name=name, tile_buffer=tile_buffer, **kwargs) elif method == "sel": ds_subset = _subset_sel(ds, name=name, **kwargs) else: raise ValueError("Subsetting type not recognized. Use 'gridpoint', 'bbox', 'shape' or 'sel'.") return ds_subset
def _subset_gridpoint( ds: xr.Dataset, lon: float | Sequence[float] | xr.DataArray, lat: float | Sequence[float] | xr.DataArray, *, name: str | None = None, **kwargs, ) -> xr.Dataset: r""" Subset the data to a gridpoint. Parameters ---------- ds : xr.Dataset Dataset to be subsetted. lon : float or Sequence[float] or xr.DataArray Longitude coordinate(s). Must be of the same length as lat. lat : float or Sequence[float] or xr.DataArray Latitude coordinate(s). Must be of the same length as lon. name: str, optional Used to rename the 'cat:domain' attribute. **kwargs : dict Other arguments to be sent to clisops. Possible kwargs are: - start_date (str): Start date for the subset in the format 'YYYY-MM-DD'. - end_date (str): End date for the subset in the format 'YYYY-MM-DD'. - first_level (int or float): First level of the subset. - last_level (int or float): Last level of the subset. - tolerance (float): Masks values if the distance to the nearest gridpoint is larger than tolerance in meters. - add_distance (bool): If True, adds a variable with the distance to the nearest gridpoint. Returns ------- xr.Dataset Subsetted Dataset. """ ds = _load_lon_lat(ds) if not hasattr(lon, "__iter__"): lon = [lon] if not hasattr(lat, "__iter__"): lat = [lat] if isinstance(lon, xr.DataArray): dim = lon.dims[0] lon = lon.rename({dim: "site"}) lat = lat.rename({dim: "site"}) else: dim = "site" lon = xr.DataArray(lon, dims=("site",), name="lon") lat = xr.DataArray(lat, dims=("site",), name="lon") try: crs = get_crs(ds) except (NotImplementedError, KeyError): crs = None if ( not set(kwargs.values()) - {None} # No kwargs have been set differently from the default and crs is not None # the actual crs of the data is known and "X" in ds.cf.axes and "Y" in ds.cf.axes # the coords (in the actual crs) are known and ds.cf["X"].name in ds.indexes # for the loc case and ds.cf["Y"].name in ds.indexes # the coords are indexable (required for nearest-sel) and (lon.size * ds.cf["Y"].size * ds.cf["X"].size) > 1000 # the work to do is relatively large ): # Fast-track : use xarray's nearest-sel on 1D coordinate by converting the request to the actual CRS PC = cartopy.crs.PlateCarree() if isinstance(crs, cartopy.crs.PlateCarree): X = lon Y = lat else: crds = crs.transform_points(PC, lon, lat) X = xr.DataArray(crds[:, 0], dims=lon.dims, coords=lon.coords, name=ds.cf["X"].name) Y = xr.DataArray(crds[:, 1], dims=lat.dims, coords=lat.coords, name=ds.cf["Y"].name) ds_subset = ds.cf.sel(X=X, Y=Y, method="nearest") if ds_subset.site.size == 1: ds_subset = ds_subset.squeeze(dim) else: ds_subset = cl.subset_gridpoint(ds, lon=lon, lat=lat, **kwargs) if dim != "site": ds_subset = ds_subset.rename(site=dim) new_history = ( f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"gridpoint spatial subsetting on {len(lon)} coordinates - clisops v{clisops.__version__}" ) return update_history_and_name(ds_subset, new_history, name) def _subset_bbox( ds: xr.Dataset, lon_bnds: tuple[float, float] | list[float], lat_bnds: tuple[float, float] | list[float], *, name: str | None = None, tile_buffer: float = 0, **kwargs, ) -> xr.Dataset: r""" Subset the data to a bounding box. Parameters ---------- ds : xr.Dataset Dataset to be subsetted. lon_bnds : tuple or list of two floats Longitude boundaries of the bounding box. lat_bnds : tuple or list of two floats Latitude boundaries of the bounding box. name: str, optional Used to rename the 'cat:domain' attribute. tile_buffer: float Uses an approximation of the grid cell size to add a dynamic buffer around the requested region. **kwargs : dict Other arguments to be sent to clisops. Possible kwargs are: - start_date (str): Start date for the subset in the format 'YYYY-MM-DD'. - end_date (str): End date for the subset in the format 'YYYY-MM-DD'. - first_level (int or float): First level of the subset. - last_level (int or float): Last level of the subset. - time_values (Sequence[str]): A list of datetime strings to subset. - level_values (Sequence[int or float]): A list of levels to subset. Returns ------- xr.Dataset Subsetted Dataset. """ ds = _load_lon_lat(ds) if tile_buffer > 0: lon_res, lat_res = _estimate_grid_resolution(ds) lon_bnds = ( lon_bnds[0] - lon_res * tile_buffer, lon_bnds[1] + lon_res * tile_buffer, ) lat_bnds = ( lat_bnds[0] - lat_res * tile_buffer, lat_bnds[1] + lat_res * tile_buffer, ) ds_subset = cl.subset_bbox(ds, lon_bnds=lon_bnds, lat_bnds=lat_bnds, **kwargs) new_history = ( f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"bbox spatial subsetting with {'buffer=' + str(tile_buffer) if tile_buffer > 0 else 'no buffer'}" f", lon_bnds={np.array(lon_bnds)}, lat_bnds={np.array(lat_bnds)}" f" - clisops v{clisops.__version__}" ) return update_history_and_name(ds_subset, new_history, name) def _subset_shape( ds: xr.Dataset, shape: str | Path | gpd.GeoDataFrame | shp.Polygon, *, name: str | None = None, tile_buffer: float = 0, **kwargs, ) -> xr.Dataset: r""" Subset the data to a shape. Parameters ---------- ds : xr.Dataset Dataset to be subsetted. shape : str or gpd.GeoDataFrame or shp.Polygon Path to the shapefile or GeoDataFrame or Polygon (which is assumed to have the ESPG:4326 CRS). name: str, optional Used to rename the 'cat:domain' attribute. tile_buffer: float Uses an approximation of the grid cell size to add a buffer around the requested region. **kwargs : dict Other arguments to be sent to clisops. Possible kwargs are: - raster_crs (str or int): EPSG number or PROJ4 string. - shape_crs (str or int): EPSG number or PROJ4 string. - buffer (float): Buffer size to add around the shape. Units are based on the shape degrees/metres. - start_date (str): Start date for the subset in the format 'YYYY-MM-DD'. - end_date (str): End date for the subset in the format 'YYYY-MM-DD'. - first_level (int or float): First level of the subset. - last_level (int or float): Last level of the subset. Returns ------- xr.Dataset Subsetted Dataset. """ ds = _load_lon_lat(ds) if isinstance(shape, str | Path): shape = gpd.read_file(shape) elif isinstance(shape, shp.Polygon): shape = gpd.GeoDataFrame(geometry=[shape], crs=CRS(4326)) if tile_buffer > 0: if kwargs.get("buffer") is not None: raise ValueError("Both tile_buffer and clisops' buffer were requested. Use only one.") lon_res, lat_res = _estimate_grid_resolution(ds) # The buffer argument needs to be in the same units as the shapefile, so it's simpler to always project the shapefile to WGS84. try: shape_crs = shape.crs except AttributeError: shape_crs = None if shape_crs is None: warnings.warn( "Shapefile does not have a CRS. Compatibility with the dataset is not guaranteed.", stacklevel=2, category=UserWarning, ) elif shape_crs != CRS(4326): # WGS84 warnings.warn( "Shapefile is not in EPSG:4326. Reprojecting to this CRS.", UserWarning, stacklevel=2, ) shape = shape.to_crs(4326) kwargs["buffer"] = np.max([lon_res, lat_res]) * tile_buffer ds_subset = cl.subset_shape(ds, shape=shape, **kwargs) new_history = ( f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"shape spatial subsetting with {'buffer=' + str(tile_buffer) if tile_buffer > 0 else 'no buffer'}" f", shape={Path(shape).name if isinstance(shape, str | Path) else 'gpd.GeoDataFrame'}" f" - clisops v{clisops.__version__}" ) return update_history_and_name(ds_subset, new_history, name) def _subset_sel(ds: xr.Dataset, *, name: str | None = None, **kwargs) -> xr.Dataset: r""" Subset the data using the .sel() method. Parameters ---------- ds : xr.Dataset Dataset to be subsetted. name: str, optional Used to rename the 'cat:domain' attribute. **kwargs : dict The keys are the dimensions to subset and the values are turned into a slice. Returns ------- xr.Dataset Subsetted Dataset. """ # Create a dictionary with slices for each dimension arg_sel = {dim: slice(*map(float, bounds)) for dim, bounds in kwargs.items()} # Subset the dataset ds_subset = ds.sel(**arg_sel) # Update the history attribute new_history = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] sel subsetting with arguments {arg_sel}" return update_history_and_name(ds_subset, new_history, name) def _load_lon_lat(ds: xr.Dataset) -> xr.Dataset: """Load longitude and latitude for more efficient subsetting.""" if xc.core.utils.uses_dask(ds.cf["longitude"]): logger.info("Loading longitude for more efficient subsetting.") (ds[ds.cf["longitude"].name],) = dask.compute(ds[ds.cf["longitude"].name]) if xc.core.utils.uses_dask(ds.cf["latitude"]): logger.info("Loading latitude for more efficient subsetting.") (ds[ds.cf["latitude"].name],) = dask.compute(ds[ds.cf["latitude"].name]) return ds
[docs] def get_grid_mapping(ds: xr.Dataset) -> str: """ Get the grid_mapping attribute from the dataset. Parameters ---------- ds : xr.Dataset The dataset to get the grid mapping from. Returns ------- str The name of the grid mapping variable, or an empty string if not found. """ gridmap = [ds[v].attrs["grid_mapping"] for v in ds.data_vars if "grid_mapping" in ds[v].attrs] gridmap += [c for c in ds.variables if ds[c].attrs.get("grid_mapping_name", None)] gridmap = list(np.unique(gridmap)) if len(gridmap) > 1: warnings.warn(f"There are conflicting grid_mapping attributes in the dataset. Assuming {gridmap[0]}.", stacklevel=2) return gridmap[0] if gridmap else ""
[docs] def get_crs(gridmap: xr.Dataset | xr.DataArray) -> cartopy.crs.Projection: """ Get the cartopy CRS. Parameters ---------- gridmap : xr.Dataset or xr.DataArray Either a dataset or dataraay that has a grid mapping variable or that grid mapping variable directly. Returns ------- cartopy.crs.Projection The cartopy crs. Only RotatedPole and ObliqueMercator are supported. """ # if not passing the grid mapping var itself, we need to get it if "grid_mapping_name" not in gridmap.attrs: if isinstance(gridmap, xr.Dataset): gridmap_name = get_grid_mapping(gridmap) else: # DataArray gridmap_name = gridmap.attrs["grid_mapping"] if gridmap_name == "" and ( "longitude" in gridmap.cf and "X" in gridmap.cf and gridmap.cf["longitude"].name == gridmap.cf["X"].name and gridmap.cf["X"].ndim == 1 and "latitude" in gridmap.cf and "Y" in gridmap.cf and gridmap.cf["latitude"].name == gridmap.cf["Y"].name and gridmap.cf["Y"].ndim == 1 ): gridmap = xr.DataArray(attrs={"grid_mapping_name": "latitude_longitude"}) else: gridmap = gridmap[gridmap_name] cf_params = gridmap.attrs # RotPole is a spherical-only projection, so having a WGS84 ellipse makes it confusing # Moreover, climate models usually use spherical models of the earth, so we use a spherical globe by default # This is different from cartopy, especially with PROJ >= 9.8 # 6370997 m is the conventional of the Earth as a sphere globe = cartopy.crs.Globe( datum=cf_params.get("horizontal_datum_name"), ellipse=cf_params.get("reference_ellipsoid_name", "sphere"), semimajor_axis=(cf_params.get("earth_radius") or cf_params.get("semi_major_axis") or 6370997), semiminor_axis=cf_params.get("semi_minor_axis"), inverse_flattening=cf_params.get("inverse_flattening"), towgs84=cf_params.get("towgs84"), ) if cf_params["grid_mapping_name"] == "latitude_longitude": crs = cartopy.crs.PlateCarree(globe=globe) elif cf_params["grid_mapping_name"] == "rotated_latitude_longitude": crs = cartopy.crs.RotatedPole( pole_longitude=cf_params["grid_north_pole_longitude"], pole_latitude=cf_params["grid_north_pole_latitude"], central_rotated_longitude=cf_params.get("north_pole_grid_longitude", 0), globe=globe, ) elif cf_params["grid_mapping_name"] == "oblique_mercator": crs = cartopy.crs.ObliqueMercator( central_longitude=cf_params["longitude_of_projection_origin"], central_latitude=cf_params["latitude_of_projection_origin"], scale_factor=cf_params["scale_factor_at_projection_origin"], azimuth=cf_params["azimuth_of_central_line"], false_easting=cf_params.get("false_easting", 0), false_northing=cf_params.get("false_northing", 0), globe=globe, ) elif cf_params["grid_mapping_name"] == "lambert_conformal_conic": crs = cartopy.crs.LambertConformal( standard_parallels=(cf_params["standard_parallel"],) if np.isscalar(cf_params["standard_parallel"]) else tuple(cf_params["standard_parallel"]), central_longitude=cf_params["longitude_of_central_meridian"], central_latitude=cf_params["latitude_of_projection_origin"], false_easting=cf_params.get("false_easting", 0), false_northing=cf_params.get("false_northing", 0), globe=globe, ) else: raise NotImplementedError(f"Grid mapping {cf_params['grid_mapping_name']} not implemented.") return crs
def _estimate_grid_resolution(ds: xr.Dataset) -> tuple[float, float]: # Since this is to compute a buffer, we take the maximum difference as an approximation. # Estimate the grid resolution if len(ds.lon.dims) == 1: # 1D lat-lon lon_res = np.abs(ds.lon.diff("lon").max().values) lat_res = np.abs(ds.lat.diff("lat").max().values) else: lon_res = np.abs(ds.lon.diff(ds.cf["X"].name).max().values) lat_res = np.abs(ds.lat.diff(ds.cf["Y"].name).max().values) return lon_res, lat_res def update_history_and_name(ds_subset, new_history, name): """ Update the history attribute and optionally the domain name of a dataset. Parameters ---------- ds_subset : xarray.Dataset The dataset to update. new_history : str The new history entry. name : str, optional The new domain name. Returns ------- xarray.Dataset The updated dataset. """ history = new_history + " \n " + ds_subset.attrs["history"] if "history" in ds_subset.attrs else new_history ds_subset.attrs["history"] = history if name is not None: ds_subset.attrs["cat:domain"] = name return ds_subset
[docs] def dataset_extent(ds: xr.Dataset, method: str = "shape", name: str | None = None) -> Region: r""" The dataset's spatial extent expressed as a xscen Region spec. Parameters ---------- ds : Dataset A dataset containing a grid with latitude and longitude coordinates and either the bounds or a grid mapping understood by :py:func:`~xscen.regrid.create_bounds_gridmapping`. method : {'shape', 'bbox'} One of the two methods recognized by xscen to specify an area. "shape" will compute a shapely polygon in latitude longitude coordinates from the cell bounds. "bbox" will return the total extent of that polygon. name : str, optional A name to give to the region. Returns ------- Region A :py:data:`~xscen.spatial.Region` dictionary useful for subsetting other datasets. """ from .regrid import create_bounds_gridmapping if "lat_bounds" not in ds: if "lat" in ds and ds.lat.ndim == 1: ds = ds.cf.add_bounds(["lon", "lat"]) else: ds = create_bounds_gridmapping(ds) if ds["lat_bounds"].ndim == 2: lonb = ds.lon_bounds.isel(bounds=xr.DataArray([0, 0, 1, 1], dims=("bounds",))) latb = ds.lat_bounds.isel(bounds=xr.DataArray([0, 1, 1, 0], dims=("bounds",))) lonb, latb = xr.broadcast(lonb, latb) else: lonb, latb = ds.lon_bounds, ds.lat_bounds region = {"method": method} if name is not None: region["name"] = name # Tolerance 0 is to merge colinear segments, without degrading anything else p = shp.simplify( shp.unary_union(shp.polygons(shp.linearrings(lonb.transpose(..., "bounds"), latb.transpose(..., "bounds")))), tolerance=0, ) match method: case "shape": region["shape"] = p case "bbox": bnds = p.bounds region["lon_bnds"] = [bnds[0], bnds[2]] region["lat_bnds"] = [bnds[1], bnds[3]] case _ as err: raise ValueError(f"Method must be 'shape' or 'bbox'. Got {err}.") return region
[docs] def merge_duplicated_stations(ds: xr.Dataset, precision: float | None = None) -> xr.Dataset: """ Merge identical locations of a dataset. Identical locations are merged by combination of float values (priority from order of the location dimension). Non-float values are taken from the first element. Parameters ---------- ds : Dataset Dataset with `lon` and `lat` coordinates. Both must be 1D and share the same dimension. precision : float, optional Round the coordinate up to this decimal before looking for duplicated points. Default (None) is not to round. Returns ------- Dataset A dataset with the same variables as the input but with duplicated locations merged. """ dim = ds.lat.dims[0] # Geopandas et des Points, c'est plus rapide que Pandas et des tuples if precision is not None: lon = ds.lon.round(precision) lat = ds.lat.round(precision) else: lon, lat = ds.lon, ds.lat df = gpd.GeoDataFrame(index=ds[dim], geometry=[shp.Point(lo, la) for lo, la in zip(lon, lat, strict=True)]) dups = df.duplicated("geometry") if dups.sum() == 0: # aucun dup return ds dss = [] N = 0 for _, grp in df.groupby("geometry"): if len(grp) == 1: dss.append(ds.sel({dim: [grp.index[0]]})) else: N += len(grp) - 1 dsi = ds.sel({dim: grp.index}) dss.append( dsi.map( lambda x, d: (x.bfill(d) if x.dtype == "f" else x).isel({d: [0]}), args=(dim,), ) ) h = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Merged {N} co-located points along dimension {dim}." return update_history_and_name(xr.concat(dss, dim), h, None)
[docs] def voronoi_weights( ds: xr.Dataset, region: gpd.GeoDataFrame | shp.Polygon | None = None, maxfrac: float | None = None, minlocs: int = 0, _pts: gpd.GeoDataFrame | None = None, ) -> xr.DataArray: """ Compute a weight for each location in the dataset inversely proportionnate to the location density. The total extent of the dataset is divided in regions using the Voronoi partition and weights are computed from the area of these regions. This is meant for station data for example. Parameters ---------- ds : Dataset Dataset with `lon` and `lat` coordinates. Both must be 1D and share the same dimension. region : GeoDataFrame or Polygon, optional A polygon to constrain the total area to partition with Voronoi polygons. Can also be a GeoDataFrame, in which case the weights are done for each feature. Defaults to a convex hull around ds with a buffer of 1°. See :py:func:`dataset_extent`. maxfrac : float, optional Limits the maximal region area to this fraction of the total area. minlocs : int When `region` is a GeoDataFrame, features containing less than this number of locations are removed from the output. _pts : GeoDataFrame, optional A GeoDataFrame of points to use for the Voronoi partition. This is a shortcut for when iterating over regions. Returns ------- DataArray The weights along the same dimension as the locations on the input. If region was a GeoDataFrame, another dimension "geom" partitions the weights for each feature and other columns are added as auxiliary coordinates (as with :py:func:`xs.spatial_mean`). """ dim = ds.lon.dims[0] if _pts is None: pts = gpd.GeoDataFrame( index=ds[dim], geometry=[shp.Point(lo, la) for lo, la in zip(ds.lon, ds.lat, strict=True)], ) else: # Shortcut for when iterating over regions pts = _pts if region is None: region = pts.union_all().convex_hull.buffer(1) elif isinstance(region, gpd.GeoDataFrame): weights = [] for _, row in region.iterrows(): weights.append(voronoi_weights(ds, row.geometry, maxfrac=maxfrac, _pts=pts).expand_dims(geom=[row.name])) weight = xr.concat(weights, "geom") weight = weight.assign_coords(region.drop(columns=["geometry"]).to_xarray().rename({region.index.name or "index": "geom"})) if minlocs > 0: weight = weight.where((weight > 0).sum(ds.lon.dims) >= minlocs, drop=True) weight = weight.where((weight > 0).any("geom"), drop=True) return weight zones = gpd.GeoDataFrame(geometry=pts.voronoi_polygons(extend_to=region)).clip(region) zones["geometry"] = zones.buffer(0) zones["zone_area"] = zones.geometry.area # overlay aligns points with their intersecting polygon # reset_index, set_index, reindex dance to preserve indexing info and sort final DF as input area = (gpd.overlay(pts.reset_index(names="station"), zones).set_index("station").reindex(pts.index)["zone_area"]).fillna(0) if maxfrac is not None: maxarea = area.sum() * maxfrac area = area.clip(0, maxarea) return xr.DataArray( 0 if area.sum() == 0 else area / area.sum(), dims=(dim,), coords=ds[dim].coords, name="weights", )