entries = filterEntries();
+ if (!entries.isEmpty()) {
+ // Move the read position of the cursor to the next position of the last read entry,
+ // or we will deliver the same entry to the consumer more than once.
+ Entry entry = entries.get(entries.size() - 1);
+ Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
+ Position nextReadPosition = callback.cursor.getNextAvailablePosition(position);
+ callback.updateReadPosition(nextReadPosition);
+ }
+ callback.internalReadEntriesFailed(entries, exception, ctx);
+ }
+
+ /**
+ * Filter the entries that have been read success.
+ *
+ * If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed,
+ * only return [1,2] to the caller, to make sure the read operation success as possible
+ * and keep the ordering guarantee.
+ *
+ * @return filtered entries
+ */
+ private List filterEntries() {
+ if (entries.isEmpty()) {
+ return Collections.emptyList();
+ }
+ entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
+ List entries0 = new ArrayList<>();
+ for (long entryId : entryIds) {
+ if (this.entries.isEmpty()) {
+ break;
+ }
+ Entry entry = this.entries.remove(0);
+ if (entry.getEntryId() == entryId) {
+ entries0.add(entry);
+ } else {
+ entry.release();
+ break;
+ }
+ }
+ // Release the entries that are not in the result.
+ for (Entry entry : entries) {
+ entry.release();
+ }
+ return entries0;
}
- asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}
protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) {
@@ -2261,20 +2373,18 @@ protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCal
}
}
- protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
- Object ctx) {
+ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries,
+ ReadEntriesCallback callback, Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
- opReadEntry, readOpCount, createdTime, ctx);
+ callback, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
- entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
- readCallback, readOpCount);
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, readCallback, readOpCount);
} else {
- entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
- ctx);
+ entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, callback, ctx);
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java
new file mode 100644
index 0000000000000..c4584da798c48
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import org.apache.bookkeeper.mledger.Position;
+
+final class MutablePositionImpl implements Position {
+
+ private volatile long ledgerId;
+ private volatile long entryId;
+
+ MutablePositionImpl(long ledgerId, long entryId) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ MutablePositionImpl() {
+ this.ledgerId = -1;
+ this.entryId = -1;
+ }
+
+ /**
+ * Change the ledgerId and entryId.
+ *
+ * @param ledgerId
+ * @param entryId
+ */
+ public void changePositionTo(long ledgerId, long entryId) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ @Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ /**
+ * String representation of virtual cursor - LedgerId:EntryId.
+ */
+ @Override
+ public String toString() {
+ return ledgerId + ":" + entryId;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCodeForPosition();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Position && compareTo((Position) obj) == 0;
+ }
+
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a4928b44bd97d..0d4c490c4b151 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -21,7 +21,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -31,6 +31,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,7 @@ void internalReadEntriesComplete(List returnedEntries, Object ctx, Positi
}
cursor.updateReadStats(entriesCount, entriesSize);
- if (entriesCount != 0) {
+ if (entriesCount != 0 && lastPosition == null) {
lastPosition = returnedEntries.get(entriesCount - 1).getPosition();
}
if (log.isDebugEnabled()) {
@@ -84,16 +85,14 @@ void internalReadEntriesComplete(List returnedEntries, Object ctx, Positi
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count);
}
- List filteredEntries = Collections.emptyList();
+ List filteredEntries;
if (entriesCount != 0) {
filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);
}
- // if entries have been filtered out then try to skip reading of already deletedMessages in that range
- final Position nexReadPosition = entriesCount != filteredEntries.size()
- ? cursor.getNextAvailablePosition(lastPosition) : lastPosition.getNext();
- updateReadPosition(nexReadPosition);
+ final Position nextReadPosition = cursor.getNextAvailablePosition(lastPosition);
+ updateReadPosition(nextReadPosition);
checkReadCompletion();
}
@@ -104,7 +103,14 @@ public void readEntriesComplete(List returnedEntries, Object ctx) {
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ internalReadEntriesFailed(null, exception, ctx);
+ }
+
+ void internalReadEntriesFailed(Collection ret, ManagedLedgerException exception, Object ctx) {
cursor.readOperationCompleted();
+ if (CollectionUtils.isNotEmpty(ret)) {
+ entries.addAll(ret);
+ }
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can return them
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ce8b0334226cf..bdeba625f22ec 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4612,7 +4612,8 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception {
return inv.callRealMethod();
})
.when(ledger)
- .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
+ .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyBoolean(),Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");
@@ -4717,7 +4718,8 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws
return inv.callRealMethod();
})
.when(ledger)
- .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
+ .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyBoolean(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index bb462e922e7e8..60ab9659a8463 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -46,6 +46,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
+import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
+import it.unimi.dsi.fastutil.longs.LongLongPair;
+import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.lang.reflect.Field;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
@@ -4476,4 +4479,192 @@ public void testRemoveLedgerProperty() throws Exception {
Assert.assertEquals(ml.getLedgersInfo().get(firstLedger).getPropertiesCount(), 0);
Assert.assertEquals(ml.getLedgersInfo().get(lastLedger).getPropertiesCount(), 0);
}
+
+ @Test
+ public void testToRanges() {
+ LongSortedSet set = new LongAVLTreeSet();
+ set.add(1L);
+ set.add(2L);
+ set.add(4L);
+ set.add(6L);
+ set.add(7L);
+ set.add(8L);
+ set.add(10L);
+
+ List ranges = ManagedLedgerImpl.toRanges(set);
+ assertEquals(ranges.size(), 4);
+
+ LongLongPair pair0 = ranges.get(0);
+ assertEquals(pair0.firstLong(), 1L);
+ assertEquals(pair0.secondLong(), 2L);
+
+ LongLongPair pair1 = ranges.get(1);
+ assertEquals(pair1.firstLong(), 4L);
+ assertEquals(pair1.secondLong(), 4L);
+
+ LongLongPair pair2 = ranges.get(2);
+ assertEquals(pair2.firstLong(), 6L);
+ assertEquals(pair2.secondLong(), 8L);
+
+ LongLongPair pair3 = ranges.get(3);
+ assertEquals(pair3.firstLong(), 10L);
+ assertEquals(pair3.secondLong(), 10L);
+ }
+
+
+ @Test
+ public void testBatchReadEntriesCallback() throws Exception {
+ @Cleanup
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("testBatchReadEntriesCallback");
+ @Cleanup
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor");
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ List entries = new ArrayList<>();
+ OpReadEntry opReadEntry = OpReadEntry.create(cursor, cursor.readPosition, 10, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List entries0, Object ctx) {
+ entries.addAll(entries0);
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, ledger.lastConfirmedEntry, position -> position.getEntryId() % 2 == 0);
+
+ LongSortedSet entryIds = new LongAVLTreeSet();
+ entryIds.add(1L);
+ entryIds.add(3L);
+ entryIds.add(5L);
+ entryIds.add(7L);
+ entryIds.add(9L);
+ ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl
+ .BatchReadEntriesCallback(entryIds, opReadEntry, null);
+ long ledgerId = ledger.currentLedger.getId();
+
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null);
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[3])), null);
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 7, new byte[7])), null);
+ callback.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("Invalid cursor position"), null);
+ // After call readEntriesFailed, the following readEntriesComplete should be ignored.
+ callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[5])), null);
+
+ latch.await();
+ // should not fail
+ assertFalse(failed.get());
+ assertEquals(entries.size(), 2);
+
+ // `entries` should be only the entries with entryId 1 and 3.
+ assertEquals(entries.get(0).getEntryId(), 1);
+ assertEquals(entries.get(1).getEntryId(), 3);
+
+ // ReadPosition should be updated to [4]
+ assertEquals(cursor.getReadPosition().getEntryId(), 4);
+ }
+
+ @Test
+ public void testReadEntriesFromDifferentLedgersWithSkipCondition() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(5);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config);
+ ledger = Mockito.spy(ledger);
+
+ AtomicInteger counter = new AtomicInteger();
+ Mockito.doAnswer(inv -> {
+ counter.incrementAndGet();
+ return inv.callRealMethod();
+ }).when(ledger).asyncReadEntries(Mockito.any());
+ @Cleanup
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor");
+
+ Position lastPosition = null;
+ for (int i = 0; i < 12; i++) {
+ lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ List entries = new ArrayList<>();
+ cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List entries0, Object ctx) {
+ entries.addAll(entries0);
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0);
+
+ latch.await();
+ assertFalse(failed.get());
+ assertEquals(entries.size(), 5);
+ // Read entries from 3 ledgers, the counter is 3.
+ assertEquals(counter.get(), 3);
+ Position readPosition = cursor.getReadPosition();
+ assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId()
+ && readPosition.getEntryId() == lastPosition.getEntryId() + 1);
+ }
+
+ @Test
+ public void testReadEntriesFromOneSameLedgerWithSkipCondition() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config);
+ ledger = Mockito.spy(ledger);
+
+ AtomicInteger counter = new AtomicInteger();
+ Mockito.doAnswer(inv -> {
+ counter.incrementAndGet();
+ return inv.callRealMethod();
+ }).when(ledger).asyncReadEntries(Mockito.any());
+
+ @Cleanup
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor");
+
+ Position lastPosition = null;
+ for (int i = 0; i < 10; i++) {
+ lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ List entries = new ArrayList<>();
+ cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List entries0, Object ctx) {
+ entries.addAll(entries0);
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0);
+
+ latch.await();
+ assertEquals(counter.get(), 1);
+
+ assertFalse(failed.get());
+ assertEquals(entries.size(), 5);
+
+ Position readPosition = cursor.getReadPosition();
+ assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId()
+ && readPosition.getEntryId() == lastPosition.getEntryId() + 1);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
index 7269df3b6b8b2..89c0c83796e7c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
@@ -217,7 +217,7 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception {
assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0);
// markDelete position should be one position behind read position
- assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext());
+ assertEquals(cursor.getReadPosition(), cursor.getNextAvailablePosition(cursor.getMarkDeletedPosition()));
producer.close();
consumer2.close();