Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;
private boolean isUseBookkeeperV2WireProtocol = false;

@Getter
@Setter
Expand Down Expand Up @@ -769,5 +770,13 @@ public String getShadowSource() {
return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
}

public void setUseBookkeeperV2WireProtocol(boolean isUseBookkeeperV2WireProtocol) {
this.isUseBookkeeperV2WireProtocol = isUseBookkeeperV2WireProtocol;
}

public boolean isUseBookkeeperV2WireProtocol() {
return isUseBookkeeperV2WireProtocol;
}

public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
public class EntryCacheDisabled implements EntryCache {
private final ManagedLedgerImpl ml;
private final ManagedLedgerInterceptor interceptor;
private final boolean useBookkeeperV2WireProtocol;

public EntryCacheDisabled(ManagedLedgerImpl ml) {
this.ml = ml;
this.interceptor = ml.getManagedLedgerInterceptor();
this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol();
}

@Override
Expand Down Expand Up @@ -79,8 +81,8 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol)
.thenAcceptAsync(ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
try {
Expand All @@ -98,17 +100,18 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
ml.getMbean().addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}, ml.getExecutor()).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}, ml.getExecutor())
.exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}

@Override
public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol)
.whenCompleteAsync((ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class RangeEntryCacheImpl implements EntryCache {
private final RangeCache<Position, EntryImpl> entries;
private final boolean copyEntries;
private final PendingReadsManager pendingReadsManager;
private final boolean useBookkeeperV2WireProtocol;

private volatile long estimatedEntrySize = 10 * 1024;

Expand All @@ -80,6 +81,7 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl
this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
this.copyEntries = copyEntries;
this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol();

if (log.isDebugEnabled()) {
log.debug("[{}] Initialized managed-ledger entry cache", ml.getName());
Expand Down Expand Up @@ -249,8 +251,8 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol)
.thenAcceptAsync(ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
Expand All @@ -264,17 +266,18 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
} else {
// got an empty sequence
callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
ctx);
ctx);
}
} finally {
ledgerEntries.close();
}
}, ml.getExecutor()).exceptionally(exception -> {
}, ml.getExecutor())
.exceptionally(exception -> {
ml.invalidateLedgerHandle(lh);
pendingReadsManager.invalidateLedger(lh.getId());
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
});
});
}
}

Expand Down Expand Up @@ -429,7 +432,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils
.readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,41 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReadEntryUtils {
@VisibleForTesting
public class ReadEntryUtils {

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry) {
private static final Logger log = LoggerFactory.getLogger(ReadEntryUtils.class);

@VisibleForTesting
public static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry, boolean useBookkeeperV2WireProtocol) {
int entriesToRead = (int) (lastEntry - firstEntry + 1);
boolean useBatchRead = useBatchRead(entriesToRead, handle, useBookkeeperV2WireProtocol);
if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
// The read handle comes from another managed ledger, in this case, we can only compare the entry range with
// the LAC of that read handle. Specifically, it happens when this method is called by a
// ReadOnlyManagedLedgerImpl object.
return handle.readAsync(firstEntry, lastEntry);
if (!useBatchRead) {
return handle.readAsync(firstEntry, lastEntry);
}
return handle.batchReadAsync(firstEntry, entriesToRead, 0);
}
// Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache
// of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed`
Expand All @@ -49,6 +69,38 @@ static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle h
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading entry " + lastEntry));
}

if (useBatchRead && handle instanceof LedgerHandle lh) {
return asyncBatchReadUnconfirmedEntries(lh, firstEntry, entriesToRead);
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}

private static boolean useBatchRead(int entriesToRead, ReadHandle handle, boolean useBookkeeperV2WireProtocol) {
// Batch read is not supported for striped ledgers.
LedgerMetadata m = handle.getLedgerMetadata();
boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize();
return entriesToRead > 1 && useBookkeeperV2WireProtocol && !isStriped;
}

private static CompletableFuture<LedgerEntries> asyncBatchReadUnconfirmedEntries(LedgerHandle lh, long firstEntry,
int numEntries) {
CompletableFuture<LedgerEntries> f = new CompletableFuture<>();
lh.asyncBatchReadUnconfirmedEntries(firstEntry, numEntries, 0, (rc, lh1, seq, ctx) -> {
if (rc != BKException.Code.OK) {
log.error("Failed to batch read entries from ledger {} : {}", lh1.getId(), BKException.getMessage(rc));
f.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
return;
}
List<org.apache.bookkeeper.client.api.LedgerEntry> entries = new ArrayList<>(numEntries);
while (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(entry.getLedgerId(), entry.getEntryId(),
entry.getLength(), entry.getEntryBuffer());
entries.add(entryImpl);
}
f.complete(LedgerEntriesImpl.create(entries));
}, null);
return f;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -85,15 +81,9 @@
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.*;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
Expand Down Expand Up @@ -128,6 +118,7 @@
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.ReadEntryUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
Expand All @@ -148,7 +139,9 @@
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -4372,4 +4365,36 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
assertEquals(ml.currentLedgerEntries, 0);
});
}

@Test
public void testBatchReadExceedMaxFrameLength() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setUseBookkeeperV2WireProtocol(true);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testBatchReadExceedMaxFrameLength", config);
ml = Mockito.spy(ml);
Mockito.doReturn(Optional.empty()).when(ml).getOptionalLedgerInfo(Mockito.anyLong());
ManagedCursor cursor = ml.openCursor("c1");
// 1 MB per entry.
byte[] body = new byte[1024 * 1024];
for (int i = 0; i < 20; i++) {
ml.addEntry(body);
}

@Cleanup
MockedStatic<ReadEntryUtils> mockStatic = Mockito.mockStatic(ReadEntryUtils.class);
mockStatic.when(() ->
ReadEntryUtils.readAsync(any(), any(), anyLong(), anyLong(),
anyBoolean()))
.thenAnswer(inv -> {
ReadHandle handle = inv.getArgument(1);
int firstEntry = inv.getArgument(2);
int lastEntry = inv.getArgument(3);
int entriesToRead = lastEntry - firstEntry + 1;
return handle.batchReadAsync(firstEntry, entriesToRead, 0);
});

List<Entry> entries = cursor.readEntries(20);
Assert.assertEquals(entries.size(), 20);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
return CompletableFuture.completedFuture(LedgerEntriesImpl.create(readEntries));
}

@Override
public CompletableFuture<LedgerEntries> batchReadAsync(long startEntry, int maxCount, long maxSize) {
long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed());
return readAsync(startEntry, lastEntry);
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean bookkeeperClientSeparatedIoThreadsEnabled = false;

@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Enable Bookkeeper client to read entries in batch mode. Default is false. Note: "
+ "this feature only works when bookkeeperUseV2WireProtocol is enabled and ensemble "
+ "size equals to write quorum size."
)
private boolean bookkeeperEnableBatchRead = false;

/**** --- Managed Ledger. --- ****/
@FieldContext(
minValue = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, Ser
bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled());
bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataStoreUrl());
bkConf.setLimitStatsLogging(conf.isBookkeeperClientLimitStatsLogging());
bkConf.setBatchReadEnabled(conf.isBookkeeperEnableBatchRead());

if (!conf.isBookkeeperMetadataStoreSeparated()) {
// If we're connecting to the same metadata service, with same config, then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
managedLedgerConfig.setUseBookkeeperV2WireProtocol(serviceConfig.isBookkeeperUseV2WireProtocol());

OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,24 @@ public void testBookKeeperLimitStatsLoggingConfiguration() throws Exception {
(ClientConfiguration) FieldUtils.readField(builder, "conf", true);
assertFalse(clientConfiguration.getLimitStatsLogging());
}

@Test
public void testBookkeeperBatchReadConfig() throws Exception {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
conf.setBookkeeperEnableBatchRead(true);
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class),
factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf));
ClientConfiguration clientConfiguration =
(ClientConfiguration) FieldUtils.readField(builder, "conf", true);
assertTrue(clientConfiguration.isBatchReadEnabled());

conf.setBookkeeperEnableBatchRead(false);
builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class),
factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf));
clientConfiguration =
(ClientConfiguration) FieldUtils.readField(builder, "conf", true);
assertFalse(clientConfiguration.isBatchReadEnabled());
}
}
Loading