Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -829,44 +829,62 @@ private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartit

private CompletableFuture<Void> internalRemovePartitionsTopicNoAutoCreationDisableAsync(int numPartitions,
boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
TopicName topicNamePartition = topicName.getPartition(i);
return FutureUtil.waitForAll(IntStream.range(0, numPartitions).mapToObj(i -> {
TopicName topicNamePartition = topicName.getPartition(i);
CompletableFuture<Void> future = new CompletableFuture<>();
// This method is reused by non-persistent topic, which doesn't have /managed-ledgers/** path
// Non-persistent partitioned topic gc process won't call delete partition topic admin api,
// so it is safe to directly call admin api when topic is non-persistent partitioned.
CompletableFuture<Boolean> needAdminClientCall = topicName.isPersistent()
? pulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicNamePartition) :
CompletableFuture.completedFuture(true);
needAdminClientCall.thenCompose(deleteThroughAdminClient -> {
if (deleteThroughAdminClient) {
try {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getAdminClient().topics()
.deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r, ex) -> {
if (ex != null) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException){
// if the sub-topic is not found, the client might not have called
// create producer or it might have been deleted earlier,
// so we ignore the 404 error.
// For all other exception,
// we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Partition not found: {}", clientAppId(),
topicNamePartition);
}
future.complete(null);
} else {
log.error("[{}] Failed to delete partition {}", clientAppId(),
topicNamePartition, realCause);
future.completeExceptionally(realCause);
}
} else {
future.complete(null);
}
});
return future;
return pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force);
} catch (PulsarServerException ex) {
log.error("[{}] Failed to get admin client while delete partition {}",
clientAppId(), topicNamePartition, ex);
log.error("[{}] Failed to get admin client while delete partition {}", clientAppId(),
topicNamePartition, ex);
return FutureUtil.failedFuture(ex);
}
}).collect(Collectors.toList()));
} else {
// If the sub-topic is not found, so we ignore this partition. This use case is mainly for
// partitioned-topic auto deletion.
// Call chain: 1.gc process(invoked on broker0)
// 2.delete partitioned-topic admin api(invoked on broker0)
// 3.delete topic-partition-0 admin api(invoked on broker0)
// If brokerClient's maxConnection is 1, then broker0 wait itself to release the
// connection, the result is timeout. So we check topic-partition-0 existence first to
// avoid connection pool deadlock.
// See issue: https://github.com/apache/pulsar/issues/24879
if (log.isDebugEnabled()) {
log.debug("[{}] Persistent topic partition not found: {}", clientAppId(), topicNamePartition);
}
return CompletableFuture.completedFuture(null);
}
}).whenComplete((r, ex) -> {
if (ex != null) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException) {
// Here, we check the exception again to avoid some race conditions
// If the sub-topic is not found, the client might not have called create producer or
// it might have been deleted earlier, so we ignore the 404 error.
// For all other exception, we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Delete topic partition not found: {}", clientAppId(), topicNamePartition);
}
future.complete(null);
} else {
log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, realCause);
future.completeExceptionally(realCause);
}
} else {
future.complete(null);
}
});
return future;
}).collect(Collectors.toList()));
}

private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {

@BeforeMethod
protected void setup() throws Exception {
//No-op
// configure pulsarAdmin connectionsPerBroker to 1, to reproduce issue 24879
conf.getProperties().put("brokerClient_connectionsPerBroker", 1);
}

@AfterMethod(alwaysRun = true)
Expand Down