Source code for vivarium.cluster_tools.vipin.perf_report

"""
=====================
Performance Reporting
=====================

Tools for summarizing and reporting performance information.

"""

import json
import re
from pathlib import Path
from typing import Generator

import pandas as pd
from loguru import logger

BASE_PERF_INDEX_COLS = ["host", "job_number", "task_number", "draw", "seed"]

# The number of scenario columns beyond which we shorten the scenarios to a single string
COMPOUND_SCENARIO_COL_COUNT = 2


[docs] class PerformanceSummary: """A class to implement a getter for data in the workers' performance logs. Given a Path, a PerformanceSummary class provides a generator to get at each entry in the workers' performance logs. The class also provides a method to get all entries in a pd.DataFrame. This class is intended as a singleton to provide data about a single Vivarium simulation run. Attributes ---------- log_dir Path of log_dir errors Number of errors encountered while parsing logs """ def __init__(self, log_dir: Path): self.log_dir: Path = log_dir self.errors: int = 0
[docs] def get_summaries(self) -> Generator[pd.DataFrame, None, None]: """Generator to get all performance summary log messages in PerformanceSummary""" for log in [ f for f in self.log_dir.iterdir() if self.PERF_LOG_PATTERN.fullmatch(f.name) ]: with log.open("r") as f: count: int = 0 for line in f.readlines(): count += 1 try: message = json.loads(line)["record"]["message"] except Exception as e: logger.warning( f"Exception: {e}. Malformed message in {log} line {count}, skipping..." ) self.errors += 1 continue m = self.TELEMETRY_PATTERN.fullmatch(str(message)) if m: yield pd.json_normalize(json.loads(message), sep="_")
[docs] def to_df(self) -> pd.DataFrame: perf_data: list[pd.DataFrame] = [] for item in self.get_summaries(): perf_data.append(item) if len(perf_data) < 1: return pd.DataFrame() perf_df = pd.concat(perf_data) # Convert the Unix timestamps to datetimes for col in [col for col in perf_df.columns if col.startswith("event_")]: perf_df[col] = pd.to_datetime(perf_df[col], unit="s") # Remove trailing "_scenario" from normalized label perf_df.columns = perf_df.columns.str.replace("_scenario", "", regex=False) return perf_df
TELEMETRY_PATTERN = re.compile(r"^{\"host\".+\"job_number\".+}$") PERF_LOG_PATTERN = re.compile(r"^perf\.[0-9a-f]{16}\.log$")
[docs] def clean_perf_logs(self) -> None: """Remove all performance logs from the log_dir (after to_df has been called)""" for log in [ f for f in self.log_dir.iterdir() if self.PERF_LOG_PATTERN.fullmatch(f.name) ]: log.unlink()
[docs] def set_index_scenario_cols(perf_df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]: """Get the columns useful to index performance data by.""" scenario_cols = [col for col in perf_df.columns if col.startswith("scenario_")] index_cols = list(BASE_PERF_INDEX_COLS) + scenario_cols perf_df = perf_df.set_index(index_cols) return perf_df, scenario_cols
[docs] def report_performance( input_directory: Path | str, output_directory: Path | str, output_hdf: bool, verbose: int, ) -> pd.DataFrame | None: """Main method for vipin reporting. Gets job performance data, outputs to a file, and logs a report. """ input_directory, output_directory = Path(input_directory), Path(output_directory) perf_summary = PerformanceSummary(input_directory) perf_df = perf_summary.to_df() if len(perf_df) < 1: logger.warning(f"No performance data found in {input_directory}.") return None # nothing left to do # Set index to include branch configuration/scenario columns perf_df, scenario_cols = set_index_scenario_cols(perf_df) # Write to file out_file = output_directory / "log_summary" if output_hdf: out_file = out_file.with_suffix(".hdf") perf_df.to_hdf(out_file, key="worker_data") else: out_file = out_file.with_suffix(".csv") perf_df.to_csv(out_file) # Clean up performance logs perf_summary.clean_perf_logs() if verbose: print_stat_report(perf_df, scenario_cols) if perf_summary.errors > 0: logger.warning( f'{perf_summary.errors} log row{"s were" if perf_summary.errors > 1 else " was"} unreadable.' ) logger.info( f'Performance summary {"hdf" if output_hdf else "csv"} can be found at {out_file}, with ' f'{perf_df.shape[0]} row{"s" if perf_df.shape[0] > 1 else ""}.' ) return perf_df