From 73c20f1a85dc4dbaeaf7f3ccb1a6e5f80b741814 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 23 Jan 2026 12:10:37 -0500 Subject: [PATCH 01/12] Add submission checker base code --- mlpstorage/submission_checker/__init__.py | 0 .../submission_checker/checks/__init__.py | 0 mlpstorage/submission_checker/checks/base.py | 51 +++++++++ .../configuration/__init__.py | 0 .../configuration/configuration.py | 4 + mlpstorage/submission_checker/constants.py | 9 ++ mlpstorage/submission_checker/loader.py | 86 ++++++++++++++ mlpstorage/submission_checker/main.py | 80 +++++++++++++ .../submission_checker/parsers/__init__.py | 0 mlpstorage/submission_checker/results.py | 65 +++++++++++ mlpstorage/submission_checker/utils.py | 108 ++++++++++++++++++ 11 files changed, 403 insertions(+) create mode 100644 mlpstorage/submission_checker/__init__.py create mode 100644 mlpstorage/submission_checker/checks/__init__.py create mode 100644 mlpstorage/submission_checker/checks/base.py create mode 100644 mlpstorage/submission_checker/configuration/__init__.py create mode 100644 mlpstorage/submission_checker/configuration/configuration.py create mode 100644 mlpstorage/submission_checker/constants.py create mode 100644 mlpstorage/submission_checker/loader.py create mode 100644 mlpstorage/submission_checker/main.py create mode 100644 mlpstorage/submission_checker/parsers/__init__.py create mode 100644 mlpstorage/submission_checker/results.py create mode 100644 mlpstorage/submission_checker/utils.py diff --git a/mlpstorage/submission_checker/__init__.py b/mlpstorage/submission_checker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mlpstorage/submission_checker/checks/__init__.py b/mlpstorage/submission_checker/checks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mlpstorage/submission_checker/checks/base.py b/mlpstorage/submission_checker/checks/base.py new file mode 100644 index 00000000..00bebee0 --- /dev/null +++ b/mlpstorage/submission_checker/checks/base.py @@ -0,0 +1,51 @@ +from abc import ABC, abstractmethod + + +class BaseCheck(ABC): + """ + A generic check class meant to be inherited by concrete check implementations. + Subclasses must register their check methods into `self.checks`. + """ + + def __init__(self, log, path): + self.checks = [] + self.log = log + self.path = path + self.name = "base checks" + pass + + def run_checks(self): + """ + Execute all registered checks. Returns True if all checks pass, False otherwise. + """ + valid = True + errors = [] + for check in self.checks: + try: + v = self.execute(check) + valid &= v + except BaseException: + valid &= False + self.log.error( + "Exception occurred in %s while running %s in %s", + self.path, + check.__name__, + self.__class__.__name__) + return valid + + def execute(self, check): + """Custom execution of a single check method.""" + return check() + + def __call__(self): + """Allows the check instance to be called like a function.""" + self.log.info("Starting %s for: %s", self.name, self.path) + valid = self.run_checks() + if valid: + self.log.info("All %s checks passed for: %s", self.name, self.path) + else: + self.log.error( + "Some %s Checks failed for: %s", + self.name, + self.path) + return valid diff --git a/mlpstorage/submission_checker/configuration/__init__.py b/mlpstorage/submission_checker/configuration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mlpstorage/submission_checker/configuration/configuration.py b/mlpstorage/submission_checker/configuration/configuration.py new file mode 100644 index 00000000..b51a5051 --- /dev/null +++ b/mlpstorage/submission_checker/configuration/configuration.py @@ -0,0 +1,4 @@ + +class Config: + def __init__(self): + pass \ No newline at end of file diff --git a/mlpstorage/submission_checker/constants.py b/mlpstorage/submission_checker/constants.py new file mode 100644 index 00000000..028401e1 --- /dev/null +++ b/mlpstorage/submission_checker/constants.py @@ -0,0 +1,9 @@ + +VERSIONS = ["v2.0", "v3.0"] +VALID_DIVISIONS = ["open", "closed"] + +SYSTEM_PATH = { + "v2.0": "{division}/{submitter}/systems/{system}.json", + "v3.0": "{division}/{submitter}/systems/{system}.json", + "default": "{division}/{submitter}/systems/{system}.json", +} \ No newline at end of file diff --git a/mlpstorage/submission_checker/loader.py b/mlpstorage/submission_checker/loader.py new file mode 100644 index 00000000..e6687aeb --- /dev/null +++ b/mlpstorage/submission_checker/loader.py @@ -0,0 +1,86 @@ + +import os +from typing import Generator, Literal +from .utils import * +from .constants import * +import logging +from dataclasses import dataclass + +@dataclass +class SubmissionLogs: + """Container for parsed submission log artifacts and metadata. + + The `SubmissionLogs` class holds references to parsed log files and + associated metadata for a single submission. It serves as a data + transfer object passed between loading and validation phases. + """ + datagen_files: dict + run_files: dict + system_file: dict + +class Loader: + """Loads and parses submission artifacts from the filesystem. + + The `Loader` class traverses the submission directory structure, + identifies valid submissions, and parses their log files and metadata. + It yields `SubmissionLogs` objects for each valid submission found, + handling version-specific path formats and optional artifacts. + """ + def __init__(self, root, version) -> None: + """Initialize the submission loader. + + Sets up path templates based on the MLPerf version and root + directory. + + Args: + root (str): Root directory containing submissions. + version (str): MLPerf version for path resolution. + """ + self.root = root + self.version = version + self.logger = logging.getLogger("Loader") + self.system_log_path = os.path.join( + self.root, SYSTEM_PATH.get( + version, SYSTEM_PATH["default"])) + + def load_single_log(self, path, log_type): + pass + + def load_datagen_files(self): + pass + + def load_run_files(self): + pass + + def load(self) -> Generator[SubmissionLogs, None, None]: + # Iterate over submission folder. + # Division -> submitter -> system -> benchmark -> runs + for division in list_dir(self.root): + if division not in VALID_DIVISIONS: + continue + division_path = os.path.join(self.root, division) + for submitter in list_dir(division_path): + results_path = os.path.join( + division_path, submitter, "results") + for system in list_dir(results_path): + system_path = os.path.join(results_path, system) + system_file_path = self.system_log_path.format(division = division, submitter = submitter, system = system) + system_file = self.load_single_log(system_file_path, "System") + for benchmark in list_dir(system_path): + datagen_path = os.path.join(system_path, "datagen") + run_path = os.path.join(system_path, "run") + datagen_files_agg = {} + run_files_agg = {} + for timestamp in datagen_path: + timestamp_path = os.path.join(datagen_path, timestamp) + datagen_files = self.load_datagen_files() + + for timestamp in run_path: + run_path = os.path.join(datagen_path, timestamp) + run_files = self.load_run_files() + + yield SubmissionLogs(datagen_files_agg, run_files_agg, system_file) + + + + diff --git a/mlpstorage/submission_checker/main.py b/mlpstorage/submission_checker/main.py new file mode 100644 index 00000000..62660243 --- /dev/null +++ b/mlpstorage/submission_checker/main.py @@ -0,0 +1,80 @@ +import argparse +import logging +import os +import sys + +# Constants +from .constants import * + +# Import config +from .configuration.configuration import Config + +# Import loader +from .loader import Loader + +# Import checkers +from checks.base import BaseCheck + +# Import result exporter +from .results import ResultExporter + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("main") + +def get_args(): + """Parse command-line arguments for the submission checker. + + Sets up an ArgumentParser with options for input directory, version, + filtering, output files, and various skip flags for different checks. + + Returns: + argparse.Namespace: Parsed command-line arguments. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--input", required=True, help="submission directory") + parser.add_argument( + "--version", + default="v5.1", + choices=list(VERSIONS), + help="mlperf version", + ) + args = parser.parse_args() + return args + +def main(): + """Run the MLPerf submission checker on the provided directory. + + Parses arguments, initializes configuration and loader, iterates + through all submissions, runs validation checks (performance, + accuracy, system, measurements, power), collects results, and + exports summaries. Logs pass/fail status and statistics. + + Returns: + int: 0 if all submissions pass checks, 1 if any errors found. + """ + args = get_args() + + config = Config() + + loader = Loader(args.input, args.version) + exporter = ResultExporter(args.csv, config) + + results = {} + systems = {} + # Main loop over all the submissions + for logs in loader.load(): + # TODO: Initialize checkers + + # TODO: Run checks + valid = True + + # TODO: Add results to summary + if valid: + exporter.add_result(logs) + + # Export results + exporter.export() + + # TODO: Output result summary to console + + diff --git a/mlpstorage/submission_checker/parsers/__init__.py b/mlpstorage/submission_checker/parsers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mlpstorage/submission_checker/results.py b/mlpstorage/submission_checker/results.py new file mode 100644 index 00000000..ec670c44 --- /dev/null +++ b/mlpstorage/submission_checker/results.py @@ -0,0 +1,65 @@ +from .loader import SubmissionLogs + + +class ResultExporter: + """Exports submission validation results to CSV format. + + The `ResultExporter` class collects validated submission data and + exports it to a CSV file with standardized columns for MLPerf + submission summaries. It handles both performance and power results, + duplicating rows for power submissions with power-specific metrics. + """ + def __init__(self, csv_path, config) -> None: + """Initialize the result exporter. + + Sets up the CSV header columns and prepares for result collection. + + Args: + csv_path (str): Path to the output CSV file. + config (Config): Configuration helper for model mappings. + """ + self.head = [] + self.rows = [] + self.csv_path = csv_path + self.config = config + + def add_result(self, submission_logs: SubmissionLogs): + """Add a validated submission result to the export queue. + + Extracts relevant fields from submission logs and system JSON, + formats them into a CSV row, and appends to the rows list. For + power submissions, adds an additional row with power metrics. + + Args: + submission_logs (SubmissionLogs): Validated submission data + and metadata. + """ + row = {key: "" for key in self.head} + # TODO: extract values from submission logs + self.rows.append(row.copy()) + + def export_row(self, row: dict): + """Write a single result row to the CSV file. + + Formats the row dictionary into a quoted CSV line and appends it + to the output file. + + Args: + row (dict): Result row data keyed by column headers. + """ + values = [f'"{row.get(key, "")}"' for key in self.head] + csv_row = ",".join(values) + "\n" + with open(self.csv_path, "+a") as csv: + csv.write(csv_row) + + def export(self): + """Export all accumulated results to the CSV file. + + Writes the header row first, then iterates through all collected + rows, exporting each one. + """ + csv_header = ",".join(self.head) + "\n" + with open(self.csv_path, "w") as csv: + csv.write(csv_header) + for row in self.rows: + self.export_row(row) \ No newline at end of file diff --git a/mlpstorage/submission_checker/utils.py b/mlpstorage/submission_checker/utils.py new file mode 100644 index 00000000..e881b279 --- /dev/null +++ b/mlpstorage/submission_checker/utils.py @@ -0,0 +1,108 @@ +import os + + +def list_dir(*path): + path = os.path.join(*path) + return sorted([f for f in os.listdir( + path) if os.path.isdir(os.path.join(path, f))]) + + +def list_files(*path): + path = os.path.join(*path) + return sorted([f for f in os.listdir( + path) if os.path.isfile(os.path.join(path, f))]) + + +def list_empty_dirs_recursively(*path): + path = os.path.join(*path) + return [dirpath for dirpath, dirs, files in os.walk( + path) if not dirs and not files] + + +def list_dirs_recursively(*path): + path = os.path.join(*path) + return [dirpath for dirpath, dirs, files in os.walk(path)] + + +def list_files_recursively(*path): + path = os.path.join(*path) + return [ + os.path.join(dirpath, file) + for dirpath, dirs, files in os.walk(path) + for file in files + ] + + +def files_diff(list1, list2, optional=None): + """returns a list of files that are missing or added.""" + if not optional: + optional = [] + optional = optional + ["mlperf_log_trace.json", "results.json", ".gitkeep"] + return set(list1).symmetric_difference(set(list2)) - set(optional) + + +def check_extra_files(path, target_files): + missing_files = [] + check_pass = True + folders = list_dir(path) + for dir in target_files.keys(): + if dir not in folders: + check_pass = False + missing_files.append(os.path.join(path, dir)) + else: + files = [f.split(".")[0] + for f in list_files(os.path.join(path, dir))] + for target_file in target_files[dir]: + if target_file not in files: + check_pass = False + missing_files.append( + f"{os.path.join(path, dir, target_file)}.png") + if "captions" not in files: + missing_files.append( + f"{os.path.join(path, dir, 'captions.txt')}") + return check_pass, missing_files + + +def split_path(m): + return m.replace("\\", "/").split("/") + + +def get_boolean(s): + if s is None: + return False + elif isinstance(s, bool): + return s + elif isinstance(s, str): + return s.lower() == "true" + elif isinstance(s, int): + return bool(s) + else: + raise TypeError( + f"Variable should be bool, string or int, got {type(s)} instead" + ) + + +def merge_two_dict(x, y): + z = x.copy() + for key in y: + if key not in z: + z[key] = y[key] + else: + z[key] += y[key] + return z + + +def sum_dict_values(x): + count = 0 + for key in x: + count += x[key] + return count + + +def is_number(s): + try: + float(s) + return True + except ValueError: + return False + From b9ee299da64f5c0f827536ecbb497d488b1254ac Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:54:04 -0500 Subject: [PATCH 02/12] Add parsers --- .../submission_checker/parsers/json_parser.py | 55 ++++++++++++++++ .../submission_checker/parsers/yaml_parser.py | 62 +++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 mlpstorage/submission_checker/parsers/json_parser.py create mode 100644 mlpstorage/submission_checker/parsers/yaml_parser.py diff --git a/mlpstorage/submission_checker/parsers/json_parser.py b/mlpstorage/submission_checker/parsers/json_parser.py new file mode 100644 index 00000000..466c208c --- /dev/null +++ b/mlpstorage/submission_checker/parsers/json_parser.py @@ -0,0 +1,55 @@ +import json +import logging +from typing import Literal + + +class JSONParser: + """Parse a JSON summary file and provide dict-like access. + + Example: + p = SummaryParser("summary.json") + value = p["some_key"] + """ + + def __init__( + self, + path, + name: Literal["Summary", "Metadata", "System"] = "Summary", + ): + self.path = path + self.name = name + self.logger = logging.getLogger(self.name) + self.d = {} + try: + with open(path, "r", encoding="utf-8") as f: + self.d = json.load(f) + except FileNotFoundError: + self.logger.error("Summary file not found: %s", path) + except json.JSONDecodeError as exc: + self.logger.error("Invalid JSON in summary file %s: %s", path, exc) + if not isinstance(self.d, dict): + # normalize to dict for consistent API + self.d = {"summary": self.d} + self.keys = set(self.d.keys()) + + def __getitem__(self, key): + """Return the value for `key` or None if missing.""" + return self.d.get(key) + + def get(self, key, default=None): + """Return the value for `key`, or `default` if not present.""" + return self.d.get(key, default) + + def get_dict(self): + """Return the full parsed JSON as a dict.""" + return self.d + + def get_keys(self): + """Return a set of top-level keys in the summary.""" + return self.keys + + def __contains__(self, key): + return key in self.messages + + def __repr__(self): + return f"" diff --git a/mlpstorage/submission_checker/parsers/yaml_parser.py b/mlpstorage/submission_checker/parsers/yaml_parser.py new file mode 100644 index 00000000..8bba55b9 --- /dev/null +++ b/mlpstorage/submission_checker/parsers/yaml_parser.py @@ -0,0 +1,62 @@ +import logging +from typing import Literal +import yaml + + + +class YamlParser: + """Parse a YAML summary file and provide dict-like access. + + Example: + p = YamlParser("summary.yaml") + value = p["some_key"] + """ + + def __init__( + self, + path, + name: Literal["Summary", "Metadata", "System"] = "Summary", + ): + self.path = path + self.name = name + self.logger = logging.getLogger(self.name) + self.d = {} + + if yaml is None: + self.logger.error("PyYAML is not installed; cannot parse YAML: %s", path) + else: + try: + with open(path, "r", encoding="utf-8") as f: + self.d = yaml.safe_load(f) + except FileNotFoundError: + self.logger.error("YAML file not found: %s", path) + except Exception as exc: + # yaml.YAMLError and others + self.logger.error("Invalid YAML in file %s: %s", path, exc) + + if not isinstance(self.d, dict): + # normalize to dict for consistent API + self.d = {"summary": self.d} + self.keys = set(self.d.keys()) + + def __getitem__(self, key): + """Return the value for `key` or None if missing.""" + return self.d.get(key) + + def get(self, key, default=None): + """Return the value for `key`, or `default` if not present.""" + return self.d.get(key, default) + + def get_dict(self): + """Return the full parsed YAML as a dict.""" + return self.d + + def get_keys(self): + """Return a set of top-level keys in the summary.""" + return self.keys + + def __contains__(self, key): + return key in self.d + + def __repr__(self): + return f"" From b0bd2fdb9e4bdff8e8709c6d63da6c975299eb72 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:54:39 -0500 Subject: [PATCH 03/12] Add directory checks --- .../checks/directory_checks.py | 473 ++++++++++++++++++ 1 file changed, 473 insertions(+) create mode 100644 mlpstorage/submission_checker/checks/directory_checks.py diff --git a/mlpstorage/submission_checker/checks/directory_checks.py b/mlpstorage/submission_checker/checks/directory_checks.py new file mode 100644 index 00000000..1afd0057 --- /dev/null +++ b/mlpstorage/submission_checker/checks/directory_checks.py @@ -0,0 +1,473 @@ + +from .base import BaseCheck +from ..constants import * +from ..configuration.configuration import Config +from ..loader import SubmissionLogs +from ..utils import * + +import os +import re +from datetime import datetime + + +class DirectoryCheck(BaseCheck): + """ + A check class for validating directory structure and related properties. + Inherits from BaseCheck and receives a config and loader instance. + """ + + def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): + """ + Initialize DirectoryChecks with configuration and loader. + + Args: + config: A Config instance containing submission configuration. + loader: A SubmissionLogs instance for accessing submission logs. + """ + # Call parent constructor with the loader's log and submission path + super().__init__(log=log, path=submissions_logs.loader_metadata.folder) + self.config = config + self.submissions_logs = submissions_logs + self.name = "directory checks" + self.datagen_path = os.path.join(self.path, "datagen") + self.run_path = os.path.join(self.path, "run") + self.checkpointing_path = os.path.join(self.path, "checkpointing") + self.init_checks() + + def init_checks(self): + self.checks = [] + mode = getattr(self.submissions_logs.loader_metadata, 'mode', 'training') + if mode == "training": + # Training mode checks + self.checks.extend([ + self.datagen_files_check, + self.datagen_dlio_config_check, + self.run_results_json_check, + self.run_files_check, + self.run_files_timestamp_check, + self.run_dlio_config_check, + self.run_duration_valid_check, + ]) + else: + # Checkpointing mode checks + self.checks.extend([ + self.checkpointing_results_json_check, + self.checkpointing_timestamps_check, + self.checkpointing_timestamp_gap_check, + self.checkpointing_files_check, + self.checkpointing_dlio_config_check, + ]) + + + def datagen_files_check(self): + """ + Check that each datagen timestamp directory contains: + - training_datagen.stdout.log + - training_datagen.stderr.log + - *output.json + - *per_epoch_stats.json + - *summary.json + - dlio.log + - dlio_config/ (subdirectory) + """ + valid = True + for _, _, timestamp in self.submissions_logs.datagen_files: + timestamp_path = os.path.join(self.datagen_path, timestamp) + files = list_files(timestamp_path) + for required_file in self.config.get_datagen_required_files(): + if not regex_matches_any(required_file, files): + self.log.error("%s not found in %s", required_file, timestamp_path) + valid = False + + # Check for dlio_config directory + for required_folder in self.config.get_datagen_required_folders(): + if required_folder not in list_dir(timestamp_path): + self.log.error("%s directory not found in %s", required_folder, timestamp_path) + valid = False + + return valid + + def datagen_dlio_config_check(self): + """ + Check that the dlio_config subdirectory in each datagen timestamp directory + contains exactly: config.yaml, hydra.yaml, and overrides.yaml (case-sensitive). + """ + valid = True + required_files = {"config.yaml", "hydra.yaml", "overrides.yaml"} + + for _, _, timestamp in self.submissions_logs.datagen_files: + dlio_config_path = os.path.join(self.datagen_path, timestamp, "dlio_config") + + if not os.path.exists(dlio_config_path): + self.log.error("dlio_config directory not found in %s", dlio_config_path) + valid = False + continue + + files = set(list_files(dlio_config_path)) + + # Check for exact match + if files != required_files: + self.log.error( + "dlio_config in %s has incorrect files. Expected %s, got %s", + dlio_config_path, + required_files, + files + ) + valid = False + + return valid + + def run_results_json_check(self): + """ + Check that there is exactly one results.json file in the run phase directory. + """ + valid = True + results_files = list_files(self.run_path) + results_json_count = sum(1 for f in results_files if f == "results.json") + + if results_json_count != 1: + self.log.error( + "Expected exactly 1 results.json file in %s, found %d", + self.run_path, + results_json_count + ) + valid = False + + return valid + + def run_files_check(self): + """ + Check that each run timestamp directory contains: + - training_run.stdout.log + - training_run.stderr.log + - *output.json + - *per_epoch_stats.json + - *summary.json + - dlio.log + - dlio_config/ (subdirectory) + """ + valid = True + for _, _, timestamp in self.submissions_logs.run_files: + timestamp_path = os.path.join(self.run_path, timestamp) + files = list_files(timestamp_path) + for required_file in self.config.get_run_required_files(): + if not regex_matches_any(required_file, files): + self.log.error("%s not found in %s", required_file, timestamp_path) + valid = False + + # Check for dlio_config directory + for required_folder in self.config.get_run_required_folders(): + if required_folder not in list_dir(timestamp_path): + self.log.error("%s directory not found in %s", required_folder, timestamp_path) + valid = False + + return valid + + def run_files_timestamp_check(self): + """ + Check that all run_files have timestamps matching format "YYYYMMDD_HHmmss" + and that there are exactly 6 of them. + """ + valid = True + timestamp_pattern = r"^\d{8}_\d{6}$" + timestamps = [] + + for _, _, timestamp in self.submissions_logs.run_files: + timestamps.append(timestamp) + if not re.match(timestamp_pattern, timestamp): + self.log.error( + "Invalid timestamp format '%s'. Expected format: YYYYMMDD_HHmmss", + timestamp + ) + valid = False + + if len(timestamps) != 6: + self.log.error( + "Expected 6 run files, but found %d. Timestamps: %s", + len(timestamps), + timestamps + ) + valid = False + + return valid + + def run_dlio_config_check(self): + """ + Check that the dlio_config subdirectory in each run timestamp directory + contains exactly: config.yaml, hydra.yaml, and overrides.yaml (case-sensitive). + """ + valid = True + required_files = {"config.yaml", "hydra.yaml", "overrides.yaml"} + + for _, _, timestamp in self.submissions_logs.run_files: + dlio_config_path = os.path.join(self.run_path, timestamp, "dlio_config") + + if not os.path.exists(dlio_config_path): + self.log.error("dlio_config directory not found in %s", dlio_config_path) + valid = False + continue + + files = set(list_files(dlio_config_path)) + + # Check for exact match + if files != required_files: + self.log.error( + "dlio_config in %s has incorrect files. Expected %s, got %s", + dlio_config_path, + required_files, + files + ) + valid = False + + return valid + + def run_duration_valid_check(self): + """ + Check that the gap between consecutive timestamp directories is less than + the duration of a single run. The gap must be short enough to ensure there + was no benchmark activity between consecutive runs. + + Compares the time delta between consecutive run directory names with the + duration of each individual run (from start to end time). + """ + valid = True + + # Parse all run data: (run_dict, _, timestamp_dir_name) + run_dir_time = [] + max_gap = float("inf") + for run_dict, _, timestamp_dir in self.submissions_logs.run_files: + try: + # Parse timestamps from run_dict + start_time = datetime.fromisoformat(run_dict["start"]) + end_time = datetime.fromisoformat(run_dict["end"]) + + # Parse the directory timestamp (YYYYMMDD_HHmmss format) + dir_time = datetime.strptime(timestamp_dir, "%Y%m%d_%H%M%S") + + run_duration = end_time - start_time + if run_duration < max_gap: + max_gap = run_duration + + run_dir_time.append(dir_time) + except (ValueError, KeyError, TypeError) as e: + self.log.error( + "Failed to parse timestamp data for %s: %s", + timestamp_dir, + str(e) + ) + valid = False + continue + + # Check gaps between consecutive runs + for i in range(len(run_dir_time) - 1): + current_run = run_dir_time[i] + next_run = run_dir_time[i + 1] + + # Calculate gap between end of current run and start of next run + gap = next_run - current_run + + # Gap should be less than the max gap + if gap >= max_gap: + self.log.error( + "Gap between runs is %s, which is >= the run duration %s. " + "Benchmark activity between runs can't be discarted.", + gap, + max_gap + ) + valid = False + + return valid + + def checkpointing_results_json_check(self): + """ + Check that there is exactly one results.json file in each workload directory + within the checkpointing directory hierarchy. + """ + valid = True + + if not hasattr(self.submissions_logs, 'checkpointing_files') or not self.submissions_logs.checkpointing_files: + self.log.warning("No checkpointing files found in submission logs") + return valid + + # Get workload directories + workload_dirs = list_dir(self.checkpointing_path) + + for workload_dir in workload_dirs: + workload_path = os.path.join(self.checkpointing_path, workload_dir) + results_files = list_files(workload_path) + results_json_count = sum(1 for f in results_files if f == "results.json") + + if results_json_count != 1: + self.log.error( + "Expected exactly 1 results.json in %s, found %d", + workload_path, + results_json_count + ) + valid = False + + return valid + + def checkpointing_timestamps_check(self): + """ + Check that there are exactly 10 timestamp directories in YYYYMMDD_HHmmss format + within the workload directories in the checkpointing hierarchy. + """ + valid = True + timestamp_pattern = r"^\d{8}_\d{6}$" + + if not hasattr(self.submissions_logs, 'checkpointing_files') or not self.submissions_logs.checkpointing_files: + self.log.warning("No checkpointing files found in submission logs") + return valid + + # Get workload directories + workload_dirs = list_dir(self.checkpointing_path) + + for workload_dir in workload_dirs: + workload_path = os.path.join(self.checkpointing_path, workload_dir) + timestamp_dirs = list_dir(workload_path) + + # Validate format of each timestamp directory + for timestamp_dir in timestamp_dirs: + if not re.match(timestamp_pattern, timestamp_dir): + self.log.error( + "Invalid timestamp format '%s' in %s. Expected format: YYYYMMDD_HHmmss", + timestamp_dir, + workload_path + ) + valid = False + + # Check count + if len(timestamp_dirs) != 10: + self.log.error( + "Expected 10 timestamp directories in %s, found %d", + workload_path, + len(timestamp_dirs) + ) + valid = False + + return valid + + def checkpointing_timestamp_gap_check(self): + """ + Check that the gap between consecutive timestamp directories is less than + the duration of a single checkpoint run. + """ + valid = True + + if not hasattr(self.submissions_logs, 'checkpointing_files') or not self.submissions_logs.checkpointing_files: + self.log.warning("No checkpointing files found in submission logs") + return valid + + # Parse all checkpoint run data + checkpoint_run_data = [] + max_gap = float("inf") + + for checkpoint_dict, _, timestamp_dir in self.submissions_logs.checkpointing_files: + try: + # Parse timestamps from checkpoint_dict + start_time = datetime.fromisoformat(checkpoint_dict["start"]) + end_time = datetime.fromisoformat(checkpoint_dict["end"]) + + # Parse the directory timestamp (YYYYMMDD_HHmmss format) + dir_time = datetime.strptime(timestamp_dir, "%Y%m%d_%H%M%S") + + run_duration = end_time - start_time + if run_duration < max_gap: + max_gap = run_duration + + checkpoint_run_data.append(dir_time) + except (ValueError, KeyError, TypeError) as e: + self.log.error( + "Failed to parse timestamp data for checkpointing %s: %s", + timestamp_dir, + str(e) + ) + valid = False + continue + + # Sort timestamps to check gaps + checkpoint_run_data.sort() + + # Check gaps between consecutive checkpoints + for i in range(len(checkpoint_run_data) - 1): + gap = checkpoint_run_data[i + 1] - checkpoint_run_data[i] + + if gap >= max_gap: + self.log.error( + "Gap between checkpoints is %s, which is >= the checkpoint duration %s. " + "Benchmark activity between checkpoints can't be discarded.", + gap, + max_gap + ) + valid = False + + return valid + + def checkpointing_files_check(self): + """ + Check that each checkpointing timestamp directory contains: + - checkpointing_run.stdout.log + - checkpointing_run.stderr.log + - *output.json + - *per_epoch_stats.json + - *summary.json + - dlio.log + - dlio_config/ (subdirectory) + """ + valid = True + + if not hasattr(self.submissions_logs, 'checkpointing_files') or not self.submissions_logs.checkpointing_files: + self.log.warning("No checkpointing files found in submission logs") + return valid + + for _, _, timestamp in self.submissions_logs.checkpointing_files: + timestamp_path = os.path.join(self.checkpointing_path, timestamp) + files = list_files(timestamp_path) + dirs = list_dir(timestamp_path) + + for required_file in self.config.get_checkpoint_required_files(): + if not regex_matches_any(required_file, files): + self.log.error("%s not found in %s", required_file, timestamp_path) + valid = False + + # Check for dlio_config directory + for required_folder in self.config.get_checkpoint_required_folders(): + if required_folder not in dirs: + self.log.error("%s directory not found in %s", required_folder, timestamp_path) + valid = False + + return valid + + def checkpointing_dlio_config_check(self): + """ + Check that the dlio_config subdirectory in each checkpointing timestamp directory + contains exactly: config.yaml, hydra.yaml, and overrides.yaml (case-sensitive). + """ + valid = True + required_files = {"config.yaml", "hydra.yaml", "overrides.yaml"} + + if not hasattr(self.submissions_logs, 'checkpointing_files') or not self.submissions_logs.checkpointing_files: + self.log.warning("No checkpointing files found in submission logs") + return valid + + for _, _, timestamp in self.submissions_logs.checkpointing_files: + dlio_config_path = os.path.join(self.checkpointing_path, timestamp, "dlio_config") + + if not os.path.exists(dlio_config_path): + self.log.error("dlio_config directory not found in %s", dlio_config_path) + valid = False + continue + + files = set(list_files(dlio_config_path)) + + # Check for exact match + if files != required_files: + self.log.error( + "dlio_config in %s has incorrect files. Expected %s, got %s", + dlio_config_path, + required_files, + files + ) + valid = False + + return valid From d37b35fa2f1a5efd04e432339449f498c6c041ee Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:55:01 -0500 Subject: [PATCH 04/12] Add training checks --- .../checks/training_checks.py | 410 ++++++++++++++++++ 1 file changed, 410 insertions(+) create mode 100644 mlpstorage/submission_checker/checks/training_checks.py diff --git a/mlpstorage/submission_checker/checks/training_checks.py b/mlpstorage/submission_checker/checks/training_checks.py new file mode 100644 index 00000000..09090283 --- /dev/null +++ b/mlpstorage/submission_checker/checks/training_checks.py @@ -0,0 +1,410 @@ + +from .base import BaseCheck +from ..constants import * +from ..configuration.configuration import Config +from ..loader import SubmissionLogs + +import os +import hashlib +import re + + +class TrainingCheck(BaseCheck): + """ + A check class for validating training parameters and related properties. + Inherits from BaseCheck and receives a config and loader instance. + """ + + def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): + """ + Initialize TrainingChecks with configuration and loader. + + Args: + config: A Config instance containing submission configuration. + loader: A SubmissionLogs instance for accessing submission logs. + """ + # Call parent constructor with the loader's log and submission path + super().__init__(log=log, path=submissions_logs.loader_metadata.folder) + self.config = config + self.submissions_logs = submissions_logs + self.name = "training checks" + self.datagen_path = os.path.join(self.path, "datagen") + self.run_path = os.path.join(self.path, "run") + self.checks = self.init_checks() + + def init_checks(self): + self.checks = [] + self.checks.extend([ + self.verify_datasize_usage, + self.recalculate_dataset_size, + self.datagen_minimum_size, + self.run_data_matches_datasize, + self.accelerator_utilization_check, + self.single_host_simulated_accelerators, + self.identical_accelerators_per_node, + self.closed_submission_checksum, + self.closed_submission_parameters, + self.open_submission_parameters, + self.mlpstorage_path_args, + self.mlpstorage_filesystem_check, + ]) + + def verify_datasize_usage(self): + """ + Verify that the datasize option was used by finding it in the run metadata. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + # Check if datasize-related parameters are in the metadata + params = metadata.get("args", {}) + combined_params = metadata.get("combined_params", {}) + + if not params and not combined_params: + self.log.error("No parameters found in metadata to verify datasize usage") + valid = False + continue + + # Check if dataset-related params are present + dataset_params = combined_params.get("dataset", {}) + if not dataset_params: + self.log.error("Dataset parameters not found in metadata") + valid = False + + return valid + + def recalculate_dataset_size(self): + """ + Recalculate minimum dataset size and verify it matches the run's logfile. + """ + valid = True + HOST_MEMORY_MULTIPLIER = 5 + MIN_STEPS_PER_EPOCH = 500 + + for summary, metadata, _ in self.submissions_logs.run_files: + try: + # Get parameters + combined_params = metadata.get("combined_params", {}) + dataset_params = combined_params.get("dataset", {}) + reader_params = combined_params.get("reader", {}) + + num_files_train = int(dataset_params.get("num_files_train", 0)) + num_samples_per_file = int(dataset_params.get("num_samples_per_file", 1)) + record_length = float(dataset_params.get("record_length_bytes", 0)) + batch_size = int(reader_params.get("batch_size", 1)) + + # From summary + num_accelerators = summary.get("num_accelerators", 1) + num_hosts = summary.get("num_hosts", 1) + host_memory_gb = summary.get("host_memory_GB", [0])[0] + + if record_length == 0: + self.log.error("Record length is 0, cannot calculate dataset size") + valid = False + continue + + # Calculate min samples from steps per epoch + num_steps_per_epoch = max(MIN_STEPS_PER_EPOCH, + num_files_train * num_samples_per_file // (batch_size * num_accelerators)) + min_samples_steps = num_steps_per_epoch * batch_size * num_accelerators + + # Calculate min samples from host memory + total_host_memory = num_hosts * host_memory_gb + min_samples_memory = (total_host_memory * HOST_MEMORY_MULTIPLIER * + 1024 * 1024 * 1024 / record_length) + + # Take max of both constraints + min_samples = max(min_samples_steps, min_samples_memory) + min_total_files = min_samples / num_samples_per_file + min_files_size_gb = min_samples * record_length / 1024 / 1024 / 1024 + + # Verify actual matches expected + actual_num_files = num_files_train + if actual_num_files < min_total_files: + self.log.error( + "Dataset size mismatch: actual files %d < minimum required %d", + actual_num_files, + int(min_total_files) + ) + valid = False + + except (KeyError, ValueError, TypeError) as e: + self.log.error("Failed to calculate dataset size: %s", str(e)) + valid = False + + return valid + + def datagen_minimum_size(self): + """ + Verify that datagen data generated >= datasize calculated. + """ + valid = True + + if not self.submissions_logs.datagen_files: + self.log.warning("No datagen files found") + return valid + + # Get expected size from run + expected_size = None + for summary, metadata, _ in self.submissions_logs.run_files: + dataset_params = metadata.get("combined_params", {}).get("dataset", {}) + num_files = int(dataset_params.get("num_files_train", 0)) + record_length = float(dataset_params.get("record_length_bytes", 0)) + num_samples_per_file = int(dataset_params.get("num_samples_per_file", 1)) + expected_size = num_files * num_samples_per_file * record_length / 1024 / 1024 / 1024 + break + + # Check datagen produced at least that much + for summary, metadata, _ in self.submissions_logs.datagen_files: + dataset_params = metadata.get("combined_params", {}).get("dataset", {}) + num_files = int(dataset_params.get("num_files_train", 0)) + record_length = float(dataset_params.get("record_length_bytes", 0)) + num_samples_per_file = int(dataset_params.get("num_samples_per_file", 1)) + datagen_size = num_files * num_samples_per_file * record_length / 1024 / 1024 / 1024 + + if expected_size and datagen_size < expected_size: + self.log.error( + "Datagen size %.2f GB is less than required %.2f GB", + datagen_size, + expected_size + ) + valid = False + + return valid + + def run_data_matches_datasize(self): + """ + Verify that run data matches the calculated datasize exactly. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + dataset_params = metadata.get("combined_params", {}).get("dataset", {}) + + # Verify dataset parameters are set correctly + num_files_train = dataset_params.get("num_files_train") + num_subfolders_train = dataset_params.get("num_subfolders_train", 0) + + if not num_files_train: + self.log.error("num_files_train not set in run parameters") + valid = False + + if num_subfolders_train == 0: + self.log.error("num_subfolders_train should match actual subfolders in dataset") + valid = False + + return valid + + def accelerator_utilization_check(self): + """ + Check that AU (Accelerator Utilization) meets minimum requirements. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + metrics = summary.get("metric", {}) + au_mean = metrics.get("train_au_mean_percentage", 0) + au_expectation = metrics.get("train_au_meet_expectation", "") + + if au_expectation != "success": + self.log.error( + "AU check failed: expected 'success', got '%s' (AU: %.2f%%)", + au_expectation, + au_mean + ) + valid = False + + return valid + + def single_host_simulated_accelerators(self): + """ + For single-host submissions, verify sufficient simulated accelerators. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + num_hosts = summary.get("num_hosts", 1) + num_accelerators = summary.get("num_accelerators", 1) + + if num_hosts == 1 and num_accelerators < 4: + self.log.warning( + "Single-host submission has only %d accelerators. Consider increasing via --num-accelerators", + num_accelerators + ) + + return valid + + def single_host_client_limit(self): + """ + For single-host submissions, fail if more than one client node used. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + num_hosts = summary.get("num_hosts", 1) + + if num_hosts == 1: + args = metadata.get("args", {}) + hosts = args.get("hosts", []) + + if len(hosts) > 1: + self.log.error( + "Single-host submission but %d client nodes specified: %s", + len(hosts), + hosts + ) + valid = False + + return valid + + def identical_accelerators_per_node(self): + """ + For distributed submissions, verify all nodes have identical accelerator count. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + num_hosts = summary.get("num_hosts", 1) + num_accelerators = summary.get("num_accelerators", 1) + + if num_hosts > 1: + # For distributed runs, accelerators should be divisible by hosts + if num_accelerators % num_hosts != 0: + self.log.error( + "Distributed submission: %d accelerators not evenly divisible by %d hosts", + num_accelerators, + num_hosts + ) + valid = False + + return valid + + def closed_submission_checksum(self): + """ + For CLOSED submissions, verify code directory MD5 checksum. + """ + # TODO + return True + + def closed_submission_parameters(self): + """ + For CLOSED submissions, verify only allowed parameters are modified. + """ + valid = True + + # Allowed parameters for CLOSED + allowed_params = { + "dataset.num_files_train", + "dataset.num_subfolders_train", + "dataset.data_folder", + "reader.read_threads", + "reader.computation_threads", + "reader.transfer_size", + "reader.prefetch_size", + "reader.odirect", + "storage.storage_root", + "storage.storage_type" + } + + for summary, metadata, _ in self.submissions_logs.run_files: + verification = metadata.get("verification", "open") + + if verification == "closed": + params_dict = metadata.get("params_dict", {}) + + for param_key in params_dict.keys(): + if param_key not in allowed_params: + self.log.error( + "CLOSED submission modifies disallowed parameter: %s", + param_key + ) + valid = False + + return valid + + def open_submission_parameters(self): + """ + For OPEN submissions, verify only allowed parameters are modified. + """ + valid = True + + # Additional allowed parameters for OPEN (beyond CLOSED) + open_allowed_params = { + "framework", + "dataset.format", + "dataset.num_samples_per_file", + "reader.data_loader" + } + + # All CLOSED params are also allowed in OPEN + closed_params = { + "dataset.num_files_train", + "dataset.num_subfolders_train", + "dataset.data_folder", + "reader.read_threads", + "reader.computation_threads", + "reader.transfer_size", + "reader.prefetch_size", + "reader.odirect", + "storage.storage_root", + "storage.storage_type" + } + + allowed_params = closed_params | open_allowed_params + + for summary, metadata, _ in self.submissions_logs.run_files: + verification = metadata.get("verification", "open") + + if verification == "open": + params_dict = metadata.get("params_dict", {}) + + for param_key in params_dict.keys(): + if param_key not in allowed_params: + self.log.error( + "OPEN submission modifies disallowed parameter: %s", + param_key + ) + valid = False + + return valid + + def mlpstorage_path_args(self): + """ + Verify dataset and output paths are set and different. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs.run_files: + args = metadata.get("args", {}) + data_dir = args.get("data_dir") + results_dir = args.get("results_dir") + + if not data_dir: + self.log.error("data_dir not set in arguments") + valid = False + + if not results_dir: + self.log.error("results_dir not set in arguments") + valid = False + + if data_dir and results_dir and data_dir == results_dir: + self.log.error( + "data_dir and results_dir must be different: both are %s", + data_dir + ) + valid = False + + return valid + + def mlpstorage_filesystem_check(self): + """ + Verify dataset and output are on different filesystems. + This would require checking 'df' output in the logfiles. + """ + valid = True + + # In a real implementation, would parse logfiles for 'df' output + # and verify the filesystems are different + self.log.info("Filesystem check (implementation pending - requires log parsing)") + + return valid From 3c30e5e594d8b22a0775e3325f7fc0358ad03e17 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:55:20 -0500 Subject: [PATCH 05/12] Add checkpointing checks --- .../checks/checkpointing_checks.py | 282 ++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 mlpstorage/submission_checker/checks/checkpointing_checks.py diff --git a/mlpstorage/submission_checker/checks/checkpointing_checks.py b/mlpstorage/submission_checker/checks/checkpointing_checks.py new file mode 100644 index 00000000..f3678ac2 --- /dev/null +++ b/mlpstorage/submission_checker/checks/checkpointing_checks.py @@ -0,0 +1,282 @@ + +from .base import BaseCheck +from ..constants import * +from ..configuration.configuration import Config +from ..loader import SubmissionLogs + +import os +import re + + +class CheckpointingCheck(BaseCheck): + """ + A check class for validating checkpointing parameters and related properties. + Inherits from BaseCheck and receives a config and loader instance. + """ + + def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): + """ + Initialize CheckpointingChecks with configuration and loader. + + Args: + config: A Config instance containing submission configuration. + loader: A SubmissionLogs instance for accessing submission logs. + """ + # Call parent constructor with the loader's log and submission path + super().__init__(log=log, path=submissions_logs.loader_metadata.folder) + self.config = config + self.submissions_logs = submissions_logs.checkpoint_files + self.name = "checkpointing checks" + self.checks = [] + self.checkpointing_path = self.path + self.init_checks() + + def init_checks(self): + """Initialize the list of checks to run.""" + self.checks = [ + self.checkpoint_data_size_ratio, + self.fsync_verification, + self.model_configuration_req, + self.closed_mpi_processes, + self.closed_accelerators_per_host, + self.aggregate_accelerator_memory, + self.closed_checkpoint_parameters, + self.checkpoint_path_args, + self.subset_run_validation, + ] + + def checkpoint_data_size_ratio(self): + """ + Verify that checkpoint data written per node > 3x node memory. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs: + checkpoint_size_gb = summary.get("metric", {}).get("checkpoint_size_GB", 0) + host_memory_gb = summary.get("host_memory_GB", [0])[0] + num_hosts = summary.get("num_hosts", 1) + + if checkpoint_size_gb == 0 or host_memory_gb == 0: + continue + + # Data written per node + data_per_node = checkpoint_size_gb / num_hosts + min_required = 3 * host_memory_gb + + if data_per_node < min_required: + self.log.warning( + "Checkpoint data per node %.2f GB < 3x memory %.2f GB. " + "Cache flush may be needed.", + data_per_node, + min_required + ) + + return valid + + def fsync_verification(self): + """ + Verify that fsync is enabled in checkpoint configuration. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs: + combined_params = metadata.get("combined_params", {}) + checkpoint_params = combined_params.get("checkpoint", {}) + fsync_enabled = checkpoint_params.get("fsync", False) + + if not fsync_enabled: + self.log.error("Checkpoint fsync is not enabled in configuration") + valid = False + + return valid + + def model_configuration_req(self): + """ + Verify benchmark uses one of the four supported models. + """ + valid = True + allowed_models = {"8b", "70b", "405b", "1t"} + + for summary, metadata, _ in self.submissions_logs: + model_name = metadata.get("args", {}).get("model", "").lower() + + # Extract just the size part (8b, 70b, etc.) + model_size = re.search(r"(8b|70b|405b|1t)", model_name) + + if not model_size or model_size.group(1) not in allowed_models: + self.log.error( + "Invalid model '%s'. Must be one of: %s", + model_name, + allowed_models + ) + valid = False + + return valid + + def closed_mpi_processes(self): + """ + For CLOSED submissions, verify MPI processes match requirements per model. + """ + valid = True + + model_process_requirements = { + "8b": 8, + "70b": 64, + "405b": 512, + "1t": 1024 + } + + for summary, metadata, _ in self.submissions_logs: + verification = metadata.get("verification", "open") + + if verification == "closed": + model_name = metadata.get("args", {}).get("model", "").lower() + num_processes = metadata.get("args", {}).get("num_processes", 0) + + model_size = re.search(r"(8b|70b|405b|1t)", model_name) + if model_size: + model_key = model_size.group(1) + required_processes = model_process_requirements.get(model_key) + + if required_processes and num_processes != required_processes: + self.log.error( + "CLOSED submission with model %s requires %d processes, got %d", + model_key, + required_processes, + num_processes + ) + valid = False + + return valid + + def closed_accelerators_per_host(self): + """ + For CLOSED submissions, verify accelerators per host > 4 and total matches requirement. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs: + verification = metadata.get("verification", "open") + + if verification == "closed": + num_accelerators = summary.get("num_accelerators", 0) + num_hosts = summary.get("num_hosts", 1) + + accelerators_per_host = num_accelerators / num_hosts if num_hosts > 0 else 0 + + if accelerators_per_host <= 4: + self.log.error( + "CLOSED submission: accelerators per host %.2f must be > 4", + accelerators_per_host + ) + valid = False + + return valid + + def aggregate_accelerator_memory(self): + """ + Verify total accelerator memory >= checkpoint size. + H100 has 80GB per accelerator. + """ + valid = True + ACCELERATOR_MEMORY_GB = 80 # H100 + + for summary, metadata, _ in self.submissions_logs: + checkpoint_size_gb = summary.get("metric", {}).get("checkpoint_size_GB", 0) + num_accelerators = summary.get("num_accelerators", 0) + + total_accelerator_memory = num_accelerators * ACCELERATOR_MEMORY_GB + + if total_accelerator_memory < checkpoint_size_gb: + self.log.error( + "Aggregate accelerator memory %.2f GB < checkpoint size %.2f GB", + total_accelerator_memory, + checkpoint_size_gb + ) + valid = False + + return valid + + def closed_checkpoint_parameters(self): + """ + For CLOSED submissions, verify only allowed parameters are modified. + """ + valid = True + + allowed_params = { + "checkpoint.checkpoint_folder" + } + + for summary, metadata, _ in self.submissions_logs: + verification = metadata.get("verification", "open") + + if verification == "closed": + params_dict = metadata.get("params_dict", {}) + + for param_key in params_dict.keys(): + if param_key not in allowed_params: + self.log.error( + "CLOSED submission modifies disallowed parameter: %s", + param_key + ) + valid = False + + return valid + + def checkpoint_path_args(self): + """ + Verify checkpoint folder and output paths are set and different. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs: + args = metadata.get("args", {}) + checkpoint_folder = args.get("checkpoint_folder") + results_dir = args.get("results_dir") + + if not checkpoint_folder: + self.log.error("checkpoint_folder not set in arguments") + valid = False + + if not results_dir: + self.log.error("results_dir not set in arguments") + valid = False + + if checkpoint_folder and results_dir and checkpoint_folder == results_dir: + self.log.error( + "checkpoint_folder and results_dir must be different: both are %s", + checkpoint_folder + ) + valid = False + + return valid + + def subset_run_validation(self): + """ + For subset runs, verify exactly 8 accelerators and not 8B model. + """ + valid = True + + for summary, metadata, _ in self.submissions_logs: + params_dict = metadata.get("params_dict", {}) + checkpoint_mode = params_dict.get("checkpoint.mode", "") + + if checkpoint_mode == "subset": + num_accelerators = summary.get("num_accelerators", 0) + model_name = metadata.get("args", {}).get("model", "").lower() + + if num_accelerators != 8: + self.log.error( + "Subset run requires exactly 8 accelerators, got %d", + num_accelerators + ) + valid = False + + if "8b" in model_name: + self.log.error( + "Subset run cannot use 8B model: %s", + model_name + ) + valid = False + + return valid From d18a468d136fde60b5825eb7bf6ce3067060e263 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:56:20 -0500 Subject: [PATCH 06/12] Add constants and configuration --- .../configuration/configuration.py | 29 ++++++++++- mlpstorage/submission_checker/constants.py | 51 +++++++++++++++++-- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/mlpstorage/submission_checker/configuration/configuration.py b/mlpstorage/submission_checker/configuration/configuration.py index b51a5051..218b10f9 100644 --- a/mlpstorage/submission_checker/configuration/configuration.py +++ b/mlpstorage/submission_checker/configuration/configuration.py @@ -1,4 +1,29 @@ +from ..constants import * class Config: - def __init__(self): - pass \ No newline at end of file + def __init__(self, version, submitters): + self.version = version + self.submitters = submitters + + def check_submitter(self, submitter): + if self.submitters is None: + return True + return submitter in self.submitters + + def get_datagen_required_files(self): + return DATAGEN_REQUIRED_FILES[self.version] + + def get_run_required_files(self): + return RUN_REQUIRED_FILES[self.version] + + def get_checkpoint_required_files(self): + return CHECKPOINT_REQUIRED_FILES[self.version] + + def get_datagen_required_folders(self): + return DATAGEN_REQUIRED_FOLDERS[self.version] + + def get_run_required_folders(self): + return RUN_REQUIRED_FOLDERS[self.version] + + def get_checkpoint_required_folders(self): + return CHECKPOINT_REQUIRED_FOLDERS[self.version] \ No newline at end of file diff --git a/mlpstorage/submission_checker/constants.py b/mlpstorage/submission_checker/constants.py index 028401e1..92797c73 100644 --- a/mlpstorage/submission_checker/constants.py +++ b/mlpstorage/submission_checker/constants.py @@ -1,9 +1,54 @@ +from .parsers.json_parser import JSONParser +from .parsers.yaml_parser import YamlParser VERSIONS = ["v2.0", "v3.0"] VALID_DIVISIONS = ["open", "closed"] SYSTEM_PATH = { - "v2.0": "{division}/{submitter}/systems/{system}.json", - "v3.0": "{division}/{submitter}/systems/{system}.json", - "default": "{division}/{submitter}/systems/{system}.json", + "v2.0": "{division}/{submitter}/systems/{system}.yaml", + "v3.0": "{division}/{submitter}/systems/{system}.yaml", + "default": "{division}/{submitter}/systems/{system}.yaml", +} + +PARSER_MAP = { + "System": YamlParser, + "Summary": JSONParser, + "Metadata": JSONParser, + "default": JSONParser +} + +DATAGEN_REQUIRED_FILES = { + "v2.0": ["training_datagen.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "v3.0": ["training_datagen.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "default": ["training_datagen.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], +} + +DATAGEN_REQUIRED_FOLDERS = { + "v2.0": ["dlio_config"], + "v3.0": ["dlio_config"], + "default": ["dlio_config"], +} + +RUN_REQUIRED_FILES = { + "v2.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "v3.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "default": ["training_run.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], +} + +RUN_REQUIRED_FOLDERS = { + "v2.0": ["dlio_config"], + "v3.0": ["dlio_config"], + "default": ["dlio_config"], +} + +CHECKPOINT_REQUIRED_FILES = { + "v2.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "v3.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "default": ["training_run.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], +} + +CHECKPOINT_REQUIRED_FOLDERS = { + "v2.0": ["dlio_config"], + "v3.0": ["dlio_config"], + "default": ["dlio_config"], } \ No newline at end of file From d0a21f74b40a11130a8ce047b7b38a9fed8f38e8 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:56:53 -0500 Subject: [PATCH 07/12] Add utils functions --- mlpstorage/submission_checker/utils.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/mlpstorage/submission_checker/utils.py b/mlpstorage/submission_checker/utils.py index e881b279..21eadd61 100644 --- a/mlpstorage/submission_checker/utils.py +++ b/mlpstorage/submission_checker/utils.py @@ -1,4 +1,23 @@ import os +import re + + +def regex_matches_any(pattern: str, strings: list) -> bool: + """ + Check if a regex pattern matches any value in a list of strings. + + Args: + pattern: A regex pattern string. + strings: A list of strings to match against. + + Returns: + True if the pattern matches at least one string, False otherwise. + """ + try: + compiled_pattern = re.compile(pattern) + return any(compiled_pattern.search(s) for s in strings) + except re.error as e: + raise ValueError(f"Invalid regex pattern: {e}") def list_dir(*path): From 1aedd7d950fdb87c5e9b44037cf6b12ad72922cd Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:57:35 -0500 Subject: [PATCH 08/12] Add functions for main loop --- mlpstorage/submission_checker/loader.py | 105 ++++++++++++++++++------ mlpstorage/submission_checker/main.py | 38 +++++++-- 2 files changed, 109 insertions(+), 34 deletions(-) diff --git a/mlpstorage/submission_checker/loader.py b/mlpstorage/submission_checker/loader.py index e6687aeb..72f821f4 100644 --- a/mlpstorage/submission_checker/loader.py +++ b/mlpstorage/submission_checker/loader.py @@ -5,6 +5,17 @@ from .constants import * import logging from dataclasses import dataclass +from .parsers.json_parser import JSONParser +from .configuration.configuration import Config + +@dataclass +class LoaderMetadata: + division: str = None + submitter: str = None + system: str = None + mode: str = None + benchmark: str = None + folder: str = None @dataclass class SubmissionLogs: @@ -14,9 +25,12 @@ class SubmissionLogs: associated metadata for a single submission. It serves as a data transfer object passed between loading and validation phases. """ - datagen_files: dict - run_files: dict - system_file: dict + datagen_files: list = None + run_files: list = None + checkpoint_files: list = None + system_file: dict = None + loader_metadata: LoaderMetadata = None + class Loader: """Loads and parses submission artifacts from the filesystem. @@ -26,7 +40,7 @@ class Loader: It yields `SubmissionLogs` objects for each valid submission found, handling version-specific path formats and optional artifacts. """ - def __init__(self, root, version) -> None: + def __init__(self, root, version, config: Config) -> None: """Initialize the submission loader. Sets up path templates based on the MLPerf version and root @@ -42,15 +56,29 @@ def __init__(self, root, version) -> None: self.system_log_path = os.path.join( self.root, SYSTEM_PATH.get( version, SYSTEM_PATH["default"])) + self.parser_map = PARSER_MAP + self.config = config def load_single_log(self, path, log_type): - pass - - def load_datagen_files(self): - pass - - def load_run_files(self): - pass + log = None + if os.path.exists(path): + self.logger.info("Loading %s log from %s", log_type, path) + log = self.parser_map.get(log_type, self.parser_map["default"])(path, log_type).get_dict() + else: + self.logger.warning( + "Could not load %s log from %s, path does not exists", + log_type, + path) + return log + + def find_metadata_path(self, path): + files = [f for f in list_files(path) if "metadata" in f] + if len(files) == 0: + self.logger.warning("Could not find metadata file at %s", path) + return os.path.join(path, "metadata.json") + elif len(files) > 1: + self.logger.warning("More than one metadata file found at %s", path) + return os.path.join(path, files[0]) def load(self) -> Generator[SubmissionLogs, None, None]: # Iterate over submission folder. @@ -60,27 +88,52 @@ def load(self) -> Generator[SubmissionLogs, None, None]: continue division_path = os.path.join(self.root, division) for submitter in list_dir(division_path): + if not self.config.check_submitter(submitter): + continue results_path = os.path.join( division_path, submitter, "results") for system in list_dir(results_path): system_path = os.path.join(results_path, system) system_file_path = self.system_log_path.format(division = division, submitter = submitter, system = system) system_file = self.load_single_log(system_file_path, "System") - for benchmark in list_dir(system_path): - datagen_path = os.path.join(system_path, "datagen") - run_path = os.path.join(system_path, "run") - datagen_files_agg = {} - run_files_agg = {} - for timestamp in datagen_path: - timestamp_path = os.path.join(datagen_path, timestamp) - datagen_files = self.load_datagen_files() - - for timestamp in run_path: - run_path = os.path.join(datagen_path, timestamp) - run_files = self.load_run_files() - - yield SubmissionLogs(datagen_files_agg, run_files_agg, system_file) - + for mode in list_dir(system_path): + mode_path = os.path.join(system_path, mode) + for benchmark in list_dir(mode_path): + benchmark_path = os.path.join(mode_path, benchmark) + loader_metadata = LoaderMetadata(division=division, submitter=submitter, system=system, mode=mode, benchmark=benchmark, folder=benchmark_path) + if mode == "training": + datagen_path = os.path.join(benchmark_path, "datagen") + run_path = os.path.join(benchmark_path, "run") + datagen_files = [] + run_files = [] + for timestamp in list_dir(datagen_path): + timestamp_path = os.path.join(datagen_path, timestamp) + summary_path = os.path.join(timestamp_path, "summary.json") + metadata_path = self.find_metadata_path(timestamp_path) + metadata_file = self.load_single_log(metadata_path, "Metadata") + datagen_file = self.load_single_log(summary_path, "Summary") + datagen_files.append((datagen_file, metadata_file, timestamp)) + + for timestamp in list_dir(run_path): + timestamp_path = os.path.join(run_path, timestamp) + summary_path = os.path.join(timestamp_path, "summary.json") + metadata_file = self.load_single_log(metadata_path, "Metadata") + run_file = self.load_single_log(summary_path, "Summary") + run_files.append((run_file, metadata_file, timestamp)) + + yield SubmissionLogs(datagen_files, run_files, system_file=system_file, loader_metadata=loader_metadata) + else: + checkpoint_path = os.path.join(mode_path, benchmark) + checkpoint_files = [] + for timestamp in list_dir(checkpoint_path): + timestamp_path = os.path.join(checkpoint_path, timestamp) + summary_path = os.path.join(timestamp_path, "summary.json") + checkpoint_file = self.load_single_log(summary_path, "Summary") + checkpoint_files.append((checkpoint_file, timestamp)) + yield SubmissionLogs(checkpoint_files=checkpoint_files, system_file=system_file, loader_metadata=loader_metadata) + + + diff --git a/mlpstorage/submission_checker/main.py b/mlpstorage/submission_checker/main.py index 62660243..294f9c16 100644 --- a/mlpstorage/submission_checker/main.py +++ b/mlpstorage/submission_checker/main.py @@ -13,7 +13,10 @@ from .loader import Loader # Import checkers -from checks.base import BaseCheck +from .checks.checkpointing_checks import CheckpointingCheck +from .checks.directory_checks import DirectoryCheck +from .checks.training_checks import TrainingCheck + # Import result exporter from .results import ResultExporter @@ -32,12 +35,17 @@ def get_args(): """ parser = argparse.ArgumentParser() parser.add_argument("--input", required=True, help="submission directory") + parser.add_argument("--submitters", help="Comma separated submitters to run the checker") parser.add_argument( "--version", default="v5.1", choices=list(VERSIONS), help="mlperf version", ) + parser.add_argument( + "--csv", + default="summary.csv", + help="csv file with results") args = parser.parse_args() return args @@ -53,28 +61,42 @@ def main(): int: 0 if all submissions pass checks, 1 if any errors found. """ args = get_args() - - config = Config() - loader = Loader(args.input, args.version) + submitters = str(args.submitters).split(",") + config = Config(version=args.version, submitters=submitters) + + loader = Loader(args.input, args.version, config) exporter = ResultExporter(args.csv, config) + results = {} systems = {} + errors = [] + checkers = [DirectoryCheck, TrainingCheck, CheckpointingCheck] # Main loop over all the submissions for logs in loader.load(): # TODO: Initialize checkers - - # TODO: Run checks + checkers_pipe = [] valid = True + #TODO: Run checks + for checker in checkers: + valid &= checker(log, config, logs)() # TODO: Add results to summary if valid: exporter.add_result(logs) + else: + errors.append(logs.loader_metadata.folder) # Export results exporter.export() - # TODO: Output result summary to console - + if len(errors) > 0: + log.error("SUMMARY: submission has errors") + return 1 + else: + log.info("SUMMARY: submission looks OK") + return 0 +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file From be6e9ab6d9796ee7ba647e931f09f8588b34009f Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 13:59:11 -0500 Subject: [PATCH 09/12] Quick fix: correct checkpointing path --- mlpstorage/submission_checker/checks/directory_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlpstorage/submission_checker/checks/directory_checks.py b/mlpstorage/submission_checker/checks/directory_checks.py index 1afd0057..3f42ef15 100644 --- a/mlpstorage/submission_checker/checks/directory_checks.py +++ b/mlpstorage/submission_checker/checks/directory_checks.py @@ -31,7 +31,7 @@ def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): self.name = "directory checks" self.datagen_path = os.path.join(self.path, "datagen") self.run_path = os.path.join(self.path, "run") - self.checkpointing_path = os.path.join(self.path, "checkpointing") + self.checkpointing_path = self.path self.init_checks() def init_checks(self): From 4664df172eae5e4c5ee26013e7145612e359f801 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 6 Feb 2026 14:27:04 -0500 Subject: [PATCH 10/12] Quick fix: initialize checks correctly --- mlpstorage/submission_checker/checks/training_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlpstorage/submission_checker/checks/training_checks.py b/mlpstorage/submission_checker/checks/training_checks.py index 09090283..e9ee3fd1 100644 --- a/mlpstorage/submission_checker/checks/training_checks.py +++ b/mlpstorage/submission_checker/checks/training_checks.py @@ -30,7 +30,7 @@ def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): self.name = "training checks" self.datagen_path = os.path.join(self.path, "datagen") self.run_path = os.path.join(self.path, "run") - self.checks = self.init_checks() + self.init_checks() def init_checks(self): self.checks = [] From 9b84fe9b3fa88135406c58f1e03bb7e8d6e373b2 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 20 Feb 2026 14:24:36 -0500 Subject: [PATCH 11/12] Submission checker fixes --- .../checks/checkpointing_checks.py | 27 +++++++- .../checks/directory_checks.py | 8 ++- .../checks/training_checks.py | 64 +++++++++++++------ .../configuration/configuration.py | 8 ++- mlpstorage/submission_checker/constants.py | 43 ++++++++++--- mlpstorage/submission_checker/loader.py | 4 +- mlpstorage/submission_checker/main.py | 5 +- mlpstorage/submission_checker/results.py | 32 +++++++++- 8 files changed, 155 insertions(+), 36 deletions(-) diff --git a/mlpstorage/submission_checker/checks/checkpointing_checks.py b/mlpstorage/submission_checker/checks/checkpointing_checks.py index f3678ac2..75d32c39 100644 --- a/mlpstorage/submission_checker/checks/checkpointing_checks.py +++ b/mlpstorage/submission_checker/checks/checkpointing_checks.py @@ -27,6 +27,7 @@ def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): self.config = config self.submissions_logs = submissions_logs.checkpoint_files self.name = "checkpointing checks" + self.mode = submissions_logs.loader_metadata.mode self.checks = [] self.checkpointing_path = self.path self.init_checks() @@ -50,6 +51,8 @@ def checkpoint_data_size_ratio(self): Verify that checkpoint data written per node > 3x node memory. """ valid = True + if self.mode != "checkpointing": + return valid for summary, metadata, _ in self.submissions_logs: checkpoint_size_gb = summary.get("metric", {}).get("checkpoint_size_GB", 0) @@ -78,6 +81,8 @@ def fsync_verification(self): Verify that fsync is enabled in checkpoint configuration. """ valid = True + if self.mode != "checkpointing": + return valid for summary, metadata, _ in self.submissions_logs: combined_params = metadata.get("combined_params", {}) @@ -95,6 +100,9 @@ def model_configuration_req(self): Verify benchmark uses one of the four supported models. """ valid = True + if self.mode != "checkpointing": + return valid + allowed_models = {"8b", "70b", "405b", "1t"} for summary, metadata, _ in self.submissions_logs: @@ -117,7 +125,10 @@ def closed_mpi_processes(self): """ For CLOSED submissions, verify MPI processes match requirements per model. """ + # Question: is this num_processes? Reference does not have all matching valid = True + if self.mode != "checkpointing": + return valid model_process_requirements = { "8b": 8, @@ -127,7 +138,7 @@ def closed_mpi_processes(self): } for summary, metadata, _ in self.submissions_logs: - verification = metadata.get("verification", "open") + verification = metadata.get("verification", "closed") if verification == "closed": model_name = metadata.get("args", {}).get("model", "").lower() @@ -154,7 +165,9 @@ def closed_accelerators_per_host(self): For CLOSED submissions, verify accelerators per host > 4 and total matches requirement. """ valid = True - + if self.mode != "checkpointing": + return valid + for summary, metadata, _ in self.submissions_logs: verification = metadata.get("verification", "open") @@ -179,6 +192,9 @@ def aggregate_accelerator_memory(self): H100 has 80GB per accelerator. """ valid = True + if self.mode != "checkpointing": + return valid + ACCELERATOR_MEMORY_GB = 80 # H100 for summary, metadata, _ in self.submissions_logs: @@ -201,7 +217,10 @@ def closed_checkpoint_parameters(self): """ For CLOSED submissions, verify only allowed parameters are modified. """ + # Question: what are the default values of the other parameters that need to be checked valid = True + if self.mode != "checkpointing": + return valid allowed_params = { "checkpoint.checkpoint_folder" @@ -228,6 +247,8 @@ def checkpoint_path_args(self): Verify checkpoint folder and output paths are set and different. """ valid = True + if self.mode != "checkpointing": + return valid for summary, metadata, _ in self.submissions_logs: args = metadata.get("args", {}) @@ -256,6 +277,8 @@ def subset_run_validation(self): For subset runs, verify exactly 8 accelerators and not 8B model. """ valid = True + if self.mode != "checkpointing": + return valid for summary, metadata, _ in self.submissions_logs: params_dict = metadata.get("params_dict", {}) diff --git a/mlpstorage/submission_checker/checks/directory_checks.py b/mlpstorage/submission_checker/checks/directory_checks.py index 3f42ef15..d2794836 100644 --- a/mlpstorage/submission_checker/checks/directory_checks.py +++ b/mlpstorage/submission_checker/checks/directory_checks.py @@ -70,6 +70,7 @@ def datagen_files_check(self): - dlio.log - dlio_config/ (subdirectory) """ + # Question: output.json is missing from reference valid = True for _, _, timestamp in self.submissions_logs.datagen_files: timestamp_path = os.path.join(self.datagen_path, timestamp) @@ -146,6 +147,7 @@ def run_files_check(self): - dlio.log - dlio_config/ (subdirectory) """ + # Question: output.json is missing from regerence valid = True for _, _, timestamp in self.submissions_logs.run_files: timestamp_path = os.path.join(self.run_path, timestamp) @@ -168,6 +170,7 @@ def run_files_timestamp_check(self): Check that all run_files have timestamps matching format "YYYYMMDD_HHmmss" and that there are exactly 6 of them. """ + # Question: Not enough runs in reference valid = True timestamp_pattern = r"^\d{8}_\d{6}$" timestamps = [] @@ -235,6 +238,7 @@ def run_duration_valid_check(self): # Parse all run data: (run_dict, _, timestamp_dir_name) run_dir_time = [] max_gap = float("inf") + time_factor = 2 for run_dict, _, timestamp_dir in self.submissions_logs.run_files: try: # Parse timestamps from run_dict @@ -244,7 +248,7 @@ def run_duration_valid_check(self): # Parse the directory timestamp (YYYYMMDD_HHmmss format) dir_time = datetime.strptime(timestamp_dir, "%Y%m%d_%H%M%S") - run_duration = end_time - start_time + run_duration = (end_time - start_time).total_seconds() * time_factor if run_duration < max_gap: max_gap = run_duration @@ -264,7 +268,7 @@ def run_duration_valid_check(self): next_run = run_dir_time[i + 1] # Calculate gap between end of current run and start of next run - gap = next_run - current_run + gap = (next_run - current_run).total_seconds() # Gap should be less than the max gap if gap >= max_gap: diff --git a/mlpstorage/submission_checker/checks/training_checks.py b/mlpstorage/submission_checker/checks/training_checks.py index e9ee3fd1..8c343972 100644 --- a/mlpstorage/submission_checker/checks/training_checks.py +++ b/mlpstorage/submission_checker/checks/training_checks.py @@ -27,6 +27,8 @@ def __init__(self, log, config: Config, submissions_logs: SubmissionLogs): super().__init__(log=log, path=submissions_logs.loader_metadata.folder) self.config = config self.submissions_logs = submissions_logs + self.mode = self.submissions_logs.loader_metadata.mode + self.model = self.submissions_logs.loader_metadata.benchmark self.name = "training checks" self.datagen_path = os.path.join(self.path, "datagen") self.run_path = os.path.join(self.path, "run") @@ -54,6 +56,8 @@ def verify_datasize_usage(self): Verify that the datasize option was used by finding it in the run metadata. """ valid = True + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: # Check if datasize-related parameters are in the metadata @@ -78,6 +82,8 @@ def recalculate_dataset_size(self): Recalculate minimum dataset size and verify it matches the run's logfile. """ valid = True + if self.mode != "training": + return valid HOST_MEMORY_MULTIPLIER = 5 MIN_STEPS_PER_EPOCH = 500 @@ -139,7 +145,8 @@ def datagen_minimum_size(self): Verify that datagen data generated >= datasize calculated. """ valid = True - + if self.mode != "training": + return valid if not self.submissions_logs.datagen_files: self.log.warning("No datagen files found") return valid @@ -176,21 +183,30 @@ def run_data_matches_datasize(self): """ Verify that run data matches the calculated datasize exactly. """ + # Question: Subfolders? + # What are the true values of the datase valid = True + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: - dataset_params = metadata.get("combined_params", {}).get("dataset", {}) - - # Verify dataset parameters are set correctly - num_files_train = dataset_params.get("num_files_train") - num_subfolders_train = dataset_params.get("num_subfolders_train", 0) + num_files_train = summary.get("num_files_train", None) + num_files_eval = summary.get("num_files_eval", None) - if not num_files_train: - self.log.error("num_files_train not set in run parameters") + if num_files_train is None: + self.log.error("num_files_train not set") valid = False - if num_subfolders_train == 0: - self.log.error("num_subfolders_train should match actual subfolders in dataset") + if num_files_train > self.config.get_num_train_files(self.model): + self.log.error("num_files_train should be lower than in dataset") + valid = False + + if num_files_eval is None: + self.log.error("num_files_eval not set") + valid = False + + if num_files_eval > self.config.get_num_eval_files(self.model): + self.log.error("num_files_eval should be lower than in dataset") valid = False return valid @@ -200,7 +216,8 @@ def accelerator_utilization_check(self): Check that AU (Accelerator Utilization) meets minimum requirements. """ valid = True - + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: metrics = summary.get("metric", {}) au_mean = metrics.get("train_au_mean_percentage", 0) @@ -221,7 +238,8 @@ def single_host_simulated_accelerators(self): For single-host submissions, verify sufficient simulated accelerators. """ valid = True - + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: num_hosts = summary.get("num_hosts", 1) num_accelerators = summary.get("num_accelerators", 1) @@ -239,7 +257,8 @@ def single_host_client_limit(self): For single-host submissions, fail if more than one client node used. """ valid = True - + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: num_hosts = summary.get("num_hosts", 1) @@ -262,6 +281,8 @@ def identical_accelerators_per_node(self): For distributed submissions, verify all nodes have identical accelerator count. """ valid = True + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: num_hosts = summary.get("num_hosts", 1) @@ -291,6 +312,8 @@ def closed_submission_parameters(self): For CLOSED submissions, verify only allowed parameters are modified. """ valid = True + if self.mode != "training": + return valid # Allowed parameters for CLOSED allowed_params = { @@ -327,7 +350,9 @@ def open_submission_parameters(self): For OPEN submissions, verify only allowed parameters are modified. """ valid = True - + if self.mode != "training": + return valid + # Additional allowed parameters for OPEN (beyond CLOSED) open_allowed_params = { "framework", @@ -373,6 +398,8 @@ def mlpstorage_path_args(self): Verify dataset and output paths are set and different. """ valid = True + if self.mode != "training": + return valid for summary, metadata, _ in self.submissions_logs.run_files: args = metadata.get("args", {}) @@ -402,9 +429,8 @@ def mlpstorage_filesystem_check(self): This would require checking 'df' output in the logfiles. """ valid = True - - # In a real implementation, would parse logfiles for 'df' output - # and verify the filesystems are different - self.log.info("Filesystem check (implementation pending - requires log parsing)") - + # Question: where to look for this? + if self.mode != "training": + return valid + # TODO return valid diff --git a/mlpstorage/submission_checker/configuration/configuration.py b/mlpstorage/submission_checker/configuration/configuration.py index 218b10f9..1e1a966e 100644 --- a/mlpstorage/submission_checker/configuration/configuration.py +++ b/mlpstorage/submission_checker/configuration/configuration.py @@ -26,4 +26,10 @@ def get_run_required_folders(self): return RUN_REQUIRED_FOLDERS[self.version] def get_checkpoint_required_folders(self): - return CHECKPOINT_REQUIRED_FOLDERS[self.version] \ No newline at end of file + return CHECKPOINT_REQUIRED_FOLDERS[self.version] + + def get_num_train_files(self, model): + return NUM_DATASET_TRAIN_FILES[model] + + def get_num_eval_files(self, model): + return NUM_DATASET_EVAL_FILES[model] \ No newline at end of file diff --git a/mlpstorage/submission_checker/constants.py b/mlpstorage/submission_checker/constants.py index 92797c73..90541336 100644 --- a/mlpstorage/submission_checker/constants.py +++ b/mlpstorage/submission_checker/constants.py @@ -18,9 +18,9 @@ } DATAGEN_REQUIRED_FILES = { - "v2.0": ["training_datagen.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], - "v3.0": ["training_datagen.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], - "default": ["training_datagen.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "v2.0": [r"training_datagen\.stdout.log", r"training_datagen.stderr\.log", r".*output\.json$", r".*per_epoch_stats\.json$", r".*summary\.json$", r"dlio\.log"], + "v3.0": [r"training_datagen\.stdout.log", r"training_datagen.stderr\.log", r".*output\.json$", r".*per_epoch_stats\.json$", r",*summary\.json$", r"dlio\.log"], + "default": [r"training_datagen\.stdout.log", r"training_datagen.stderr\.log", r".*output\.json$", r".*per_epoch_stats\.json$", r".*summary\.json$", r"dlio\.log"], } DATAGEN_REQUIRED_FOLDERS = { @@ -30,9 +30,9 @@ } RUN_REQUIRED_FILES = { - "v2.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], - "v3.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], - "default": ["training_run.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "v2.0": [r"training_run\.stdout.log", r"training_run\.stderr.log", r".*output\.json", r".*per_epoch_stats\.json", r".*summary\.json", r"dlio\.log"], + "v3.0": [r"training_run\.stdout.log", r"training_run\.stderr.log", r".*output\.json", r".*per_epoch_stats\.json", r".*summary\.json", r"dlio\.log"], + "default": [r"training_run\.stdout.log", r"training_run\.stderr.log", r".*output\.json", r".*per_epoch_stats\.json", r".*summary\.json", r"dlio\.log"], } RUN_REQUIRED_FOLDERS = { @@ -42,13 +42,38 @@ } CHECKPOINT_REQUIRED_FILES = { - "v2.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], - "v3.0": ["training_run.stdout.log", "training_run.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], - "default": ["training_run.stdout.log", "training_datagen.stderr.log", "*output.json", "*per_epoch_stats.json", "*summary.json", "dlio.log"], + "v2.0": [r"training_run\.stdout.log", r"training_run\.stderr.log", r".*output\.json", r".*per_epoch_stats\.json", r".*summary\.json", r"dlio\.log"], + "v3.0": [r"training_run\.stdout.log", r"training_run\.stderr.log", r".*output\.json", r".*per_epoch_stats\.json", r".*summary\.json", r"dlio\.log"], + "default": [r"training_run\.stdout.log", r"training_run\.stderr.log", r".*output\.json", r".*per_epoch_stats\.json", r".*summary\.json", r"dlio\.log"], } CHECKPOINT_REQUIRED_FOLDERS = { "v2.0": ["dlio_config"], "v3.0": ["dlio_config"], "default": ["dlio_config"], +} + +# TODO: Ask for correct values +NUM_DATASET_TRAIN_FILES = { + "cosmoflow": 524288, + "resnet50": 10391, + "unet3d": 14000 +} + +NUM_DATASET_EVAL_FILES = { + "cosmoflow": 0, + "resnet50": 0, + "unet3d": 0 +} + +NUM_DATASET_TRAIN_FOLDERS = { + "cosmoflow": 0, + "resnet50": 0, + "unet3d": 0 +} + +NUM_DATASET_EVAL_FOLDERS = { + "cosmoflow": 0, + "resnet50": 0, + "unet3d": 0 } \ No newline at end of file diff --git a/mlpstorage/submission_checker/loader.py b/mlpstorage/submission_checker/loader.py index 72f821f4..316081ee 100644 --- a/mlpstorage/submission_checker/loader.py +++ b/mlpstorage/submission_checker/loader.py @@ -128,8 +128,10 @@ def load(self) -> Generator[SubmissionLogs, None, None]: for timestamp in list_dir(checkpoint_path): timestamp_path = os.path.join(checkpoint_path, timestamp) summary_path = os.path.join(timestamp_path, "summary.json") + metadata_path = self.find_metadata_path(timestamp_path) + metadata_file = self.load_single_log(metadata_path, "Metadata") checkpoint_file = self.load_single_log(summary_path, "Summary") - checkpoint_files.append((checkpoint_file, timestamp)) + checkpoint_files.append((checkpoint_file, metadata_file, timestamp)) yield SubmissionLogs(checkpoint_files=checkpoint_files, system_file=system_file, loader_metadata=loader_metadata) diff --git a/mlpstorage/submission_checker/main.py b/mlpstorage/submission_checker/main.py index 294f9c16..8a3676c9 100644 --- a/mlpstorage/submission_checker/main.py +++ b/mlpstorage/submission_checker/main.py @@ -21,7 +21,10 @@ # Import result exporter from .results import ResultExporter -logging.basicConfig(level=logging.INFO) +logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s %(filename)s:%(lineno)d %(levelname)s] %(message)s" +) log = logging.getLogger("main") def get_args(): diff --git a/mlpstorage/submission_checker/results.py b/mlpstorage/submission_checker/results.py index ec670c44..17250c0b 100644 --- a/mlpstorage/submission_checker/results.py +++ b/mlpstorage/submission_checker/results.py @@ -18,7 +18,37 @@ def __init__(self, csv_path, config) -> None: csv_path (str): Path to the output CSV file. config (Config): Configuration helper for model mappings. """ - self.head = [] + # Question: Is the final table defined? + self.head = [ + "Public ID", + "Organization", + "Submission Name", + "Description", + "Type", + "Access Protocol", + "Availability", + "RUs", + "Integrated Client Storage", + "Accelerator Type", + "# Client Nodes", + # TODO: Avoid hardcoding this + # Training + "3D-Unet - # Accel", + "3D-Unet - Read B/W (GB/s)", + "ResNet-50 - # Accel", + "ResNet-50 - Read B/W (GB/s)", + "CosmoFlow - # Accel", + "CosmoFlow - Read B/W (GB/s)", + # Checkpointing + "8B - Write B/W (GB/s)", + "8B - Read B/W (GB/s)", + "70B - Write B/W (GB/s)", + "70B - Read B/W (GB/s)", + "405B - Write B/W (GB/s)", + "405B - Read B/W (GB/s)", + "1T - Write B/W (GB/s)", + "1T - Read B/W (GB/s)", + ] self.rows = [] self.csv_path = csv_path self.config = config From fbf307387f3a05c8360d12245caf67d8deaec6be Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Date: Fri, 20 Feb 2026 14:32:00 -0500 Subject: [PATCH 12/12] Add submission checker readme --- mlpstorage/submission_checker/README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 mlpstorage/submission_checker/README.md diff --git a/mlpstorage/submission_checker/README.md b/mlpstorage/submission_checker/README.md new file mode 100644 index 00000000..8d6e8143 --- /dev/null +++ b/mlpstorage/submission_checker/README.md @@ -0,0 +1,23 @@ +# Submission Checker + +## Overview +The submission checker is a tool designed to validate submissions against specified criteria in [Rules.md](). +**TODO:** Add this link + + +## Running the Submission Checker + +To run the submission checker, use the following command: + +``` +python -m storage.mlpstorage.submission_checker.main --input \ + [--version v2.0] \ + [--submitters Micron] +``` + +### Parameters +- `--input`: Path to the submission directory +- `--version`: MLPerf storage version +- `--submitters`: Only check the submitters provided. Comma separated. + +