From 10aa5eeb7756372f1365c31355919364d0217cfa Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Tue, 28 Oct 2025 10:11:45 +0800 Subject: [PATCH 1/5] [fix][admin] Fix partitioned topic auto deletion http request timeout --- .../admin/impl/PersistentTopicsBase.java | 93 +++++++++++-------- .../service/InactiveTopicDeleteTest.java | 6 ++ 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ec61d58d2afe4..49eab02e5337d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -829,44 +829,61 @@ private CompletableFuture internalRemovePartitionsTopicAsync(int numPartit private CompletableFuture internalRemovePartitionsTopicNoAutoCreationDisableAsync(int numPartitions, boolean force) { - return FutureUtil.waitForAll(IntStream.range(0, numPartitions) - .mapToObj(i -> { - TopicName topicNamePartition = topicName.getPartition(i); - try { - CompletableFuture future = new CompletableFuture<>(); - pulsar().getAdminClient().topics() - .deleteAsync(topicNamePartition.toString(), force) - .whenComplete((r, ex) -> { - if (ex != null) { - Throwable realCause = FutureUtil.unwrapCompletionException(ex); - if (realCause instanceof NotFoundException){ - // if the sub-topic is not found, the client might not have called - // create producer or it might have been deleted earlier, - // so we ignore the 404 error. - // For all other exception, - // we fail the delete partition method even if a single - // partition is failed to be deleted - if (log.isDebugEnabled()) { - log.debug("[{}] Partition not found: {}", clientAppId(), - topicNamePartition); - } - future.complete(null); - } else { - log.error("[{}] Failed to delete partition {}", clientAppId(), - topicNamePartition, realCause); - future.completeExceptionally(realCause); - } - } else { - future.complete(null); - } - }); - return future; - } catch (PulsarServerException ex) { - log.error("[{}] Failed to get admin client while delete partition {}", - clientAppId(), topicNamePartition, ex); - return FutureUtil.failedFuture(ex); - } - }).collect(Collectors.toList())); + return FutureUtil.waitForAll(IntStream.range(0, numPartitions).mapToObj(i -> { + TopicName topicNamePartition = topicName.getPartition(i); + CompletableFuture future = new CompletableFuture<>(); + pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) + .thenCompose(exists -> { + if (exists) { + try { + return pulsar().getAdminClient().topics() + .deleteAsync(topicNamePartition.toString(), force); + } catch (PulsarServerException ex) { + log.error("[{}] Failed to get admin client while delete partition {}", clientAppId(), + topicNamePartition, ex); + return FutureUtil.failedFuture(ex); + } + } else { + // If the sub-topic is not found, so we ignore this partition. This use case is mainly for + // partitioned-topic auto deletion. + // Call chain: 1.gc process(invoked on broker0) + // 2.delete partitioned-topic admin api(invoked on broker0) + // 3.delete topic-partition-0 admin api(invoked on broker0). + // If brokerClient's maxConnection is 1, then broker0 wait itself to release the + // connection, the result is timeout. So we check topic-partition-0 existence first to + // avoid connection pool deadlock. + // See issue: https://github.com/apache/pulsar/issues/24879 + if (log.isDebugEnabled()) { + log.debug("[{}] Persistent topic partition not found: {}", clientAppId(), + topicNamePartition); + } + return CompletableFuture.completedFuture(null); + } + }).whenComplete((r, ex) -> { + if (ex != null) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotFoundException) { + // Here, we check the exception again to avoid some race conditions + // If the sub-topic is not found, the client might not have called create producer or + // it might have been deleted earlier, so we ignore the 404 error. + // For all other exception, we fail the delete partition method even if a single + // partition is failed to be deleted + if (log.isDebugEnabled()) { + log.debug("[{}] Delete topic partition not found: {}", clientAppId(), + topicNamePartition); + } + future.complete(null); + } else { + log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, + realCause); + future.completeExceptionally(realCause); + } + } else { + future.complete(null); + } + }); + return future; + }).collect(Collectors.toList())); } private CompletableFuture internalRemovePartitionsAuthenticationPoliciesAsync() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 33e38d97a9043..e5ff84937d3cb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -60,6 +61,11 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { + pulsarAdminBuilder.maxConnectionsPerHost(1); + } + @Test public void testDeleteWhenNoSubscriptions() throws Exception { conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); From 3b80518870f8552da0500c571a9c159d9140d5e0 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Tue, 28 Oct 2025 13:20:23 +0800 Subject: [PATCH 2/5] Considering non-persistent partitioned topic deletion --- .../admin/impl/PersistentTopicsBase.java | 99 ++++++++++--------- 1 file changed, 50 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 49eab02e5337d..2bb9ebc641f01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -832,56 +832,57 @@ private CompletableFuture internalRemovePartitionsTopicNoAutoCreationDisab return FutureUtil.waitForAll(IntStream.range(0, numPartitions).mapToObj(i -> { TopicName topicNamePartition = topicName.getPartition(i); CompletableFuture future = new CompletableFuture<>(); - pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) - .thenCompose(exists -> { - if (exists) { - try { - return pulsar().getAdminClient().topics() - .deleteAsync(topicNamePartition.toString(), force); - } catch (PulsarServerException ex) { - log.error("[{}] Failed to get admin client while delete partition {}", clientAppId(), - topicNamePartition, ex); - return FutureUtil.failedFuture(ex); - } - } else { - // If the sub-topic is not found, so we ignore this partition. This use case is mainly for - // partitioned-topic auto deletion. - // Call chain: 1.gc process(invoked on broker0) - // 2.delete partitioned-topic admin api(invoked on broker0) - // 3.delete topic-partition-0 admin api(invoked on broker0). - // If brokerClient's maxConnection is 1, then broker0 wait itself to release the - // connection, the result is timeout. So we check topic-partition-0 existence first to - // avoid connection pool deadlock. - // See issue: https://github.com/apache/pulsar/issues/24879 - if (log.isDebugEnabled()) { - log.debug("[{}] Persistent topic partition not found: {}", clientAppId(), - topicNamePartition); - } - return CompletableFuture.completedFuture(null); - } - }).whenComplete((r, ex) -> { - if (ex != null) { - Throwable realCause = FutureUtil.unwrapCompletionException(ex); - if (realCause instanceof NotFoundException) { - // Here, we check the exception again to avoid some race conditions - // If the sub-topic is not found, the client might not have called create producer or - // it might have been deleted earlier, so we ignore the 404 error. - // For all other exception, we fail the delete partition method even if a single - // partition is failed to be deleted - if (log.isDebugEnabled()) { - log.debug("[{}] Delete topic partition not found: {}", clientAppId(), - topicNamePartition); - } - future.complete(null); - } else { - log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, - realCause); - future.completeExceptionally(realCause); - } - } else { - future.complete(null); + // This method is reused by non-persistent topic, which doesn't have /managed-ledgers/** path + // Non-persistent partitioned topic gc process won't call delete partition topic admin api, + // so it is safe to directly call admin api when topic is non-persistent partitioned. + CompletableFuture needAdminClientCall = topicName.isPersistent() ? + pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) : + CompletableFuture.completedFuture(true); + needAdminClientCall.thenCompose(deleteThroughAdminClient -> { + if (deleteThroughAdminClient) { + try { + return pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force); + } catch (PulsarServerException ex) { + log.error("[{}] Failed to get admin client while delete partition {}", clientAppId(), + topicNamePartition, ex); + return FutureUtil.failedFuture(ex); + } + } else { + // If the sub-topic is not found, so we ignore this partition. This use case is mainly for + // partitioned-topic auto deletion. + // Call chain: 1.gc process(invoked on broker0) + // 2.delete partitioned-topic admin api(invoked on broker0) + // 3.delete topic-partition-0 admin api(invoked on broker0) + // If brokerClient's maxConnection is 1, then broker0 wait itself to release the + // connection, the result is timeout. So we check topic-partition-0 existence first to + // avoid connection pool deadlock. + // See issue: https://github.com/apache/pulsar/issues/24879 + if (log.isDebugEnabled()) { + log.debug("[{}] Persistent topic partition not found: {}", clientAppId(), topicNamePartition); + } + return CompletableFuture.completedFuture(null); + } + }).whenComplete((r, ex) -> { + if (ex != null) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotFoundException) { + // Here, we check the exception again to avoid some race conditions + // If the sub-topic is not found, the client might not have called create producer or + // it might have been deleted earlier, so we ignore the 404 error. + // For all other exception, we fail the delete partition method even if a single + // partition is failed to be deleted + if (log.isDebugEnabled()) { + log.debug("[{}] Delete topic partition not found: {}", clientAppId(), topicNamePartition); } - }); + future.complete(null); + } else { + log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, realCause); + future.completeExceptionally(realCause); + } + } else { + future.complete(null); + } + }); return future; }).collect(Collectors.toList())); } From a6cf33b1b7c631b556b6acbfb7b615abb79ff484 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Tue, 28 Oct 2025 13:34:31 +0800 Subject: [PATCH 3/5] fix checkstyle violation --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 2bb9ebc641f01..5049e4eaf5289 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -835,8 +835,8 @@ private CompletableFuture internalRemovePartitionsTopicNoAutoCreationDisab // This method is reused by non-persistent topic, which doesn't have /managed-ledgers/** path // Non-persistent partitioned topic gc process won't call delete partition topic admin api, // so it is safe to directly call admin api when topic is non-persistent partitioned. - CompletableFuture needAdminClientCall = topicName.isPersistent() ? - pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) : + CompletableFuture needAdminClientCall = topicName.isPersistent() + ? pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) : CompletableFuture.completedFuture(true); needAdminClientCall.thenCompose(deleteThroughAdminClient -> { if (deleteThroughAdminClient) { From edaff51ceba81462b5f95b5e593e94ccfc57b96c Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Wed, 3 Dec 2025 16:47:16 +0800 Subject: [PATCH 4/5] config pulsarService connectionsPerBroker to 1 --- .../pulsar/broker/service/InactiveTopicDeleteTest.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index e5ff84937d3cb..3b599c063b3d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -53,7 +52,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { @BeforeMethod protected void setup() throws Exception { - //No-op + // configure pulsarAdmin connectionsPerBroker to 1, to reproduce issue 24879 + conf.getProperties().put("brokerClient_connectionsPerBroker", 1); } @AfterMethod(alwaysRun = true) @@ -61,11 +61,6 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Override - protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { - pulsarAdminBuilder.maxConnectionsPerHost(1); - } - @Test public void testDeleteWhenNoSubscriptions() throws Exception { conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); From 9b6366d1614b90bd98c1380c6925a2eee97af977 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Wed, 3 Dec 2025 17:41:17 +0800 Subject: [PATCH 5/5] Trigger GitHub Actions