-
Notifications
You must be signed in to change notification settings - Fork 1
3.0.0 #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds error-handling abstractions and handler registration for queue push/pop and listener on_message failures, migrates per-listener queue access to per-connection send/fetch methods, updates repository interfaces and storage to index connections by connection_id, introduces DeadLetterQueue pattern, and adds tests, mocks, example, and docs updates. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Producer
participant Channel
participant Connection
participant Queue
participant QHandlers as QueuingErrorHandlers
rect rgba(230,240,255,0.5)
note over Producer,Channel: Dispatch -> Connection.send_message
Producer->>Channel: dispatch(msg)
Channel->>Connection: send_message(msg)
alt push OK
Connection->>Queue: push(msg)
Queue-->>Connection: ack
Connection-->>Channel: success
else push fails
Connection->>Queue: push(msg)
Queue-->>Connection: Exception e
Connection->>QHandlers: handle_push_error(msg, e)
Connection-->>Channel: re-raise e
end
end
sequenceDiagram
autonumber
participant Worker
participant Channel
participant Connection
participant Queue
participant Listener
participant LHandlers as ListenerErrorHandlers
participant QHandlers as QueuingErrorHandlers
rect rgba(230,255,230,0.5)
note over Worker,Channel: Delivery -> Connection.fetch_message
Worker->>Channel: deliver_next(listener_id)
Channel->>Connection: fetch_message()
alt pop OK
Connection->>Queue: pop()
Queue-->>Connection: msg
Connection-->>Channel: msg
else pop fails
Connection->>Queue: pop()
Queue-->>Connection: Exception e
Connection->>QHandlers: handle_pop_error(e)
Connection-->>Channel: re-raise e
end
opt deliver to listener
Channel->>Listener: on_message(msg)
alt listener OK
Listener-->>Channel: ack
else listener fails
Listener-->>Channel: Exception e2
Channel->>LHandlers: handle_on_message_error(msg, e2)
Channel-->>Worker: re-raise e2
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🧹 Nitpick comments (12)
update_docs.sh (2)
8-8: Use git add -A on the directory; avoid glob pitfalls
git add docs_archive/"$CURRENT_VERSION"/*fails on empty dirs and relies on shell globbing. Prefer adding the directory.-git add docs_archive/"$CURRENT_VERSION"/* +git add -A "docs_archive/${CURRENT_VERSION}"
2-2: Fail fast and avoid disabling SC2164Instead of disabling SC2164 globally, enable strict mode and guard cd.
-# shellcheck disable=SC2164 +set -euo pipefailAnd change later (outside this hunk):
cd docs || exit 1eric_sse/handlers.py (2)
4-5: Remove unused logger to satisfy linters
loggerisn’t used in this module.-from eric_sse import get_logger -logger = get_logger()
9-12: Add explicit return types for handler methodsImproves readability and static checking.
- def handle_push_error(self, msg: MessageContract, exception: Exception): + def handle_push_error(self, msg: MessageContract, exception: Exception) -> None: pass - def handle_pop_error(self, exception: Exception): + def handle_pop_error(self, exception: Exception) -> None: pass ... - def handle_on_message_error(self, msg: MessageContract, exception: Exception): + def handle_on_message_error(self, msg: MessageContract, exception: Exception) -> None: passAlso applies to: 16-17
eric_sse/interfaces.py (1)
66-67: Docstring no longer matches signatureNow only
connection_idis required.- """Loads a connection given the connection and channel id it belongs to.""" + """Loads a connection given its id."""eric_sse/patterns.py (1)
12-16: Use logger.exception and include the original exceptionAddresses lints (ARG002, TRY400) and preserves context about the initial push failure.
- def handle_push_error(self, msg: MessageContract, exception: Exception): + def handle_push_error(self, msg: MessageContract, exception: Exception) -> None: try: self.__queue.push(msg) - except Exception as e: - logger.error(e) + except Exception: + logger.exception("Dead-letter requeue failed; original error: %r", exception)eric_sse/connection.py (2)
52-53: Minor: fix extra space in signature.- def register_queuing_error_handler(self, handler: QueuingErrorHandler): + def register_queuing_error_handler(self, handler: QueuingErrorHandler):
21-21: Consider preserving registration order and avoiding hashability constraints.Sets make handler call order non-deterministic and require hashability (mocks/custom classes can surprise). A list with de-dupe is simpler.
- self.__queues_error_handlers: set[QueuingErrorHandler] = set() + self.__queues_error_handlers: list[QueuingErrorHandler] = [] @@ - def register_queuing_error_handler(self, handler: QueuingErrorHandler): - self.__queues_error_handlers.add(handler) + def register_queuing_error_handler(self, handler: QueuingErrorHandler): + if handler not in self.__queues_error_handlers: + self.__queues_error_handlers.append(handler)Also applies to: 52-53
test/test_repository.py (1)
72-84: Stabilize delete path by stubbingfetch_oneresult.Prevents MagicMock leaking into key formatting; strengthens the assertion semantics.
def test_deletions_are_delegated_to_composites(self): connection = Connection( listener=MessageQueueListener(), queue=InMemoryQueue(), ) - self.sut.persist(channel_id='fake_channel', connection=connection) + self.sut.persist(channel_id='fake_channel', connection=connection) + # ensure repository can resolve channel_id during delete + self.storage.fetch_one.return_value = {'ch_id': 'fake_channel'} self.sut.delete(connection_id=connection.id) self.listeners_repository.delete.assert_called_once_with(connection_id=connection.id) self.queues_repository.delete.assert_called_once_with(connection_id=connection.id) self.storage.delete.assert_called()eric_sse/entities.py (3)
39-44: Standardize InvalidListenerException construction.
Use the same pattern as remove_listener: pass listener_id and suppress context.- except KeyError: - raise InvalidListenerException(f"Invalid listener {listener_id}") from None + except KeyError: + raise InvalidListenerException(listener_id) from None
184-189: Improve exception handling and logging in dispatch.
Avoid blindexcept Exception, uselogger.exception, and re-raise with bareraise.- try: - self._get_connection(listener_id).send_message(msg) - except Exception as e: - logger.error(repr(e)) - raise e from None + try: + self._get_connection(listener_id).send_message(msg) + except Exception: + logger.exception(f"Dispatch to {listener_id} failed for {msg!r}") + raise
33-37: Minor: align with B904 and include id in exception.
Consistent with other methods and clearer error reporting.- except KeyError: - raise InvalidListenerException + except KeyError: + raise InvalidListenerException(listener_id) from None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
eric_sse/connection.py(3 hunks)eric_sse/entities.py(4 hunks)eric_sse/handlers.py(1 hunks)eric_sse/interfaces.py(3 hunks)eric_sse/patterns.py(1 hunks)eric_sse/prefabs.py(2 hunks)eric_sse/repository.py(5 hunks)examples/producer_consumer.py(1 hunks)test/mock/connection.py(1 hunks)test/test_error_handlers.py(1 hunks)test/test_patterns.py(1 hunks)test/test_repository.py(5 hunks)test/test_sse_channel_repository.py(1 hunks)update_docs.sh(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (12)
eric_sse/handlers.py (2)
eric_sse/message.py (1)
MessageContract(4-22)eric_sse/__init__.py (1)
get_logger(5-16)
test/test_error_handlers.py (5)
eric_sse/handlers.py (4)
ListenerErrorHandler(14-17)QueuingErrorHandler(7-12)handle_pop_error(11-12)handle_on_message_error(16-17)eric_sse/message.py (1)
Message(25-39)test/mock/channel.py (1)
FakeChannel(10-12)test/mock/connection.py (3)
BrokenListener(8-11)BrokenQueue(14-26)BrokenConnectionFactory(29-47)eric_sse/entities.py (6)
add_listener(133-137)dispatch(181-190)id(82-84)deliver_next(159-176)register_listener_error_handler(153-154)register_listener(140-143)
eric_sse/prefabs.py (3)
eric_sse/entities.py (2)
_get_connection(178-179)id(82-84)eric_sse/connection.py (3)
listener(24-25)id(32-33)fetch_message(44-50)eric_sse/repository.py (2)
connections_factory(166-167)connections_repository(83-84)
eric_sse/patterns.py (6)
eric_sse/handlers.py (1)
QueuingErrorHandler(7-12)eric_sse/message.py (1)
MessageContract(4-22)eric_sse/queues.py (1)
Queue(8-22)eric_sse/__init__.py (1)
get_logger(5-16)eric_sse/connection.py (1)
queue(28-29)test/mock/connection.py (1)
push(24-26)
eric_sse/connection.py (3)
eric_sse/message.py (1)
MessageContract(4-22)eric_sse/queues.py (6)
Queue(8-22)InMemoryQueue(24-42)push(21-22)push(38-42)pop(12-18)pop(31-36)eric_sse/handlers.py (4)
QueuingErrorHandler(7-12)ListenerErrorHandler(14-17)handle_push_error(9-10)handle_pop_error(11-12)
test/test_patterns.py (4)
eric_sse/connection.py (4)
Connection(9-53)listener(24-25)queue(28-29)register_queuing_error_handler(52-53)eric_sse/patterns.py (1)
DeadLetterQueueHandler(8-16)test/mock/channel.py (1)
FakeChannel(10-12)test/mock/connection.py (2)
BrokenQueue(14-26)push(24-26)
test/test_sse_channel_repository.py (3)
eric_sse/repository.py (2)
queues_repository(158-159)connections_factory(166-167)eric_sse/interfaces.py (2)
queues_repository(52-53)connections_factory(47-48)eric_sse/connection.py (1)
InMemoryConnectionsFactory(67-72)
eric_sse/interfaces.py (3)
eric_sse/repository.py (7)
connections_factory(166-167)load_one(107-111)load_one(180-181)delete(35-37)delete(63-66)delete(126-133)delete(191-198)eric_sse/connection.py (2)
ConnectionsFactory(56-64)Connection(9-53)test/test_sse_channel_repository.py (2)
delete(20-21)delete(33-34)
eric_sse/entities.py (5)
eric_sse/handlers.py (2)
ListenerErrorHandler(14-17)handle_on_message_error(16-17)eric_sse/__init__.py (1)
get_logger(5-16)eric_sse/listener.py (2)
MessageQueueListener(7-29)on_message(18-20)eric_sse/connection.py (5)
Connection(9-53)listener(24-25)id(32-33)fetch_message(44-50)send_message(35-41)eric_sse/exception.py (2)
InvalidListenerException(7-8)NoMessagesException(11-13)
test/mock/connection.py (5)
eric_sse/connection.py (5)
ConnectionsFactory(56-64)Connection(9-53)listener(24-25)queue(28-29)register_queuing_error_handler(52-53)eric_sse/handlers.py (2)
ListenerErrorHandler(14-17)QueuingErrorHandler(7-12)eric_sse/listener.py (1)
MessageQueueListener(7-29)eric_sse/message.py (2)
MessageContract(4-22)Message(25-39)eric_sse/queues.py (1)
Queue(8-22)
test/test_repository.py (4)
eric_sse/connection.py (4)
Connection(9-53)ConnectionsFactory(56-64)listener(24-25)id(32-33)eric_sse/repository.py (14)
ConnectionRepository(135-198)KvStorage(11-37)InMemoryStorage(40-66)connections_factory(166-167)listeners_repository(162-163)queues_repository(158-159)upsert(25-27)upsert(54-55)delete(35-37)delete(63-66)delete(126-133)delete(191-198)load_one(107-111)load_one(180-181)eric_sse/interfaces.py (11)
ListenerRepositoryInterface(27-41)QueueRepositoryInterface(9-24)connections_factory(47-48)listeners_repository(57-58)queues_repository(52-53)delete(22-24)delete(39-41)delete(76-78)delete(106-108)load_one(66-68)load_one(96-98)eric_sse/exception.py (2)
ItemNotFound(29-32)InvalidChannelException(1-2)
eric_sse/repository.py (3)
eric_sse/exception.py (2)
ItemNotFound(29-32)InvalidChannelException(1-2)eric_sse/interfaces.py (19)
ConnectionRepositoryInterface(44-78)delete(22-24)delete(39-41)delete(76-78)delete(106-108)load_one(66-68)load_one(96-98)load_all(61-63)load_all(91-93)queues_repository(52-53)QueueRepositoryInterface(9-24)connections_factory(47-48)listeners_repository(57-58)load(12-14)load(29-31)persist(17-19)persist(34-36)persist(71-73)persist(101-103)eric_sse/connection.py (5)
id(32-33)ConnectionsFactory(56-64)Connection(9-53)listener(24-25)queue(28-29)
🪛 Ruff (0.12.2)
test/test_error_handlers.py
30-30: Do not catch blind exception: Exception
(BLE001)
49-49: Do not catch blind exception: Exception
(BLE001)
68-68: Do not catch blind exception: Exception
(BLE001)
eric_sse/patterns.py
12-12: Unused method argument: exception
(ARG002)
15-15: Do not catch blind exception: Exception
(BLE001)
16-16: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
eric_sse/connection.py
41-41: Use raise without specifying exception name
Remove exception name
(TRY201)
50-50: Use raise without specifying exception name
Remove exception name
(TRY201)
eric_sse/entities.py
37-37: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
43-43: Avoid specifying long messages outside the exception class
(TRY003)
173-173: Use raise without specifying exception name
Remove exception name
(TRY201)
186-186: Do not catch blind exception: Exception
(BLE001)
187-187: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
test/mock/connection.py
10-10: Unused method argument: msg
(ARG002)
11-11: Create your own exception
(TRY002)
21-21: Create your own exception
(TRY002)
24-24: Unused method argument: message
(ARG002)
26-26: Create your own exception
(TRY002)
34-34: Do not perform function call BrokenListener in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
34-34: Do not perform function call BrokenQueue in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
44-44: Unused method argument: listener
(ARG002)
🔇 Additional comments (11)
eric_sse/interfaces.py (1)
45-49: No action needed: implementations align with updated API
ConnectionRepository defines the connections_factory property and the load_one/delete methods per the interface, and FakeConnectionRepository inherits them.test/test_sse_channel_repository.py (1)
61-63: Wiring looks correctInjecting
queues_repositoryandconnections_factoryvia theFakeConnectionRepositoryaligns with the new interfaces.eric_sse/prefabs.py (2)
83-83: Switched to connection.fetch_message(): goodThis matches the new connection-centric API and centralizes queue error handling.
180-180: Repository now supplies connections_factory: goodKeeps construction concerns in the repository; no change needed here.
eric_sse/entities.py (3)
11-11: LGTM: import and docstring updates are consistent with the new connections model.Also applies to: 16-16
77-79: LGTM: listener error-handling surface.
Set-based registry and registration method look good.Also applies to: 153-155
178-180: LGTM: private connection accessor.
Matches the new manager API.eric_sse/repository.py (4)
6-6: Import surface OK.
No action.
77-78: LGTM: Channel repo now depends only on the connections repository.
Matches the refactored design.
124-124: LGTM: updated delete calls to new signature.
Consistent with ConnectionRepository.delete(connection_id).Also applies to: 132-132
142-153: LGTM: constructor and surfaced connections_factory comply with the interface.
Dependencies injected explicitly; load_one by connection_id matches the new API.Also applies to: 166-168, 180-182
eric_sse/connection.py
Outdated
| except Exception as e: | ||
| for handler in self.__queues_error_handlers: | ||
| handler.handle_push_error(msg=msg, exception=e) | ||
| raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Preserve traceback: use bare raise, not raise e.
Re-raising with the exception variable discards the original traceback. Also flagged by Ruff TRY201.
except Exception as e:
for handler in self.__queues_error_handlers:
handler.handle_push_error(msg=msg, exception=e)
- raise e
+ raise
@@
except Exception as e:
for handler in self.__queues_error_handlers:
handler.handle_pop_error(exception=e)
- raise e
+ raiseAlso applies to: 47-50
🧰 Tools
🪛 Ruff (0.12.2)
41-41: Use raise without specifying exception name
Remove exception name
(TRY201)
🤖 Prompt for AI Agents
In eric_sse/connection.py around lines 38-41 and 47-50, the except blocks
re-raise the caught exception using "raise e", which discards the original
traceback; change those re-raises to a bare "raise" so the original traceback is
preserved (i.e., after calling the handlers, use "raise" instead of "raise e").
test/test_error_handlers.py
Outdated
| failed = False | ||
| try: | ||
| channel.dispatch(listener_id=my_listener.id, msg=Message(msg_type='test')) | ||
| channel.deliver_next(listener_id=my_listener.id) | ||
| except Exception as e: | ||
| failed = True | ||
| self.queues_handler_mock.handle_pop_error.assert_called_once_with(exception=e) | ||
|
|
||
| if not failed: | ||
| self.fail('Exception was not raised') | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use assertRaises context (pop error case).
- failed = False
- try:
- channel.dispatch(listener_id=my_listener.id, msg=Message(msg_type='test'))
- channel.deliver_next(listener_id=my_listener.id)
- except Exception as e:
- failed = True
- self.queues_handler_mock.handle_pop_error.assert_called_once_with(exception=e)
-
- if not failed:
- self.fail('Exception was not raised')
+ channel.dispatch(listener_id=my_listener.id, msg=Message(msg_type='test'))
+ with self.assertRaises(Exception) as cm:
+ channel.deliver_next(listener_id=my_listener.id)
+ self.queues_handler_mock.handle_pop_error.assert_called_once_with(exception=cm.exception)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| failed = False | |
| try: | |
| channel.dispatch(listener_id=my_listener.id, msg=Message(msg_type='test')) | |
| channel.deliver_next(listener_id=my_listener.id) | |
| except Exception as e: | |
| failed = True | |
| self.queues_handler_mock.handle_pop_error.assert_called_once_with(exception=e) | |
| if not failed: | |
| self.fail('Exception was not raised') | |
| channel.dispatch(listener_id=my_listener.id, msg=Message(msg_type='test')) | |
| with self.assertRaises(Exception) as cm: | |
| channel.deliver_next(listener_id=my_listener.id) | |
| self.queues_handler_mock.handle_pop_error.assert_called_once_with(exception=cm.exception) |
🧰 Tools
🪛 Ruff (0.12.2)
49-49: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In test/test_error_handlers.py around lines 45 to 55, replace the try/except
manual failure pattern with unittest's assertRaises context manager: wrap the
calls that should raise (channel.dispatch and channel.deliver_next) in a with
self.assertRaises(Exception) as cm: block, remove the failed flag and self.fail
check, and then assert
self.queues_handler_mock.handle_pop_error.assert_called_once_with(exception=cm.exception)
to verify the handler was invoked with the caught exception.
| failed: bool = False | ||
| try: | ||
| channel.dispatch(listener_id=my_listener.id, msg=msg) | ||
| channel.deliver_next(listener_id=my_listener.id) | ||
| except Exception as e: | ||
| failed = True | ||
| self.listeners_handler_mock.handle_on_message_error.assert_called_once_with(msg=msg, exception=e) | ||
|
|
||
| if not failed: | ||
| self.fail('Exception was not raised') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use assertRaises context (listener error case).
- failed: bool = False
- try:
- channel.dispatch(listener_id=my_listener.id, msg=msg)
- channel.deliver_next(listener_id=my_listener.id)
- except Exception as e:
- failed = True
- self.listeners_handler_mock.handle_on_message_error.assert_called_once_with(msg=msg, exception=e)
-
- if not failed:
- self.fail('Exception was not raised')
+ with self.assertRaises(Exception) as cm:
+ channel.dispatch(listener_id=my_listener.id, msg=msg)
+ channel.deliver_next(listener_id=my_listener.id)
+ self.listeners_handler_mock.handle_on_message_error.assert_called_once_with(msg=msg, exception=cm.exception)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| failed: bool = False | |
| try: | |
| channel.dispatch(listener_id=my_listener.id, msg=msg) | |
| channel.deliver_next(listener_id=my_listener.id) | |
| except Exception as e: | |
| failed = True | |
| self.listeners_handler_mock.handle_on_message_error.assert_called_once_with(msg=msg, exception=e) | |
| if not failed: | |
| self.fail('Exception was not raised') | |
| with self.assertRaises(Exception) as cm: | |
| channel.dispatch(listener_id=my_listener.id, msg=msg) | |
| channel.deliver_next(listener_id=my_listener.id) | |
| self.listeners_handler_mock.handle_on_message_error.assert_called_once_with( | |
| msg=msg, | |
| exception=cm.exception | |
| ) |
🧰 Tools
🪛 Ruff (0.12.2)
68-68: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In test/test_error_handlers.py around lines 64 to 73, replace the manual
try/except/fail pattern with unittest's assertRaises context manager: wrap the
channel.dispatch(...) and channel.deliver_next(...) calls in "with
self.assertRaises(Exception) as cm:" then after the block assert that
listeners_handler_mock.handle_on_message_error was called once with msg=msg and
exception=cm.exception; remove the failed flag and the self.fail('Exception was
not raised') branch.
test/test_patterns.py
Outdated
| with pytest.raises(Exception) as e: | ||
| message = Message("test") | ||
| channel.broadcast(message) | ||
| self.logger.error.assert_called_once_with(e.value) | ||
| self.dead_letter_queue.push.assert_called_with(message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertions inside pytest.raises won’t execute
Move assertions outside the context and define message before entering it.
- with pytest.raises(Exception) as e:
- message = Message("test")
- channel.broadcast(message)
- self.logger.error.assert_called_once_with(e.value)
- self.dead_letter_queue.push.assert_called_with(message)
+ message = Message("test")
+ with pytest.raises(Exception) as e:
+ channel.broadcast(message)
+ self.logger.error.assert_called_once_with(e.value)
+ self.dead_letter_queue.push.assert_called_with(message)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with pytest.raises(Exception) as e: | |
| message = Message("test") | |
| channel.broadcast(message) | |
| self.logger.error.assert_called_once_with(e.value) | |
| self.dead_letter_queue.push.assert_called_with(message) | |
| message = Message("test") | |
| with pytest.raises(Exception) as e: | |
| channel.broadcast(message) | |
| self.logger.error.assert_called_once_with(e.value) | |
| self.dead_letter_queue.push.assert_called_with(message) |
🤖 Prompt for AI Agents
In test/test_patterns.py around lines 31 to 35, the assertions are placed inside
the pytest.raises context and the message is created inside it, so they never
run; define message = Message("test") before entering the with
pytest.raises(Exception) as e: block, put only the call that is expected to
raise (channel.broadcast(message)) inside that block, and move the assertions
(self.logger.error.assert_called_once_with(e.value) and
self.dead_letter_queue.push.assert_called_with(message)) to after the with
block.
|
@coderabbitai pause |
✅ Actions performedReviews paused. |
* add doc archiving support in update_docs.sh * DLQ Pattern added * BREAKING: Moved connections_factory attribute from ChannelRepositoryInterface to ConnectionRepositoryInterface * BREAKING updated interfaces signature * introduced error handlers
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (4)
test/test_patterns.py (1)
30-34: Fix pytest.raises scope and unused variable (F841).Move message creation outside the context and drop the unused
as e.- with pytest.raises(Exception) as e: - message = Message("test") - channel.broadcast(message) + message = Message("test") + with pytest.raises(Exception): + channel.broadcast(message)eric_sse/connection.py (1)
45-51: Preserve traceback: use bareraise, notraise e.This matches prior feedback on this file.
try: return self.__queue.pop() except Exception as e: for handler in self.__queues_error_handlers: handler.handle_pop_error(exception=e) - raise e + raisetest/test_error_handlers.py (1)
60-77: Use assertRaises context for listener error case (BLE001).Replace manual try/except + flag with a context manager.
def test_listeners_handler(self): channel = FakeChannel() channel.register_listener_error_handler(self.listeners_handler_mock) my_listener = BrokenListener() my_listener.start() channel.register_listener(my_listener) msg = Message(msg_type='test') - failed: bool = False - try: - channel.dispatch(listener_id=my_listener.id, msg=msg) - channel.deliver_next(listener_id=my_listener.id) - except Exception as e: - failed = True - self.listeners_handler_mock.handle_on_message_error.assert_called_once_with(msg=msg, exception=e) - - if not failed: - self.fail('Exception was not raised') + with self.assertRaises(Exception) as cm: + channel.dispatch(listener_id=my_listener.id, msg=msg) + channel.deliver_next(listener_id=my_listener.id) + self.listeners_handler_mock.handle_on_message_error.assert_called_once_with(msg=msg, exception=cm.exception)eric_sse/entities.py (1)
167-173: Robustness: don’t let a buggy error handler mask the original failure.
Great fix from the earlier review: all handlers run and the original traceback is preserved with a bare raise. Add a guard so exceptions in handlers don’t abort the loop.- try: - listener.on_message(msg) - except Exception as e: - for handler in self.__listeners_error_handlers: - handler.handle_on_message_error(msg=msg, exception=e) - raise + try: + listener.on_message(msg) + except Exception as e: + for handler in self.__listeners_error_handlers: + try: + handler.handle_on_message_error(msg=msg, exception=e) + except Exception: + logger.exception( + "ListenerErrorHandler failed during on_message error handling (listener_id=%s)", + listener.id, + ) + raise
🧹 Nitpick comments (7)
eric_sse/interfaces.py (2)
66-69: Docstring no longer matches signature.Adjust to reflect
connection_id-only API.- def load_one(self, connection_id: str) -> Connection: - """Loads a connection given the connection and channel id it belongs to.""" + def load_one(self, connection_id: str) -> Connection: + """Loads a connection given its id."""
96-108: Fix typos: “its it” → “its id”.Minor text quality fix.
def load_one(self, channel_id: str) -> AbstractChannel: - """Loads a channel given its it""" + """Loads a channel given its id""" @@ def delete(self, channel_id: str): - """Deletes a channel given its it""" + """Deletes a channel given its id"""test/mock/connection.py (1)
10-27: Prefer specific exceptions in test doubles.Custom exceptions make assertions clearer and avoid generic
Exception.+class BrokenPushError(Exception): + pass + +class BrokenPopError(Exception): + pass @@ def pop(self) -> MessageContract: if self.broken_pop: - raise Exception() + raise BrokenPopError() return Message(msg_type='test') @@ def push(self, message: MessageContract) -> None: if self.broken_push: - raise Exception() + raise BrokenPushError()eric_sse/entities.py (4)
16-16: Docstring wording nit.
Slightly clearer phrasing.- """Maintains relationships between listeners and connections.""" + """Maintains relationships between listeners and their connections."""
39-44: Standardize exception raising in get_connection (and align with TRY003).
Prefer passing the id to the exception class; avoid custom messages here.def get_connection(self, listener_id: str) -> Connection: try: return self.__connections[listener_id] except KeyError: - raise InvalidListenerException(f"Invalid listener {listener_id}") from None + raise InvalidListenerException(listener_id) from None
77-79: Handler registry: consider duplicate prevention.
Current list allows the same handler to be registered multiple times.Addressed below in the register method to avoid duplicates.
153-155: Avoid duplicate registrations; add return type.
Prevents multiple invocations of the same handler.- def register_listener_error_handler(self, handler: ListenerErrorHandler): - self.__listeners_error_handlers.append(handler) + def register_listener_error_handler(self, handler: ListenerErrorHandler) -> None: + # Ensure a handler is registered at most once. + if handler not in self.__listeners_error_handlers: + self.__listeners_error_handlers.append(handler)If useful, add an unregister API elsewhere:
def unregister_listener_error_handler(self, handler: ListenerErrorHandler) -> None: try: self.__listeners_error_handlers.remove(handler) except ValueError: ...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
README.md(1 hunks)eric_sse/connection.py(3 hunks)eric_sse/entities.py(4 hunks)eric_sse/handlers.py(1 hunks)eric_sse/interfaces.py(3 hunks)eric_sse/patterns.py(1 hunks)eric_sse/prefabs.py(2 hunks)eric_sse/repository.py(5 hunks)examples/producer_consumer.py(2 hunks)pyproject.toml(1 hunks)test/mock/connection.py(1 hunks)test/test_error_handlers.py(1 hunks)test/test_patterns.py(1 hunks)test/test_repository.py(6 hunks)test/test_sse_channel_repository.py(1 hunks)update_docs.sh(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- pyproject.toml
- README.md
🚧 Files skipped from review as they are similar to previous changes (6)
- eric_sse/handlers.py
- eric_sse/prefabs.py
- examples/producer_consumer.py
- test/test_sse_channel_repository.py
- update_docs.sh
- eric_sse/repository.py
🧰 Additional context used
🪛 Ruff (0.12.2)
eric_sse/entities.py
37-37: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
43-43: Avoid specifying long messages outside the exception class
(TRY003)
186-186: Do not catch blind exception: Exception
(BLE001)
187-187: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
test/mock/connection.py
10-10: Unused method argument: msg
(ARG002)
11-11: Create your own exception
(TRY002)
21-21: Create your own exception
(TRY002)
24-24: Unused method argument: message
(ARG002)
26-26: Create your own exception
(TRY002)
eric_sse/connection.py
50-50: Use raise without specifying exception name
Remove exception name
(TRY201)
eric_sse/patterns.py
12-12: Unused method argument: exception
(ARG002)
15-15: Do not catch blind exception: Exception
(BLE001)
16-16: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
test/test_error_handlers.py
71-71: Do not catch blind exception: Exception
(BLE001)
test/test_patterns.py
30-30: Local variable e is assigned to but never used
Remove assignment to unused variable e
(F841)
🔇 Additional comments (3)
test/test_repository.py (1)
145-185: Stray triple-quoted block disables tests below. Confirm intent.If accidental, remove the block comment to enable
FullPathTestCase.-""" -MessageQueueListenerMock - - - -class FullPathTestCase(IsolatedAsyncioTestCase): - def setUp(self): - self.listeners_repository = MagicMock(spec=ListenerRepositoryInterface) - self.queues_repository = MagicMock(spec=QueueRepositoryInterface) - self.connections_factory = FakeConnectionsFactory() - - def create_sut(self): - connections_repository = ConnectionRepository( - listeners_repository=self.listeners_repository, - queues_repository=self.queues_repository, - connections_factory=self.connections_factory, - storage=InMemoryStorage(), - ) - return FakeChannelRepository( - storage=InMemoryStorage(), - connections_repository=connections_repository - ) - async def test_one(self): - sut = self.create_sut() - channel = FakeChannel() - - listener = channel.add_listener() - message = Message(msg_type='test') - - channel.dispatch(listener.id, message) - channel.broadcast(message) - - sut.persist(channel=channel) - channel_clone = sut.load_one(channel_id=channel.id) - listener_clone = channel_clone.get_listener(listener.id) - - async for received_message in channel_clone.message_stream(listener_clone): - self.assertEqual(received_message.msg_type, 'test') - -"""eric_sse/entities.py (2)
11-11: LGTM: new error-handler import is appropriate.
No concerns with the import or its placement.
178-180: LGTM: private connection helper improves readability.
Simple and clear indirection.
eric_sse/patterns.py
Outdated
| def handle_push_error(self, msg: MessageContract, exception: Exception): | ||
| try: | ||
| self.__queue.push(msg) | ||
| except Exception as e: | ||
| logger.error(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log DLQ failures with traceback; use the original exception to fix ARG002.
Use logger.exception(...) and include exception in the log context.
def handle_push_error(self, msg: MessageContract, exception: Exception):
try:
self.__queue.push(msg)
- except Exception as e:
- logger.error(e)
+ except Exception:
+ logger.exception("Dead-letter push failed (msg=%r); original_error=%r", msg, exception)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def handle_push_error(self, msg: MessageContract, exception: Exception): | |
| try: | |
| self.__queue.push(msg) | |
| except Exception as e: | |
| logger.error(e) | |
| def handle_push_error(self, msg: MessageContract, exception: Exception): | |
| try: | |
| self.__queue.push(msg) | |
| except Exception: | |
| logger.exception("Dead-letter push failed (msg=%r); original_error=%r", msg, exception) |
🧰 Tools
🪛 Ruff (0.12.2)
12-12: Unused method argument: exception
(ARG002)
15-15: Do not catch blind exception: Exception
(BLE001)
16-16: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
eric_sse/patterns.py around lines 12 to 16: the except block currently calls
logger.error(e) which neither logs the traceback of the DLQ push failure nor
includes the original exception passed into the handler; replace that call with
logger.exception("Failed to push message to DLQ", extra={"original_exception":
exception"}) so the traceback from the push failure is recorded and the original
exception is preserved in the log context.
| channel = FakeChannel( | ||
| connections_factory=BrokenConnectionFactory( | ||
| q_handlers=[self.queues_handler_mock, self.queues_handler_mock2], | ||
| queue=self.queues_handler_mock | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong object passed as queue; should be a Queue, not a handler mock.
Use a broken queue to simulate push failure.
channel = FakeChannel(
connections_factory=BrokenConnectionFactory(
q_handlers=[self.queues_handler_mock, self.queues_handler_mock2],
- queue=self.queues_handler_mock
+ queue=BrokenQueue() # broken push by default
)
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| channel = FakeChannel( | |
| connections_factory=BrokenConnectionFactory( | |
| q_handlers=[self.queues_handler_mock, self.queues_handler_mock2], | |
| queue=self.queues_handler_mock | |
| ) | |
| ) | |
| channel = FakeChannel( | |
| connections_factory=BrokenConnectionFactory( | |
| q_handlers=[self.queues_handler_mock, self.queues_handler_mock2], | |
| queue=BrokenQueue() # broken push by default | |
| ) | |
| ) |
🤖 Prompt for AI Agents
In test/test_error_handlers.py around lines 21 to 26, the
BrokenConnectionFactory is being constructed with queue=self.queues_handler_mock
which is incorrect — the queue parameter must be a Queue instance that simulates
push failure, not a handler mock. Replace that argument with a broken queue
object (e.g. an instance of the test BrokenQueue/MockQueue that raises on push)
or construct a BrokenQueue and pass it as queue=BrokenQueue(...) so the channel
simulates a queue push failure correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
eric_sse/entities.py (4)
98-102: Remove redundant pre-check; let the stream yield an 'error' event.This early re-raise prevents
event_generatorfrom handlingInvalidListenerExceptionand contradicts the docstring that promises an'error'message. Let the loop handle it.- try: - self.__connection_manager.get_listener(listener.id) - except InvalidListenerException: - raise + # Let event_generator handle invalid listeners and yield an 'error' event.
134-134: Typo in docstring.“correspondant” → “corresponding”.
- """Shortcut that creates a connection and returns correspondant listener""" + """Shortcut that creates a connection and returns corresponding listener"""
146-151: Grammar: “an existing connection”.Minor doc polish.
def register_connection(self, connection: Connection): """ - Register and existing connection. + Register an existing connection. **Warning**: Listener and queue should belong to the same classes returned by connection factory to avoid compatibility issues with persistence layer """
203-206: Make AbstractChannel.watch an async generator (and update callers).Returning the generator from an
async defforces callers toawait; yield events directly so callers canasync for channel.watch().- async def watch(self) -> AsyncIterable[Any]: - listener = self.add_listener() - listener.start() - return self.message_stream(listener) + async def watch(self) -> AsyncIterable[Any]: + listener = self.add_listener() + listener.start() + async for event in self.message_stream(listener): + yield eventUpdate callers: change eric_sse/servers.py line 182 from
async for m in await channel.watch():
to
async for m in channel.watch():
♻️ Duplicate comments (1)
test/mock/connection.py (1)
40-44: Handlelistener=Noneto avoid constructing an invalidConnection.
Connectionrequires a concrete listener; default to a simple one when not provided.- def create(self, listener: MessageQueueListener | None = None) -> Connection: - connection = Connection(listener=listener, queue=self.queue) + def create(self, listener: MessageQueueListener | None = None) -> Connection: + connection = Connection(listener=listener or MessageQueueListener(), queue=self.queue) for handler in self.q_handlers: connection.register_queuing_error_handler(handler) return connection
🧹 Nitpick comments (5)
test/mock/connection.py (3)
10-11: Use explicit test exception and silence unused arg.Avoid bare
Exceptionand the ARG002 warning. Raise a dedicated mock error and mark the param unused.-class BrokenListener(MessageQueueListener): - - def on_message(self, msg: MessageContract) -> None: - raise Exception() +class BrokenListener(MessageQueueListener): + + def on_message(self, _msg: MessageContract) -> None: + raise MockListenerError()Add these helpers near the top of the file:
class MockListenerError(Exception): ... class MockPushError(Exception): ... class MockPopError(Exception): ...
19-27: Model queue failures with specific exceptions; avoid bareException.Improves clarity in tests and aligns with TRY002.
def pop(self) -> MessageContract: if self.broken_pop: - raise Exception() + raise MockPopError() return Message(msg_type='test') def push(self, message: MessageContract) -> None: if self.broken_push: - raise Exception() + raise MockPushError()
31-38: Type: annotate optional correctly.
queuecan beNone; reflect that in the signature.- q_handlers: list[QueuingErrorHandler], - queue: Queue = None, + q_handlers: list[QueuingErrorHandler], + queue: Queue | None = None,eric_sse/entities.py (2)
39-44: Unify exception style and message.Use the same constructor form as
get_listenerand suppress chaining; avoids TRY003 inconsistency.- def get_connection(self, listener_id: str) -> Connection: + def get_connection(self, listener_id: str) -> Connection: try: return self.__connections[listener_id] except KeyError: - raise InvalidListenerException(f"Invalid listener {listener_id}") from None + raise InvalidListenerException(listener_id) from None
126-128: Log with stack once; avoid duplicate trace logging.Use
logger.exception(...)to capture the traceback instead of a separatedebug(traceback.format_exc())anderror(e).- logger.debug(traceback.format_exc()) - logger.error(e) + logger.exception("Unhandled error in message_stream for listener_id=%s", listener.id)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
eric_sse/entities.py(4 hunks)test/mock/connection.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
eric_sse/entities.py (5)
eric_sse/handlers.py (2)
ListenerErrorHandler(14-17)handle_on_message_error(16-17)eric_sse/listener.py (2)
MessageQueueListener(7-29)on_message(18-20)eric_sse/connection.py (5)
Connection(9-53)listener(24-25)id(32-33)fetch_message(44-50)send_message(35-41)eric_sse/message.py (2)
id(54-56)MessageContract(4-22)eric_sse/exception.py (2)
InvalidListenerException(7-8)NoMessagesException(11-13)
test/mock/connection.py (5)
eric_sse/connection.py (7)
ConnectionsFactory(56-64)Connection(9-53)listener(24-25)queue(28-29)create(58-64)create(69-72)register_queuing_error_handler(52-53)eric_sse/handlers.py (1)
QueuingErrorHandler(7-12)eric_sse/listener.py (1)
MessageQueueListener(7-29)eric_sse/message.py (2)
MessageContract(4-22)Message(25-39)eric_sse/queues.py (1)
Queue(8-22)
🪛 Ruff (0.12.2)
eric_sse/entities.py
43-43: Avoid specifying long messages outside the exception class
(TRY003)
test/mock/connection.py
10-10: Unused method argument: msg
(ARG002)
11-11: Create your own exception
(TRY002)
21-21: Create your own exception
(TRY002)
24-24: Unused method argument: message
(ARG002)
26-26: Create your own exception
(TRY002)
🔇 Additional comments (2)
eric_sse/entities.py (2)
153-155: LGTM: listener error-handler registration is straightforward.
165-174: LGTM: deliver/handle pattern looks correct.Message fetched, delivered, handlers invoked on failure, then bare re-raise preserves traceback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
eric_sse/patterns.py (1)
12-17: Remove artificial raise and fix logging; include original exception; make handler type-safe.
The forcedraise Exception('blah')breaks the build and makes the DLQ unusable. Also, use parameterized logging and include the original error to address ARG002/TRY401 and prior feedback.- def handle_push_error(self, msg: MessageContract, exception: Exception): - try: - raise Exception('blah') - self.__queue.push(msg) - except Exception as e: - logger.exception(f"Dead-letter push failed. msg type: {msg.type} payload {msg.payload} {repr(e)}") + def handle_push_error(self, msg: MessageContract, exception: Exception) -> None: + try: + # Attempt to push the failed message to the DLQ. + self._queue.push(msg) + except Exception: # noqa: BLE001 - must guard against any DLQ push failure + # Log the DLQ push failure with traceback, preserving the original error context. + logger.exception( + "Dead-letter push failed (type=%s, payload=%r); original_error=%r", + getattr(msg, "type", None), + getattr(msg, "payload", None), + exception, + )
🧹 Nitpick comments (2)
eric_sse/patterns.py (2)
9-11: Avoid dunder attribute; prefer single underscore for testability and clarity.
Name-mangling isn’t needed; it complicates tests/mocks.- def __init__(self, queue: Queue): - self.__queue = queue + def __init__(self, queue: Queue): + self._queue = queue
12-17: Optional: add a docstring and consider metrics for observability.
Briefly document the handler and consider emitting a counter/gauge on DLQ failures to aid alerting.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
eric_sse/patterns.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
eric_sse/patterns.py (5)
eric_sse/handlers.py (2)
QueuingErrorHandler(7-12)handle_push_error(9-10)eric_sse/message.py (1)
MessageContract(4-22)eric_sse/queues.py (1)
Queue(8-22)eric_sse/__init__.py (1)
get_logger(5-16)test/mock/connection.py (1)
push(24-26)
🪛 Ruff (0.12.2)
eric_sse/patterns.py
12-12: Unused method argument: exception
(ARG002)
14-14: Abstract raise to an inner function
(TRY301)
14-14: Create your own exception
(TRY002)
17-17: Use explicit conversion flag
Replace with conversion flag
(RUF010)
17-17: Redundant exception object included in logging.exception call
(TRY401)
🪛 GitHub Check: build (3.11)
eric_sse/patterns.py
[failure] 14-14:
blah
🪛 GitHub Check: build (3.10)
eric_sse/patterns.py
[failure] 14-14:
blah
🪛 GitHub Actions: Python package
eric_sse/patterns.py
[error] 14-17: Dead-letter push failed. msg type: test payload None Exception('blah')
🔇 Additional comments (1)
eric_sse/patterns.py (1)
1-7: Imports and logger setup look good.
No issues here.
| def handle_push_error(self, msg: MessageContract, exception: Exception): | ||
| try: | ||
| raise Exception('blah') | ||
| self.__queue.push(msg) | ||
| except Exception as e: | ||
| logger.exception(f"Dead-letter push failed. msg type: {msg.type} payload {msg.payload} {repr(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Quick verification helper.
Run to confirm the artificial raise is gone and logging is parameterized.
🏁 Script executed:
#!/bin/bash
rg -nC2 'class DeadLetterQueueHandler' eric_sse/patterns.py
rg -n "raise Exception\('blah'\)" eric_sse/patterns.py || echo "OK: artificial raise removed"
rg -nP "logger\.exception\(" eric_sse/patterns.pyLength of output: 304
Remove artificial raise and parameterize logging in eric_sse/patterns.py
Artificial raise still present (eric_sse/patterns.py:14); logger.exception uses an f-string (line 17).
- Remove
raise Exception('blah')(line 14). - Replace the f-string with parameterized logging, e.g.:
logger.exception("Dead-letter push failed. msg_type=%s payload=%s", msg.type, msg.payload)
🧰 Tools
🪛 Ruff (0.12.2)
12-12: Unused method argument: exception
(ARG002)
14-14: Abstract raise to an inner function
(TRY301)
14-14: Create your own exception
(TRY002)
17-17: Use explicit conversion flag
Replace with conversion flag
(RUF010)
17-17: Redundant exception object included in logging.exception call
(TRY401)
🪛 GitHub Check: build (3.11)
[failure] 14-14:
blah
🪛 GitHub Check: build (3.10)
[failure] 14-14:
blah
🪛 GitHub Actions: Python package
[error] 14-17: Dead-letter push failed. msg type: test payload None Exception('blah')
🤖 Prompt for AI Agents
In eric_sse/patterns.py around lines 12 to 17, remove the artificial raise
Exception('blah') so the method actually attempts to push the message inside the
try block, and replace the f-string logger.exception call with a parameterized
logging call that passes msg.type and msg.payload as separate arguments (e.g.
logger.exception("Dead-letter push failed. msg_type=%s payload=%s", msg.type,
msg.payload)) so logging is properly parameterized and the exception is still
recorded.
|
@coderabbitai pause |
✅ Actions performedReviews paused. |
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (4)
eric_sse/patterns.py (1)
12-16: Parameterize logging; include original exception; fix lints (ARG002, RUF010, TRY401).Use logger.exception with format args, drop repr(e), and log the original exception passed to the handler.
- def handle_push_error(self, msg: MessageContract, exception: Exception): + def handle_push_error(self, msg: MessageContract, exception: Exception): try: self.__queue.push(msg) - except Exception as e: - logger.exception(f"Dead-letter push failed. msg type: {msg.type} payload {msg.payload} {repr(e)}") + except Exception: + logger.exception( + "Dead-letter push failed (msg_type=%s payload=%r); original_error=%r", + msg.type, + msg.payload, + exception, + )test/mock/connection.py (1)
40-44: Honor factory contract when listener is None.Creating a Connection with listener=None may break callers that expect a default listener (matches InMemoryConnectionsFactory behavior).
- def create(self, listener: MessageQueueListener | None = None) -> Connection: - connection = Connection(listener=listener, queue=self.queue) + def create(self, listener: MessageQueueListener | None = None) -> Connection: + listener = listener or MessageQueueListener() + connection = Connection(listener=listener, queue=self.queue) for handler in self.q_handlers: connection.register_queuing_error_handler(handler) return connectioneric_sse/repository.py (2)
154-156: Prefix constants fix: LGTM and resolves the double-colon bug.Trailing colon removed; formatted key usage now yields ch_cn:<ch_id>:<cn_id> and cn_ch:<cn_id>.
181-183: Harden against stale index entries in load_all.Indexes can drift; skip entries whose cn_id no longer exists.
Apply:
- for connection_data in self.__storage.fetch_by_prefix(f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:'): - yield self._load_connection(connection_id=connection_data['cn_id']) + for connection_data in self.__storage.fetch_by_prefix(f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:'): + try: + yield self._load_connection(connection_id=connection_data['cn_id']) + except ItemNotFound: + continueIf any old ch_cn:: keys were ever persisted, consider temporary dual-read here during the migration window.
🧹 Nitpick comments (12)
test/mock/connection.py (2)
24-26: Silence unused param in test double.Rename to underscore to avoid ARG002 noise.
- def push(self, message: MessageContract) -> None: + def push(self, _message: MessageContract) -> None: if self.broken_push: raise Exception()
10-11: Prefer specific or custom exceptions in test doubles.Optional: replace bare Exception() with a small custom error to improve assertions and intent.
Also applies to: 20-21, 25-26
eric_sse/entities.py (3)
121-128: Use logger.exception once; avoid duplicate traceback logging and include context.This simplifies logs and preserves stack automatically.
- except (InvalidListenerException, InvalidChannelException) as e: - yield self.adapt(Message(msg_type='error', msg_payload=e)) - except Exception as e: - logger.debug(traceback.format_exc()) - logger.error(e) - yield self.adapt(Message(msg_type='error')) + except (InvalidListenerException, InvalidChannelException) as e: + yield self.adapt(Message(msg_type='error', msg_payload=e)) + except Exception: + logger.exception("Unhandled error in message_stream for listener_id=%s", listener.id) + yield self.adapt(Message(msg_type='error'))
189-189: Parameterize debug log to avoid f-string overhead.- logger.debug(f"Dispatched {msg} to {listener_id}") + logger.debug("Dispatched %r to %s", msg, listener_id)
146-151: Docstring mentions queues; update to “connections” for consistency.- **Warning**: Listener and queue should belong to the same classes returned by connection factory to avoid compatibility issues with persistence layer + **Warning**: Listener and connection components should match the types produced by the connections factory to avoid persistence-layer compatibility issues.eric_sse/repository.py (7)
31-31: Docstring grammar nit.Apply:
- """Return value corresponding to key""" + """Return the value corresponding to the key."""
116-125: Avoid O(N) heavy loads just to compute IDs.AbstractChannelRepository.persist calls load_all, instantiating full Connection objects and hitting listener/queue repos unnecessarily. Provide an ID-only path.
Apply:
- persisted_connections_ids = {c.id for c in self.__connections_repository.load_all(channel_id=channel.id)} + persisted_connections_ids = set(self.__connections_repository.list_ids(channel_id=channel.id))Add this helper to ConnectionRepository (outside this hunk):
def list_ids(self, channel_id: str) -> Iterable[str]: for connection_data in self.__storage.fetch_by_prefix(f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:'): yield connection_data['cn_id']
140-141: Docstring wording nit.Apply:
- Relies on :class:`~eric_sse.repository.KvStorage` abstraction for final writes of connections data, and on - corresponding repositories for related objects ones. + Relies on :class:`~eric_sse.repository.KvStorage` for persisting connection metadata and on the + respective repositories for related objects.
146-153: Type-hint spacing.PEP 8: one space after colon in annotations.
Apply:
- connections_factory:ConnectionsFactory + connections_factory: ConnectionsFactory
170-174: Minor cleanup in existence check.Drop the dummy assignment; keep the traceback-suppression behavior.
Apply:
- try: - _ = self.__storage.fetch_one(f'{self.CONNECTIONS_PREFIX}:{connection_id}') + try: + self.__storage.fetch_one(f'{self.CONNECTIONS_PREFIX}:{connection_id}') except ItemNotFound as e: raise e from None
196-208: Delete ordering to reduce reader exposure.Consider removing the per-channel index first so load_all stops seeing the connection immediately; then clean listener, queue, and the per-connection key.
Apply:
- self.__listeners_repository.delete(connection_id=connection_id) - self.__queues_repository.delete(connection_id=connection_id) - self.__storage.delete(key=f'{self.CONNECTIONS_PREFIX}:{connection_id}') - self.__storage.delete(key=f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:{connection_id}') + # Hide from channel-scoped scans first + self.__storage.delete(key=f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:{connection_id}') + self.__listeners_repository.delete(connection_id=connection_id) + self.__queues_repository.delete(connection_id=connection_id) + self.__storage.delete(key=f'{self.CONNECTIONS_PREFIX}:{connection_id}')Please confirm listeners_repository.delete and queues_repository.delete are idempotent and do not raise on missing entries.
63-67: Tiny simplification in InMemoryStorage.delete.Apply:
- if key not in self.items: - return - del self.items[key] + self.items.pop(key, None)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
eric_sse/entities.py(5 hunks)eric_sse/patterns.py(1 hunks)eric_sse/repository.py(5 hunks)test/mock/connection.py(1 hunks)test/test_entities.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
eric_sse/patterns.py
12-12: Unused method argument: exception
(ARG002)
16-16: Use explicit conversion flag
Replace with conversion flag
(RUF010)
16-16: Redundant exception object included in logging.exception call
(TRY401)
test/mock/connection.py
10-10: Unused method argument: msg
(ARG002)
11-11: Create your own exception
(TRY002)
21-21: Create your own exception
(TRY002)
24-24: Unused method argument: message
(ARG002)
26-26: Create your own exception
(TRY002)
🔇 Additional comments (7)
eric_sse/entities.py (2)
166-173: Listener error fan-out and re-raise look good.
180-188: Good switch to parameterized logger.exception with bare re-raise.eric_sse/repository.py (5)
6-6: No action on new import.Importing InvalidChannelException here is fine if used by subclasses; otherwise your linter will flag it.
74-81: Constructor surface simplification: LGTM.Switching to a connections_repository dep tightens responsibilities and improves testability.
165-168: Exposing connections_factory: LGTM.Keeps prefabs and other modules from re-plumbing factories.
185-187: load_one delegation: LGTM.
192-194: Dual indexing on persist: LGTM.Order (cn_ch first, then ch_cn) is sensible for readers of per-connection data.
| def delete(self, channel_id: str): | ||
| try: | ||
| channel = self.load_one(channel_id) | ||
| except ItemNotFound: | ||
| return | ||
| for connection in self.__connections_repository.load_all(channel_id=channel.id): | ||
| self.__connections_repository.delete(channel_id=channel_id, connection_id=connection.id) | ||
| self.__connections_repository.delete(connection_id=connection.id) | ||
| self.__storage.delete(channel_id) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Don’t build the Channel just to delete it.
load_one constructs the channel and re-loads connections, which can fail on stale entries and is wasted work. Check existence, then delete by IDs.
Apply:
- try:
- channel = self.load_one(channel_id)
- except ItemNotFound:
- return
- for connection in self.__connections_repository.load_all(channel_id=channel.id):
- self.__connections_repository.delete(connection_id=connection.id)
+ try:
+ self.__storage.fetch_one(channel_id)
+ except ItemNotFound:
+ return
+ for connection_id in getattr(self.__connections_repository, "list_ids", lambda channel_id: (c.id for c in self.__connections_repository.load_all(channel_id)))(channel_id=channel_id):
+ self.__connections_repository.delete(connection_id=connection_id)If channel and connection repos share the same KvStorage instance, fetch_all in load_all() may mix index values with channel documents. Please confirm storages are distinct or channel keys are prefixed. To verify:
🏁 Script executed:
#!/bin/bash
# Inspect repo wiring for shared storage between ChannelRepository and ConnectionRepository
rg -nP -C3 '(ConnectionRepository\(|ChannelRepository\(|SSEChannelRepository\()'Length of output: 5540
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "===== eric_sse/repository.py (full) ====="
sed -n '1,400p' eric_sse/repository.py || true
echo
echo "===== eric_sse/prefabs.py (full) ====="
sed -n '1,400p' eric_sse/prefabs.py || true
echo
echo "===== Search for ConnectionRepository/ChannelRepository constructors/usages ====="
rg -n --hidden --no-ignore -S "class ConnectionRepository|class AbstractChannelRepository|class AbstractChannelRepository|class AbstractChannelRepository|class ChannelRepository|def __init__\(|connections_repository=|storage=" || true
echo
echo "===== Show ConnectionRepository implementation area ====="
rg -n -n -C3 --hidden --no-ignore "class ConnectionRepository" || trueLength of output: 19332
Don't build the Channel just to delete it; check existence and delete connections by ID.
- Replace delete(...) in eric_sse/repository.py to call self.__storage.fetch_one(channel_id) (avoid self.load_one) and delete by connection IDs (prefer a lighter list_ids API to avoid constructing Connection objects):
- try:
- channel = self.load_one(channel_id)
- except ItemNotFound:
- return
- for connection in self.__connections_repository.load_all(channel_id=channel.id):
- self.__connections_repository.delete(connection_id=connection.id)
- self.__storage.delete(channel_id)
+ try:
+ self.__storage.fetch_one(channel_id)
+ except ItemNotFound:
+ return
+ for connection_id in getattr(self.__connections_repository, "list_ids", lambda channel_id: (c.id for c in self.__connections_repository.load_all(channel_id=channel_id)))(channel_id=channel_id):
+ self.__connections_repository.delete(connection_id=connection_id)
+ self.__storage.delete(channel_id)-
Add ConnectionRepository.list_ids(channel_id) (eric_sse/repository.py) that yields cn_id from self.__storage.fetch_by_prefix(f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:') so deletes can avoid loading listeners/queues.
-
Critical: AbstractChannelRepository.load_all() uses self.__storage.fetch_all() — if ChannelRepository and ConnectionRepository share the same KvStorage, fetch_all() will return connection entries too and create() may be invoked on them. Either require distinct KvStorage instances in wiring (tests use separate InMemoryStorage) or persist channels under a dedicated prefix and change load_all() to use fetch_by_prefix('channels_prefix:'). (Files: eric_sse/repository.py — load_all, delete; ConnectionRepository — add list_ids.)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def delete(self, channel_id: str): | |
| try: | |
| channel = self.load_one(channel_id) | |
| except ItemNotFound: | |
| return | |
| for connection in self.__connections_repository.load_all(channel_id=channel.id): | |
| self.__connections_repository.delete(channel_id=channel_id, connection_id=connection.id) | |
| self.__connections_repository.delete(connection_id=connection.id) | |
| self.__storage.delete(channel_id) | |
| def delete(self, channel_id: str): | |
| try: | |
| self.__storage.fetch_one(channel_id) | |
| except ItemNotFound: | |
| return | |
| for connection_id in getattr(self.__connections_repository, "list_ids", lambda channel_id: (c.id for c in self.__connections_repository.load_all(channel_id=channel_id)))(channel_id=channel_id): | |
| self.__connections_repository.delete(connection_id=connection_id) | |
| self.__storage.delete(channel_id) |
🤖 Prompt for AI Agents
In eric_sse/repository.py around lines 126-134, the delete method currently
loads a full Channel object and iterates over Connection objects; change delete
to call self.__storage.fetch_one(channel_id) to only check existence, then
iterate over connection IDs (not full objects) and call
self.__connections_repository.delete(connection_id=cn_id). Add
ConnectionRepository.list_ids(channel_id) that yields connection IDs by calling
self.__storage.fetch_by_prefix(f'{self.CONNECTIONS_BY_CHANNEL_PREFIX}:{channel_id}:')
and extracting the id for each entry so deletes avoid constructing Connection
instances. Also fix AbstractChannelRepository.load_all() (and ChannelRepository
wiring) so channels are persisted/queried under a dedicated prefix (use
fetch_by_prefix('channels_prefix:') in load_all) or require distinct KvStorage
instances for channels vs connections to prevent fetch_all from returning
connection entries and accidentally invoking create() on them.
| msgs = [] | ||
| with pytest.raises(InvalidListenerException): | ||
| async for msg in self.sut.message_stream(listener): | ||
| msgs.append(msg) | ||
| async for _ in self.sut.message_stream(listener): | ||
| pass | ||
| self.assertEqual(1, len(msgs)) | ||
| self.assertEqual('error', msgs[0].type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix test: remove pytest.raises and actually assert the streamed error.
Current test mixes “expect raise” with “collect error message” and will fail. Start the listener, stream once, assert error.
- msgs = []
- with pytest.raises(InvalidListenerException):
- async for msg in self.sut.message_stream(listener):
- msgs.append(msg)
- async for _ in self.sut.message_stream(listener):
- pass
- self.assertEqual(1, len(msgs))
- self.assertEqual('error', msgs[0].type)
+ listener.start()
+ msgs = []
+ async for msg in self.sut.message_stream(listener):
+ msgs.append(msg)
+ break # consume one event and exit
+ self.assertEqual(1, len(msgs))
+ self.assertEqual('error', msgs[0].type)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| msgs = [] | |
| with pytest.raises(InvalidListenerException): | |
| async for msg in self.sut.message_stream(listener): | |
| msgs.append(msg) | |
| async for _ in self.sut.message_stream(listener): | |
| pass | |
| self.assertEqual(1, len(msgs)) | |
| self.assertEqual('error', msgs[0].type) | |
| listener.start() | |
| msgs = [] | |
| async for msg in self.sut.message_stream(listener): | |
| msgs.append(msg) | |
| break # consume one event and exit | |
| self.assertEqual(1, len(msgs)) | |
| self.assertEqual('error', msgs[0].type) |
🤖 Prompt for AI Agents
In test/test_entities.py around lines 145-152 the test incorrectly wraps the
async stream consumption in pytest.raises(InvalidListenerException) while also
trying to collect a streamed error message; remove the pytest.raises usage,
start the listener stream normally, consume exactly one message (or iterate the
async generator once), collect it into msgs, then assert that msgs length is 1
and msgs[0].type == 'error' (ensuring the async iteration is awaited properly
and not expecting an exception).
Summary by CodeRabbit
New Features
Refactor
Tests
Chores