Source code for vivarium_cluster_tools.vipin.perf_report
"""
=====================
Performance Reporting
=====================
Tools for summarizing and reporting performance information.
"""
import json
import re
from pathlib import Path
from typing import Generator
import pandas as pd
from loguru import logger
BASE_PERF_INDEX_COLS = ["host", "job_number", "task_number", "draw", "seed"]
# The number of scenario columns beyond which we shorten the scenarios to a single string
COMPOUND_SCENARIO_COL_COUNT = 2
[docs]
class PerformanceSummary:
"""A class to implement a getter for data in the workers' performance logs.
Given a Path, a PerformanceSummary class provides a generator to get at each
entry in the workers' performance logs. The class also provides a method
to get all entries in a pd.DataFrame. This class is intended as a singleton
to provide data about a single Vivarium simulation run.
Attributes
----------
log_dir
Path of log_dir
errors
Number of errors encountered while parsing logs
"""
def __init__(self, log_dir: Path):
self.log_dir: Path = log_dir
self.errors: int = 0
[docs]
def get_summaries(self) -> Generator[pd.DataFrame, None, None]:
"""Generator to get all performance summary log messages in PerformanceSummary"""
for log in [
f for f in self.log_dir.iterdir() if self.PERF_LOG_PATTERN.fullmatch(f.name)
]:
with log.open("r") as f:
count: int = 0
for line in f.readlines():
count += 1
try:
message = json.loads(line)["record"]["message"]
except Exception as e:
logger.warning(
f"Exception: {e}. Malformed message in {log} line {count}, skipping..."
)
self.errors += 1
continue
m = self.TELEMETRY_PATTERN.fullmatch(str(message))
if m:
yield pd.json_normalize(json.loads(message), sep="_")
[docs]
def to_df(self) -> pd.DataFrame:
perf_data: list[pd.DataFrame] = []
for item in self.get_summaries():
perf_data.append(item)
if len(perf_data) < 1:
return pd.DataFrame()
perf_df = pd.concat(perf_data)
# Convert the Unix timestamps to datetimes
for col in [col for col in perf_df.columns if col.startswith("event_")]:
perf_df[col] = pd.to_datetime(perf_df[col], unit="s")
# Remove trailing "_scenario" from normalized label
perf_df.columns = perf_df.columns.str.replace("_scenario", "", regex=False)
return perf_df
TELEMETRY_PATTERN = re.compile(r"^{\"host\".+\"job_number\".+}$")
PERF_LOG_PATTERN = re.compile(r"^perf\.[0-9a-f]{16}\.log$")
[docs]
def clean_perf_logs(self) -> None:
"""Remove all performance logs from the log_dir (after to_df has been called)"""
for log in [
f for f in self.log_dir.iterdir() if self.PERF_LOG_PATTERN.fullmatch(f.name)
]:
log.unlink()
[docs]
def set_index_scenario_cols(perf_df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
"""Get the columns useful to index performance data by."""
scenario_cols = [col for col in perf_df.columns if col.startswith("scenario_")]
index_cols = list(BASE_PERF_INDEX_COLS) + scenario_cols
perf_df = perf_df.set_index(index_cols)
return perf_df, scenario_cols
[docs]
def print_stat_report(perf_df: pd.DataFrame, scenario_cols: list[str]) -> None:
"""Print some helpful stats from the performance data.
The stats are grouped by scenario_cols.
"""
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.options.display.float_format = "{:.2f}".format
do_compound = len(scenario_cols) > COMPOUND_SCENARIO_COL_COUNT
perf_df = perf_df.reset_index()
if do_compound:
logger.info(
f"compound scenario:\n({'/'.join([s.replace('scenario_', '') for s in scenario_cols])}):"
)
perf_df["compound_scenario"] = (
perf_df[scenario_cols]
.to_csv(header=False, index=False, sep="/")
.strip("\n")
.split("\n")
)
# Print execution times stats by scenario
idx = ["compound_scenario"] if do_compound else list(scenario_cols)
cols = idx + ["measure", "value"]
if idx:
temp = perf_df.set_index(idx).filter(like="exec_time_").stack().reset_index()
else:
temp = perf_df.filter(like="exec_time_").stack().reset_index()
temp = temp.drop(columns=["level_0"], errors="ignore")
temp.columns = cols
cols = [c for c in cols if c != "value"]
report_df = temp.groupby(cols).describe()
report_df.columns = report_df.columns.droplevel()
report_df = report_df.drop(["count", "25%", "50%", "75%"], axis=1)
report_df = report_df.reset_index()
# Abbreviate execution time measures for printing
report_df["measure"] = report_df["measure"].replace("^exec_time_", "", regex=True)
report_df["measure"] = report_df["measure"].replace(
"^simulant_initialization", "sim_init", regex=True
)
report_df["measure"] = report_df["measure"].replace("minutes$", "min", regex=True)
report_df["measure"] = report_df["measure"].replace("seconds", "s", regex=True)
report_df = report_df.set_index(cols).sort_index()
logger.info(f"\n{report_df}")
[docs]
def report_performance(
input_directory: Path | str,
output_directory: Path | str,
output_hdf: bool,
verbose: int,
) -> pd.DataFrame | None:
"""Main method for vipin reporting.
Gets job performance data, outputs to a file, and logs a report.
"""
input_directory, output_directory = Path(input_directory), Path(output_directory)
perf_summary = PerformanceSummary(input_directory)
perf_df = perf_summary.to_df()
if len(perf_df) < 1:
logger.warning(f"No performance data found in {input_directory}.")
return None # nothing left to do
# Set index to include branch configuration/scenario columns
perf_df, scenario_cols = set_index_scenario_cols(perf_df)
# Write to file
out_file = output_directory / "log_summary"
if output_hdf:
out_file = out_file.with_suffix(".hdf")
perf_df.to_hdf(out_file, key="worker_data")
else:
out_file = out_file.with_suffix(".csv")
perf_df.to_csv(out_file)
# Clean up performance logs
perf_summary.clean_perf_logs()
if verbose:
print_stat_report(perf_df, scenario_cols)
if perf_summary.errors > 0:
logger.warning(
f'{perf_summary.errors} log row{"s were" if perf_summary.errors > 1 else " was"} unreadable.'
)
logger.info(
f'Performance summary {"hdf" if output_hdf else "csv"} can be found at {out_file}, with '
f'{perf_df.shape[0]} row{"s" if perf_df.shape[0] > 1 else ""}.'
)
return perf_df