"""Testing utilities for xscen."""
import importlib.metadata
import os
import re
from io import StringIO
from pathlib import Path
from typing import TextIO
import cartopy.crs as ccrs
import numpy as np
import pandas as pd
import xarray as xr
from xclim.testing.helpers import test_timeseries as timeseries
from xclim.testing.utils import show_versions as _show_versions
from xscen.spatial import get_crs
__all__ = ["datablock_3d", "fake_data", "publish_release_notes", "show_versions"]
[docs]
def datablock_3d(
values: np.ndarray,
variable: str,
x: str,
x_start: float,
y: str,
y_start: float,
x_step: float = 0.1,
y_step: float = 0.1,
start: str = "7/1/2000",
freq: str = "D",
units: str | None = None,
as_dataset: bool = False,
) -> xr.DataArray | xr.Dataset:
"""
Create a generic timeseries object based on pre-defined dictionaries of existing variables.
Parameters
----------
values : np.ndarray
The values to be assigned to the variable. Dimensions are interpreted [T, Y, X].
variable : str
The variable name.
x : str
The name of the x coordinate.
x_start : float
The starting value of the x coordinate.
y : str
The name of the y coordinate.
y_start : float
The starting value of the y coordinate.
x_step : float
The step between x values.
y_step : float
The step between y values.
start : str
The starting date of the time coordinate.
freq : str
The frequency of the time coordinate.
units : str, optional
The units of the variable. If None, the units are inferred from the variable name.
as_dataset : bool
If True, return a Dataset, else a DataArray.
Returns
-------
xr.DataArray or xr.Dataset
The created DataArray or Dataset.
"""
attrs = {
"lat": {
"units": "degrees_north",
"description": "Latitude.",
"standard_name": "latitude",
},
"lon": {
"units": "degrees_east",
"description": "Longitude.",
"standard_name": "longitude",
},
"rlon": {
"units": "degrees",
"description": "Rotated longitude.",
"standard_name": "grid_longitude",
},
"rlat": {
"units": "degrees",
"description": "Rotated latitude.",
"standard_name": "grid_latitude",
},
"x": {
"units": "m",
"description": "Projection x coordinate.",
"standard_name": "projection_x_coordinate",
},
"y": {
"units": "m",
"description": "Projection y coordinate.",
"standard_name": "projection_y_coordinate",
},
}
dims = {
"time": xr.DataArray(pd.date_range(start, periods=values.shape[0], freq=freq), dims="time"),
y: xr.DataArray(
np.arange(y_start, y_start + values.shape[1] * y_step, y_step),
dims=y,
attrs=attrs[y],
)[0 : values.shape[1]], # np.arange sometimes creates an extra value
x: xr.DataArray(
np.arange(x_start, x_start + values.shape[2] * x_step, x_step),
dims=x,
attrs=attrs[x],
)[0 : values.shape[2]], # np.arange sometimes creates an extra value
}
# Get the attributes using xclim, then create the DataArray
ts_1d = timeseries(values[:, 0, 0], variable, start, units=units, freq=freq)
da = xr.DataArray(values, coords=dims, dims=dims, name=variable, attrs=ts_1d.attrs)
# Add the axis information
da[x].attrs["axis"] = "X"
da[y].attrs["axis"] = "Y"
da["time"].attrs["axis"] = "T"
# Support for rotated pole and oblique mercator grids
if x != "lon" and y != "lat":
if x == "rlon": # rotated pole
GM = xr.DataArray(
"",
attrs={
"grid_mapping_name": "rotated_latitude_longitude",
"grid_north_pole_latitude": 42.5,
"grid_north_pole_longitude": 83.0,
"north_pole_grid_longitude": 0.0,
"earth_radius": 6370997,
},
)
da.attrs["grid_mapping"] = "rotated_pole"
CRS = get_crs(GM)
else:
GM = xr.DataArray(
"",
attrs={
"grid_mapping_name": "oblique_mercator",
"azimuth_of_central_line": 90.0,
"latitude_of_projection_origin": 46.0,
"longitude_of_projection_origin": 263.0,
"scale_factor_at_projection_origin": 1.0,
"false_easting": 0.0,
"false_northing": 0.0,
},
)
da.attrs["grid_mapping"] = "oblique_mercator"
CRS = get_crs(GM)
PC = ccrs.PlateCarree(globe=CRS.globe)
YY, XX = xr.broadcast(da[y], da[x])
pts = PC.transform_points(CRS, XX.values, YY.values)
da["lon"] = xr.DataArray(pts[..., 0], dims=XX.dims, attrs=attrs["lon"])
da["lat"] = xr.DataArray(pts[..., 1], dims=YY.dims, attrs=attrs["lat"])
if as_dataset:
if "grid_mapping" in da.attrs:
da = da.to_dataset()
if da[variable].attrs["grid_mapping"] == "rotated_pole":
da = da.assign_coords(rotated_pole=GM)
else:
da = da.assign_coords(oblique_mercator=GM)
return da
else:
return da.to_dataset()
else:
return da
[docs]
def fake_data(
nyears: int,
nx: int,
ny: int,
rand_type: str = "random",
seed: int = 0,
amplitude: float = 1.0,
offset: float = 0.0,
) -> np.ndarray:
"""
Generate fake data for testing.
Parameters
----------
nyears : int
Number of years (365 days) to generate.
nx : int
Number of x points.
ny : int
Number of y points.
rand_type : str
Type of random data to generate. Options are:
- "random": random data with no structure.
- "tas": temperature-like data with a yearly half-sine cycle.
seed : int
Random seed.
amplitude : float
Amplitude of the random data.
offset : float
Offset of the random data.
Returns
-------
np.ndarray
Fake data.
"""
if rand_type not in ["random", "tas"]:
raise NotImplementedError(f"rand_type={rand_type} not implemented.")
np.random.seed(seed)
data = np.reshape(np.random.random(365 * nyears * (nx * ny)) * amplitude, (365 * nyears, ny, nx))
if rand_type == "tas":
# add an annual cycle (repeating half-sine)
data += np.tile(
np.sin(np.linspace(0, np.pi, 365))[:, None, None] * amplitude,
(nyears, 1, 1),
)
# convert to Kelvin and offset the half-sine
data = data + 273.15 - amplitude
# add trend (polynomial 3rd)
np.random.seed(seed)
base_warming_rate = 0.02 + np.random.random() * 0.01
data += np.tile(np.linspace(0, base_warming_rate * nyears, 365 * nyears) ** 3, (nx, ny, 1)).T
# add a semi-random offset
np.random.seed(seed)
data = data + offset - (np.random.random() * amplitude - amplitude / 2)
return data
[docs]
def publish_release_notes(
style: str = "md",
file: os.PathLike | StringIO | TextIO | None = None,
changes: str | os.PathLike | None = None,
latest: bool = True,
) -> str | None:
"""
Format release history in Markdown or ReStructuredText.
Parameters
----------
style : {"rst", "md"}
Use ReStructuredText (`rst`) or Markdown (`md`) formatting. Default: Markdown.
file : {os.PathLike, StringIO, TextIO, None}
If provided, prints to the given file-like object. Otherwise, returns a string.
changes : {str, os.PathLike}, optional
If provided, manually points to the file where the changelog can be found.
Assumes a relative path otherwise.
latest : bool
Whether to return the release notes of the latest version or all the content of the changelog.
Returns
-------
str, optional
A string containing the formatted release notes if `file` is None. Otherwise, None.
Notes
-----
This function exists solely for development purposes. Adapted from xclim.testing.utils.publish_release_notes.
"""
if isinstance(changes, str | Path):
changes_file = Path(changes).absolute()
else:
changes_file = Path(__file__).absolute().parents[2].joinpath("CHANGELOG.rst")
if not changes_file.exists():
raise FileNotFoundError("Changes file not found in xscen file tree.")
with Path(changes_file).open(encoding="utf-8") as f:
changes = f.read()
if style == "rst":
hyperlink_replacements = {
r":issue:`([0-9]+)`": r"`GH/\1 <https://github.com/Ouranosinc/xscen/issues/\1>`_",
r":pull:`([0-9]+)`": r"`PR/\1 <https://github.com/Ouranosinc/xscen/pull/\>`_",
r":user:`([a-zA-Z0-9_.-]+)`": r"`@\1 <https://github.com/\1>`_",
}
elif style == "md":
hyperlink_replacements = {
r":issue:`([0-9]+)`": r"[GH/\1](https://github.com/Ouranosinc/xscen/issues/\1)",
r":pull:`([0-9]+)`": r"[PR/\1](https://github.com/Ouranosinc/xscen/pull/\1)",
r":user:`([a-zA-Z0-9_.-]+)`": r"[@\1](https://github.com/\1)",
}
else:
raise NotImplementedError()
for search, replacement in hyperlink_replacements.items():
changes = re.sub(search, replacement, changes)
if latest:
changes_split = changes.split("\n\nv0.")
changes = changes_split[0] + "\n\nv0." + changes_split[1]
if style == "md":
changes = changes.replace("=========\nChangelog\n=========", "# Changelog")
titles = {r"\n(.*?)\n([\-]{1,})": "-", r"\n(.*?)\n([\^]{1,})": "^"}
for title_expression, level in titles.items():
found = re.findall(title_expression, changes)
for grouping in found:
fixed_grouping = str(grouping[0]).replace("(", r"\(").replace(")", r"\)")
search = rf"({fixed_grouping})\n([\{level}]{'{' + str(len(grouping[1])) + '}'})"
replacement = f"{'##' if level == '-' else '###'} {grouping[0]}"
changes = re.sub(search, replacement, changes)
link_expressions = r"[\`]{1}([\w\s]+)\s<(.+)>`\_"
found = re.findall(link_expressions, changes)
for grouping in found:
search = rf"`{grouping[0]} <.+>`\_"
replacement = f"[{str(grouping[0]).strip()}]({grouping[1]})"
changes = re.sub(search, replacement, changes)
if not file:
return changes
if isinstance(file, Path | os.PathLike):
file = Path(file).open("w")
print(changes, file=file)
[docs]
def show_versions(
file: os.PathLike | StringIO | TextIO | None = None,
deps: list | None = None,
) -> str | None:
"""
Print the versions of xscen and its dependencies.
Parameters
----------
file : {os.PathLike, StringIO, TextIO}, optional
If provided, prints to the given file-like object. Otherwise, returns a string.
deps : list, optional
A list of dependencies to gather and print version information from. Otherwise, prints `xscen` dependencies.
Returns
-------
str or None
A string containing the versions of xscen and its dependencies, or None if printed to a file.
"""
def _get_xscen_dependencies():
xscen_metadata = importlib.metadata.metadata("xscen")
requires = xscen_metadata.get_all("Requires-Dist")
requires = [req.split("[")[0].split(";")[0].split(">")[0].split("<")[0].split("=")[0].split("!")[0] for req in requires]
return ["xscen"] + requires
if deps is None:
deps = _get_xscen_dependencies()
return _show_versions(file=file, deps=deps)