Skip to content

Commit 8f8716c

Browse files
committed
Fetch topic route when registering message queue changed listener
1 parent 8d6c708 commit 8f8716c

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public interface PullConsumer extends Closeable {
3737
* @param topic the topic that needs to be monitored.
3838
* @param listener the callback to detect the message queue changes.
3939
*/
40-
void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener);
40+
void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener) throws ClientException;
4141

4242
/**
4343
* Fetch message queues of the topic.

java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PullConsumerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,9 @@ public String getConsumerGroup() {
164164
}
165165

166166
@Override
167-
public synchronized void registerMessageQueueChangeListenerByTopic(String topic,
168-
TopicMessageQueueChangeListener listener) {
167+
public void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener)
168+
throws ClientException {
169+
// TODO: add lock?
169170
checkNotNull(topic, "topic should not be null");
170171
checkNotNull(listener, "listener should not be null");
171172
if (!this.isRunning()) {
@@ -178,6 +179,7 @@ public synchronized void registerMessageQueueChangeListenerByTopic(String topic,
178179
topic);
179180
}
180181
topicMessageQueueChangeListenerMap.put(topic, listener);
182+
fetchMessageQueues(topic);
181183
}
182184

183185
public int getMaxCacheMessageCountEachQueue() {
@@ -537,7 +539,8 @@ private List<MessageQueueImpl> transformTopicRouteData(TopicRouteData topicRoute
537539
.collect(Collectors.toList());
538540
}
539541

540-
public synchronized void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
542+
public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
543+
// TODO: add lock?
541544
final List<MessageQueueImpl> newMqs = transformTopicRouteData(topicRouteData);
542545
Set<MessageQueue> newMqSet = new HashSet<>(newMqs);
543546
final List<MessageQueueImpl> oldMqs = topicMessageQueuesCache.get(topic);

0 commit comments

Comments
 (0)