"""
=============
dagger Runner
=============
The main process loop for ``dagger run`` invocations: parse the
workflow configuration, build the Jobmon workflow, bind and run it,
and notify on completion.
"""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
from loguru import logger
from vivarium.cluster_tools.core.cluster.interface import get_workflow_timeout_seconds
from vivarium.cluster_tools.core.jobmon import client
from vivarium.cluster_tools.core.notifications import send_slack_notification
from vivarium.cluster_tools.dagger.config.builder import build_workflow_from_config
from vivarium.cluster_tools.dagger.config.config import WorkflowConfig
from vivarium.cluster_tools.dagger.config.parsing import load_workflow_config
from vivarium.cluster_tools.dagger.config.serialization import workflow_config_to_dict
from vivarium.cluster_tools.dagger.config.utilities import (
CONFIGURATION_FILENAME,
WORKFLOW_ARGS_FILENAME,
)
from vivarium.cluster_tools.utilities import hash_output_path
[docs]
def run_workflow(
workflow_config: WorkflowConfig,
verbose: int = 0,
slack_channel: str | None = None,
slack_tag: str | None = None,
mute_slack: bool = False,
) -> None:
"""Entry point for the ``dagger run`` subcommand: start a fresh workflow.
Parameters
----------
workflow_config
The parsed and validated workflow configuration (with CLI overrides applied).
verbose
Verbosity level.
slack_channel
Optional Slack channel to post a successful-run notification to instead
of DMing the launching user.
slack_tag
Optional username to @-mention in the channel notification on success.
mute_slack
If ``True``, suppress the completion notification entirely.
"""
logger.info(f"Starting workflow: {workflow_config.name}")
output_root = workflow_config.output_directory
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
output_hash = hash_output_path(output_root)
workflow_id = f"workflow_{workflow_config.name}_{output_hash}_{timestamp}"
_execute_workflow(
workflow_config,
workflow_id=workflow_id,
resume=False,
command_label="dagger run",
slack_channel=slack_channel,
slack_tag=slack_tag,
mute_slack=mute_slack,
)
[docs]
def restart_workflow(
results_directory: Path,
*,
project: str | None = None,
queue: str | None = None,
max_attempts: int | None = None,
verbose: int = 0,
slack_channel: str | None = None,
slack_tag: str | None = None,
mute_slack: bool = False,
) -> None:
"""Resume a previously started ``dagger`` workflow from its output directory.
Reloads the ``configuration.yaml`` and persisted Jobmon ``workflow_args``
written by the original ``dagger run`` invocation, applies any CLI
overrides, forces the output directory to ``results_directory``, and resumes
the Jobmon workflow, skipping completed tasks.
Parameters
----------
results_directory
Output directory from a previous ``dagger run``. The workflow's output
directory is forced to this path.
project
Override for the workflow project.
queue
Override for the workflow queue.
max_attempts
Override for the maximum number of Jobmon task attempts.
verbose
Verbosity level.
slack_channel
Optional Slack channel to post a successful-run notification to instead
of DMing the launching user.
slack_tag
Optional username to @-mention in the channel notification on success.
mute_slack
If ``True``, suppress the completion notification entirely.
Raises
------
FileNotFoundError
If ``results_directory`` is not a resumable ``dagger run`` output (it
lacks a saved configuration or the persisted workflow args).
"""
logger.info(f"Restarting workflow from {results_directory}")
config_path = results_directory / CONFIGURATION_FILENAME
if not config_path.exists():
raise FileNotFoundError(
f"{results_directory} is not a resumable dagger run output: "
f"missing {CONFIGURATION_FILENAME}."
)
workflow_args_path = results_directory / WORKFLOW_ARGS_FILENAME
if not workflow_args_path.exists():
raise FileNotFoundError(
f"{results_directory} is not a resumable dagger run output: "
f"missing {WORKFLOW_ARGS_FILENAME} (persisted workflow_args)."
)
workflow_id = workflow_args_path.read_text().strip()
logger.info(f"Resuming workflow with args: {workflow_id}")
# Reload the saved config, forcing the output directory to the results dir
workflow_config = load_workflow_config(
config_path,
project=project,
queue=queue,
max_attempts=max_attempts,
output_directory=results_directory,
)
_execute_workflow(
workflow_config,
workflow_id=workflow_id,
resume=True,
command_label="dagger restart",
slack_channel=slack_channel,
slack_tag=slack_tag,
mute_slack=mute_slack,
)
def _execute_workflow(
workflow_config: WorkflowConfig,
*,
workflow_id: str,
resume: bool,
command_label: str,
slack_channel: str | None = None,
slack_tag: str | None = None,
mute_slack: bool = False,
) -> None:
"""Build, bind, run, and report a workflow; shared by run and restart.
Persists the configuration and ``workflow_id`` (Jobmon's ``workflow_args``)
to the output directory *before* building, so that a workflow which fails
early is still restartable by ``dagger restart``.
"""
output_root = workflow_config.output_directory
output_root.mkdir(parents=True, exist_ok=True)
_write_workflow_configuration(output_root, workflow_config)
(output_root / WORKFLOW_ARGS_FILENAME).write_text(workflow_id)
logger.debug("Building workflow.")
workflow = build_workflow_from_config(
workflow_config, workflow_args=workflow_id, resume=resume
)
wf_status, monitoring_url = client.bind_and_run_workflow(
workflow,
output_root,
resume=resume,
seconds_until_timeout=get_workflow_timeout_seconds(),
)
send_slack_notification(
workflow_name=workflow_config.name,
status=wf_status,
command_label=command_label,
monitoring_url=monitoring_url,
results_dir=str(output_root),
slack_channel=slack_channel,
slack_tag=slack_tag,
mute_slack=mute_slack,
)
if wf_status != client.JOBMON_STATUS_DONE:
raise RuntimeError(
f"Workflow finished with status '{wf_status}' "
f"(expected '{client.JOBMON_STATUS_DONE}' for DONE)."
)
logger.info(f"Workflow completed successfully. Results in {output_root}")
def _write_workflow_configuration(output_root: Path, workflow_config: WorkflowConfig) -> None:
"""Write workflow configuration to a YAML file in the output directory.
Creates a ``configuration.yaml`` that ``dagger restart`` reloads to resume
the workflow (and that can also be reused directly with ``dagger run -c``).
Parameters
----------
output_root
The root output directory for the workflow.
workflow_config
The parsed and validated workflow configuration.
"""
config: dict[str, Any] = {"workflow": workflow_config_to_dict(workflow_config)}
config_file = output_root / CONFIGURATION_FILENAME
config_file.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False))
logger.info(f"Run configuration written to {config_file}")