diff --git a/src/juno/core/memory/memory_layer.py b/src/juno/core/memory/memory_layer.py index f267d478f..608addac9 100644 --- a/src/juno/core/memory/memory_layer.py +++ b/src/juno/core/memory/memory_layer.py @@ -50,6 +50,14 @@ def __init__(self, db_path: str = "juno_memory.db"): self.db_path = db_path self._init_database() + def initialize(self) -> None: + """Public initialization hook for compatibility with older tests.""" + self._init_database() + + def close(self) -> None: + """Placeholder close method for API parity.""" + pass + @staticmethod def _json_serializer(obj: Any) -> Any: """Serialize objects that are not JSON serializable by default.""" diff --git a/tests/unit/test_integration_workflows.py b/tests/unit/test_integration_workflows.py index 89895dea0..e3f9a075f 100644 --- a/tests/unit/test_integration_workflows.py +++ b/tests/unit/test_integration_workflows.py @@ -514,7 +514,6 @@ def setUp(self): # Initialize components self.memory_layer = MemoryLayer(db_path=self.temp_db.name) - self.memory_layer.initialize() self.reasoning_engine = ReasoningEngine() self.reasoning_engine.initialize() diff --git a/tests/unit/test_memory_layer.py b/tests/unit/test_memory_layer.py index 672ef75cd..e168bbafc 100644 --- a/tests/unit/test_memory_layer.py +++ b/tests/unit/test_memory_layer.py @@ -1,557 +1,69 @@ -""" -JUNO Phase 2: Memory Layer Test Suite -Comprehensive testing for episodic, semantic, procedural, and working memory components -""" - -import pytest -pytest.skip("requires full environment", allow_module_level=True) - -import unittest -import asyncio -import json -import time -from datetime import datetime, timedelta -from typing import Dict, List, Any, Optional -from unittest.mock import Mock, patch, MagicMock import tempfile -import os +from datetime import datetime, timedelta # Import JUNO Phase 2 components -import sys -sys.path.append('../juno-agent/src/phase2') +from juno.core.memory.memory_layer import MemoryLayer, MemoryType, MemoryEntry -from memory_layer import MemoryLayer, MemoryType, MemoryEntry +def create_memory_layer(tmp_path): + return MemoryLayer(db_path=str(tmp_path / "memory.db")) -class TestMemoryLayer(unittest.TestCase): - """Comprehensive test suite for JUNO Memory Layer""" - - def setUp(self): - """Set up test environment before each test""" - self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix='.db') - self.temp_db.close() - - self.memory_layer = MemoryLayer(db_path=self.temp_db.name) - self.memory_layer.initialize() - - # Test data - self.test_team_id = "team_test_001" - self.test_user_id = "user_test_001" - - def tearDown(self): - """Clean up after each test""" - self.memory_layer.close() - os.unlink(self.temp_db.name) - - def test_memory_storage_and_retrieval(self): - """Test basic memory storage and retrieval operations""" - start_time = time.time() - - # Test episodic memory storage - episodic_entry = MemoryEntry( - memory_type=MemoryType.EPISODIC, - team_id=self.test_team_id, - key="sprint_completion_event", - value={ - "sprint_id": "sprint_001", - "completion_date": "2025-06-15", - "velocity": 42, - "success_rate": 0.89 - }, - confidence=0.95, - metadata={"source": "jira", "validated": True} - ) - - # Store memory - storage_start = time.time() - result = self.memory_layer.store_memory(episodic_entry) - storage_time = (time.time() - storage_start) * 1000 # Convert to ms - - self.assertTrue(result) - # Allow a generous threshold for storage timing to avoid flaky tests - self.assertLess(storage_time, 100.0, "Storage operation should be < 100ms") - - # Retrieve memory - retrieval_start = time.time() - retrieved = self.memory_layer.retrieve_memory( - self.test_team_id, - MemoryType.EPISODIC, - "sprint_completion_event" - ) - retrieval_time = (time.time() - retrieval_start) * 1000 # Convert to ms - - self.assertIsNotNone(retrieved) - self.assertEqual(retrieved.value["sprint_id"], "sprint_001") - self.assertEqual(retrieved.confidence, 0.95) - # Retrieval should remain reasonably fast - self.assertLess(retrieval_time, 100.0, "Retrieval operation should be < 100ms") - - total_time = (time.time() - start_time) * 1000 - # Overall test execution should finish quickly but allow leeway - self.assertLess(total_time, 1000, "Total test should complete in < 1000ms") - - def test_pattern_recognition(self): - """Test pattern recognition and learning capabilities""" - start_time = time.time() - - # Store multiple sprint patterns - sprint_patterns = [ - {"sprint_id": f"sprint_{i:03d}", "velocity": 35 + (i % 10), "success": i % 3 == 0} - for i in range(20) - ] - - for i, pattern in enumerate(sprint_patterns): - entry = MemoryEntry( - memory_type=MemoryType.SEMANTIC, - team_id=self.test_team_id, - key=f"velocity_pattern_{i}", - value=pattern, - confidence=0.8 + (i * 0.01) - ) - self.memory_layer.store_memory(entry) - - # Test pattern recognition - patterns = self.memory_layer.recognize_patterns( - self.test_team_id, - pattern_type="velocity_trends" - ) - - self.assertIsNotNone(patterns) - self.assertGreater(len(patterns), 0) - - # Verify pattern recognition performance - pattern_time = (time.time() - start_time) * 1000 - # Allow more room for performance variations - self.assertLess(pattern_time, 1000, "Pattern recognition should complete in < 1000ms") - - def test_preference_learning(self): - """Test team preference learning and adaptation""" - start_time = time.time() - - # Simulate team preference data - preferences = [ - {"preference": "sprint_length", "value": 14, "frequency": 15}, - {"preference": "daily_standup_time", "value": "09:00", "frequency": 12}, - {"preference": "retrospective_format", "value": "structured", "frequency": 8}, - {"preference": "story_point_scale", "value": "fibonacci", "frequency": 18} - ] - - for pref in preferences: - entry = MemoryEntry( - memory_type=MemoryType.PROCEDURAL, - team_id=self.test_team_id, - key=f"team_preference_{pref['preference']}", - value=pref, - confidence=min(0.95, pref['frequency'] / 20.0) - ) - self.memory_layer.store_memory(entry) - - # Test preference retrieval and learning - learned_prefs = self.memory_layer.get_team_preferences(self.test_team_id) - - self.assertIsNotNone(learned_prefs) - self.assertIn("sprint_length", learned_prefs) - self.assertEqual(learned_prefs["sprint_length"]["value"], 14) - - learning_time = (time.time() - start_time) * 1000 - # Allow sufficient time for preference learning without flakiness - self.assertLess(learning_time, 1000, "Preference learning should complete in < 1000ms") - - def test_memory_expiration(self): - """Test memory expiration and cleanup mechanisms""" - start_time = time.time() - - # Store memory with short expiration - short_term_entry = MemoryEntry( - memory_type=MemoryType.WORKING, - team_id=self.test_team_id, - key="temporary_context", - value={"context": "test_context", "active": True}, - confidence=0.9, - expires_at=datetime.now() + timedelta(seconds=1) - ) - - # Store memory with long expiration - long_term_entry = MemoryEntry( - memory_type=MemoryType.SEMANTIC, - team_id=self.test_team_id, - key="permanent_knowledge", - value={"knowledge": "team_expertise", "domain": "backend"}, - confidence=0.95, - expires_at=datetime.now() + timedelta(days=365) - ) - - self.memory_layer.store_memory(short_term_entry) - self.memory_layer.store_memory(long_term_entry) - - # Verify both memories exist - short_retrieved = self.memory_layer.retrieve_memory( - self.test_team_id, MemoryType.WORKING, "temporary_context" - ) - long_retrieved = self.memory_layer.retrieve_memory( - self.test_team_id, MemoryType.SEMANTIC, "permanent_knowledge" - ) - - self.assertIsNotNone(short_retrieved) - self.assertIsNotNone(long_retrieved) - - # Wait for expiration - time.sleep(1.5) - - # Run cleanup - cleanup_start = time.time() - cleaned_count = self.memory_layer.cleanup_expired_memories() - cleanup_time = (time.time() - cleanup_start) * 1000 - - # Verify short-term memory expired, long-term persists - short_after_cleanup = self.memory_layer.retrieve_memory( - self.test_team_id, MemoryType.WORKING, "temporary_context" - ) - long_after_cleanup = self.memory_layer.retrieve_memory( - self.test_team_id, MemoryType.SEMANTIC, "permanent_knowledge" - ) - - self.assertIsNone(short_after_cleanup) - self.assertIsNotNone(long_after_cleanup) - self.assertGreaterEqual(cleaned_count, 1) - # Cleanup should finish reasonably fast - self.assertLess(cleanup_time, 1000, "Memory cleanup should complete in < 1000ms") - - total_time = (time.time() - start_time) * 1000 - # Provide leeway for total expiration test duration - self.assertLess(total_time, 1500, "Total expiration test should complete in < 1500ms") - - def test_concurrent_access(self): - """Test concurrent memory access and thread safety""" - start_time = time.time() - - import threading - import queue - - results_queue = queue.Queue() - error_queue = queue.Queue() - - def concurrent_memory_operation(thread_id): - try: - # Each thread stores and retrieves memory - entry = MemoryEntry( - memory_type=MemoryType.EPISODIC, - team_id=self.test_team_id, - key=f"concurrent_test_{thread_id}", - value={"thread_id": thread_id, "timestamp": time.time()}, - confidence=0.9 - ) - - # Store - store_result = self.memory_layer.store_memory(entry) - - # Retrieve - retrieve_result = self.memory_layer.retrieve_memory( - self.test_team_id, - MemoryType.EPISODIC, - f"concurrent_test_{thread_id}" - ) - - results_queue.put({ - "thread_id": thread_id, - "store_success": store_result, - "retrieve_success": retrieve_result is not None, - "data_integrity": retrieve_result.value["thread_id"] == thread_id if retrieve_result else False - }) - - except Exception as e: - error_queue.put({"thread_id": thread_id, "error": str(e)}) - - # Launch concurrent threads - threads = [] - thread_count = 10 - - for i in range(thread_count): - thread = threading.Thread(target=concurrent_memory_operation, args=(i,)) - threads.append(thread) - thread.start() - - # Wait for all threads to complete - for thread in threads: - thread.join() - - # Verify results - self.assertTrue(error_queue.empty(), f"Concurrent access errors: {list(error_queue.queue)}") - - results = [] - while not results_queue.empty(): - results.append(results_queue.get()) - - self.assertEqual(len(results), thread_count) - - for result in results: - self.assertTrue(result["store_success"]) - self.assertTrue(result["retrieve_success"]) - self.assertTrue(result["data_integrity"]) - - concurrent_time = (time.time() - start_time) * 1000 - # Allow more time for thread scheduling variations - self.assertLess(concurrent_time, 2000, "Concurrent access test should complete in < 2000ms") - - def test_memory_cleanup(self): - """Test memory cleanup and maintenance operations""" - start_time = time.time() - - # Store various types of memory entries - test_entries = [] - for i in range(50): - entry = MemoryEntry( - memory_type=MemoryType.WORKING if i % 2 == 0 else MemoryType.EPISODIC, - team_id=self.test_team_id, - key=f"cleanup_test_{i}", - value={"index": i, "data": f"test_data_{i}"}, - confidence=0.5 + (i % 5) * 0.1, - expires_at=datetime.now() + timedelta(seconds=i % 3) # Some expire quickly - ) - test_entries.append(entry) - self.memory_layer.store_memory(entry) - - # Wait for some entries to expire - time.sleep(2) - - # Perform cleanup - cleanup_start = time.time() - cleaned_count = self.memory_layer.cleanup_expired_memories() - cleanup_time = (time.time() - cleanup_start) * 1000 - - # Verify cleanup performance - self.assertGreater(cleaned_count, 0) - # Give ample room for cleanup operations on slower systems - self.assertLess(cleanup_time, 1000, "Memory cleanup should complete in < 1000ms") - - # Test low-confidence memory cleanup - low_confidence_cleaned = self.memory_layer.cleanup_low_confidence_memories(threshold=0.7) - self.assertGreaterEqual(low_confidence_cleaned, 0) - - total_time = (time.time() - start_time) * 1000 - # Total duration can vary depending on environment - self.assertLess(total_time, 1500, "Total cleanup test should complete in < 1500ms") - - def test_data_consistency(self): - """Test data consistency and integrity across operations""" - start_time = time.time() - - # Store complex nested data - complex_data = { - "team_metrics": { - "velocity": [35, 42, 38, 45, 41], - "quality_score": 0.89, - "satisfaction": 4.2 - }, - "sprint_history": [ - {"id": "sprint_001", "completed": True, "velocity": 42}, - {"id": "sprint_002", "completed": True, "velocity": 38}, - {"id": "sprint_003", "completed": False, "velocity": 0} - ], - "team_composition": { - "developers": 5, - "qa": 2, - "pm": 1, - "skills": ["python", "react", "aws", "kubernetes"] - } - } - - entry = MemoryEntry( - memory_type=MemoryType.SEMANTIC, - team_id=self.test_team_id, - key="complex_team_data", - value=complex_data, - confidence=0.95 - ) - - # Store and retrieve - store_result = self.memory_layer.store_memory(entry) - retrieved = self.memory_layer.retrieve_memory( - self.test_team_id, - MemoryType.SEMANTIC, - "complex_team_data" - ) - - # Verify data integrity - self.assertTrue(store_result) - self.assertIsNotNone(retrieved) - self.assertEqual(retrieved.value["team_metrics"]["velocity"], complex_data["team_metrics"]["velocity"]) - self.assertEqual(len(retrieved.value["sprint_history"]), 3) - self.assertEqual(retrieved.value["team_composition"]["skills"], complex_data["team_composition"]["skills"]) - - # Test data consistency after updates - updated_data = complex_data.copy() - updated_data["team_metrics"]["velocity"].append(47) - - updated_entry = MemoryEntry( - memory_type=MemoryType.SEMANTIC, - team_id=self.test_team_id, - key="complex_team_data", - value=updated_data, - confidence=0.96 - ) - - update_result = self.memory_layer.update_memory(updated_entry) - updated_retrieved = self.memory_layer.retrieve_memory( - self.test_team_id, - MemoryType.SEMANTIC, - "complex_team_data" - ) - - self.assertTrue(update_result) - self.assertEqual(len(updated_retrieved.value["team_metrics"]["velocity"]), 6) - self.assertEqual(updated_retrieved.confidence, 0.96) - - consistency_time = (time.time() - start_time) * 1000 - # Data consistency checks may vary in speed - self.assertLess(consistency_time, 1000, "Data consistency test should complete in < 1000ms") - - def test_performance_optimization(self): - """Test memory layer performance optimization and caching""" - start_time = time.time() - - # Store multiple entries for performance testing - entries = [] - for i in range(100): - entry = MemoryEntry( - memory_type=MemoryType.EPISODIC, - team_id=self.test_team_id, - key=f"perf_test_{i}", - value={"index": i, "data": f"performance_data_{i}" * 10}, # Larger data - confidence=0.8 + (i % 20) * 0.01 - ) - entries.append(entry) - - # Batch storage performance test - batch_start = time.time() - batch_results = self.memory_layer.store_memories_batch(entries) - batch_time = (time.time() - batch_start) * 1000 - - self.assertEqual(len(batch_results), 100) - self.assertTrue(all(batch_results)) - # Batch operations might take longer on shared CI runners - self.assertLess(batch_time, 2000, "Batch storage should complete in < 2000ms") - - # Cache performance test - multiple retrievals of same data - cache_test_key = "perf_test_50" - cache_times = [] - - for _ in range(10): - cache_start = time.time() - retrieved = self.memory_layer.retrieve_memory( - self.test_team_id, - MemoryType.EPISODIC, - cache_test_key - ) - cache_time = (time.time() - cache_start) * 1000 - cache_times.append(cache_time) - self.assertIsNotNone(retrieved) - - # Verify caching improves performance (later retrievals should be faster) - avg_early = sum(cache_times[:3]) / 3 - avg_late = sum(cache_times[-3:]) / 3 - - # Search performance test - search_start = time.time() - search_results = self.memory_layer.search_memories( - self.test_team_id, - query="performance_data", - limit=20 - ) - search_time = (time.time() - search_start) * 1000 - - self.assertGreater(len(search_results), 0) - self.assertLessEqual(len(search_results), 20) - # Searching the memory database should still be reasonably quick - self.assertLess(search_time, 1000, "Memory search should complete in < 1000ms") - - total_time = (time.time() - start_time) * 1000 - # Overall performance test can tolerate some variance - self.assertLess(total_time, 2000, "Total performance test should complete in < 2000ms") +def test_store_and_retrieve_memory(tmp_path): + layer = create_memory_layer(tmp_path) + entry = MemoryEntry( + id="entry1", + memory_type=MemoryType.EPISODIC, + content={"event": "unit_test"}, + context={"user": "tester"}, + confidence=0.9, + timestamp=datetime.now(), + tags=["test"], + ) + assert layer.store_memory(entry) + memories = layer.retrieve_memories(memory_type=MemoryType.EPISODIC) + assert any(m.id == "entry1" for m in memories) -class TestMemoryLayerIntegration(unittest.TestCase): - """Integration tests for Memory Layer with other JUNO components""" - - def setUp(self): - """Set up integration test environment""" - self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix='.db') - self.temp_db.close() - - self.memory_layer = MemoryLayer(db_path=self.temp_db.name) - self.memory_layer.initialize() - - def tearDown(self): - """Clean up integration test environment""" - self.memory_layer.close() - os.unlink(self.temp_db.name) - - def test_memory_reasoning_integration(self): - """Test integration between Memory Layer and Reasoning Engine""" - # Store decision context in memory - decision_context = { - "decision_type": "sprint_risk_assessment", - "factors": ["velocity_drop", "scope_increase", "team_capacity"], - "confidence": 0.87, - "outcome": "medium_risk", - "reasoning": "Velocity decreased 15% while scope increased 8%" - } - - entry = MemoryEntry( - memory_type=MemoryType.EPISODIC, - team_id="team_integration_test", - key="decision_context_001", - value=decision_context, - confidence=0.87 - ) - - result = self.memory_layer.store_memory(entry) - self.assertTrue(result) - - # Retrieve for reasoning engine use - retrieved = self.memory_layer.retrieve_memory( - "team_integration_test", - MemoryType.EPISODIC, - "decision_context_001" - ) - - self.assertIsNotNone(retrieved) - self.assertEqual(retrieved.value["outcome"], "medium_risk") - self.assertGreater(retrieved.confidence, 0.8) +def test_update_confidence(tmp_path): + layer = create_memory_layer(tmp_path) + entry = MemoryEntry( + id="conf", + memory_type=MemoryType.EPISODIC, + content={}, + context={}, + confidence=0.5, + timestamp=datetime.now(), + ) + layer.store_memory(entry) + assert layer.update_confidence("conf", 0.8) + memories = layer.retrieve_memories(memory_type=MemoryType.EPISODIC) + assert memories[0].confidence == 0.8 -if __name__ == '__main__': - # Configure logging for test execution - import logging - logging.basicConfig(level=logging.INFO) - - # Create test suite - test_suite = unittest.TestSuite() - - # Add all test cases - test_suite.addTest(unittest.makeSuite(TestMemoryLayer)) - test_suite.addTest(unittest.makeSuite(TestMemoryLayerIntegration)) - - # Run tests with detailed output - runner = unittest.TextTestRunner(verbosity=2, stream=sys.stdout) - result = runner.run(test_suite) - - # Print summary - print(f"\n{'='*60}") - print(f"JUNO Memory Layer Test Results") - print(f"{'='*60}") - print(f"Tests Run: {result.testsRun}") - print(f"Failures: {len(result.failures)}") - print(f"Errors: {len(result.errors)}") - print(f"Success Rate: {((result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun * 100):.1f}%") - - if result.failures: - print(f"\nFailures:") - for test, traceback in result.failures: - print(f" - {test}: {traceback}") - - if result.errors: - print(f"\nErrors:") - for test, traceback in result.errors: - print(f" - {test}: {traceback}") +def test_cleanup_expired(tmp_path): + layer = create_memory_layer(tmp_path) + expired = MemoryEntry( + id="old", + memory_type=MemoryType.EPISODIC, + content={}, + context={}, + confidence=1.0, + timestamp=datetime.now() - timedelta(days=2), + expires_at=datetime.now() - timedelta(days=1), + ) + valid = MemoryEntry( + id="new", + memory_type=MemoryType.EPISODIC, + content={}, + context={}, + confidence=1.0, + timestamp=datetime.now(), + expires_at=datetime.now() + timedelta(days=1), + ) + layer.store_memory(expired) + layer.store_memory(valid) + deleted = layer.cleanup_expired() + assert deleted >= 1 + remaining = layer.retrieve_memories() + assert any(m.id == "new" for m in remaining)