Source code for xscen.scripting

"""A collection of various convenience objects and functions to use in scripts."""

import logging
import mimetypes
import os
import shutil as sh
import signal
import smtplib
import sys
import time
import warnings
from contextlib import contextmanager
from distutils.dir_util import copy_tree
from email.message import EmailMessage
from io import BytesIO
from pathlib import Path
from traceback import format_exception
from typing import Optional, Union

import xarray as xr
from matplotlib.figure import Figure

from .catalog import ProjectCatalog
from .config import parse_config
from .utils import get_cat_attrs

logger = logging.getLogger(__name__)

__all__ = [
    "TimeoutException",
    "measure_time",
    "move_and_delete",
    "save_and_update",
    "send_mail",
    "send_mail_on_exit",
    "skippable",
    "timeout",
]


[docs] @parse_config def send_mail( *, subject: str, msg: str, to: Optional[str] = None, server: str = "127.0.0.1", port: int = 25, attachments: Optional[ list[Union[tuple[str, Union[Figure, os.PathLike]], Figure, os.PathLike]] ] = None, ) -> None: """Send email. Email a single address through a login-less SMTP server. The default values of server and port should work out-of-the-box on Ouranos's systems. Parameters ---------- subject: str Subject line. msg: str Main content of the email. Can be UTF-8 and multi-line. to: str, optional Email address to which send the email. If None (default), the email is sent to "{os.getlogin()}@{os.uname().nodename}". On unix systems simply put your real email address in `$HOME/.forward` to receive the emails sent to this local address. server : str SMTP server url. Defaults to 127.0.0.1, the local host. This function does not try to log-in. port: int Port of the SMTP service on the server. Defaults to 25, which is usually the default port on unix-like systems. attachments : list of paths or matplotlib figures or tuples of a string and a path or figure, optional List of files to attach to the email. Elements of the list can be paths, the mimetypes of those is guessed and the files are read and sent. Elements can also be matplotlib Figures which are send as png image (savefig) with names like "Figure00.png". Finally, elements can be tuples of a filename to use in the email and the attachment, handled as above. Returns ------- None """ # Inspired by https://betterprogramming.pub/how-to-send-emails-with-attachments-using-python-dd37c4b6a7fd email = EmailMessage() email["Subject"] = subject email["From"] = f"{os.getlogin()}@{os.uname().nodename}" email["To"] = to or f"{os.getlogin()}@{os.uname().nodename}" email.set_content(msg) for i, att in enumerate(attachments or []): fname = None if isinstance(att, tuple): fname, att = att if isinstance(att, Figure): data = BytesIO() att.savefig(data, format="png") data.seek(0) email.add_attachment( data.read(), maintype="image", subtype="png", filename=fname or f"Figure{i:02d}.png", ) else: # a path attpath = Path(att) ctype, encoding = mimetypes.guess_type(attpath) if ctype is None or encoding is not None: ctype = "application/octet-stream" maintype, subtype = ctype.split("/", 1) with attpath.open("rb") as fp: email.add_attachment( fp.read(), maintype=maintype, subtype=subtype, filename=fname or attpath.name, ) with smtplib.SMTP(host=server, port=port) as SMTP: SMTP.send_message(email)
class ExitWatcher: """An object that watches system exits and exceptions before the python exits.""" def __init__(self): self.code = None self.error = None self.hooked = False def hook(self): """Hooks the watcher to the system by monkeypatching `sys` with its own methods.""" if not self.hooked: self.orig_exit = sys.exit self.orig_excepthook = sys.excepthook sys.exit = self.exit sys.excepthook = self.err_handler self.hooked = True else: warnings.warn("Exit hooks have already been overrided.") def unhook(self): if self.hooked: sys.exit = self.orig_exit sys.excepthook = self.orig_excepthook else: raise ValueError("Exit hooks were not overriden, can't unhook.") def exit(self, code=0): self.code = code self.orig_exit(code) def err_handler(self, *exc_info): self.error = exc_info self.orig_excepthook(*exc_info) exit_watcher = ExitWatcher() exit_watcher.hook()
[docs] @parse_config def send_mail_on_exit( *, subject: Optional[str] = None, msg_ok: Optional[str] = None, msg_err: Optional[str] = None, on_error_only: bool = False, skip_ctrlc: bool = True, **mail_kwargs, ) -> None: """Send an email with content depending on how the system exited. This function is best used by registering it with `atexit`. Calls :py:func:`send_mail`. Parameters ---------- subject : str, optional Email subject. Will be appended by "Success", "No errors" or "Failure" depending on how the system exits. msg_ok : str, optional Content of the email if the system exists successfully. msg_err : str, optional Content of the email id the system exists with a non-zero code or with an error. The message will be appended by the exit code or with the error traceback. on_error_only : boolean Whether to only send an email on a non-zero/error exit. skip_ctrlc : boolean If True (default), exiting with a KeyboardInterrupt will not send an email. mail_kwargs Other arguments passed to :py:func:`send_mail`. The `to` argument is necessary for this function to work. Returns ------- None Example ------- Send an eamil titled "Woups" upon non-successful program exit. We assume the `to` field was given in the config. >>> import atexit >>> atexit.register(send_mail_on_exit, subject="Woups", on_error_only=True) """ subject = subject or "Workflow" msg_err = msg_err or "Workflow exited with some errors." if ( not on_error_only and exit_watcher.error is None and exit_watcher.code in [None, 0] ): send_mail( subject=subject + " - Success", msg=msg_ok or "Workflow exited successfully.", **mail_kwargs, ) elif exit_watcher.error is None and (exit_watcher.code or 0) > 0: send_mail( subject=subject + " - No errors", msg=f"{msg_err}\nSystem exited with code {exit_watcher.code}.", **mail_kwargs, ) elif exit_watcher.error is not None and ( exit_watcher.error[0] is not KeyboardInterrupt or not skip_ctrlc ): tb = "".join(format_exception(*exit_watcher.error)) msg_err = f"{msg_err}\n\n{tb}" send_mail(subject=subject + " - Failure", msg=msg_err, **mail_kwargs)
[docs] @parse_config class measure_time: """Context for timing a code block. Parameters ---------- name : str, optional A name to give to the block being timed, for meaningful logging. cpu : boolean If True, the CPU time is also measured and logged. logger : logging.Logger, optional The logger object to use when sending Info messages with the measured time. Defaults to a logger from this module. """ def __init__( self, name: Optional[str] = None, cpu: bool = False, logger: logging.Logger = logger, ): self.name = name or "" self.cpu = cpu self.logger = logger def __enter__(self): # noqa: D105 self.start = time.perf_counter() self.start_cpu = time.process_time() self.logger.info(f"Started process {self.name}.") return def __exit__(self, *args, **kwargs): # noqa: D105 elapsed = time.perf_counter() - self.start elapsed_cpu = time.process_time() - self.start_cpu occ = elapsed_cpu / elapsed s = f"Process {self.name} done in {elapsed:.02f} s" if self.cpu: s += f" and used {elapsed_cpu:.02f} of cpu time ({occ:.1%} % occupancy)." self.logger.info(s)
[docs] class TimeoutException(Exception): """An exception raised with a timeout occurs.""" def __init__(self, seconds: int, task: str = "", **kwargs): self.msg = f"Task {task} timed out after {seconds} seconds" super().__init__(self.msg, **kwargs)
[docs] @contextmanager def timeout(seconds: int, task: str = ""): """Timeout context manager. Only one can be used at a time, this is not multithread-safe : it cannot be used in another thread than the main one, but multithreading can be used in parallel. Parameters ---------- seconds : int Number of seconds after which the context exits with a TimeoutException. If None or negative, no timeout is set and this context does nothing. task : str, optional A name to give to the task, allowing a more meaningful exception. """ if seconds is None or seconds <= 0: yield else: def _timeout_handler(signum, frame): raise TimeoutException(seconds, task) old_handler = signal.signal(signal.SIGALRM, _timeout_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler)
[docs] @contextmanager def skippable( seconds: int = 2, task: str = "", logger: Optional[logging.Logger] = None ): """Skippable context manager. When CTRL-C (SIGINT, KeyboardInterrupt) is sent within the context, this catches it, prints to the log and gives a timeout during which a subsequent interruption will stop the script. Otherwise, the context exits normally. This is meant to be used within a loop so that we can skip some iterations: .. code-block:: python for i in iterable: with skippable(2, i): some_skippable_code() Parameters ---------- seconds: int Number of seconds to wait for a second CTRL-C. task : str A name for the skippable task, to have an explicit script. logger : logging.Logger, optional The logger to use when printing the messages. The interruption signal is notified with ERROR, while the skipping is notified with INFO. If not given (default), a brutal print is used. """ if logger: err = logger.error inf = logger.info else: err = inf = print try: yield except KeyboardInterrupt: err("Received SIGINT. Do it again to stop the script.") time.sleep(seconds) inf(f"Skipping {task}.")
[docs] def save_and_update( ds: xr.Dataset, pcat: ProjectCatalog, path: Optional[Union[str, os.PathLike]] = None, file_format: Optional[str] = None, build_path_kwargs: Optional[dict] = None, save_kwargs: Optional[dict] = None, update_kwargs: Optional[dict] = None, ): """ Construct the path, save and delete. This function can be used after each task of a workflow. Parameters ---------- ds: xr.Dataset Dataset to save. pcat: ProjectCatalog Catalog to update after saving the dataset. path: str or os.pathlike, optional Path where to save the dataset. If the string contains variables in curly bracket. They will be filled by catalog attributes. If None, the `catutils.build_path` fonction will be used to create a path. file_format: {'nc', 'zarr'} Format of the file. If None, look for the following in order: build_path_kwargs['format'], a suffix in path, ds.attrs['cat:format']. If nothing is found, it will default to zarr. build_path_kwargs: dict, optional Arguments to pass to `build_path`. save_kwargs: dict, optional Arguments to pass to `save_to_netcdf` or `save_to_zarr`. update_kwargs: dict, optional Arguments to pass to `update_from_ds`. """ build_path_kwargs = build_path_kwargs or {} save_kwargs = save_kwargs or {} update_kwargs = update_kwargs or {} # try to guess file format if not given. if file_format is None: if "format" in build_path_kwargs: file_format = build_path_kwargs.get("format") elif path is not None and Path(path).suffix: file_format = Path(path).suffix.split(".")[-1] else: file_format = ds.attrs.get("cat:format", "zarr") # get path if path is not None: path = str(path).format( **get_cat_attrs(ds, var_as_str=True) ) # fill path with attrs else: # if path is not given build it build_path_kwargs.setdefault("format", file_format) from .catutils import build_path path = build_path(ds, **build_path_kwargs) # save if file_format == "zarr": from .io import save_to_zarr save_to_zarr(ds, path, **save_kwargs) elif file_format == "nc": from .io import save_to_netcdf save_to_netcdf(ds, path, **save_kwargs) else: raise ValueError(f"file_format {file_format} is not valid. Use zarr or nc.") # update catalog pcat.update_from_ds(ds=ds, path=path, **update_kwargs) logger.info(f"File {path} has saved succesfully and the catalog was updated.")
[docs] def move_and_delete( moving: list[list[Union[str, os.PathLike]]], pcat: ProjectCatalog, deleting: Optional[list[Union[str, os.PathLike]]] = None, copy: bool = False, ): """ First, move files, then update the catalog with new locations. Finally, delete directories. This function can be used at the end of for loop in a workflow to clean temporary files. Parameters ---------- moving: list of lists of str or os.PathLike list of lists of path of files to move, following the format: [[source 1, destination1], [source 2, destination2],...] pcat: ProjectCatalog Catalog to update with new destinations deleting: list of str or os.PathLike, optional list of directories to be deleted including all contents and recreated empty. E.g. the working directory of a workflow. copy: bool, optional If True, copy directories instead of moving them. """ if isinstance(moving, list) and isinstance(moving[0], list): for files in moving: source, dest = files[0], files[1] if Path(source).exists(): if copy: logger.info(f"Copying {source} to {dest}.") copied_files = copy_tree(source, dest) for f in copied_files: # copied files don't include zarr files if f[-16:] == ".zarr/.zmetadata": ds = xr.open_dataset(f[:-11]) pcat.update_from_ds(ds=ds, path=f[:-11]) if f[-3:] == ".nc": ds = xr.open_dataset(f) pcat.update_from_ds(ds=ds, path=f) else: logger.info(f"Moving {source} to {dest}.") sh.move(source, dest) if Path(dest).suffix in [".zarr", ".nc"]: ds = xr.open_dataset(dest) pcat.update_from_ds(ds=ds, path=dest) else: logger.info(f"You are trying to move {source}, but it does not exist.") else: raise ValueError("`moving` should be a list of lists.") # erase workdir content if this is the last step if isinstance(deleting, list): for dir_to_delete in deleting: if Path(dir_to_delete).exists() and Path(dir_to_delete).is_dir(): logger.info(f"Deleting content inside {dir_to_delete}.") sh.rmtree(dir_to_delete) os.mkdir(dir_to_delete) elif deleting is None: pass else: raise ValueError("`deleting` should be a list.")