"""
==============
psimulate Jobs
==============
"""
import hashlib
import json
from collections import defaultdict
from copy import deepcopy
from pathlib import Path
from typing import Any, NamedTuple
import numpy as np
import pandas as pd
from vivarium.engine.framework.utilities import collapse_nested_dict
from vivarium.cluster_tools.psimulate import branches
[docs]
def generate_task_id(
input_draw: int, random_seed: int, branch_configuration: dict[str, Any]
) -> str:
"""Generate a deterministic task ID from job-specific parameters.
Uses SHA-256 hash of canonical JSON serialization of the job-specific
parameters (input_draw, random_seed, branch_configuration).
Parameters
----------
input_draw
The input draw number.
random_seed
The random seed.
branch_configuration
The branch configuration dictionary.
Returns
-------
A hex string of the first 8 bytes (16 hex chars) of the SHA-256 hash.
"""
canonical = json.dumps(
{
"input_draw": input_draw,
"random_seed": random_seed,
"branch_configuration": branch_configuration,
},
sort_keys=True,
separators=(",", ":"),
)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
[docs]
class JobParameters(NamedTuple):
"""Parameters for a single distributed simulation job."""
model_specification: str
branch_configuration: dict[str, Any]
input_draw: int
random_seed: int
results_path: str
worker_logging_root: str
backup_configuration: dict[str, Any]
extras: dict[str, Any]
@property
def task_id(self) -> str:
"""Deterministic task ID derived from job-specific parameters."""
return generate_task_id(self.input_draw, self.random_seed, self.branch_configuration)
@property
def shared(self) -> dict[str, Any]:
"""Parameters shared by all jobs in a psimulate run."""
return {
"model_specification": self.model_specification,
"results_path": self.results_path,
"worker_logging_root": self.worker_logging_root,
"backup_configuration": self.backup_configuration,
}
@property
def job_specific(self) -> dict[str, Any]:
"""Parameters that vary by job in a psimulate run."""
return {
**self.branch_configuration,
"input_draw": self.input_draw,
"random_seed": self.random_seed,
}
@property
def sim_config(self) -> dict[str, Any]:
"""Parameters for the simulation configuration."""
config = defaultdict(dict, deepcopy(self.branch_configuration))
config["randomness"]["random_seed"] = self.random_seed
config["input_data"]["input_draw_number"] = self.input_draw
return dict(config)
[docs]
def to_dict(self) -> dict[str, Any]:
# I will never understand why this is a private
# method of named tuples.
return self._asdict()
def __repr__(self) -> str:
return (
f"({self.input_draw}, {self.random_seed}, "
f"{self.model_specification}, {self.branch_configuration})"
)
[docs]
def build_job_list(
model_specification_path: Path,
output_root: Path,
keyspace: branches.Keyspace,
finished_sim_metadata: pd.DataFrame,
backup_freq: int | None,
backup_dir: Path,
backup_metadata_path: Path,
worker_logging_root: Path,
extras: dict[str, Any],
) -> tuple[list[JobParameters], int]:
jobs: list[JobParameters] = []
number_already_completed = 0
for input_draw, random_seed, branch_config in keyspace:
parameters = JobParameters(
model_specification=str(model_specification_path),
branch_configuration=branch_config,
input_draw=int(input_draw),
random_seed=int(random_seed),
results_path=str(output_root),
worker_logging_root=str(worker_logging_root),
backup_configuration={
"backup_dir": backup_dir,
"backup_freq": backup_freq,
"backup_metadata_path": backup_metadata_path,
},
extras=extras,
)
if already_complete(parameters, finished_sim_metadata):
number_already_completed += 1
else:
jobs.append(parameters)
np.random.shuffle(jobs) # type: ignore [arg-type]
return jobs, number_already_completed
[docs]
def already_complete(
job_parameters: JobParameters, finished_sim_metadata: pd.DataFrame
) -> bool:
if finished_sim_metadata.empty:
return False
job_parameter_list = collapse_nested_dict(job_parameters.job_specific)
mask = pd.Series(True, index=finished_sim_metadata.index)
for k, v in job_parameter_list:
if isinstance(v, float):
mask &= np.isclose(finished_sim_metadata[k], v)
else:
mask &= finished_sim_metadata[k] == v
return bool(np.any(mask))