"""
================================
vivarium.cluster_tools Utilities
================================
Making directories is hard.
"""
import functools
import hashlib
import os
import socket
import time
import warnings
from collections.abc import Callable
from pathlib import Path
from typing import ParamSpec, TypeVar
NUM_ROWS_PER_CENTRAL_LOG_FILE = 100_000
P = ParamSpec("P")
T = TypeVar("T")
[docs]
def get_cluster_name() -> str:
"""Returns the hostname"""
return socket.gethostname()
[docs]
def hash_output_path(output_path: str | Path, length: int = 8) -> str:
"""Return a short, stable hash of an output path.
Used to disambiguate Jobmon workflow identifiers between concurrent
pipelines that share the same name and timestamp but write to different
output directories.
Parameters
----------
output_path
The output path to hash.
length
Number of leading hex characters of the digest to return.
Returns
-------
The first ``length`` characters of the MD5 hex digest of ``output_path``.
"""
return hashlib.md5(str(output_path).encode()).hexdigest()[:length]
[docs]
def mkdir(
path: str | Path, umask: int = 0o002, exists_ok: bool = False, parents: bool = False
) -> None:
"""Utility method to create a directory with specified permissions.
Parameters
----------
path
Path of the directory to create.
umask
Umask specifying the desired permissions - defaults to 0o002.
exists_ok
If False, raises FileExistsError if the directory already exists.
parents
If False, raises FileNotFoundError if the directory's parent doesn't exist.
"""
path = Path(path)
old_umask = os.umask(umask)
try:
path.mkdir(exist_ok=exists_ok, parents=parents)
finally:
os.umask(old_umask)
[docs]
def backoff_and_retry(
backoff_seconds: int | float = 30,
num_retries: int = 3,
log_function: Callable[[str], None] = warnings.warn,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Adds a retry handler to the decorated function.
Parameters
----------
backoff_seconds
Number of seconds to wait until retrying the operation.
num_retries
Number of times to retry before failing.
log_function
A callable used to emit retry messages. Defaults to emitting
a standard library warning.
Returns
-------
A function that retries the decorated function a specified number of times.
"""
def _wrap(func: Callable[P, T]) -> Callable[P, T]:
@functools.wraps(func)
def _wrapped(*args: P.args, **kwargs: P.kwargs) -> T:
retries = num_retries
while retries:
try:
result = func(*args, **kwargs)
break
except Exception as e:
log_function(
f"Error trying to write results, retries remaining {retries}"
)
time.sleep(backoff_seconds)
retries -= 1
if not retries:
log_function(f"Retries exhausted.")
raise e
return result
return _wrapped
return _wrap