From 7de17587952a17027a2b3284d61d20587e950f9e Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 4 Jan 2024 22:10:31 +0800 Subject: [PATCH 01/21] Improve for skipCondition. --- .../mledger/impl/ManagedLedgerImpl.java | 158 ++++++++++++++---- .../mledger/impl/ManagedLedgerTest.java | 2 +- 2 files changed, 124 insertions(+), 36 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4084d7004a80d..c2ee80a5b6eb0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -27,6 +27,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; @@ -35,6 +37,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,6 +49,8 @@ import java.util.Queue; import java.util.Random; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -59,6 +64,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -2088,44 +2094,127 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) return; } - long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); + Predicate skipCond = opReadEntry.skipCondition; + if (skipCond == null) { + long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, + lastEntry); + } + asyncReadEntry( + ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx); + return; + } - // Filer out and skip unnecessary read entry - if (opReadEntry.skipCondition != null) { - long firstValidEntry = -1L; - long lastValidEntry = -1L; - long entryId = firstEntry; - for (; entryId <= lastEntry; entryId++) { - if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) { - if (firstValidEntry != -1L) { - break; - } - } else { - if (firstValidEntry == -1L) { - firstValidEntry = entryId; - } + // Try to read entries in the current ledger what we need through a single `entryIdSet` as much as possible. + long entryId = firstEntry; + int count = 0; + SortedSet entryIds = new TreeSet<>(); + int entriesToRead = opReadEntry.getNumberOfEntriesToRead(); + while (entryId <= lastEntryInLedger && count < entriesToRead) { + PositionImpl position = PositionImpl.get(ledger.getId(), entryId); + if (!skipCond.test(position)) { + entryIds.add(entryId); + count++; + } + entryId++; + } + if (entryIds.isEmpty()) { + // Move `readPosition` of `cursor`. + PositionImpl position = PositionImpl.get(ledger.getId(), entryId - 1); + opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, position); + return; + } + asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); + } - lastValidEntry = entryId; - } + + private void asyncReadEntry(ReadHandle ledger, SortedSet entryIds, OpReadEntry opReadEntry, Object ctx) { + checkArgument(!entryIds.isEmpty()); + Set> ranges = toRanges(entryIds); + ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); + for (Range range : ranges) { + long start = range.lowerEndpoint(); + long end = range.upperEndpoint(); + // TODO: should handle `lastReadCallback` timeout check??? + asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, ctx); + } + } + + // Parse entryIds into ranges. + @VisibleForTesting + public static Set> toRanges(SortedSet entryIds) { + RangeSet set = TreeRangeSet.create(); + long start = entryIds.first(); + long end = start; + for (long entryId : entryIds) { + if (entryId - end > 1) { + set.add(Range.closed(start, end)); + start = entryId; + end = start; + } else { + end = entryId; } + } + set.add(Range.closed(start, end)); + return set.asRanges(); + } + + @VisibleForTesting + public static class BatchReadEntriesCallback implements ReadEntriesCallback { + private final Set entryIdSet; + private final SortedSet entrySet; + private final OpReadEntry callback; + private final AtomicBoolean failed = new AtomicBoolean(false); + + @VisibleForTesting + public BatchReadEntriesCallback(Set entryIdSet, OpReadEntry callback) { + this.entryIdSet = entryIdSet; + this.entrySet = new TreeSet<>(Comparator.comparing(Entry::getEntryId)); + this.callback = callback; + } - // If all messages in [firstEntry...lastEntry] are filter out, - // then manual call internalReadEntriesComplete to advance read position. - if (firstValidEntry == -1L) { - opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, - PositionImpl.get(ledger.getId(), lastEntry)); + @Override + public synchronized void readEntriesComplete(List entries0, Object ctx) { + entrySet.addAll(entries0); + if (entrySet.size() < entryIdSet.size()) { return; } + callback.readEntriesComplete(entrySet.stream().toList(), ctx); + } - firstEntry = firstValidEntry; - lastEntry = lastValidEntry; + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + // Should fail AT_MOST ONCE + if (!failed.compareAndSet(false, true)) { + return; + } + // If there are entries been read success, try to let the read operation success as possible. + List entries = filterEntries(); + if (entries.isEmpty()) { + callback.readEntriesFailed(exception, ctx); + } else { + callback.readEntriesComplete(entries, ctx); + } } - if (log.isDebugEnabled()) { - log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, - lastEntry); + private List filterEntries() { + if (entrySet.isEmpty()) { + return Collections.emptyList(); + } + // Make sure the `readPosition` of `cursor` could be moved correctly. + List entries = new ArrayList<>(); + for (long entryId : entryIdSet) { + Entry entry = entrySet.first(); + if (entry.getEntryId() == entryId) { + entries.add(entry); + entrySet.remove(entry); + } else { + break; + } + } + return entries; } - asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); } protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) { @@ -2142,20 +2231,19 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr } } - protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, - Object ctx) { + @VisibleForTesting + public void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries, + ReadEntriesCallback callback, Object ctx) { if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); long createdTime = System.nanoTime(); ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, - opReadEntry, readOpCount, createdTime, ctx); + callback, readOpCount, createdTime, ctx); lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), - readCallback, readOpCount); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, readCallback, readOpCount); } else { - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, - ctx); + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, callback, ctx); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 8430afb4e4f82..79bece552c6a8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3159,7 +3159,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { }, null, PositionImpl.LATEST, null); ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), - opReadEntry, ctxStr); + opReadEntry.cursor.isCacheReadEntry(), opReadEntry, ctxStr); retryStrategically((test) -> { return responseException2.get() != null; }, 5, 1000); From 72362654024cbbdce4418c5e479cff8670745375 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 20 Jan 2024 22:12:33 +0800 Subject: [PATCH 02/21] code improve --- .../mledger/impl/ManagedLedgerImpl.java | 49 +++++----- .../mledger/impl/ManagedCursorTest.java | 4 +- .../mledger/impl/ManagedLedgerTest.java | 97 +++++++++++++++++++ 3 files changed, 124 insertions(+), 26 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a3873c749a1b1..bdcd8960f8131 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -27,8 +27,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; @@ -2131,56 +2129,56 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) private void asyncReadEntry(ReadHandle ledger, SortedSet entryIds, OpReadEntry opReadEntry, Object ctx) { checkArgument(!entryIds.isEmpty()); - Set> ranges = toRanges(entryIds); + List> ranges = toRanges(entryIds); ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); - for (Range range : ranges) { - long start = range.lowerEndpoint(); - long end = range.upperEndpoint(); + for (Pair pair : ranges) { + long start = pair.getLeft(); + long end = pair.getRight(); // TODO: should handle `lastReadCallback` timeout check??? asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, ctx); } } - // Parse entryIds into ranges. + // Parse entryIds to ranges. @VisibleForTesting - public static Set> toRanges(SortedSet entryIds) { - RangeSet set = TreeRangeSet.create(); + public static List> toRanges(SortedSet entryIds) { + List> ranges = new ArrayList<>(); long start = entryIds.first(); long end = start; for (long entryId : entryIds) { if (entryId - end > 1) { - set.add(Range.closed(start, end)); + ranges.add(Pair.of(start, end)); start = entryId; end = start; } else { end = entryId; } } - set.add(Range.closed(start, end)); - return set.asRanges(); + ranges.add(Pair.of(start, end)); + return ranges; } @VisibleForTesting public static class BatchReadEntriesCallback implements ReadEntriesCallback { - private final Set entryIdSet; - private final SortedSet entrySet; + private final SortedSet entryIds; + private final SortedSet entries; private final OpReadEntry callback; private final AtomicBoolean failed = new AtomicBoolean(false); @VisibleForTesting - public BatchReadEntriesCallback(Set entryIdSet, OpReadEntry callback) { - this.entryIdSet = entryIdSet; - this.entrySet = new TreeSet<>(Comparator.comparing(Entry::getEntryId)); + public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback) { + this.entryIds = entryIdSet; + this.entries = new TreeSet<>(Comparator.comparing(Entry::getEntryId)); this.callback = callback; } @Override public synchronized void readEntriesComplete(List entries0, Object ctx) { - entrySet.addAll(entries0); - if (entrySet.size() < entryIdSet.size()) { + entries.addAll(entries0); + if (entries.size() < entryIds.size()) { return; } - callback.readEntriesComplete(entrySet.stream().toList(), ctx); + callback.readEntriesComplete(entries.stream().toList(), ctx); } @Override @@ -2199,16 +2197,19 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } private List filterEntries() { - if (entrySet.isEmpty()) { + if (entries.isEmpty()) { return Collections.emptyList(); } // Make sure the `readPosition` of `cursor` could be moved correctly. List entries = new ArrayList<>(); - for (long entryId : entryIdSet) { - Entry entry = entrySet.first(); + for (long entryId : entryIds) { + if (this.entries.isEmpty()) { + break; + } + Entry entry = this.entries.first(); if (entry.getEntryId() == entryId) { entries.add(entry); - entrySet.remove(entry); + this.entries.remove(entry); } else { break; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 644f53c3a522d..96eb60fc96256 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4503,7 +4503,7 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4608,7 +4608,7 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index aa8bb019a8540..fa7de56eef90d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; @@ -64,6 +65,8 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -4232,4 +4235,98 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + @Test + public void testToRanges() { + SortedSet set = new TreeSet<>(); + set.add(1L); + set.add(2L); + set.add(4L); + set.add(6L); + set.add(7L); + set.add(8L); + set.add(10L); + + List> ranges = ManagedLedgerImpl.toRanges(set); + assertEquals(ranges.size(), 4); + + Pair pair0 = ranges.get(0); + assertEquals(pair0.getLeft().longValue(), 1L); + assertEquals(pair0.getRight().longValue(), 2L); + + Pair pair1 = ranges.get(1); + assertEquals(pair1.getLeft().longValue(), 4L); + assertEquals(pair1.getRight().longValue(), 4L); + + Pair pair2 = ranges.get(2); + assertEquals(pair2.getLeft().longValue(), 6L); + assertEquals(pair2.getRight().longValue(), 8L); + + Pair pair3 = ranges.get(3); + assertEquals(pair3.getLeft().longValue(), 10L); + assertEquals(pair3.getRight().longValue(), 10L); + } + + + @Test + public void testBatchReadEntriesCallback() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testBatchReadEntriesCallback"); + @Cleanup + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor"); + for (int i = 0; i < 10; i++) { + ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + List entries = new ArrayList<>(); + OpReadEntry opReadEntry = OpReadEntry.create(cursor, cursor.readPosition, 10, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries0, Object ctx) { + entries.addAll(entries0); + latch.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, ledger.lastConfirmedEntry, position -> position.getEntryId() % 2 == 0); + + SortedSet entryIds = new TreeSet<>(); + entryIds.add(1L); + entryIds.add(3L); + entryIds.add(5L); + entryIds.add(7L); + entryIds.add(9L); + ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl.BatchReadEntriesCallback(entryIds, opReadEntry); + long ledgerId = ledger.currentLedger.getId(); + + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null); + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[1])), null); + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[1])), null); + callback.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("Invalid cursor position"), null); + + latch.await(); + // should not fail + assertFalse(failed.get()); + assertEquals(entries.size(), 5); + + // Manually trigger the callback + Entry entry1 = entries.get(0); + Entry entry3 = entries.get(1); + Entry entry5 = entries.get(2); + assertEquals(entry1.getData().length, 1); + assertEquals(entry3.getData().length, 1); + assertEquals(entry5.getData().length, 1); + + // Read from ledger. + Entry entry7 = entries.get(3); + Entry entry9 = entries.get(4); + assertNotEquals(entry7.getData().length, 1); + assertNotEquals(entry9.getData().length, 1); + } } From 6a4f74002a101d3ab6b4fa4bc00898a227944e34 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 23 Apr 2024 09:32:34 +0800 Subject: [PATCH 03/21] Fix code --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 01f4a2eb974f3..2196d5a8039f2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2160,7 +2160,7 @@ public static class BatchReadEntriesCallback implements ReadEntriesCallback { private final SortedSet entryIds; private final SortedSet entries; private final OpReadEntry callback; - private final AtomicBoolean failed = new AtomicBoolean(false); + private boolean completed = false; @VisibleForTesting public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback) { @@ -2171,19 +2171,23 @@ public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback @Override public synchronized void readEntriesComplete(List entries0, Object ctx) { + if (completed) { + return; + } entries.addAll(entries0); if (entries.size() < entryIds.size()) { return; } + completed = true; callback.readEntriesComplete(entries.stream().toList(), ctx); } @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - // Should fail AT_MOST ONCE - if (!failed.compareAndSet(false, true)) { + public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + if (completed) { return; } + completed = true; // If there are entries been read success, try to let the read operation success as possible. List entries = filterEntries(); if (entries.isEmpty()) { From b0a1f9bc981f7a7eb50af5e70c56f37a21933b8e Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 23 Apr 2024 09:52:05 +0800 Subject: [PATCH 04/21] Fix code --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 2196d5a8039f2..f98d995fb9bf3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -62,7 +62,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; From a62226307269a5b8a8ab5e40b5836c2620c809e8 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 23 Apr 2024 10:24:57 +0800 Subject: [PATCH 05/21] Fix code --- .../mledger/impl/ManagedLedgerImpl.java | 57 ++++++++++--------- .../mledger/impl/ManagedCursorTest.java | 4 +- .../mledger/impl/ManagedLedgerTest.java | 4 +- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f98d995fb9bf3..f7e0e5e08766d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2088,54 +2088,43 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) return; } - Predicate skipCond = opReadEntry.skipCondition; - if (skipCond == null) { - long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); + long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); + + Predicate skipCondition = opReadEntry.skipCondition; + if (skipCondition == null) { if (log.isDebugEnabled()) { log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, lastEntry); } - asyncReadEntry( - ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx); + asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); return; } - // Try to read entries in the current ledger what we need through a single `entryIdSet` as much as possible. - long entryId = firstEntry; - int count = 0; + // Skip entries that don't match the predicate SortedSet entryIds = new TreeSet<>(); - int entriesToRead = opReadEntry.getNumberOfEntriesToRead(); - while (entryId <= lastEntryInLedger && count < entriesToRead) { - PositionImpl position = PositionImpl.get(ledger.getId(), entryId); - if (!skipCond.test(position)) { - entryIds.add(entryId); - count++; + for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { + PositionImpl position = new PositionImpl(ledger.getId(), entryId); + if (skipCondition.test(position)) { + continue; } - entryId++; + entryIds.add(entryId); } if (entryIds.isEmpty()) { // Move `readPosition` of `cursor`. - PositionImpl position = PositionImpl.get(ledger.getId(), entryId - 1); + PositionImpl position = PositionImpl.get(ledger.getId(), lastEntry - 1); opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, position); return; } - asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx); - } - - private void asyncReadEntry(ReadHandle ledger, SortedSet entryIds, OpReadEntry opReadEntry, Object ctx) { - checkArgument(!entryIds.isEmpty()); List> ranges = toRanges(entryIds); ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); for (Pair pair : ranges) { long start = pair.getLeft(); long end = pair.getRight(); - // TODO: should handle `lastReadCallback` timeout check??? - asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, ctx); + asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, opReadEntry.ctx); } } - // Parse entryIds to ranges. @VisibleForTesting public static List> toRanges(SortedSet entryIds) { List> ranges = new ArrayList<>(); @@ -2233,8 +2222,7 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr } } - @VisibleForTesting - public void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries, + protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries, ReadEntriesCallback callback, Object ctx) { if (config.getReadEntryTimeoutSeconds() > 0) { // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled @@ -2249,6 +2237,23 @@ public void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, b } } + protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, + Object ctx) { + if (config.getReadEntryTimeoutSeconds() > 0) { + // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled + long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); + long createdTime = System.nanoTime(); + ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, + opReadEntry, readOpCount, createdTime, ctx); + lastReadCallback = readCallback; + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), + readCallback, readOpCount); + } else { + entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, + ctx); + } + } + static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { volatile ReadEntryCallback readEntryCallback; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 70a9cf11aa248..c9bd64171c15a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4514,7 +4514,7 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4619,7 +4619,7 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 14fae54b96821..76a7acf9f9631 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3176,8 +3176,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }, null, PositionImpl.LATEST, null); - ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), - opReadEntry.cursor.isCacheReadEntry(), opReadEntry, ctxStr); + ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId() + , opReadEntry, ctxStr); retryStrategically((test) -> { return responseException2.get() != null; }, 5, 1000); From a04d7880c6802939429251e6f21f8544d9ed4d47 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 24 Apr 2024 20:55:28 +0800 Subject: [PATCH 06/21] Fix code --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 13 +++++++------ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f7e0e5e08766d..bf34d37135b51 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2109,9 +2109,10 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) } entryIds.add(entryId); } + if (entryIds.isEmpty()) { // Move `readPosition` of `cursor`. - PositionImpl position = PositionImpl.get(ledger.getId(), lastEntry - 1); + PositionImpl position = PositionImpl.get(ledger.getId(), lastEntry); opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, position); return; } @@ -2146,14 +2147,14 @@ public static List> toRanges(SortedSet entryIds) { @VisibleForTesting public static class BatchReadEntriesCallback implements ReadEntriesCallback { private final SortedSet entryIds; - private final SortedSet entries; + private final List entries; private final OpReadEntry callback; private boolean completed = false; @VisibleForTesting public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback) { this.entryIds = entryIdSet; - this.entries = new TreeSet<>(Comparator.comparing(Entry::getEntryId)); + this.entries = new ArrayList<>(entryIdSet.size()); this.callback = callback; } @@ -2167,7 +2168,8 @@ public synchronized void readEntriesComplete(List entries0, Object ctx) { return; } completed = true; - callback.readEntriesComplete(entries.stream().toList(), ctx); + entries.sort(Comparator.comparingLong(Entry::getEntryId)); + callback.readEntriesComplete(entries, ctx); } @Override @@ -2195,10 +2197,9 @@ private List filterEntries() { if (this.entries.isEmpty()) { break; } - Entry entry = this.entries.first(); + Entry entry = this.entries.remove(0); if (entry.getEntryId() == entryId) { entries.add(entry); - this.entries.remove(entry); } else { break; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 76a7acf9f9631..cac41db2c7a0e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4390,7 +4390,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { entryIds.add(5L); entryIds.add(7L); entryIds.add(9L); - ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl.BatchReadEntriesCallback(entryIds, opReadEntry); + ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl + .BatchReadEntriesCallback(entryIds, opReadEntry); long ledgerId = ledger.currentLedger.getId(); callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null); From 3949101ebb992a8437b301cc6a7c5080780a0f43 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 25 Apr 2024 16:13:21 +0800 Subject: [PATCH 07/21] Fix code --- .../mledger/impl/ManagedCursorImpl.java | 2 +- .../mledger/impl/ManagedLedgerImpl.java | 15 +-- .../bookkeeper/mledger/impl/OpReadEntry.java | 10 +- .../mledger/impl/ManagedLedgerTest.java | 102 +++++++++++++++++- 4 files changed, 115 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 69b130a98c869..a456ab6807094 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3452,7 +3452,7 @@ public PositionImpl getNextAvailablePosition(PositionImpl position) { PositionImpl nextPosition = range.upperEndpoint().getNext(); return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext(); } - return position.getNext(); + return ledger.getNextValidPosition(position); } public Position getNextLedgerPosition(long currentLedgerId) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index bf34d37135b51..fc28b03417008 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2110,15 +2110,15 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) entryIds.add(entryId); } + PositionImpl lastReadPosition = PositionImpl.get(ledger.getId(), lastEntry); if (entryIds.isEmpty()) { // Move `readPosition` of `cursor`. - PositionImpl position = PositionImpl.get(ledger.getId(), lastEntry); - opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, position); + opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, lastReadPosition); return; } List> ranges = toRanges(entryIds); - ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry); + ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry, lastReadPosition); for (Pair pair : ranges) { long start = pair.getLeft(); long end = pair.getRight(); @@ -2149,13 +2149,16 @@ public static class BatchReadEntriesCallback implements ReadEntriesCallback { private final SortedSet entryIds; private final List entries; private final OpReadEntry callback; - private boolean completed = false; + private volatile boolean completed = false; + private final PositionImpl lastReadPosition; @VisibleForTesting - public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback) { + public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback, + PositionImpl lastReadPosition) { this.entryIds = entryIdSet; this.entries = new ArrayList<>(entryIdSet.size()); this.callback = callback; + this.lastReadPosition = lastReadPosition; } @Override @@ -2169,7 +2172,7 @@ public synchronized void readEntriesComplete(List entries0, Object ctx) { } completed = true; entries.sort(Comparator.comparingLong(Entry::getEntryId)); - callback.readEntriesComplete(entries, ctx); + callback.internalReadEntriesComplete(entries, ctx, lastReadPosition); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index a79ba3fb5e23b..0dbf5fa910b86 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -75,7 +75,7 @@ void internalReadEntriesComplete(List returnedEntries, Object ctx, Positi } cursor.updateReadStats(entriesCount, entriesSize); - if (entriesCount != 0) { + if (entriesCount != 0 && lastPosition == null) { lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition(); } if (log.isDebugEnabled()) { @@ -83,16 +83,14 @@ void internalReadEntriesComplete(List returnedEntries, Object ctx, Positi cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count); } - List filteredEntries = Collections.emptyList(); + List filteredEntries; if (entriesCount != 0) { filteredEntries = cursor.filterReadEntries(returnedEntries); entries.addAll(filteredEntries); } - // if entries have been filtered out then try to skip reading of already deletedMessages in that range - final Position nexReadPosition = entriesCount != filteredEntries.size() - ? cursor.getNextAvailablePosition(lastPosition) : lastPosition.getNext(); - updateReadPosition(nexReadPosition); + final Position nextReadPosition = cursor.getNextAvailablePosition(lastPosition); + updateReadPosition(nextReadPosition); checkReadCompletion(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index cac41db2c7a0e..773d085728094 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -149,6 +149,8 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -4391,7 +4393,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { entryIds.add(7L); entryIds.add(9L); ManagedLedgerImpl.BatchReadEntriesCallback callback = new ManagedLedgerImpl - .BatchReadEntriesCallback(entryIds, opReadEntry); + .BatchReadEntriesCallback(entryIds, opReadEntry, null); long ledgerId = ledger.currentLedger.getId(); callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null); @@ -4418,4 +4420,102 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { assertNotEquals(entry7.getData().length, 1); assertNotEquals(entry9.getData().length, 1); } + + @Test + public void testReadEntriesFromDifferentLedgersWithSkipCondition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(5); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config); + ledger = Mockito.spy(ledger); + + AtomicInteger counter = new AtomicInteger(); + Mockito.doAnswer(inv -> { + counter.incrementAndGet(); + return inv.callRealMethod(); + }).when(ledger).asyncReadEntries(Mockito.any()); + @Cleanup + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor"); + + Position lastPosition = null; + for (int i = 0; i < 12; i++) { + lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + List entries = new ArrayList<>(); + cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries0, Object ctx) { + entries.addAll(entries0); + latch.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, PositionImpl.LATEST, position -> position.getEntryId() % 2 == 0); + + latch.await(); + assertFalse(failed.get()); + assertEquals(entries.size(), 5); + // Read entries from 3 ledgers, the counter is 3. + assertEquals(counter.get(), 3); + Position readPosition = cursor.getReadPosition(); + assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId() + && readPosition.getEntryId() == lastPosition.getEntryId() + 1); + } + + @Test + public void testReadEntriesFromOneSameLedgerWithSkipCondition() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipCondition", config); + ledger = Mockito.spy(ledger); + + AtomicInteger counter = new AtomicInteger(); + Mockito.doAnswer(inv -> { + counter.incrementAndGet(); + return inv.callRealMethod(); + }).when(ledger).asyncReadEntries(Mockito.any()); + + @Cleanup + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test-cursor"); + + Position lastPosition = null; + for (int i = 0; i < 10; i++) { + lastPosition = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + List entries = new ArrayList<>(); + cursor.asyncReadEntriesWithSkip(100, -1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries0, Object ctx) { + entries.addAll(entries0); + latch.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, PositionImpl.LATEST, position -> position.getEntryId() % 2 == 0); + + latch.await(); + assertEquals(counter.get(), 1); + + assertFalse(failed.get()); + assertEquals(entries.size(), 5); + + Position readPosition = cursor.getReadPosition(); + assertTrue(readPosition.getLedgerId() == lastPosition.getLedgerId() + && readPosition.getEntryId() == lastPosition.getEntryId() + 1); + } } From 9eec2ba261d471d99f11199e3267a13e8a2e17fc Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 25 Apr 2024 16:15:21 +0800 Subject: [PATCH 08/21] Fix code --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 773d085728094..5d077f1442817 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3178,8 +3178,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }, null, PositionImpl.LATEST, null); - ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId() - , opReadEntry, ctxStr); + ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), opReadEntry, ctxStr); retryStrategically((test) -> { return responseException2.get() != null; }, 5, 1000); From e656c459c5a094c1ab2e20bba7b28f55a0853c30 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 25 Apr 2024 16:15:58 +0800 Subject: [PATCH 09/21] Fix code --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 5d077f1442817..755d32c407b34 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3178,7 +3178,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }, null, PositionImpl.LATEST, null); - ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), opReadEntry, ctxStr); + ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(), + opReadEntry, ctxStr); retryStrategically((test) -> { return responseException2.get() != null; }, 5, 1000); From b71b022e0b702ca2982d6c166f272564adf4d9fc Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 25 Apr 2024 16:32:03 +0800 Subject: [PATCH 10/21] Fix code --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index a456ab6807094..7f526c372fdc3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3446,6 +3446,9 @@ public long[] getBatchPositionAckSet(Position position) { * @return next available position */ public PositionImpl getNextAvailablePosition(PositionImpl position) { + if (individualDeletedMessages.isEmpty()) { + return ledger.getNextValidPosition(position); + } Range range = individualDeletedMessages.rangeContaining(position.getLedgerId(), position.getEntryId()); if (range != null) { From cbd449aa4db4e2e84e3f295fe8d2dc2ac110ed59 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 26 Apr 2024 00:55:15 +0800 Subject: [PATCH 11/21] fix codestyle --- .../java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java | 1 - .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 0dbf5fa910b86..5bbac99941468 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -21,7 +21,6 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 755d32c407b34..8f993d5dbce33 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -149,8 +149,6 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; From 0cbe85a97832996953bc8081a87f9552158e09eb Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 26 Apr 2024 02:18:59 +0800 Subject: [PATCH 12/21] fix tests --- .../apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 6 ++++-- .../apache/pulsar/client/impl/MessageRedeliveryTest.java | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c9bd64171c15a..1932b1d90ae5b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4514,7 +4514,8 @@ public void testReadEntriesWithSkipDeletedEntries() throws Exception { return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), + Mockito.anyBoolean(),Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); @@ -4619,7 +4620,8 @@ public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws return inv.callRealMethod(); }) .when(ledger) - .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any()); + .asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), + Mockito.anyBoolean(), Mockito.any(), Mockito.any()); @Cleanup ManagedCursor cursor = ledger.openCursor("c"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 29b06f68b64eb..d601542cd50ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -32,6 +32,7 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.BatchReceivePolicy; @@ -212,7 +213,8 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception { assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0); // markDelete position should be one position behind read position - assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext()); + assertEquals(cursor.getReadPosition(), + cursor.getNextAvailablePosition((PositionImpl) cursor.getMarkDeletedPosition())); producer.close(); consumer2.close(); From d675baed8cef1c8330c84282ff56ae9acb5ae9e2 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 26 Apr 2024 13:02:01 +0800 Subject: [PATCH 13/21] improve code --- .../mledger/impl/ManagedLedgerImpl.java | 31 ++++++++++++++----- .../bookkeeper/mledger/impl/OpReadEntry.java | 9 ++++++ .../mledger/impl/ManagedLedgerTest.java | 27 +++++++--------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index fc28b03417008..08c3509d58a8e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2171,7 +2171,10 @@ public synchronized void readEntriesComplete(List entries0, Object ctx) { return; } completed = true; + // Make sure the entries are in the correct order entries.sort(Comparator.comparingLong(Entry::getEntryId)); + // If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass + // the `lastReadPosition([5])` to make sure the cursor read position is correct. callback.internalReadEntriesComplete(entries, ctx, lastReadPosition); } @@ -2183,18 +2186,30 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj completed = true; // If there are entries been read success, try to let the read operation success as possible. List entries = filterEntries(); - if (entries.isEmpty()) { - callback.readEntriesFailed(exception, ctx); - } else { - callback.readEntriesComplete(entries, ctx); - } - } - + if (!entries.isEmpty()) { + // Move the read position of the cursor to the next position of the last read entry, + // or we will deliver the same entry to the consumer more than once. + Entry entry = entries.get(entries.size() - 1); + PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); + PositionImpl nextReadPosition = callback.cursor.getNextAvailablePosition(position); + callback.updateReadPosition(nextReadPosition); + } + callback.internalReadEntriesFailed(entries, exception, ctx); + } + + /** + * Filter the entries that have been read success. + *

+ * If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed, + * only return [1,2] to the caller, to make sure the read operation success as possible + * and keep the ordering guarantee. + * + * @return filtered entries + */ private List filterEntries() { if (entries.isEmpty()) { return Collections.emptyList(); } - // Make sure the `readPosition` of `cursor` could be moved correctly. List entries = new ArrayList<>(); for (long entryId : entryIds) { if (this.entries.isEmpty()) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 5bbac99941468..29039cd89576e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -21,6 +21,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; @@ -29,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +102,14 @@ public void readEntriesComplete(List returnedEntries, Object ctx) { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + internalReadEntriesFailed(null, exception, ctx); + } + + void internalReadEntriesFailed(Collection ret, ManagedLedgerException exception, Object ctx) { cursor.readOperationCompleted(); + if (CollectionUtils.isNotEmpty(ret)) { + entries.addAll(ret); + } if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 8f993d5dbce33..d5bca17152dc2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4395,28 +4395,23 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { long ledgerId = ledger.currentLedger.getId(); callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 1, new byte[1])), null); - callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[1])), null); - callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[1])), null); + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 3, new byte[3])), null); + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 7, new byte[7])), null); callback.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("Invalid cursor position"), null); + // After call readEntriesFailed, the following readEntriesComplete should be ignored. + callback.readEntriesComplete(List.of(EntryImpl.create(ledgerId, 5, new byte[5])), null); latch.await(); // should not fail assertFalse(failed.get()); - assertEquals(entries.size(), 5); + assertEquals(entries.size(), 2); + + // `entries` should be only the entries with entryId 1 and 3. + assertEquals(entries.get(0).getEntryId(), 1); + assertEquals(entries.get(1).getEntryId(), 3); - // Manually trigger the callback - Entry entry1 = entries.get(0); - Entry entry3 = entries.get(1); - Entry entry5 = entries.get(2); - assertEquals(entry1.getData().length, 1); - assertEquals(entry3.getData().length, 1); - assertEquals(entry5.getData().length, 1); - - // Read from ledger. - Entry entry7 = entries.get(3); - Entry entry9 = entries.get(4); - assertNotEquals(entry7.getData().length, 1); - assertNotEquals(entry9.getData().length, 1); + // ReadPosition should be updated to [4] + assertEquals(cursor.getReadPosition().getEntryId(), 4); } @Test From ad4712683072af4877530685bb753a0656d471ab Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 17 Oct 2024 23:00:30 +0800 Subject: [PATCH 14/21] rebase master --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 +++++++------- .../bookkeeper/mledger/impl/OpReadEntry.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 4 ++-- .../pulsar/client/impl/MessageRedeliveryTest.java | 4 +--- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index aed23d6cd5f84..8d44c9f8ba915 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2110,7 +2110,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); - Predicate skipCondition = opReadEntry.skipCondition; + Predicate skipCondition = opReadEntry.skipCondition; if (skipCondition == null) { if (log.isDebugEnabled()) { log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, @@ -2123,14 +2123,14 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) // Skip entries that don't match the predicate SortedSet entryIds = new TreeSet<>(); for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { - PositionImpl position = new PositionImpl(ledger.getId(), entryId); + Position position = PositionFactory.create(ledger.getId(), entryId); if (skipCondition.test(position)) { continue; } entryIds.add(entryId); } - PositionImpl lastReadPosition = PositionImpl.get(ledger.getId(), lastEntry); + Position lastReadPosition = PositionFactory.create(ledger.getId(), lastEntry); if (entryIds.isEmpty()) { // Move `readPosition` of `cursor`. opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, lastReadPosition); @@ -2170,11 +2170,11 @@ public static class BatchReadEntriesCallback implements ReadEntriesCallback { private final List entries; private final OpReadEntry callback; private volatile boolean completed = false; - private final PositionImpl lastReadPosition; + private final Position lastReadPosition; @VisibleForTesting public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback, - PositionImpl lastReadPosition) { + Position lastReadPosition) { this.entryIds = entryIdSet; this.entries = new ArrayList<>(entryIdSet.size()); this.callback = callback; @@ -2210,8 +2210,8 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj // Move the read position of the cursor to the next position of the last read entry, // or we will deliver the same entry to the consumer more than once. Entry entry = entries.get(entries.size() - 1); - PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); - PositionImpl nextReadPosition = callback.cursor.getNextAvailablePosition(position); + Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId()); + Position nextReadPosition = callback.cursor.getNextAvailablePosition(position); callback.updateReadPosition(nextReadPosition); } callback.internalReadEntriesFailed(entries, exception, ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index d258a12643ff3..830f3005a71f6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -21,7 +21,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.List; import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 4b34b3d832e9a..27d8b3863d6a3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4506,7 +4506,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { failed.set(true); latch.countDown(); } - }, null, PositionImpl.LATEST, position -> position.getEntryId() % 2 == 0); + }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0); latch.await(); assertFalse(failed.get()); @@ -4554,7 +4554,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { failed.set(true); latch.countDown(); } - }, null, PositionImpl.LATEST, position -> position.getEntryId() % 2 == 0); + }, null, PositionFactory.LATEST, position -> position.getEntryId() % 2 == 0); latch.await(); assertEquals(counter.get(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index b302df482d9a5..89c0c83796e7c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -37,7 +37,6 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.BatchReceivePolicy; @@ -218,8 +217,7 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception { assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0); // markDelete position should be one position behind read position - assertEquals(cursor.getReadPosition(), - cursor.getNextAvailablePosition((PositionImpl) cursor.getMarkDeletedPosition())); + assertEquals(cursor.getReadPosition(), cursor.getNextAvailablePosition(cursor.getMarkDeletedPosition())); producer.close(); consumer2.close(); From 8786644d593610c41886bfa9b5f51f54934f7967 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 26 May 2025 18:20:07 +0800 Subject: [PATCH 15/21] fix code --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index fe8b8922f3cbf..3d7fd027e1ee3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3679,10 +3679,10 @@ public long[] getBatchPositionAckSet(Position position) { */ public Position getNextAvailablePosition(Position position) { lock.readLock().lock(); - if (individualDeletedMessages.isEmpty()) { - return ledger.getNextValidPosition(position); - } try { + if (individualDeletedMessages.isEmpty()) { + return ledger.getNextValidPosition(position); + } Range range = individualDeletedMessages.rangeContaining(position.getLedgerId(), position.getEntryId()); if (range != null) { From 8c700ecaf517aefd65bd8b610a15d6452d0c4db0 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 27 May 2025 01:57:44 +0800 Subject: [PATCH 16/21] address comment --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1826605d0151a..5ac65fe53599f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2331,6 +2331,7 @@ private List filterEntries() { if (entries.isEmpty()) { return Collections.emptyList(); } + entries.sort(Comparator.comparingLong(Entry::getEntryId)); List entries = new ArrayList<>(); for (long entryId : entryIds) { if (this.entries.isEmpty()) { From 9f0a50d74e22807c554aa96dc0233ae8a6fe9a6e Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 3 Jun 2025 11:27:43 +0800 Subject: [PATCH 17/21] address comment --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5ac65fe53599f..3eb9282ece4de 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2285,6 +2285,9 @@ public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback @Override public synchronized void readEntriesComplete(List entries0, Object ctx) { if (completed) { + for (Entry entry : entries0) { + entry.release(); + } return; } entries.addAll(entries0); @@ -2332,19 +2335,24 @@ private List filterEntries() { return Collections.emptyList(); } entries.sort(Comparator.comparingLong(Entry::getEntryId)); - List entries = new ArrayList<>(); + List entries0 = new ArrayList<>(); for (long entryId : entryIds) { if (this.entries.isEmpty()) { break; } Entry entry = this.entries.remove(0); if (entry.getEntryId() == entryId) { - entries.add(entry); + entries0.add(entry); } else { + entry.release(); break; } } - return entries; + // Release the entries that are not in the result. + for (Entry entry : entries) { + entry.release(); + } + return entries0; } } From b8265941525bb9435e6072cb61dd6021978bc46c Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 11 Jun 2025 22:30:05 +0800 Subject: [PATCH 18/21] Address review comment --- managed-ledger/pom.xml | 5 ++ .../bookkeeper/mledger/PositionFactory.java | 13 ++++ .../mledger/impl/ManagedCursorImpl.java | 2 +- .../mledger/impl/ManagedLedgerImpl.java | 37 ++++++----- .../mledger/impl/MutablePositionImpl.java | 66 +++++++++++++++++++ .../mledger/impl/ManagedLedgerTest.java | 35 +++++----- 6 files changed, 122 insertions(+), 36 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 2b83fcb49177b..f7bb6e68ba055 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -141,6 +141,11 @@ slf4j-api + + it.unimi.dsi + fastutil + + diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java index 0b119844a6268..36c9f90854492 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger; import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl; +import org.apache.bookkeeper.mledger.impl.MutablePositionImpl; /** * Factory for creating {@link Position} instances. @@ -47,6 +48,18 @@ public static Position create(long ledgerId, long entryId) { return new ImmutablePositionImpl(ledgerId, entryId); } + + /** + * Create a mutable position. + * + * @param ledgerId + * @param entryId + * @return + */ + public static MutablePositionImpl createMutable(long ledgerId, long entryId) { + return new MutablePositionImpl(ledgerId, entryId); + } + /** * Create a new position. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3d7fd027e1ee3..91707907c1122 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -118,7 +118,7 @@ @SuppressWarnings("checkstyle:javadoctype") public class ManagedCursorImpl implements ManagedCursor { - private static final Comparator ENTRY_COMPARATOR = (e1, e2) -> { + static final Comparator ENTRY_COMPARATOR = (e1, e2) -> { if (e1.getLedgerId() != e2.getLedgerId()) { return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3eb9282ece4de..b47e294479b8e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -31,12 +31,14 @@ import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; +import it.unimi.dsi.fastutil.longs.LongLongPair; +import it.unimi.dsi.fastutil.longs.LongSortedSet; import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -48,8 +50,6 @@ import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -2222,9 +2222,10 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) } // Skip entries that don't match the predicate - SortedSet entryIds = new TreeSet<>(); + LongSortedSet entryIds = new LongAVLTreeSet(); + MutablePositionImpl position = PositionFactory.createMutable(-1L, -1L); for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { - Position position = PositionFactory.create(ledger.getId(), entryId); + position.transfer(ledger.getId(), entryId); if (skipCondition.test(position)) { continue; } @@ -2238,43 +2239,43 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) return; } - List> ranges = toRanges(entryIds); + List ranges = toRanges(entryIds); ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry, lastReadPosition); - for (Pair pair : ranges) { - long start = pair.getLeft(); - long end = pair.getRight(); + for (LongLongPair pair : ranges) { + long start = pair.firstLong(); + long end = pair.secondLong(); asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, opReadEntry.ctx); } } @VisibleForTesting - public static List> toRanges(SortedSet entryIds) { - List> ranges = new ArrayList<>(); - long start = entryIds.first(); + public static List toRanges(LongSortedSet entryIds) { + List ranges = new ArrayList<>(); + long start = entryIds.firstLong(); long end = start; for (long entryId : entryIds) { if (entryId - end > 1) { - ranges.add(Pair.of(start, end)); + ranges.add(LongLongPair.of(start, end)); start = entryId; end = start; } else { end = entryId; } } - ranges.add(Pair.of(start, end)); + ranges.add(LongLongPair.of(start, end)); return ranges; } @VisibleForTesting public static class BatchReadEntriesCallback implements ReadEntriesCallback { - private final SortedSet entryIds; + private final LongSortedSet entryIds; private final List entries; private final OpReadEntry callback; private volatile boolean completed = false; private final Position lastReadPosition; @VisibleForTesting - public BatchReadEntriesCallback(SortedSet entryIdSet, OpReadEntry callback, + public BatchReadEntriesCallback(LongSortedSet entryIdSet, OpReadEntry callback, Position lastReadPosition) { this.entryIds = entryIdSet; this.entries = new ArrayList<>(entryIdSet.size()); @@ -2296,7 +2297,7 @@ public synchronized void readEntriesComplete(List entries0, Object ctx) { } completed = true; // Make sure the entries are in the correct order - entries.sort(Comparator.comparingLong(Entry::getEntryId)); + entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR); // If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass // the `lastReadPosition([5])` to make sure the cursor read position is correct. callback.internalReadEntriesComplete(entries, ctx, lastReadPosition); @@ -2334,7 +2335,7 @@ private List filterEntries() { if (entries.isEmpty()) { return Collections.emptyList(); } - entries.sort(Comparator.comparingLong(Entry::getEntryId)); + entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR); List entries0 = new ArrayList<>(); for (long entryId : entryIds) { if (this.entries.isEmpty()) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java new file mode 100644 index 0000000000000..098d632f0b8d6 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import org.apache.bookkeeper.mledger.Position; + +public final class MutablePositionImpl implements Position { + + private volatile long ledgerId; + private volatile long entryId; + + public MutablePositionImpl(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + public void transfer(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + @Override + public long getLedgerId() { + return ledgerId; + } + + @Override + public long getEntryId() { + return entryId; + } + + /** + * String representation of virtual cursor - LedgerId:EntryId. + */ + @Override + public String toString() { + return ledgerId + ":" + entryId; + } + + @Override + public int hashCode() { + return hashCodeForPosition(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Position && compareTo((Position) obj) == 0; + } + +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 9dc260421722f..60ab9659a8463 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -46,6 +46,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.concurrent.DefaultThreadFactory; +import it.unimi.dsi.fastutil.longs.LongAVLTreeSet; +import it.unimi.dsi.fastutil.longs.LongLongPair; +import it.unimi.dsi.fastutil.longs.LongSortedSet; import java.lang.reflect.Field; import java.nio.ReadOnlyBufferException; import java.nio.charset.Charset; @@ -65,8 +68,6 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -4481,7 +4482,7 @@ public void testRemoveLedgerProperty() throws Exception { @Test public void testToRanges() { - SortedSet set = new TreeSet<>(); + LongSortedSet set = new LongAVLTreeSet(); set.add(1L); set.add(2L); set.add(4L); @@ -4490,24 +4491,24 @@ public void testToRanges() { set.add(8L); set.add(10L); - List> ranges = ManagedLedgerImpl.toRanges(set); + List ranges = ManagedLedgerImpl.toRanges(set); assertEquals(ranges.size(), 4); - Pair pair0 = ranges.get(0); - assertEquals(pair0.getLeft().longValue(), 1L); - assertEquals(pair0.getRight().longValue(), 2L); + LongLongPair pair0 = ranges.get(0); + assertEquals(pair0.firstLong(), 1L); + assertEquals(pair0.secondLong(), 2L); - Pair pair1 = ranges.get(1); - assertEquals(pair1.getLeft().longValue(), 4L); - assertEquals(pair1.getRight().longValue(), 4L); + LongLongPair pair1 = ranges.get(1); + assertEquals(pair1.firstLong(), 4L); + assertEquals(pair1.secondLong(), 4L); - Pair pair2 = ranges.get(2); - assertEquals(pair2.getLeft().longValue(), 6L); - assertEquals(pair2.getRight().longValue(), 8L); + LongLongPair pair2 = ranges.get(2); + assertEquals(pair2.firstLong(), 6L); + assertEquals(pair2.secondLong(), 8L); - Pair pair3 = ranges.get(3); - assertEquals(pair3.getLeft().longValue(), 10L); - assertEquals(pair3.getRight().longValue(), 10L); + LongLongPair pair3 = ranges.get(3); + assertEquals(pair3.firstLong(), 10L); + assertEquals(pair3.secondLong(), 10L); } @@ -4539,7 +4540,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }, null, ledger.lastConfirmedEntry, position -> position.getEntryId() % 2 == 0); - SortedSet entryIds = new TreeSet<>(); + LongSortedSet entryIds = new LongAVLTreeSet(); entryIds.add(1L); entryIds.add(3L); entryIds.add(5L); From 9f1245ac2b8c740aa5f3bfabfefb39841c64c960 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 11 Jun 2025 22:42:17 +0800 Subject: [PATCH 19/21] Address review comment --- .../mledger/impl/ManagedLedgerImpl.java | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b47e294479b8e..cf52970acfb6d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2217,7 +2217,8 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry, lastEntry); } - asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx); + asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), + opReadEntry, opReadEntry.ctx); return; } @@ -2387,22 +2388,7 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry } } - protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry, - Object ctx) { - if (config.getReadEntryTimeoutSeconds() > 0) { - // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled - long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); - long createdTime = System.nanoTime(); - ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, - opReadEntry, readOpCount, createdTime, ctx); - lastReadCallback = readCallback; - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), - readCallback, readOpCount); - } else { - entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, - ctx); - } - } + static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { From 11824bc38ff718f95587f5c4a21fccb00ae5cef6 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 11 Jun 2025 22:45:18 +0800 Subject: [PATCH 20/21] Address review comment --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index cf52970acfb6d..ce6af879d772a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2388,8 +2388,6 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry } } - - static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback { volatile ReadEntryCallback readEntryCallback; From cf518026496b84d87760fcb2bc3f8b9b673aa179 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 12 Jun 2025 10:59:28 +0800 Subject: [PATCH 21/21] Address review comment --- .../bookkeeper/mledger/PositionFactory.java | 12 ------------ .../mledger/impl/ManagedLedgerImpl.java | 4 ++-- .../mledger/impl/MutablePositionImpl.java | 17 ++++++++++++++--- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java index 36c9f90854492..b7111f567b64f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionFactory.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger; import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl; -import org.apache.bookkeeper.mledger.impl.MutablePositionImpl; /** * Factory for creating {@link Position} instances. @@ -49,17 +48,6 @@ public static Position create(long ledgerId, long entryId) { } - /** - * Create a mutable position. - * - * @param ledgerId - * @param entryId - * @return - */ - public static MutablePositionImpl createMutable(long ledgerId, long entryId) { - return new MutablePositionImpl(ledgerId, entryId); - } - /** * Create a new position. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ce6af879d772a..990090a4a158b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2224,9 +2224,9 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) // Skip entries that don't match the predicate LongSortedSet entryIds = new LongAVLTreeSet(); - MutablePositionImpl position = PositionFactory.createMutable(-1L, -1L); + MutablePositionImpl position = new MutablePositionImpl(); for (long entryId = firstEntry; entryId <= lastEntry; entryId++) { - position.transfer(ledger.getId(), entryId); + position.changePositionTo(ledger.getId(), entryId); if (skipCondition.test(position)) { continue; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java index 098d632f0b8d6..c4584da798c48 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MutablePositionImpl.java @@ -20,17 +20,28 @@ import org.apache.bookkeeper.mledger.Position; -public final class MutablePositionImpl implements Position { +final class MutablePositionImpl implements Position { private volatile long ledgerId; private volatile long entryId; - public MutablePositionImpl(long ledgerId, long entryId) { + MutablePositionImpl(long ledgerId, long entryId) { this.ledgerId = ledgerId; this.entryId = entryId; } - public void transfer(long ledgerId, long entryId) { + MutablePositionImpl() { + this.ledgerId = -1; + this.entryId = -1; + } + + /** + * Change the ledgerId and entryId. + * + * @param ledgerId + * @param entryId + */ + public void changePositionTo(long ledgerId, long entryId) { this.ledgerId = ledgerId; this.entryId = entryId; }