diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ec61d58d2afe4..5049e4eaf5289 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -829,44 +829,62 @@ private CompletableFuture internalRemovePartitionsTopicAsync(int numPartit private CompletableFuture internalRemovePartitionsTopicNoAutoCreationDisableAsync(int numPartitions, boolean force) { - return FutureUtil.waitForAll(IntStream.range(0, numPartitions) - .mapToObj(i -> { - TopicName topicNamePartition = topicName.getPartition(i); + return FutureUtil.waitForAll(IntStream.range(0, numPartitions).mapToObj(i -> { + TopicName topicNamePartition = topicName.getPartition(i); + CompletableFuture future = new CompletableFuture<>(); + // This method is reused by non-persistent topic, which doesn't have /managed-ledgers/** path + // Non-persistent partitioned topic gc process won't call delete partition topic admin api, + // so it is safe to directly call admin api when topic is non-persistent partitioned. + CompletableFuture needAdminClientCall = topicName.isPersistent() + ? pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) : + CompletableFuture.completedFuture(true); + needAdminClientCall.thenCompose(deleteThroughAdminClient -> { + if (deleteThroughAdminClient) { try { - CompletableFuture future = new CompletableFuture<>(); - pulsar().getAdminClient().topics() - .deleteAsync(topicNamePartition.toString(), force) - .whenComplete((r, ex) -> { - if (ex != null) { - Throwable realCause = FutureUtil.unwrapCompletionException(ex); - if (realCause instanceof NotFoundException){ - // if the sub-topic is not found, the client might not have called - // create producer or it might have been deleted earlier, - // so we ignore the 404 error. - // For all other exception, - // we fail the delete partition method even if a single - // partition is failed to be deleted - if (log.isDebugEnabled()) { - log.debug("[{}] Partition not found: {}", clientAppId(), - topicNamePartition); - } - future.complete(null); - } else { - log.error("[{}] Failed to delete partition {}", clientAppId(), - topicNamePartition, realCause); - future.completeExceptionally(realCause); - } - } else { - future.complete(null); - } - }); - return future; + return pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force); } catch (PulsarServerException ex) { - log.error("[{}] Failed to get admin client while delete partition {}", - clientAppId(), topicNamePartition, ex); + log.error("[{}] Failed to get admin client while delete partition {}", clientAppId(), + topicNamePartition, ex); return FutureUtil.failedFuture(ex); } - }).collect(Collectors.toList())); + } else { + // If the sub-topic is not found, so we ignore this partition. This use case is mainly for + // partitioned-topic auto deletion. + // Call chain: 1.gc process(invoked on broker0) + // 2.delete partitioned-topic admin api(invoked on broker0) + // 3.delete topic-partition-0 admin api(invoked on broker0) + // If brokerClient's maxConnection is 1, then broker0 wait itself to release the + // connection, the result is timeout. So we check topic-partition-0 existence first to + // avoid connection pool deadlock. + // See issue: https://github.com/apache/pulsar/issues/24879 + if (log.isDebugEnabled()) { + log.debug("[{}] Persistent topic partition not found: {}", clientAppId(), topicNamePartition); + } + return CompletableFuture.completedFuture(null); + } + }).whenComplete((r, ex) -> { + if (ex != null) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotFoundException) { + // Here, we check the exception again to avoid some race conditions + // If the sub-topic is not found, the client might not have called create producer or + // it might have been deleted earlier, so we ignore the 404 error. + // For all other exception, we fail the delete partition method even if a single + // partition is failed to be deleted + if (log.isDebugEnabled()) { + log.debug("[{}] Delete topic partition not found: {}", clientAppId(), topicNamePartition); + } + future.complete(null); + } else { + log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, realCause); + future.completeExceptionally(realCause); + } + } else { + future.complete(null); + } + }); + return future; + }).collect(Collectors.toList())); } private CompletableFuture internalRemovePartitionsAuthenticationPoliciesAsync() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 33e38d97a9043..3b599c063b3d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -52,7 +52,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase { @BeforeMethod protected void setup() throws Exception { - //No-op + // configure pulsarAdmin connectionsPerBroker to 1, to reproduce issue 24879 + conf.getProperties().put("brokerClient_connectionsPerBroker", 1); } @AfterMethod(alwaysRun = true)