Source code for vivarium.cluster_tools.dagger.config.builder

"""
================
Workflow Builder
================

Build Jobmon workflows from workflow configuration.

"""

from __future__ import annotations

from typing import TYPE_CHECKING, Callable

from vivarium.cluster_tools.core.jobmon import client
from vivarium.cluster_tools.dagger.config.config import WorkflowConfig
from vivarium.cluster_tools.dagger.config.interface import (
    get_bash_step_tasks,
    get_notebook_step_tasks,
    get_pytest_step_tasks,
    get_python_step_tasks,
    get_simulation_step_tasks,
)

if TYPE_CHECKING:
    from jobmon.client.task import Task
    from jobmon.client.workflow import Workflow


STEP_TYPE_API_FNS: dict[str, Callable[..., list["Task"]]] = {
    "bash": get_bash_step_tasks,
    "simulation": get_simulation_step_tasks,
    "pytest": get_pytest_step_tasks,
    "python": get_python_step_tasks,
    "notebook": get_notebook_step_tasks,
}
"""Maps each YAML ``step_type`` to the API function that builds its tasks.
Paired with the dispatch tables in
:mod:`vivarium.cluster_tools.dagger.config.parsing`."""


[docs] def build_workflow_from_config( config: WorkflowConfig, workflow_args: str, *, resume: bool = False ) -> Workflow: """Build a complete Jobmon workflow from a workflow configuration. For each step in the workflow, dispatches to the matching interface API function (one of ``get_*_step_tasks``) and wires dependencies so that steps execute in sequential order (all tasks from step *N* must complete before any task in step *N+1* starts). Parameters ---------- config The validated workflow configuration to build. workflow_args Deterministic string that Jobmon uses to identify the workflow. Must be identical across runs for resume to work. resume Whether this build is a resume (``dagger restart``). Forwarded only to the simulation step, the one step type whose output layout varies on resume. """ tool = client.make_tool() workflow = client.make_workflow( tool, workflow_args=workflow_args, name=config.name, max_attempts=config.max_attempts, ) previous_step_tasks: list[Task] = [] all_tasks: list[Task] = [] for parsed_step in config.steps: api_fn = STEP_TYPE_API_FNS[parsed_step.step_type] # Step-level environment wins; otherwise the workflow default is # substituted. Build a new dict so the cached api_kwargs is not mutated. kwargs = parsed_step.api_kwargs if kwargs.get("environment") is None: kwargs = {**kwargs, "environment": config.default_environment} # Only the simulation step varies its behavior on resume. if parsed_step.step_type == "simulation": step_tasks = api_fn(**kwargs, tool=tool, is_resume=resume) else: step_tasks = api_fn(**kwargs, tool=tool) # Wire sequential dependencies: every task in this step # depends on every task from the previous step. for task in step_tasks: for prev_task in previous_step_tasks: client.add_upstream(task, prev_task) all_tasks.extend(step_tasks) previous_step_tasks = step_tasks client.add_tasks(workflow, all_tasks) return workflow