Source code for vivarium.cluster_tools.psimulate.worker.task_runner

"""
==================
Jobmon Task Runner
==================

CLI entry point for Jobmon worker tasks. Loads the task's metadata JSON
and runs the appropriate work horse in-process. Invoked by every
psimulate command that submits simulations (``run`` / ``restart`` /
``expand`` / ``load_test``) as well as workflow simulation steps.

Logging is configured via ``_configure_dual_sink`` so loguru INFO+
messages land in the SLURM stdout file and WARNING+ messages land in the
SLURM stderr file (which the Jobmon GUI surfaces).

"""

import argparse
import json
import sys
from pathlib import Path

from loguru import logger

from vivarium.cluster_tools.psimulate import COMMANDS
from vivarium.cluster_tools.psimulate.jobs import JobParameters
from vivarium.cluster_tools.psimulate.results.writing import write_task_results
from vivarium.cluster_tools.psimulate.worker.load_test_work_horse import (
    work_horse as load_test_work_horse,
)
from vivarium.cluster_tools.psimulate.worker.vivarium_work_horse import work_horse


def _configure_dual_sink() -> None:
    """Route INFO+ to stdout and WARNING+ to stderr.

    Removes loguru's default stderr handler first so INFO-level messages
    don't end up duplicated on stderr. Warnings and errors thus land in
    the SLURM stderr file and the Jobmon GUI surfaces them.
    """
    logger.remove()
    logger.add(sys.stdout, level="INFO")
    logger.add(sys.stderr, level="WARNING")


[docs] def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run a single Jobmon worker task.") parser.add_argument( "--metadata-dir", type=Path, required=True, help="Directory containing task metadata JSON files.", ) parser.add_argument( "--task-id", type=str, required=True, help="The deterministic task ID.", ) parser.add_argument( "--results-dir", type=Path, required=True, help="Directory to write results to.", ) parser.add_argument( "--command", type=str, required=True, help="The psimulate command (e.g. run, restart, expand, load_test).", ) return parser.parse_args(argv)
[docs] def main(argv: list[str] | None = None) -> int: _configure_dual_sink() args = parse_args(argv) metadata_path = args.metadata_dir / f"{args.task_id}.json" logger.info(f"Loading task metadata from {metadata_path}") with open(metadata_path) as f: task_metadata = json.load(f) command = args.command job_parameters = JobParameters(**task_metadata) task_id = args.task_id logger.info(f"Running task {task_id} with command '{command}'") if command in (COMMANDS.run, COMMANDS.restart, COMMANDS.expand): results_dict = work_horse(job_parameters) elif command == COMMANDS.load_test: results_df = load_test_work_horse(job_parameters) results_dict = {"load_test": results_df} else: raise ValueError(f"Unknown command: {command}") logger.info(f"Task {task_id} completed, writing results.") write_task_results( results_dir=args.results_dir, job_parameters=job_parameters, results_dict=results_dict, ) logger.info(f"Task {task_id} results written successfully.") return 0
if __name__ == "__main__": sys.exit(main())