Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 250 additions & 2 deletions common/CI/package_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ruamel.yaml.compat import StringIO
from typing import Any, Callable, Dict, List, Optional, TextIO, Tuple, Union
from urllib import request
from urllib.parse import urlparse, ParseResult
from xml.etree import ElementTree

"""Package is either a Package YML file or Pspec XML file."""
Expand Down Expand Up @@ -77,6 +78,50 @@
return [str(element.text) for element in self._xml.findall('.//Path')]


class MonitoringYAML:
"""Represents a Monitoring YAML file."""

def __init__(self, stream: Any):
yaml = YAML(typ='safe', pure=True)
yaml.default_flow_style = False
self._data = dict(yaml.load(stream))

@property
def releases(self) -> Optional[dict]:

Check failure on line 90 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing type parameters for generic type "dict" [type-arg]
return self._data.get('releases')

@property
def release_id(self) -> Optional[int]:

Check failure on line 94 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing return statement [return]
releases = self.releases
if releases:
return releases.get('id')

@property
def release_ignore(self) -> Optional[List[str]]:

Check failure on line 100 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing return statement [return]
releases = self.releases
if releases and releases.get('ignore'):
return releases.get('ignore')

@property
def security(self) -> Optional[dict]:

Check failure on line 106 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing type parameters for generic type "dict" [type-arg]
return self._data.get('security')

@property
def cpe(self) -> Optional[List[dict]]:

Check failure on line 110 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing type parameters for generic type "dict" [type-arg]

Check failure on line 110 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing return statement [return]
security = self.security
if security:
return security.get('cpe')

@property
def security_ignore(self) -> Optional[List[str]]:

Check failure on line 116 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing return statement [return]
security = self.security
if security and security.get('ignore'):
return security.get('ignore')

def get(self, key: str, default: Any = None) -> Any:
return self._data.get(key, default)


@dataclass
class FreezeConfig:
start: Optional[datetime]
Expand Down Expand Up @@ -263,6 +308,7 @@

class PullRequestCheck:
_package_files = ['package.yml']
_monitoring_files = ['monitoring.yaml']
_two_letter_dirs = ['py']
_config: Optional[Config] = None

Expand All @@ -287,6 +333,10 @@
def package_files(self) -> List[str]:
return self.filter_files(*self._package_files)

@property
def monitoring_files(self) -> List[str]:
return self.filter_files(*self._monitoring_files)

def filter_files(self, *allowed: str) -> List[str]:
return [f for f in self.files
if os.path.basename(f) in allowed]
Expand Down Expand Up @@ -317,6 +367,13 @@
def load_pspec_xml_from_commit(self, ref: str, file: str) -> PspecXML:
return PspecXML(self.git.file_from_commit(ref, file))

def load_monitoring_yml(self, file: str) -> MonitoringYAML:
with self._open(file) as f:
return MonitoringYAML(f)

def load_monitoring_yml_from_commit(self, ref: str, file: str) -> MonitoringYAML:
return MonitoringYAML(self.git.file_from_commit(ref, file))

def file_line(self, file: str, expr: str) -> Optional[int]:
with self._open(file) as f:
for i, line in enumerate(f.read().splitlines()):
Expand Down Expand Up @@ -442,7 +499,7 @@
return results


class Monitoring(PullRequestCheck):
class MonitoringExists(PullRequestCheck):
_error = '`monitoring.yaml` is missing'
_level = Level.WARNING

Expand All @@ -455,6 +512,196 @@
return self._exists(os.path.join(os.path.dirname(file), 'monitoring.yaml'))


class MonitoringFormat(PullRequestCheck):
_error_required_sections = 'monitoring.yaml must contain required sections: releases and security'
_error_cpe_format = "security.cpe must be a list or null (~)"
_error_cpe_entry = "Each CPE entry must have both 'vendor' and 'product' fields with non-null values"
_level = Level.ERROR

def _yml_file(self, file: str) -> MonitoringYAML:
return self.load_monitoring_yml(file)

def _is_valid_url(self, url: str) -> bool:
"""Check if a string is a valid URL."""
try:
result = urlparse(url)
# Check if scheme and netloc are present
return all([result.scheme, result.netloc])
except ParseResult:

Check failure on line 530 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Exception type must be derived from BaseException (or be a tuple of exception classes) [misc]
return False

def run(self) -> List[Result]:
package_files = self.monitoring_files
results = []

for file in package_files:
monitoring = self._yml_file(file)

# Check required sections
results.extend(self._check_required_sections(file, monitoring))

# Check security section
results.extend(self._check_security_section(file, monitoring))

# Check releases section
results.extend(self._check_releases_section(file, monitoring))

return results

def _check_required_sections(self, file: str, monitoring: MonitoringYAML) -> List[Result]:
results = []
if not isinstance(monitoring.releases, dict) and not isinstance(monitoring.security, dict):
results.append(Result(
message=self._error_required_sections,
file=file,
level=self._level
))
return results

def _check_security_section(self, file: str, monitoring: MonitoringYAML) -> List[Result]:
results = []

# Ensure the 'cpe' key exists in the security section
if not isinstance(monitoring.cpe, list) and monitoring.cpe is not None:
results.append(Result(
message=self._error_cpe_format,
file=file,
level=self._level,
line=self.file_line(file, r'^security\s*:')
))

# If cpe is a list (not null), validate each entry
if isinstance(monitoring.cpe, list):
results.extend(self._check_cpe_entries(file, monitoring.cpe))
results.extend(self._check_security_ignore_patterns(file, monitoring.security_ignore))

return results

def _check_security_ignore_patterns(self, file: str, ignore_patterns: Optional[List[str]]) -> List[Result]:
results = []

Check failure on line 581 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Need type annotation for "results" (hint: "results: list[<type>] = ...") [var-annotated]
if not ignore_patterns:
return results

if not all(isinstance(pattern, str) for pattern in ignore_patterns):
results.append(Result(
message="security.ignore must contain string patterns",
file=file,
level=self._level,
line=self.file_line(file, r'^ ignore\s*:')
))
else:
# Check that all patterns begin with CVE-
invalid_patterns = [pattern for pattern in ignore_patterns
if not pattern.startswith("CVE-")]
if invalid_patterns:
results.append(Result(
message=f"security.ignore patterns must begin with 'CVE-': {', '.join(invalid_patterns)}",
file=file,
level=self._level,
line=self.file_line(file, r'^ security\.ignore\s*:')
))
return results

def _check_cpe_entries(self, file: str, cpe_entries: List[dict]) -> List[Result]:

Check failure on line 605 in common/CI/package_checks.py

View workflow job for this annotation

GitHub Actions / Python Linting

Missing type parameters for generic type "dict" [type-arg]
results = []
for i, item in enumerate(cpe_entries):
if not isinstance(item, dict):
results.append(Result(
message="Each CPE entry must be a dictionary",
file=file,
level=self._level,
line=self.file_line(file, r'^ cpe\s*:')
))
elif 'vendor' not in item or 'product' not in item or item['vendor'] is None or item['product'] is None:
results.append(Result(
message=self._error_cpe_entry,
file=file,
level=self._level,
line=self.file_line(file, r'^ cpe\s*:')
))
return results

def _check_releases_section(self, file: str, monitoring: MonitoringYAML) -> List[Result]:
results = []

# Check releases.id validity
if 'releases' in monitoring._data:
releases_dict = monitoring._data.get('releases', {})
if isinstance(releases_dict, dict):
results.extend(self._check_releases_id(file, releases_dict))
results.extend(self._check_releases_rss(file, releases_dict))
results.extend(self._check_releases_ignore_patterns(file, monitoring.release_ignore))

return results

def _check_releases_ignore_patterns(self, file: str, ignore_patterns: Optional[List[str]]) -> List[Result]:
results = []
if ignore_patterns and not all(isinstance(pattern, str) for pattern in ignore_patterns):
results.append(Result(
message="releases.ignore must contain string patterns",
file=file,
level=self._level,
line=self.file_line(file, r'^ ignore\s*:')
))
return results

def _check_releases_rss(self, file: str, releases_dict: dict) -> List[Result]:
results = []
if 'rss' not in releases_dict:
results.append(Result(
message="releases section must contain an `rss` key",
file=file,
level=self._level,
line=self.file_line(file, r'^releases\s*:')
))
elif releases_dict.get('rss') is None:
# The key exists but has a null value
results.append(Result(
message="releases.rss is set to null, it should point to a rss feed",
file=file,
level=Level.WARNING,
line=self.file_line(file, r'^\s+rss\s*:')
))
elif releases_dict.get('rss') is not None and not self._is_valid_url(releases_dict.get('rss')):
results.append(Result(
message="releases.rss must contain a valid URL",
file=file,
level=self._level,
line=self.file_line(file, r'^\s+rss\s*:')
))
return results

def _check_releases_id(self, file: str, releases_dict: dict) -> List[Result]:
results = []
if 'id' not in releases_dict:
results.append(Result(
message="releases section must contain an `id` key",
file=file,
level=self._level,
line=self.file_line(file, r'^releases\s*:')
))
elif releases_dict.get('id') is None:
# The key exists but has a null value
results.append(Result(
message="releases.id is set to null, it should have a numeric value",
file=file,
level=Level.WARNING,
line=self.file_line(file, r'^\s+id\s*:')
))
elif releases_dict.get('id') is not None:
# The key exists with a non-null value, check if it's an integer
try:
int(releases_dict.get('id'))
except (ValueError, TypeError):
results.append(Result(
message="releases.id must be a number",
file=file,
level=self._level,
line=self.file_line(file, r'^\s+id\s*:')
))
return results


class PackageBumped(PullRequestCheck):
_msg = 'Package release is not incremented by 1'
_msg_new = 'Package release is not 1'
Expand Down Expand Up @@ -843,7 +1090,8 @@
CommitMessage,
FrozenPackage,
Homepage,
Monitoring,
MonitoringExists,
MonitoringFormat,
PackageBumped,
PackageDependenciesOrder,
PackageDirectory,
Expand Down