From edd2debb8089d7b9176c25cde47853aa0dcbb794 Mon Sep 17 00:00:00 2001 From: Joey Riches Date: Tue, 3 Feb 2026 21:41:59 +0000 Subject: [PATCH] common/CI: Add monitoring checks **Summary** - Adds more expansive monitoring.yaml checks --- common/CI/package_checks.py | 252 +++++++++++++++++++++++++++++++++++- 1 file changed, 250 insertions(+), 2 deletions(-) diff --git a/common/CI/package_checks.py b/common/CI/package_checks.py index 697700ecaa2f..c6fea7136d0d 100755 --- a/common/CI/package_checks.py +++ b/common/CI/package_checks.py @@ -15,6 +15,7 @@ from ruamel.yaml.compat import StringIO from typing import Any, Callable, Dict, List, Optional, TextIO, Tuple, Union from urllib import request +from urllib.parse import urlparse, ParseResult from xml.etree import ElementTree """Package is either a Package YML file or Pspec XML file.""" @@ -77,6 +78,50 @@ def files(self) -> List[str]: return [str(element.text) for element in self._xml.findall('.//Path')] +class MonitoringYAML: + """Represents a Monitoring YAML file.""" + + def __init__(self, stream: Any): + yaml = YAML(typ='safe', pure=True) + yaml.default_flow_style = False + self._data = dict(yaml.load(stream)) + + @property + def releases(self) -> Optional[dict]: + return self._data.get('releases') + + @property + def release_id(self) -> Optional[int]: + releases = self.releases + if releases: + return releases.get('id') + + @property + def release_ignore(self) -> Optional[List[str]]: + releases = self.releases + if releases and releases.get('ignore'): + return releases.get('ignore') + + @property + def security(self) -> Optional[dict]: + return self._data.get('security') + + @property + def cpe(self) -> Optional[List[dict]]: + security = self.security + if security: + return security.get('cpe') + + @property + def security_ignore(self) -> Optional[List[str]]: + security = self.security + if security and security.get('ignore'): + return security.get('ignore') + + def get(self, key: str, default: Any = None) -> Any: + return self._data.get(key, default) + + @dataclass class FreezeConfig: start: Optional[datetime] @@ -263,6 +308,7 @@ def _record(self) -> logging.LogRecord: class PullRequestCheck: _package_files = ['package.yml'] + _monitoring_files = ['monitoring.yaml'] _two_letter_dirs = ['py'] _config: Optional[Config] = None @@ -287,6 +333,10 @@ def config(self) -> Config: def package_files(self) -> List[str]: return self.filter_files(*self._package_files) + @property + def monitoring_files(self) -> List[str]: + return self.filter_files(*self._monitoring_files) + def filter_files(self, *allowed: str) -> List[str]: return [f for f in self.files if os.path.basename(f) in allowed] @@ -317,6 +367,13 @@ def load_pspec_xml(self, file: str) -> PspecXML: def load_pspec_xml_from_commit(self, ref: str, file: str) -> PspecXML: return PspecXML(self.git.file_from_commit(ref, file)) + def load_monitoring_yml(self, file: str) -> MonitoringYAML: + with self._open(file) as f: + return MonitoringYAML(f) + + def load_monitoring_yml_from_commit(self, ref: str, file: str) -> MonitoringYAML: + return MonitoringYAML(self.git.file_from_commit(ref, file)) + def file_line(self, file: str, expr: str) -> Optional[int]: with self._open(file) as f: for i, line in enumerate(f.read().splitlines()): @@ -442,7 +499,7 @@ def run(self) -> List[Result]: return results -class Monitoring(PullRequestCheck): +class MonitoringExists(PullRequestCheck): _error = '`monitoring.yaml` is missing' _level = Level.WARNING @@ -455,6 +512,196 @@ def _has_monitoring_yml(self, file: str) -> bool: return self._exists(os.path.join(os.path.dirname(file), 'monitoring.yaml')) +class MonitoringFormat(PullRequestCheck): + _error_required_sections = 'monitoring.yaml must contain required sections: releases and security' + _error_cpe_format = "security.cpe must be a list or null (~)" + _error_cpe_entry = "Each CPE entry must have both 'vendor' and 'product' fields with non-null values" + _level = Level.ERROR + + def _yml_file(self, file: str) -> MonitoringYAML: + return self.load_monitoring_yml(file) + + def _is_valid_url(self, url: str) -> bool: + """Check if a string is a valid URL.""" + try: + result = urlparse(url) + # Check if scheme and netloc are present + return all([result.scheme, result.netloc]) + except ParseResult: + return False + + def run(self) -> List[Result]: + package_files = self.monitoring_files + results = [] + + for file in package_files: + monitoring = self._yml_file(file) + + # Check required sections + results.extend(self._check_required_sections(file, monitoring)) + + # Check security section + results.extend(self._check_security_section(file, monitoring)) + + # Check releases section + results.extend(self._check_releases_section(file, monitoring)) + + return results + + def _check_required_sections(self, file: str, monitoring: MonitoringYAML) -> List[Result]: + results = [] + if not isinstance(monitoring.releases, dict) and not isinstance(monitoring.security, dict): + results.append(Result( + message=self._error_required_sections, + file=file, + level=self._level + )) + return results + + def _check_security_section(self, file: str, monitoring: MonitoringYAML) -> List[Result]: + results = [] + + # Ensure the 'cpe' key exists in the security section + if not isinstance(monitoring.cpe, list) and monitoring.cpe is not None: + results.append(Result( + message=self._error_cpe_format, + file=file, + level=self._level, + line=self.file_line(file, r'^security\s*:') + )) + + # If cpe is a list (not null), validate each entry + if isinstance(monitoring.cpe, list): + results.extend(self._check_cpe_entries(file, monitoring.cpe)) + results.extend(self._check_security_ignore_patterns(file, monitoring.security_ignore)) + + return results + + def _check_security_ignore_patterns(self, file: str, ignore_patterns: Optional[List[str]]) -> List[Result]: + results = [] + if not ignore_patterns: + return results + + if not all(isinstance(pattern, str) for pattern in ignore_patterns): + results.append(Result( + message="security.ignore must contain string patterns", + file=file, + level=self._level, + line=self.file_line(file, r'^ ignore\s*:') + )) + else: + # Check that all patterns begin with CVE- + invalid_patterns = [pattern for pattern in ignore_patterns + if not pattern.startswith("CVE-")] + if invalid_patterns: + results.append(Result( + message=f"security.ignore patterns must begin with 'CVE-': {', '.join(invalid_patterns)}", + file=file, + level=self._level, + line=self.file_line(file, r'^ security\.ignore\s*:') + )) + return results + + def _check_cpe_entries(self, file: str, cpe_entries: List[dict]) -> List[Result]: + results = [] + for i, item in enumerate(cpe_entries): + if not isinstance(item, dict): + results.append(Result( + message="Each CPE entry must be a dictionary", + file=file, + level=self._level, + line=self.file_line(file, r'^ cpe\s*:') + )) + elif 'vendor' not in item or 'product' not in item or item['vendor'] is None or item['product'] is None: + results.append(Result( + message=self._error_cpe_entry, + file=file, + level=self._level, + line=self.file_line(file, r'^ cpe\s*:') + )) + return results + + def _check_releases_section(self, file: str, monitoring: MonitoringYAML) -> List[Result]: + results = [] + + # Check releases.id validity + if 'releases' in monitoring._data: + releases_dict = monitoring._data.get('releases', {}) + if isinstance(releases_dict, dict): + results.extend(self._check_releases_id(file, releases_dict)) + results.extend(self._check_releases_rss(file, releases_dict)) + results.extend(self._check_releases_ignore_patterns(file, monitoring.release_ignore)) + + return results + + def _check_releases_ignore_patterns(self, file: str, ignore_patterns: Optional[List[str]]) -> List[Result]: + results = [] + if ignore_patterns and not all(isinstance(pattern, str) for pattern in ignore_patterns): + results.append(Result( + message="releases.ignore must contain string patterns", + file=file, + level=self._level, + line=self.file_line(file, r'^ ignore\s*:') + )) + return results + + def _check_releases_rss(self, file: str, releases_dict: dict) -> List[Result]: + results = [] + if 'rss' not in releases_dict: + results.append(Result( + message="releases section must contain an `rss` key", + file=file, + level=self._level, + line=self.file_line(file, r'^releases\s*:') + )) + elif releases_dict.get('rss') is None: + # The key exists but has a null value + results.append(Result( + message="releases.rss is set to null, it should point to a rss feed", + file=file, + level=Level.WARNING, + line=self.file_line(file, r'^\s+rss\s*:') + )) + elif releases_dict.get('rss') is not None and not self._is_valid_url(releases_dict.get('rss')): + results.append(Result( + message="releases.rss must contain a valid URL", + file=file, + level=self._level, + line=self.file_line(file, r'^\s+rss\s*:') + )) + return results + + def _check_releases_id(self, file: str, releases_dict: dict) -> List[Result]: + results = [] + if 'id' not in releases_dict: + results.append(Result( + message="releases section must contain an `id` key", + file=file, + level=self._level, + line=self.file_line(file, r'^releases\s*:') + )) + elif releases_dict.get('id') is None: + # The key exists but has a null value + results.append(Result( + message="releases.id is set to null, it should have a numeric value", + file=file, + level=Level.WARNING, + line=self.file_line(file, r'^\s+id\s*:') + )) + elif releases_dict.get('id') is not None: + # The key exists with a non-null value, check if it's an integer + try: + int(releases_dict.get('id')) + except (ValueError, TypeError): + results.append(Result( + message="releases.id must be a number", + file=file, + level=self._level, + line=self.file_line(file, r'^\s+id\s*:') + )) + return results + + class PackageBumped(PullRequestCheck): _msg = 'Package release is not incremented by 1' _msg_new = 'Package release is not 1' @@ -843,7 +1090,8 @@ class Checker: CommitMessage, FrozenPackage, Homepage, - Monitoring, + MonitoringExists, + MonitoringFormat, PackageBumped, PackageDependenciesOrder, PackageDirectory,