Skip to content
111 changes: 111 additions & 0 deletions agent-framework/prometheus_swarm/utils/duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
from typing import List, Dict, Any, Optional

class DuplicateEvidenceError(Exception):
"""Custom exception for duplicate evidence scenarios."""
pass

def validate_unique_evidence(evidence_list: List[Dict[Any, Any]],
unique_key: str = 'id') -> None:
"""
Validate that evidence entries are unique based on a specified key.

Args:
evidence_list (List[Dict[Any, Any]]): List of evidence dictionaries
unique_key (str, optional): Key used to determine uniqueness. Defaults to 'id'.

Raises:
DuplicateEvidenceError: If duplicate evidence is detected
"""
logger = logging.getLogger(__name__)

# Check for duplicates
seen_keys = set()
duplicates = []

for item in evidence_list:
if unique_key not in item:
logger.warning(f"Evidence item missing unique key '{unique_key}': {item}")
continue

current_key = item[unique_key]

if current_key in seen_keys:
duplicates.append(current_key)
logger.error(f"Duplicate evidence found with {unique_key}: {current_key}")

seen_keys.add(current_key)

if duplicates:
raise DuplicateEvidenceError(
f"Duplicate evidence detected for {unique_key}s: {duplicates}"
)

def log_evidence_summary(evidence_list: List[Dict[Any, Any]],
log_level: str = 'INFO') -> None:
"""
Log a summary of evidence entries with configurable log level.

Args:
evidence_list (List[Dict[Any, Any]]): List of evidence dictionaries
log_level (str, optional): Logging level. Defaults to 'INFO'.
"""
logger = logging.getLogger(__name__)
log_method = getattr(logger, log_level.lower(), logger.info)

log_method(f"Total evidence entries: {len(evidence_list)}")
log_method(f"Evidence keys: {list(evidence_list[0].keys()) if evidence_list else 'N/A'}")

def filter_duplicates(evidence_list: List[Dict[Any, Any]],
unique_key: str = 'id',
keep: str = 'first') -> List[Dict[Any, Any]]:
"""
Filter out duplicate evidence entries while preserving desired entries.

Args:
evidence_list (List[Dict[Any, Any]]): List of evidence dictionaries
unique_key (str, optional): Key used to determine uniqueness. Defaults to 'id'.
keep (str, optional): Strategy for keeping duplicates.
'first' keeps first occurrence, 'last' keeps last.
Defaults to 'first'.

Returns:
List[Dict[Any, Any]]: Filtered list of evidence without duplicates
"""
logger = logging.getLogger(__name__)

if keep not in ['first', 'last']:
raise ValueError("'keep' must be either 'first' or 'last'")

seen_keys = set()
filtered_evidence = []
non_unique_items = []

# First pass: handle total list
if keep == 'first':
for item in evidence_list:
if unique_key not in item:
non_unique_items.append(item)
continue

current_key = item[unique_key]

if current_key not in seen_keys:
filtered_evidence.append(item)
seen_keys.add(current_key)
else: # keep == 'last'
for item in reversed(evidence_list):
if unique_key not in item:
non_unique_items.insert(0, item)
continue

current_key = item[unique_key]

if current_key not in seen_keys:
filtered_evidence.insert(0, item)
seen_keys.add(current_key)

# Handle case with non-unique entries
filtered_evidence.extend(non_unique_items)

return filtered_evidence
114 changes: 114 additions & 0 deletions agent-framework/tests/unit/test_duplicate_evidence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pytest
import logging
from typing import List, Dict
from prometheus_swarm.utils.duplicate_evidence import (
validate_unique_evidence,
DuplicateEvidenceError,
log_evidence_summary,
filter_duplicates
)

def test_validate_unique_evidence_no_duplicates():
"""Test validation of unique evidence passes."""
evidence = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'},
{'id': 3, 'data': 'third'}
]

try:
validate_unique_evidence(evidence)
except DuplicateEvidenceError:
pytest.fail("Unexpected DuplicateEvidenceError raised")

def test_validate_unique_evidence_with_duplicates():
"""Test validation raises error for duplicate evidence."""
evidence = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'},
{'id': 1, 'data': 'duplicate'}
]

with pytest.raises(DuplicateEvidenceError, match="Duplicate evidence detected"):
validate_unique_evidence(evidence)

def test_validate_unique_evidence_missing_key():
"""Test behavior with evidence missing unique key."""
evidence = [
{'id': 1, 'data': 'first'},
{'data': 'no id'},
{'id': 2, 'data': 'second'}
]

# Should not raise an error, just log a warning
validate_unique_evidence(evidence)

def test_log_evidence_summary(caplog):
"""Test logging of evidence summary."""
evidence = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'}
]

with caplog.at_level(logging.INFO):
log_evidence_summary(evidence)

assert "Total evidence entries: 2" in caplog.text

def test_filter_duplicates_first_occurrence():
"""Test filtering duplicates, keeping first occurrence."""
evidence = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'},
{'id': 1, 'data': 'duplicate'}
]

filtered = filter_duplicates(evidence)

assert len(filtered) == 2
assert filtered == [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'}
]

def test_filter_duplicates_last_occurrence():
"""Test filtering duplicates, keeping last occurrence."""
evidence = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'},
{'id': 1, 'data': 'duplicate'}
]

filtered = filter_duplicates(evidence, keep='last')

assert len(filtered) == 2
assert filtered == [
{'id': 2, 'data': 'second'},
{'id': 1, 'data': 'duplicate'}
]

def test_filter_duplicates_invalid_keep_strategy():
"""Test that an invalid keep strategy raises an error."""
evidence = [
{'id': 1, 'data': 'first'},
{'id': 2, 'data': 'second'}
]

with pytest.raises(ValueError, match="'keep' must be either 'first' or 'last'"):
filter_duplicates(evidence, keep='invalid')

def test_filter_duplicates_missing_key():
"""Test filtering duplicates with entries missing unique key."""
evidence = [
{'id': 1, 'data': 'first'},
{'data': 'no id'},
{'id': 1, 'data': 'duplicate'}
]

filtered = filter_duplicates(evidence)

assert len(filtered) == 2
assert filtered == [
{'id': 1, 'data': 'first'},
{'data': 'no id'}
]