Source code for vivarium.cluster_tools.psimulate.cluster.interface

"""
=================
Cluster Interface
=================

"""

from __future__ import annotations

import os
import re
import subprocess
from pathlib import Path
from typing import Any, NamedTuple

from loguru import logger

from vivarium.cluster_tools.psimulate.environment import ENV_VARIABLES


[docs] def validate_cluster_environment() -> None: if "slurm" not in ENV_VARIABLES.HOSTNAME.value: raise RuntimeError("This tool must be run from the IHME cluster.") submit_host_marker = "slogin" if submit_host_marker in ENV_VARIABLES.HOSTNAME.value: raise RuntimeError("This tool must not be run from a submit host.")
[docs] class NativeSpecification(NamedTuple): job_name: str project: str queue: str peak_memory: float # Memory in GB max_runtime: str hardware: list[str] # Class constant NUM_THREADS: int = 1
[docs] def to_jobmon_spec(self, worker_logging_root: Path) -> dict[str, Any]: """Build the Jobmon compute resources dict from this NativeSpecification. Parameters ---------- worker_logging_root Root directory for worker logs. Returns ------- Dictionary of compute resources for Jobmon. Notes ----- * ``memory`` is passed in **GB** because the Jobmon SLURM plugin performs its own GB → MB conversion internally. * ``constraints`` is a pipe-separated string of SLURM feature names (e.g. ``"r650|r650v2"``), included only when hardware is requested. * ``standard_output`` and ``standard_error`` route SLURM stdout/stderr to the cluster logs directory. The Jobmon SLURM plugin appends the task name and SLURM job ID to these paths automatically. """ resources: dict[str, Any] = { "queue": self.queue, "project": self.project, "memory": self.peak_memory, # GB – Jobmon converts to MB "runtime": self._runtime_to_seconds(self.max_runtime), "cores": self.NUM_THREADS, "stdout": str(worker_logging_root), "stderr": str(worker_logging_root), } if self.hardware: resources["constraints"] = "|".join(self.hardware) return resources
@staticmethod def _runtime_to_seconds(runtime_str: str) -> int: """Convert HH:MM:SS runtime string to seconds. Parameters ---------- runtime_str Runtime in HH:MM:SS format. Returns ------- Runtime in seconds. """ return _parse_slurm_time(runtime_str)
# Buffer in seconds to subtract from the remaining SLURM time so the jobmon # workflow shuts down cleanly before SLURM kills the runner node. _SLURM_TIMEOUT_BUFFER_SECONDS = 120
[docs] def get_workflow_timeout_seconds() -> int | None: """Get jobmon workflow's timeout in seconds. The result includes a small buffer so that the workflow can shut down gracefully before SLURM terminates the runner node. Returns ------- Remaining time in seconds (with buffer subtracted), or ``None`` if there is no SLURM allocation (in which case Jobmon's built-in default timeout is used). Raises ------ RuntimeError If the remaining time is less than the safety buffer, or the remaining time cannot be determined from ``squeue``. """ job_id = os.environ.get("SLURM_JOB_ID") if job_id is None: logger.info( "SLURM_JOB_ID is unset. The workflow is likely being run " "from a SLURM-capable host without an explicit resource allocation " "e.g. Jenkins. Deferring to Jobmon's default timeout." ) return None try: # squeue -h -j <job_id> -o %L gives the remaining time as D-HH:MM:SS result = subprocess.run( ["squeue", "-h", "-j", job_id, "-o", "%L"], capture_output=True, text=True, timeout=10, ) remaining_str = result.stdout.strip() except Exception as e: raise RuntimeError( f"Could not determine remaining SLURM time for job {job_id}: {e}" ) from e if result.returncode != 0: raise RuntimeError( f"squeue failed for SLURM job {job_id} " f"(exit {result.returncode}): {result.stderr.strip()}" ) stderr_output = result.stderr.strip() if stderr_output: logger.warning(f"squeue stderr for job {job_id}: {stderr_output}") if not remaining_str: raise RuntimeError( f"squeue returned no output for SLURM job {job_id}. " "Cannot determine remaining allocation time." ) remaining_seconds = _parse_slurm_time(remaining_str) workflow_seconds = remaining_seconds - _SLURM_TIMEOUT_BUFFER_SECONDS if workflow_seconds <= 0: raise RuntimeError( f"SLURM allocation has {remaining_str} ({remaining_seconds}s) remaining, " f"which is less than the {_SLURM_TIMEOUT_BUFFER_SECONDS}s safety buffer. " "Not enough time to run a workflow." ) logger.info( f"Detected SLURM allocation with {remaining_str} ({remaining_seconds}s) remaining. " f"Setting workflow timeout to {workflow_seconds}s ({_SLURM_TIMEOUT_BUFFER_SECONDS}s buffer)." ) return workflow_seconds
def _parse_slurm_time(time_str: str) -> int: """Parse a SLURM time string into seconds. Handles the formats returned by ``squeue -o %L``: ``SS``, ``MM:SS``, ``HH:MM:SS``, ``D-HH:MM:SS``. Parameters ---------- time_str A SLURM time string. Returns ------- Total seconds represented by the time string. Raises ------ ValueError If ``time_str`` does not match a recognized SLURM time format. """ # When a day prefix is present, squeue always uses D-HH:MM:SS. # Without a day prefix the format is HH:MM:SS, MM:SS, or SS. if not re.fullmatch(r"(\d+-\d+:\d+:\d+|\d+(:\d+){0,2})", time_str): raise ValueError( f"Unrecognized SLURM time format: '{time_str}'. " "Expected D-HH:MM:SS, HH:MM:SS, MM:SS, or SS." ) if "-" in time_str: days, hms = time_str.split("-", 1) else: days = "0" hms = time_str hms_parts = hms.split(":") if len(hms_parts) == 3: h, m, s = hms_parts elif len(hms_parts) == 2: m, s = hms_parts h = "0" else: s = hms_parts[0] h = "0" m = "0" return int(days) * 86400 + int(h) * 3600 + int(m) * 60 + int(s)