-
Notifications
You must be signed in to change notification settings - Fork 214
Open
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.
Description
Environment details
- OS type and version: Docker image python:3.12-bookworm
- Python version: 3.12.1
- pip version: 23.2.1
google-cloud-pubsubversion: 2.31.1
Steps to reproduce
- Subscribe to a PubSub subscription (which returns a future)
- Call
cancel()andresult()on the future to initiate a shutdown sequence
Example code
I've extracted below simplified code samples. Those are meant as illustration, they can't be run as is.
from concurrent.futures import ThreadPoolExecutor
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
# Message callback
def callback(message):
try:
# ... do something useful ...
message.ack()
except Exception:
message.nack()
# Subscribe to Pubsub subscription
client = pubsub_v1.SubscriberClient()
flow_control = pubsub_v1.types.FlowControl(
max_bytes=10 * 1024 * 1024,
max_messages=100,
max_lease_duration=240 * 60 * 60
)
scheduler = ThreadScheduler(ThreadPoolExecutor(max_workers=10))
future = client.subscribe("subscription_name", callback, flow_control=flow_control, scheduler=scheduler)
# Later, initiate shutdown sequence
future.cancel()
future.result()Investigation
There are 2 exceptions from 2 concurrent threads that are logged at the same time.
One is:
Exception in thread Thread-RegularStreamShutdown:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.12/threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.12/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 994, in _shutdown
msg.nack()
AttributeError: 'tuple' object has no attribute 'nack'
The other is:
Exception in thread Thread-OnRpcTerminated:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.12/threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.12/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 967, in _shutdown
assert self._scheduler is not None
AssertionError
This looks very much like another reported bug, that's supposed to be fixed as of release 2.23.1, yet I'm running 2.31.1 . Note that this specific message mentioned a similar exception on msg.nack(), except it was NoneType instead of tuple.
What seems to happen is that:
- first, thread
Thread-RegularStreamShutdowncallsStreamingPullManager._shutdown()- acquires lock
StreamingPullManager._closing - asserts that
self._scheduleris notNone=> OK ✅ - stops
self._schedulerand resets it toNone - tries to
msg.nack()but fails becausemsgis unexpectedly atuple=> raises an exception, releases lockStreamingPullManager._closing
- acquires lock
- then, thread
Thread-OnRpcTerminatedalso callsStreamingPullManager._shutdown()(or, possibly, was already called earlier but waited for the lock to be released)- acquires lock
StreamingPullManager._closing - asserts that
self._scheduleris notNone=> failure ❌
- acquires lock
Thus, the root cause seems to be msg being a tuple. That variable comes from private fields within the library, hence I suspect an issue in the library code rather than in the user code, but I might be wrong if references to that variable leak to user code.
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.