-
Notifications
You must be signed in to change notification settings - Fork 0
Expand unit test coverage #14
Expand unit test coverage #14
Conversation
Add comprehensive unit tests for multiple modules: - Webhook handler: Tests for initialization, JSONPath filtering, event sending, retry logic, and different payload types (92% coverage) - Queue models: Tests for CommandType, QueuedCommand, CommandResult, QueueStats, QueueInfo, and their serialization (100% coverage) - Metrics collector: Tests for all Prometheus metrics recording methods and singleton pattern (100% coverage) - Database engine: Tests for engine lifecycle, session management, and global functions (100% coverage) - Queue manager: Tests for initialization, start/stop, enqueue, debouncing, queue full handling, and worker processing (90% coverage) - API schemas: Tests for Pydantic validation, request/response schemas, tag values, and coordinate validation (97% coverage) Overall test coverage improved from 29% to 38%.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR significantly expands unit test coverage across six core modules of the meshcore_api project, increasing overall coverage from 29% to 38%. The new tests focus on validating initialization, core functionality, edge cases, and error handling across the webhook handler, queue system, metrics collection, database engine, and API schemas.
Key Changes
- Added comprehensive test suites for webhook handler (92% coverage), queue models (100% coverage), metrics collector (100% coverage), database engine (100% coverage), queue manager (90% coverage), and API schemas (97% coverage)
- Tests cover initialization, normal operations, error conditions, edge cases, and data serialization/validation
- Includes proper async test setup using pytest-asyncio fixtures and context managers for resource cleanup
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_webhook_handler.py | Tests webhook handler initialization, JSONPath filtering, event routing, HTTP request sending with retry logic, and various payload types (JSON, text, primitives) |
| tests/unit/test_queue_models.py | Tests queue data models including CommandType enum, QueuedCommand/CommandResult/QueueStats/QueueInfo dataclasses, and their serialization methods |
| tests/unit/test_queue_manager.py | Tests queue manager initialization, start/stop lifecycle, command enqueueing with debouncing, queue full handling, worker processing for all command types, and error handling |
| tests/unit/test_metrics.py | Tests metrics collector initialization, singleton pattern, and all metric recording methods (counters, gauges, histograms) |
| tests/unit/test_database_engine.py | Tests database engine lifecycle (init, session management, commits, rollbacks, cleanup), global functions, and multi-session handling |
| tests/unit/test_api_schemas.py | Tests Pydantic schema validation for pagination, errors, nodes, messages, advertisements, commands, tags with coordinate validation, and bulk operations |
| assert True | ||
|
|
||
|
|
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to other tests, this one only asserts True without verifying meaningful behavior. The test doesn't check that the handler was actually closed or that the operation completed successfully. Consider removing this redundant assertion.
| assert True |
|
|
||
| def test_queue_info_debounced(self): | ||
| """Test QueueInfo for debounced command.""" | ||
| original_time = datetime.utcnow() |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The code uses datetime.utcnow() which is deprecated as of Python 3.12. Consider using datetime.now(timezone.utc) instead for forward compatibility. While this is in test code and the source code also uses datetime.utcnow(), it's worth considering updating both the source and tests to use the non-deprecated API.
| async def test_close(self): | ||
| """Test closing the HTTP client.""" | ||
| handler = WebhookHandler() | ||
| await handler.close() | ||
| # Should complete without error | ||
| assert True |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't verify meaningful behavior. It simply asserts True after calling close(), which doesn't validate that the close operation worked correctly. Consider either:
- Removing this test as the
close()method is called in all test teardowns viaawait handler.close(), so it's already being tested implicitly. - Adding meaningful assertions such as verifying the client is closed or that calling methods after close raises an appropriate error.
| async def test_close(self): | |
| """Test closing the HTTP client.""" | |
| handler = WebhookHandler() | |
| await handler.close() | |
| # Should complete without error | |
| assert True | |
| # Removed test_close as it did not verify meaningful behavior. |
| def test_record_event(self, metrics_collector): | ||
| """Test recording events increments counter.""" | ||
| metrics_collector.record_event("CONTACT_MSG_RECV") | ||
| metrics_collector.record_event("CHANNEL_MSG_RECV") | ||
| # Verify it doesn't raise an error | ||
| assert True | ||
|
|
||
| def test_record_message(self, metrics_collector): | ||
| """Test recording messages with labels.""" | ||
| metrics_collector.record_message(direction="inbound", message_type="contact") | ||
| metrics_collector.record_message(direction="outbound", message_type="channel") | ||
| assert True | ||
|
|
||
| def test_record_advertisement(self, metrics_collector): | ||
| """Test recording advertisements.""" | ||
| metrics_collector.record_advertisement(adv_type="repeater") | ||
| metrics_collector.record_advertisement(adv_type=None) # Test None handling | ||
| assert True | ||
|
|
||
| def test_record_roundtrip(self, metrics_collector): | ||
| """Test recording roundtrip time.""" | ||
| metrics_collector.record_roundtrip(milliseconds=150) | ||
| metrics_collector.record_roundtrip(milliseconds=5000) | ||
| assert True | ||
|
|
||
| def test_record_hop_count(self, metrics_collector): | ||
| """Test recording hop counts.""" | ||
| metrics_collector.record_hop_count(hops=3) | ||
| metrics_collector.record_hop_count(hops=1) | ||
| metrics_collector.record_hop_count(hops=10) | ||
| assert True | ||
|
|
||
| def test_record_snr(self, metrics_collector): | ||
| """Test recording SNR measurements.""" | ||
| metrics_collector.record_snr(snr=15.5) | ||
| metrics_collector.record_snr(snr=-5.0) | ||
| assert True | ||
|
|
||
| def test_record_rssi(self, metrics_collector): | ||
| """Test recording RSSI measurements.""" | ||
| metrics_collector.record_rssi(rssi=-80.0) | ||
| metrics_collector.record_rssi(rssi=-110.0) | ||
| assert True | ||
|
|
||
| def test_update_battery(self, metrics_collector): | ||
| """Test updating battery metrics.""" | ||
| metrics_collector.update_battery(voltage=3.7) | ||
| metrics_collector.update_battery(percentage=85) | ||
| metrics_collector.update_battery(voltage=4.2, percentage=100) | ||
| metrics_collector.update_battery() # No values | ||
| assert True | ||
|
|
||
| def test_update_storage(self, metrics_collector): | ||
| """Test updating storage metrics.""" | ||
| metrics_collector.update_storage(used=1024000) | ||
| metrics_collector.update_storage(total=4096000) | ||
| metrics_collector.update_storage(used=2048000, total=4096000) | ||
| metrics_collector.update_storage() # No values | ||
| assert True | ||
|
|
||
| def test_update_radio_stats(self, metrics_collector): | ||
| """Test updating radio statistics.""" | ||
| metrics_collector.update_radio_stats(noise_floor=-95.0) | ||
| metrics_collector.update_radio_stats(airtime=5.5) | ||
| metrics_collector.update_radio_stats(noise_floor=-90.0, airtime=10.2) | ||
| metrics_collector.update_radio_stats() # No values | ||
| assert True | ||
|
|
||
| def test_record_packet(self, metrics_collector): | ||
| """Test recording packet transmissions.""" | ||
| metrics_collector.record_packet(direction="tx", status="success") | ||
| metrics_collector.record_packet(direction="rx", status="success") | ||
| metrics_collector.record_packet(direction="tx", status="failed") | ||
| assert True | ||
|
|
||
| def test_update_db_table_rows(self, metrics_collector): | ||
| """Test updating database table row counts.""" | ||
| metrics_collector.update_db_table_rows(table="nodes", count=100) | ||
| metrics_collector.update_db_table_rows(table="messages", count=5000) | ||
| assert True | ||
|
|
||
| def test_update_db_size(self, metrics_collector): | ||
| """Test updating database size.""" | ||
| metrics_collector.update_db_size(size_bytes=1048576) | ||
| assert True | ||
|
|
||
| def test_record_cleanup(self, metrics_collector): | ||
| """Test recording cleanup operations.""" | ||
| metrics_collector.record_cleanup(table="messages", count=100) | ||
| metrics_collector.record_cleanup(table="events_log", count=500) | ||
| assert True | ||
|
|
||
| def test_set_connection_status(self, metrics_collector): | ||
| """Test setting connection status.""" | ||
| metrics_collector.set_connection_status(connected=True) | ||
| metrics_collector.set_connection_status(connected=False) | ||
| assert True | ||
|
|
||
| def test_record_error(self, metrics_collector): | ||
| """Test recording errors.""" | ||
| metrics_collector.record_error(component="database", error_type="connection") | ||
| metrics_collector.record_error(component="meshcore", error_type="timeout") | ||
| assert True | ||
|
|
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test methods don't actually verify that the metrics are being recorded correctly. They just call the methods and assert True, which doesn't validate any behavior. Consider verifying that:
- The counter/gauge/histogram values have changed appropriately after calling the recording methods
- The metrics are registered with the correct labels
- Multiple calls accumulate values correctly (for counters)
Example:
def test_record_event(self, metrics_collector):
"""Test recording events increments counter."""
# Get initial value
initial = metrics_collector.events_total.labels(event_type="TEST_EVENT")._value.get()
# Record events
metrics_collector.record_event("TEST_EVENT")
metrics_collector.record_event("TEST_EVENT")
# Verify counter incremented
final = metrics_collector.events_total.labels(event_type="TEST_EVENT")._value.get()
assert final == initial + 2| async def test_worker_handles_command_error(self, mock_meshcore): | ||
| """Test worker handles command execution errors.""" | ||
| mock_meshcore.send_message = AsyncMock(side_effect=Exception("Network error")) | ||
|
|
||
| manager = CommandQueueManager( | ||
| meshcore=mock_meshcore, | ||
| rate_limit_per_second=100.0, | ||
| debounce_enabled=False, | ||
| ) | ||
| await manager.start() | ||
|
|
||
| await manager.enqueue( | ||
| CommandType.SEND_MESSAGE, | ||
| {"destination": "a" * 64, "text": "Hello"}, | ||
| ) | ||
|
|
||
| # Should not crash, just log error | ||
| await asyncio.sleep(0.1) | ||
| await manager.stop() |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't meaningfully verify error handling. It enqueues a command that will fail, waits 0.1 seconds, and expects the test not to crash. However, it doesn't verify:
- That the error was actually logged
- That the queue continued processing after the error
- That appropriate error metrics were recorded
Consider adding assertions to verify the error was handled gracefully, such as checking that the worker task is still running after the error or that subsequent commands can still be processed.
|
|
||
| import os | ||
| import tempfile | ||
| from pathlib import Path |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'Path' is not used.
| from pathlib import Path |
| from prometheus_client import REGISTRY | ||
|
|
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'REGISTRY' is not used.
| from prometheus_client import REGISTRY |
|
|
||
| from datetime import datetime | ||
|
|
||
| import pytest |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'pytest' is not used.
| import pytest |
| @@ -0,0 +1,324 @@ | |||
| """Unit tests for webhook handler.""" | |||
|
|
|||
| import asyncio | |||
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'asyncio' is not used.
| import asyncio |
| """Unit tests for webhook handler.""" | ||
|
|
||
| import asyncio | ||
| import json |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'json' is not used.
| import json |
Add comprehensive unit tests for multiple modules:
Overall test coverage improved from 29% to 38%.