Source code for xscen.catutils

"""Catalog creation and path building tools."""

import json
import logging
import operator as op
import os
import queue
import string
import threading
import warnings
from collections.abc import Mapping, Sequence
from copy import deepcopy
from fnmatch import fnmatch
from functools import partial, reduce
from multiprocessing import Pool
from pathlib import Path
from typing import Any, Optional, Union

import cftime
import netCDF4
import numpy as np
import pandas as pd
import parse
import xarray as xr
import yaml
import zarr
from intake_esm import esm_datastore
from pandas import isna

from .catalog import COLUMNS, DataCatalog, generate_id
from .config import parse_config
from .io import get_engine
from .utils import CV, date_parser, ensure_new_xrfreq, get_cat_attrs

logger = logging.getLogger(__name__)


__all__ = ["build_path", "parse_directory", "parse_from_ds", "register_parse_type"]
# ## File finding and path parsing ## #


EXTRA_PARSE_TYPES = {}
"""Extra parse types to add to parse's default.

Add your own types with the :py:func:`register_parse_type` decorator.
"""


[docs] def register_parse_type(name: str, regex: str = r"([^\_\/\\]*)", group_count: int = 1): r"""Register a new parse type to be available in :py:func:`parse_directory` patterns. Function decorated by this will be registered in :py:data:`EXTRA_PARSE_TYPES`. The function must take a single string and should return a single string. If you return a different type, it may interfere with the other steps of `parse_directory`. Parameters ---------- name: str The type name. To make use of this type, put "{field:name}" in your pattern. regex: str A regex string to determine what can be matched by this type. The default matches anything but / \ and _, same as the default parse type. group_count: int The number of regex groups in the previous regex string. """ def _register_parse_type(func): EXTRA_PARSE_TYPES[name] = parse.with_pattern( regex, regex_group_count=group_count )(func) return func return _register_parse_type
@register_parse_type("no_", regex=r"([^\_\/\\]*)", group_count=1) def _parse_word(text: str) -> str: r"""Parse helper to match strings with anything except / \ or _.""" return text @register_parse_type("_", regex=r"([^\/\\]*)", group_count=1) def _parse_level(text: str) -> str: r"""Parse helper to match strings with anything except / or \.""" return text # Minimum 4 digits for a date (a single year). Maximum is, in theory, YYYYMMDDHHMMSS so 14. @register_parse_type( "datebounds", regex=r"(([\d]{4,15}(\-[\d]{4,15})?)|fx)", group_count=3 ) def _parse_datebounds( text: str, ) -> Union[list[str], tuple[None, None], tuple[str, str]]: """Parse helper to translate date bounds, used in the special DATES field.""" if "-" in text: return text.split("-") if text == "fx": return None, None return text, text def _find_assets( root: Union[str, os.PathLike], exts: set[str], lengths: set[int], dirglob: Optional[str] = None, ): """Walk recursively over files in a directory, filtering according to a glob pattern, path depth and extensions. Parameters ---------- root: str or Pathlike Path of the directory to walk through. exts: set of strings Set of file extensions to look for. lengths: set of ints Set of path depths to look for. dirglob: str, optional A glob pattern. If given, only parent folders matching this pattern are walked through. This pattern can not include the asset's basename. """ root = str(Path(root)) # to be sure for top, alldirs, files in os.walk(root): # Split zarr subdirectories from next iteration zarrs = [] for dr in deepcopy(alldirs): if dr.endswith(".zarr"): zarrs.append(dr) alldirs.remove(dr) if ( top != root and (os.path.relpath(top, root).count(os.path.sep) + 1) not in lengths ): continue if dirglob is not None and not fnmatch(top, dirglob): continue if ".zarr" in exts: for zr in zarrs: yield os.path.join(top, zr) if exts - {".zarr"}: # There are more exts than for file in files: if os.path.splitext(file)[-1] in exts: yield os.path.join(top, file) def _compile_pattern(pattern: str) -> parse.Parser: r"""Compile a parse pattern (if needed) for quicker evaluation. The `no_` default format spec is added where no format spec was given. The field prefix "?" is converted to "_" so the field name is a valid python variable name. """ if isinstance(pattern, parse.Parser): return pattern parts = [] for pre, field, fmt, _ in string.Formatter().parse(pattern): if not fmt: fmt = "no_" if field: if field.startswith("?"): field = "_" + field[1:] if field == "DATES": fmt = "datebounds" parts.extend([pre, "{", field, ":", fmt, "}"]) else: parts.append(pre) return parse.compile("".join(parts), EXTRA_PARSE_TYPES) def _name_parser( path: Union[os.PathLike, str], root: Union[os.PathLike, str], patterns: list[Union[str, parse.Parser]], read_from_file: Optional[Union[list[str], dict]] = None, attrs_map: Optional[dict] = None, xr_open_kwargs: Optional[dict] = None, ) -> Optional[dict]: """Extract metadata information from the file path. Parameters ---------- path : os.PathLike or str Full file path. root : os.PathLike or str Root directory. Only the part of the path relative to this directory is checked against the patterns. patterns : list of str or parse.Parser List of patterns to try in `parse.parse`. See :py:func:`parse_directory` for the pattern specification. read_from_file : list of string or dict, optional If not None, passed directly to :py:func:`parse_from_ds` as `names`. If None (default), only the path is parsed, the file is not opened. attrs_map : dict, optional If `read_from_file` is not None, passed directly to :py:func:`parse_from_ds`. xr_open_kwargs : dict, optional If `read_from_file` is not None, passed directly to :py:func:`parse_from_ds`. Returns ------- dict or None The metadata fields parsed from the path using the first matching pattern. If no pattern matched, None is returned. See Also -------- parse.parse parse_directory parse_from_ds """ abs_path = Path(path) path = abs_path.relative_to(Path(root)) xr_open_kwargs = xr_open_kwargs or {} d = {} for pattern in map(_compile_pattern, patterns): res = pattern.parse(str(path)) if res: d = res.named break else: return None d["path"] = abs_path d["format"] = path.suffix[1:] if "DATES" in d: d["date_start"], d["date_end"] = d.pop("DATES") if read_from_file: fromfile = parse_from_ds( abs_path, names=read_from_file, attrs_map=attrs_map, **xr_open_kwargs ) d.update(fromfile) # files with a single year/month if "date_end" not in d and "date_start" in d: d["date_end"] = d["date_start"] # strip to clean off lost spaces and line jumps # do not include wildcarded fields (? was transformed to _ in _compile_pattern) return { k: v.strip() if isinstance(v, str) else v for k, v in d.items() if not k.startswith("_") } def _parse_dir( # noqa: C901 root: Union[os.PathLike, str], patterns: list[str], dirglob: Optional[str] = None, checks: Optional[list[str]] = None, read_from_file: Optional[Union[list[str], dict]] = None, attrs_map: Optional[dict] = None, xr_open_kwargs: Optional[dict] = None, progress: bool = False, ): """Iterate and parses files in a directory, filtering according to basic pattern properties and optional checks. Parameters ---------- root: os.PathLike or str Path to walk through. patterns: list of strings or compiled parsers Patterns that the files will be checked against. The extensions of the patterns are extracted and only paths with these are returned. Also, the depths of the patterns are calculated and only paths of this depth under the root are returned. dirglob: str A glob pattern. If given, only parent folders matching this pattern are walked through. This pattern can not include the asset's basename. checks: list of strings, optional A list of checks to perform, available values are: - "readable" : Check that the file is readable by the current user. - "writable" : Check that the file is writable by the current user. - "ncvalid" : For netCDF, check that it is valid (openable with netCDF4). All checks will slow down the parsing. read_from_file : list of string or dict, optional If not None, passed directly to :py:func:`parse_from_ds` as `names`. If None (default), only the path is parsed, the file is not opened. attrs_map : dict, optional If `read_from_file` is not None, passed directly to :py:func:`parse_from_ds`. xr_open_kwargs : dict, optional If `read_from_file` is not None, passed directly to :py:func:`parse_from_ds`. progress: bool If True, the number of found files is printed to stdout. Return ------ List of dictionaries Metadata parsed from each found asset. """ lengths = {patt.count(os.path.sep) for patt in patterns} exts = {os.path.splitext(patt)[-1] for patt in patterns} comp_patterns = list(map(_compile_pattern, patterns)) checks = checks or [] # Multithread, communicating via FIFO queues. # This thread walks the directory # Another thread runs the checks # Another thread parses the path and file. # In theory, for a local disk, walking a directory cannot be parallelized. This is not as true for network-mounted drives. # Thus we parallelize the parsing steps. # If the name-parsing step becomes blocking, we could try to increase the number of threads (but netCDF4 can't multithread...) # Usually, the walking is the bottleneck. q_found = queue.Queue() q_checked = queue.Queue() parsed = [] def check_worker(): # Worker that processes the checks. while True: path = q_found.get() valid = True if "readable" in checks and not os.access(path, os.R_OK): valid = False if "writable" in checks and not os.access(path, os.W_OK): valid = False if "ncvalid" in checks: try: # Simple check that the file is openable if get_engine(path) == "netcdf4": # if get_engine is "h5netcdf", it means h5py was able to recognize it. # TODO: testing for zarr validity is not implemented with netCDF4.Dataset(path): pass except Exception: valid = False if valid: q_checked.put(path) q_found.task_done() def parse_worker(): # Worker that parses the paths while True: path = q_checked.get() try: d = _name_parser( path, root, comp_patterns, read_from_file=read_from_file, attrs_map=attrs_map, xr_open_kwargs=xr_open_kwargs, ) except Exception as err: logger.error(f"Parsing file {path} failed with {err}.") else: if d is not None: parsed.append(d) n = len(parsed) # Print number of files but on round numbers to limit the calls to stdout for large collections if progress and all( [(n < N or (n % N == 0)) for N in [10, 100, 1000]] ): print(f"Found {n:7d} files", end="\r") else: logger.debug(f"File {path} didn't match any pattern.") q_checked.task_done() CW = threading.Thread(target=check_worker, daemon=True) CW.start() PW = threading.Thread(target=parse_worker, daemon=True) PW.start() # Skip the checks if none are requested (save some overhead) q = q_found if checks else q_checked for path in _find_assets(Path(root), exts, lengths, dirglob): q.put(path) q_found.join() q_checked.join() return parsed def _get_new_item(name, newval, repval, oldval, fromcol, is_list): if is_list: if name == fromcol: # We replace only the repval element of the list return tuple(newval if v == repval else v for v in oldval) # We must return a tuple, replace the whole list with a single element. return (newval,) return newval # Simple replacement def _replace_in_row(oldrow: pd.Series, replacements: dict): """Replace values in Series (row) according to replacements mapping. Replacements can be simple mappings, but also mapping to other fields. List-like fields are handled. """ row = oldrow.copy() list_cols = [col for col in oldrow.index if isinstance(oldrow[col], (tuple, list))] for col, reps in replacements.items(): if col not in row: continue for repval, new in reps.items(): # Either the field is a list containing the value to replace, or it is the value to replace. if (col in list_cols and repval in row[col]) or repval == row[col]: if isinstance(new, dict): # Replacement is for multiple columns for name, newval in new.items(): row[name] = _get_new_item( name, newval, repval, row[col], col, name in list_cols ) else: row[col] = _get_new_item( col, new, repval, row[col], col, col in list_cols ) # Special case for "variable" where we remove Nones. if "variable" in row and "variable" in list_cols and None in row["variable"]: row["variable"] = tuple(v for v in row["variable"] if v is not None) return row def _parse_first_ds( grp: pd.DataFrame, cols: list[str], attrs_map: dict, xr_open_kwargs: dict ): """Parse attributes from one file per group, apply them to the whole group.""" fromfile = parse_from_ds(grp.path.iloc[0], cols, attrs_map, **xr_open_kwargs) logger.info(f"Got {len(fromfile)} fields, applying to {len(grp)} entries.") out = grp.copy() for col, val in fromfile.items(): for i in grp.index: # If val is an iterable we can't use loc. out.at[i, col] = val return out
[docs] @parse_config def parse_directory( # noqa: C901 directories: list[Union[str, os.PathLike]], patterns: list[str], *, id_columns: Optional[list[str]] = None, read_from_file: Union[ bool, Sequence[str], tuple[Sequence[str], Sequence[str]], Sequence[tuple[Sequence[str], Sequence[str]]], ] = False, homogenous_info: Optional[dict] = None, cvs: Optional[Union[str, os.PathLike, dict]] = None, dirglob: Optional[str] = None, xr_open_kwargs: Optional[Mapping[str, Any]] = None, only_official_columns: bool = True, progress: bool = False, parallel_dirs: Union[bool, int] = False, file_checks: Optional[list[str]] = None, ) -> pd.DataFrame: r"""Parse files in a directory and return them as a pd.DataFrame. Parameters ---------- directories : list of os.PathLike or list of str List of directories to parse. The parse is recursive. patterns : list of str List of possible patterns to be used by :py:func:`parse.parse` to decode the file names. See Notes below. id_columns : list of str, optional List of column names on which to base the dataset definition. Empty columns will be skipped. If None (default), it uses :py:data:`ID_COLUMNS`. read_from_file : boolean or set of strings or tuple of 2 sets of strings or list of tuples If True, if some fields were not parsed from their path, files are opened and missing fields are parsed from their metadata, if found. If a sequence of column names, only those fields are parsed from the file, if missing. If False (default), files are never opened. If a tuple of 2 lists of strings, only the first file of groups defined by the first list of columns is read and the second list of columns is parsed from the file and applied to the whole group. For example, `(["source"],["institution", "activity"])` will find a group with all the files that have the same source, open only one of the files to read the institution and activity, and write this information in the catalog for all filles of the group. It can also be a list of those tuples. homogenous_info : dict, optional Using the {column_name: description} format, information to apply to all files. These are applied before the `cvs`. cvs: str or os.PathLike or dict, optional Dictionary with mapping from parsed term to preferred terms (Controlled VocabularieS) for each column. May have an additional "attributes" entry which maps from attribute names in the files to official column names. The attribute translation is done before the rest. In the "variable" entry, if a name is mapped to None (null), that variable will not be listed in the catalog. A term can map to another mapping from field name to values, so that a value on one column triggers the filling of other columns. In the latter case, that other column must exist beforehand, whether it was in the pattern or in the homogenous_info. dirglob : str, optional A glob pattern for path matching to accelerate the parsing of a directory tree if only a subtree is needed. Only folders matching the pattern are parsed to find datasets. xr_open_kwargs: dict If needed, arguments to send xr.open_dataset() when opening the file to read the attributes. only_official_columns: bool If True (default), this ensures the final catalog only has the columns defined in :py:data:`xscen.catalog.COLUMNS`. Other fields in the patterns will raise an error. If False, the columns are those used in the patterns and the homogenous info. In that case, the column order is not determined. Path, format and id are always present in the output. progress : bool If True, a counter is shown in stdout when finding files on disk. Does nothing if `parallel_dirs` is not False. parallel_dirs: bool or int If True, each directory is searched in parallel. If an int, it is the number of parallel searches. This should only be significantly useful if the directories are on different disks. file_checks: list of str, optional A list of file checks to run on the parsed files. Available values are: - "readable" : Check that the file is readable by the current user. - "writable" : Check that the file is writable by the current user. - "ncvalid" : For netCDF, check that it is valid (openable with netCDF4). Any check will slow down the parsing. Notes ----- - Offical columns names are controlled and ordered by :py:data:`COLUMNS`: ["id", "type", "processing_level", "mip_era", "activity", "driving_institution", "driving_model", "institution", "source", "bias_adjust_institution", "bias_adjust_project","experiment", "member", "xrfreq", "frequency", "variable", "domain", "date_start", "date_end", "version"] - Not all column names have to be present, but "xrfreq" (obtainable through "frequency"), "variable", "date_start" and "processing_level" are necessary for a workable catalog. - 'patterns' should highlight the columns with braces. This acts like the reverse operation of `format()`. It is a template string with `{field name:type}` elements. The default "type" will match alphanumeric parts of the path, excluding the "_", "/" and "\" characters. The "_" type will allow underscores. Field names prefixed by "?" will not be included in the output. See the documentation of :py:mod:`parse` for more type options. You can also add your own types using the :py:func:`register_parse_type` decorator. The "DATES" field is special as it will only match dates, either as a single date (YYYY, YYYYMM, YYYYMMDD) assigned to "{date_start}" (with "date_end" automatically inferred) or two dates of the same format as "{date_start}-{date_end}". Example: `"{source}/{?ignored project name}_{?:_}_{DATES}.nc"` Here, "source" will be the full folder name and it can't include underscores. The first section of the filename will be excluded from the output, it was given a name (ignore project name) to make the pattern readable. The last section of the filenames ("dates") will yield a "date_start" / "date_end" couple. All other sections in the middle will be ignored, as they match "{?:_}". Returns ------- pd.DataFrame Parsed directory files """ homogenous_info = homogenous_info or {} xr_open_kwargs = xr_open_kwargs or {} if only_official_columns: columns = set(COLUMNS) - homogenous_info.keys() pattern_fields = { f for f in set.union( *(set(patt.named_fields) for patt in map(_compile_pattern, patterns)) ) if not f.startswith("_") } - {"DATES"} unrecognized = pattern_fields - set(COLUMNS) if unrecognized: raise ValueError( f"Patterns include fields which are not recognized by xscen : {unrecognized}. " "If this is wanted, pass only_official_columns=False to remove the check." ) read_file_groups = False # Whether to read file per group or not. if not isinstance(read_from_file, bool) and not isinstance(read_from_file[0], str): # A tuple of 2 lists read_file_groups = True if isinstance(read_from_file[0][0], str): # only one grouping read_from_file = [read_from_file] elif read_from_file is True: # True but not a list of strings read_from_file = columns if cvs is not None: if not isinstance(cvs, dict): with open(cvs) as f: cvs = yaml.safe_load(f) attrs_map = cvs.pop("attributes", {}) else: attrs_map = {} parse_kwargs = dict( patterns=patterns, dirglob=dirglob, read_from_file=read_from_file if not read_file_groups else None, attrs_map=attrs_map, xr_open_kwargs=xr_open_kwargs, checks=file_checks, ) if parallel_dirs is True: parallel_dirs = len(directories) parsed = [] if parallel_dirs > 1: with Pool(processes=parallel_dirs) as pool: results = [] for directory in directories: results.append(pool.apply_async(_parse_dir, (directory,), parse_kwargs)) for res in results: parsed.extend(res.get()) else: for directory in directories: parsed.extend(_parse_dir(directory, progress=progress, **parse_kwargs)) if not parsed: raise ValueError("No files found.") else: if progress: print() logger.info(f"Found and parsed {len(parsed)} files.") # Path has become NaN when some paths didn't fit any passed pattern df = pd.DataFrame(parsed).dropna(axis=0, subset=["path"]) if only_official_columns: # Add the missing official columns for col in set(COLUMNS) - set(df.columns): df[col] = None if read_file_groups: # Read fields from file, but only one per group. for group_cols, parse_cols in read_from_file: df = ( df.groupby(group_cols) .apply( _parse_first_ds, cols=parse_cols, attrs_map=attrs_map, xr_open_kwargs=xr_open_kwargs, ) .reset_index(drop=True) ) # Everything below could be wrapped in a function to be applied to each row maybe allowing some basic parallelization with dask (or else). # Add homogeous info for key, val in homogenous_info.items(): df[key] = val # Replace entries by definitions found in CV if cvs: df = df.apply(_replace_in_row, axis=1, replacements=cvs) # Fix potential legacy xrfreq if "xrfreq" in df.columns: df["xrfreq"] = df["xrfreq"].apply(ensure_new_xrfreq) # translate xrfreq into frequencies and vice-versa if {"xrfreq", "frequency"}.issubset(df.columns): df.fillna( {"xrfreq": df["frequency"].apply(CV.frequency_to_xrfreq, default=pd.NA)}, inplace=True, ) df.fillna( {"frequency": df["xrfreq"].apply(CV.xrfreq_to_frequency, default=pd.NA)}, inplace=True, ) # Parse dates # If we don't do the to_numpy(na_value=np.datetime64('')).astype('<M8[ms]') trick, # the dtype will be "object" if any of the dates are out-of-bounds. # `na_values=np.datetime64('')` is needed because pandas' NaT does not translate to numpy's NaT, but to float. if "date_start" in df.columns: df["date_start"] = ( df["date_start"] .apply(date_parser) .to_numpy(na_value=np.datetime64("")) .astype("<M8[ms]") ) if "date_end" in df.columns: df["date_end"] = ( df["date_end"] .apply(date_parser, end_of_period=True) .to_numpy(na_value=np.datetime64("")) .astype("<M8[ms]") ) # Checks if {"date_start", "date_end", "xrfreq", "frequency"}.issubset(df.columns): # All NaN dates correspond to a fx frequency. invalid = df.date_start.isnull() & df.date_end.isnull() & (df.xrfreq != "fx") n = invalid.sum() if n > 0: warnings.warn( f"{n} invalid entries where the start and end dates are Null but the frequency is not 'fx'." ) logger.debug(f"Paths: {df.path[invalid].values}") df = df[~invalid] # Exact opposite invalid = df.date_start.notnull() & df.date_end.notnull() & (df.xrfreq == "fx") n = invalid.sum() if n > 0: warnings.warn( f"{n} invalid entries where the start and end dates are given but the frequency is 'fx'." ) logger.debug(f"Paths: {df.path[invalid].values}") df = df[~invalid] # Create id from user specifications df["id"] = generate_id(df, id_columns) # TODO: ensure variable is a tuple ? # ensure path is a string df["path"] = df.path.apply(str) # Sort columns and return if only_official_columns: return df.loc[:, COLUMNS] return df
[docs] def parse_from_ds( # noqa: C901 obj: Union[str, os.PathLike, xr.Dataset], names: Sequence[str], attrs_map: Optional[Mapping[str, str]] = None, **xrkwargs, ): """Parse a list of catalog fields from the file/dataset itself. If passed a path, this opens the file. Infers the variable from the variables. Infers xrfreq, frequency, date_start and date_end from the time coordinate if present. Infers other attributes from the coordinates or the global attributes. Attributes names can be translated using the `attrs_map` mapping (from file attribute name to name in `names`). If the obj is the path to a Zarr dataset and none of "frequency", "xrfreq", "date_start" or "date_end" are requested, :py:func:`parse_from_zarr` is used instead of opening the file. Parameters ---------- obj: str or os.PathLike or xr.Dataset Dataset to parse. names: sequence of str List of attributes to be parsed from the dataset. attrs_map: dict, optional In the case of non-standard names in the file, this can be used to match entries in the files to specific 'names' in the requested list. xrkwargs: Arguments to be passed to open_dataset(). """ get_time = bool( {"frequency", "xrfreq", "date_start", "date_end"}.intersection(names) ) if not isinstance(obj, xr.Dataset): obj = Path(obj) if isinstance(obj, Path) and obj.suffixes[-1] == ".zarr": logger.info(f"Parsing attributes from Zarr {obj}.") ds_attrs, variables, time = _parse_from_zarr( obj, get_vars="variable" in names, get_time=get_time ) elif isinstance(obj, Path) and obj.suffixes[-1] == ".nc": logger.info(f"Parsing attributes with netCDF4 from {obj}.") ds_attrs, variables, time = _parse_from_nc( obj, get_vars="variable" in names, get_time=get_time ) else: if isinstance(obj, Path): logger.info(f"Parsing attributes with xarray from {obj}.") obj = xr.open_dataset(obj, engine=get_engine(obj), **xrkwargs) ds_attrs = obj.attrs time = obj.indexes["time"] if "time" in obj else None variables = set(obj.data_vars.keys()).difference( [v for v in obj.data_vars if len(obj[v].dims) == 0] ) rev_attrs_map = {v: k for k, v in (attrs_map or {}).items()} attrs = {} for name in names: if name == "variable": attrs["variable"] = tuple(sorted(variables)) elif name in ("frequency", "xrfreq") and time is not None and time.size > 3: # round to the minute to catch floating point imprecision freq = xr.infer_freq(time.round("min")) if freq: if "xrfreq" in names: attrs["xrfreq"] = freq if "frequency" in names: attrs["frequency"] = CV.xrfreq_to_frequency(freq) else: warnings.warn( f"Couldn't infer frequency of dataset {obj if not isinstance(obj, xr.Dataset) else ''}" ) elif name in ("frequency", "xrfreq") and time is None: attrs[name] = "fx" elif name == "date_start" and time is not None: attrs["date_start"] = time[0] elif name == "date_end" and time is not None: attrs["date_end"] = time[-1] elif name in rev_attrs_map and rev_attrs_map[name] in ds_attrs: attrs[name] = ds_attrs[rev_attrs_map[name]].strip() elif name in ds_attrs: attrs[name] = ds_attrs[name].strip() logger.debug(f"Got fields {attrs.keys()} from file.") return attrs
def _parse_from_zarr( path: Union[os.PathLike, str], get_vars: bool = True, get_time: bool = True ): """Obtain the list of variables, the time coordinate and the list of global attributes from a zarr dataset. Vars and attrs from reading the JSON files directly, time by reading the data with zarr. Variables are those - where .zattrs/_ARRAY_DIMENSIONS is not empty - where .zattrs/_ARRAY_DIMENSIONS does not contain the variable name - who do not appear in any "coordinates" attribute. Parameters ---------- path: os.PathLike or str Path to the zarr dataset. get_vars: bool If True, return the list of variables. get_time: bool If True, return the time coordinate. """ path = Path(path) if (path / ".zattrs").is_file(): with (path / ".zattrs").open() as f: ds_attrs = json.load(f) else: ds_attrs = {} variables = [] if get_vars: coords = [] for varpath in path.iterdir(): if varpath.is_dir() and (varpath / ".zattrs").is_file(): with (varpath / ".zattrs").open() as f: var_attrs = json.load(f) if ( varpath.name in var_attrs["_ARRAY_DIMENSIONS"] or len(var_attrs["_ARRAY_DIMENSIONS"]) == 0 ): coords.append(varpath.name) if "coordinates" in var_attrs: coords.extend( list(map(str.strip, var_attrs["coordinates"].split(" "))) ) variables = [ varpath.name for varpath in path.iterdir() if varpath.name not in coords and varpath.is_dir() ] time = None if get_time and (path / "time").is_dir(): ds = zarr.open(path) time = xr.CFTimeIndex( cftime.num2date( ds.time[:], calendar=ds.time.attrs["calendar"], units=ds.time.attrs["units"], ) ) return ds_attrs, variables, time def _parse_from_nc( path: Union[os.PathLike, str], get_vars: bool = True, get_time: bool = True ): """Obtain the list of variables, the time coordinate, and the list of global attributes from a netCDF dataset, using netCDF4. Parameters ---------- path: os.PathLike or str Path to the netCDF dataset. get_vars: bool If True, return the list of variables. get_time: bool If True, return the time coordinate. """ ds = netCDF4.Dataset(str(Path(path))) ds_attrs = {k: ds.getncattr(k) for k in ds.ncattrs()} variables = [] if get_vars: coords = [] for name, var in ds.variables.items(): if "coordinates" in var.ncattrs(): coords.extend( list(map(str.strip, var.getncattr("coordinates").split(" "))) ) if len(var.dimensions) == 0 or name in var.dimensions: coords.append(name) variables = [var for var in ds.variables.keys() if var not in coords] time = None if get_time and "time" in ds.variables: time = xr.CFTimeIndex( cftime.num2date( ds["time"][:], calendar=ds["time"].calendar, units=ds["time"].units ).data ) ds.close() return ds_attrs, variables, time # ## Path building ## # def _schema_option(option: dict, facets: dict): """Parse an option element of the facet schema tree.""" facet_value = facets.get(option["facet"]) if "value" in option: if isinstance(option["value"], str): answer = facet_value == option["value"] else: # A list answer = facet_value in option["value"] else: answer = not isna(facet_value) return answer def _schema_level(schema: Union[dict, list[str], str], facets: dict): if isinstance(schema, str): if schema.startswith("(") and schema.endswith(")"): optional = True schema = schema[1:-1] else: optional = False if schema == "DATES": return _schema_dates(facets, optional=optional) # A single facet: if isna(facets.get(schema)): if optional: return None raise ValueError( f"Facet {schema} is needed but None-like or missing in the data." ) return facets[schema] if isinstance(schema, list): parts = [] for element in schema: part = _schema_level(element, facets) if not isna(part): parts.append(part) return "_".join(parts) if isinstance(schema, dict) and "text" in schema: return schema["text"] raise ValueError(f"Invalid schema : {schema}") def _schema_dates(facets: dict, optional: bool = False): if facets.get("xrfreq") == "fx": return "fx" if any([facets.get(f) is None for f in ["date_start", "date_end", "xrfreq"]]): if optional: return None raise ValueError( "Facets date_start, date_end and xrfreq are needed, but at least one is missing or None-like in the data." ) start = date_parser(facets["date_start"]) end = date_parser(facets["date_end"]) freq = pd.Timedelta(CV.xrfreq_to_timedelta(facets["xrfreq"])) # Full years : Starts on Jan 1st and is either annual or ends on Dec 31st (accepting Dec 30 for 360 cals) if ( start.month == 1 and start.day == 1 and ( freq >= pd.Timedelta(CV.xrfreq_to_timedelta("YS")) or (end.month == 12 and end.day > 29) ) ): if start.year == end.year: return f"{start:%4Y}" return f"{start:%4Y}-{end:%4Y}" # Full months : Starts on the 1st and is either monthly or ends on the last day if start.day == 1 and ( freq >= pd.Timedelta(CV.xrfreq_to_timedelta("M")) or end.day > 27 ): # Full months if (start.year, start.month) == (end.year, end.month): return f"{start:%4Y%m}" return f"{start:%4Y%m}-{end:%4Y%m}" # The full range return f"{start:%4Y%m%d}-{end:%4Y%m%d}" def _schema_filename(schema: list, facets: dict) -> str: return "_".join( [ facets.get(element) if element != "DATES" else _schema_dates(facets) for element in schema if element == "DATES" or not isna(facets.get(element)) ] ) def _schema_folders(schema: list, facets: dict) -> list: folder_tree = list() for level in schema: part = _schema_level(level, facets) if not isna(part): folder_tree.append(part) return folder_tree def _get_needed_fields(schema: dict): """Return the list of facets that is needed for a given schema.""" needed = set() for level in schema["folders"]: if isinstance(level, str): if not (level.startswith("(") and level.endswith(")")): needed.add(level) elif isinstance(level, list): for lvl in level: needed.add(lvl) elif not (isinstance(level, dict) and list(level.keys()) == ["text"]): raise ValueError( f"Invalid schema with unknown {level} of type {type(level)}." ) return needed def _read_schemas(schemas): if isinstance(schemas, dict) and {"folders", "filename"}.issubset(schemas.keys()): # Single schema # Remove any conditions (or insert empty one) schemas["with"] = [] schemas = {"unnamed_schema": schemas} elif not isinstance(schemas, dict): if schemas is None: schemas = Path(__file__).parent / "data" / "file_schema.yml" with open(schemas) as f: schemas = yaml.safe_load(f) for name, schema in schemas.items(): missing_fields = {"with", "folders", "filename"} - set(schema.keys()) if missing_fields: raise ValueError( f"Invalid schema specification. Missing fields {missing_fields} in schema {name}." ) return schemas def _build_path( data: Union[dict, xr.Dataset, xr.DataArray, pd.Series], schemas: dict, root: Union[str, os.PathLike], get_type: bool = False, **extra_facets, ) -> Union[Path, tuple[Path, str]]: # Get all known metadata if isinstance(data, (xr.Dataset, xr.DataArray)): facets = ( # Get non-attribute metadata parse_from_ds( data, ["frequency", "xrfreq", "date_start", "date_end", "variable"] ) | data.attrs | get_cat_attrs(data) ) elif isinstance(data, pd.Series): facets = dict(data) else: raise NotImplementedError(f"Can't buld path with object of type {type(data)}") facets = facets | extra_facets # Scalar-ize variable if needed. if ( "variable" in facets and not isinstance(facets["variable"], str) and len(facets["variable"]) == 1 ): facets["variable"] = facets["variable"][0] # Find the first fitting schema for name, schema in schemas.items(): if not schema["with"]: match = True else: match = reduce( op.and_, map(partial(_schema_option, facets=facets), schema["with"]) ) if match: # Checks needed_fields = _get_needed_fields(schema) if missing_fields := needed_fields - set(facets.keys()): raise ValueError( f"Missing facets {missing_fields} are needed to build the path according to selected schema {name}." ) if "variable" in needed_fields and not isinstance(facets["variable"], str): raise ValueError( f"Selected schema {name} is meant to be used with single-variable datasets. Got multiple: {facets['variable']}. " "You can override the facet by passing `variable='varname'` directly." ) out = Path(*_schema_folders(schema["folders"], facets)) out = out / _schema_filename(schema["filename"], facets) if root is not None: out = Path(root) / out if "format" in facets: # Add extension # Can't use `with_suffix` in case there are dots in the name out = out.parent / f"{out.name}.{facets['format']}" if get_type: return out, name return out raise ValueError(f"This file doesn't match any schema. Facets:\n{facets}")
[docs] @parse_config def build_path( data: Union[dict, xr.Dataset, xr.DataArray, pd.Series, DataCatalog, pd.DataFrame], schemas: Optional[Union[str, os.PathLike, dict]] = None, root: Optional[Union[str, os.PathLike]] = None, **extra_facets, ) -> Union[Path, DataCatalog, pd.DataFrame]: r"""Parse the schema from a configuration and construct path using a dictionary of facets. Parameters ---------- data : dict or xr.Dataset or xr.DataArray or pd.Series or DataCatalog or pd.DataFrame Dict of facets. Or xarray object to read the facets from. In the latter case, variable and time-dependent facets are read with :py:func:`parse_from_ds` and supplemented with all the object's attribute, giving priority to the "official" xscen attributes (prefixed with `cat:`, see :py:func:`xscen.utils.get_cat_attrs`). Can also be a catalog or a DataFrame, in which a "new_path" column is generated for each item. schemas : Path or dict, optional Path to YAML schematic of database schema. If None, will use a default schema. See the comments in the `xscen/data/file_schema.yml` file for more details on its construction. A dict of dict schemas can be given (same as reading the yaml). Or a single schema dict (single element of the yaml). root : str or Path, optional If given, the generated path(s) is given under this root one. \*\*extra_facets Extra facets to supplement or override metadadata missing from the first input. Returns ------- Path or catalog Constructed path. If "format" is absent from the facets, it has no suffix. If `data` was a catalog, a copy with a "new_path" column is returned. Another "new_path_type" column is also added if `schemas` was a collection of schemas (like the default). Examples -------- To rename a full catalog, the simplest way is to do: >>> import xscen as xs >>> import shutil as sh >>> new_cat = xs.catutils.build_path(old_cat) >>> for i, row in new_cat.iterrows(): ... sh.move(row.path, row.new_path) ... """ if root: root = Path(root) schemas = _read_schemas(schemas) if isinstance(data, (esm_datastore, pd.DataFrame)): if isinstance(data, esm_datastore): df = data.df else: df = data df = df.copy() paths = df.apply( _build_path, axis=1, result_type="expand", schemas=schemas, root=root, get_type=True, **extra_facets, ) df["new_path"] = paths[0].apply(str) if len(schemas) > 1: df["new_path_type"] = paths[1] return df return _build_path(data, schemas=schemas, root=root, get_type=False, **extra_facets)