Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7de1758
Improve for skipCondition.
dao-jun Jan 4, 2024
302ab0d
Merge branch 'master' into dev/ml_parallel_read
dao-jun Jan 19, 2024
7236265
code improve
dao-jun Jan 20, 2024
eb505c0
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun Apr 21, 2024
6a4f740
Fix code
dao-jun Apr 23, 2024
b0a1f9b
Fix code
dao-jun Apr 23, 2024
a622263
Fix code
dao-jun Apr 23, 2024
a04d788
Fix code
dao-jun Apr 24, 2024
3949101
Fix code
dao-jun Apr 25, 2024
9eec2ba
Fix code
dao-jun Apr 25, 2024
e656c45
Fix code
dao-jun Apr 25, 2024
b71b022
Fix code
dao-jun Apr 25, 2024
cbd449a
fix codestyle
dao-jun Apr 25, 2024
0cbe85a
fix tests
dao-jun Apr 25, 2024
d675bae
improve code
dao-jun Apr 26, 2024
5a30448
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun Apr 27, 2024
f438312
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun May 3, 2024
ec248c0
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun May 8, 2024
bba345a
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun May 13, 2024
60e090d
Merge branch 'refs/heads/master' into dev/ml_parallel_read
dao-jun Oct 17, 2024
ad47126
rebase master
dao-jun Oct 17, 2024
ae51bbc
Merge remote-tracking branch 'origin/master' into dev/ml_parallel_read
lhotari Oct 22, 2024
47fca1d
Merge branch 'master' into dev/ml_parallel_read
dao-jun May 26, 2025
8786644
fix code
dao-jun May 26, 2025
8c700ec
address comment
dao-jun May 26, 2025
9f0a50d
address comment
dao-jun Jun 3, 2025
b826594
Address review comment
dao-jun Jun 11, 2025
9f1245a
Address review comment
dao-jun Jun 11, 2025
11824bc
Address review comment
dao-jun Jun 11, 2025
cf51802
Address review comment
dao-jun Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static Position create(long ledgerId, long entryId) {
return new ImmutablePositionImpl(ledgerId, entryId);
}


/**
* Create a new position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@

@SuppressWarnings("checkstyle:javadoctype")
public class ManagedCursorImpl implements ManagedCursor {
private static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
if (e1.getLedgerId() != e2.getLedgerId()) {
return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1;
}
Expand Down Expand Up @@ -3680,14 +3680,17 @@ public long[] getBatchPositionAckSet(Position position) {
public Position getNextAvailablePosition(Position position) {
lock.readLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
return ledger.getNextValidPosition(position);
}
Range<Position> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
position.getEntryId());
if (range != null) {
Position nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0)
? nextPosition : position.getNext();
}
return position.getNext();
return ledger.getNextValidPosition(position);
} finally {
lock.readLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import it.unimi.dsi.fastutil.longs.LongAVLTreeSet;
import it.unimi.dsi.fastutil.longs.LongLongPair;
import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
Expand Down Expand Up @@ -2208,42 +2211,151 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)

long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);

// Filer out and skip unnecessary read entry
if (opReadEntry.skipCondition != null) {
long firstValidEntry = -1L;
long lastValidEntry = -1L;
long entryId = firstEntry;
for (; entryId <= lastEntry; entryId++) {
if (opReadEntry.skipCondition.test(PositionFactory.create(ledger.getId(), entryId))) {
if (firstValidEntry != -1L) {
break;
}
} else {
if (firstValidEntry == -1L) {
firstValidEntry = entryId;
}
Predicate<Position> skipCondition = opReadEntry.skipCondition;
if (skipCondition == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
opReadEntry, opReadEntry.ctx);
return;
}

lastValidEntry = entryId;
}
// Skip entries that don't match the predicate
LongSortedSet entryIds = new LongAVLTreeSet();
MutablePositionImpl position = new MutablePositionImpl();
for (long entryId = firstEntry; entryId <= lastEntry; entryId++) {
position.changePositionTo(ledger.getId(), entryId);
if (skipCondition.test(position)) {
continue;
}
entryIds.add(entryId);
}

// If all messages in [firstEntry...lastEntry] are filter out,
// then manual call internalReadEntriesComplete to advance read position.
if (firstValidEntry == -1L) {
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
PositionFactory.create(ledger.getId(), lastEntry));
return;
Position lastReadPosition = PositionFactory.create(ledger.getId(), lastEntry);
if (entryIds.isEmpty()) {
// Move `readPosition` of `cursor`.
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx, lastReadPosition);
return;
}

List<LongLongPair> ranges = toRanges(entryIds);
ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry, lastReadPosition);
for (LongLongPair pair : ranges) {
long start = pair.firstLong();
long end = pair.secondLong();
asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, opReadEntry.ctx);
}
}

@VisibleForTesting
public static List<LongLongPair> toRanges(LongSortedSet entryIds) {
List<LongLongPair> ranges = new ArrayList<>();
long start = entryIds.firstLong();
long end = start;
for (long entryId : entryIds) {
if (entryId - end > 1) {
ranges.add(LongLongPair.of(start, end));
start = entryId;
end = start;
} else {
end = entryId;
}
}
ranges.add(LongLongPair.of(start, end));
return ranges;
}

firstEntry = firstValidEntry;
lastEntry = lastValidEntry;
@VisibleForTesting
public static class BatchReadEntriesCallback implements ReadEntriesCallback {
private final LongSortedSet entryIds;
private final List<Entry> entries;
private final OpReadEntry callback;
private volatile boolean completed = false;
private final Position lastReadPosition;

@VisibleForTesting
public BatchReadEntriesCallback(LongSortedSet entryIdSet, OpReadEntry callback,
Position lastReadPosition) {
this.entryIds = entryIdSet;
this.entries = new ArrayList<>(entryIdSet.size());
this.callback = callback;
this.lastReadPosition = lastReadPosition;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
@Override
public synchronized void readEntriesComplete(List<Entry> entries0, Object ctx) {
if (completed) {
for (Entry entry : entries0) {
entry.release();
}
return;
}
entries.addAll(entries0);
if (entries.size() < entryIds.size()) {
return;
}
completed = true;
// Make sure the entries are in the correct order
entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
// If we want to read [1, 2, 3, 4, 5], but we only read [1, 2, 3], [4,5] are filtered, so we need to pass
// the `lastReadPosition([5])` to make sure the cursor read position is correct.
callback.internalReadEntriesComplete(entries, ctx, lastReadPosition);
}

@Override
public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (completed) {
return;
}
completed = true;
// If there are entries been read success, try to let the read operation success as possible.
List<Entry> entries = filterEntries();
if (!entries.isEmpty()) {
// Move the read position of the cursor to the next position of the last read entry,
// or we will deliver the same entry to the consumer more than once.
Entry entry = entries.get(entries.size() - 1);
Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
Position nextReadPosition = callback.cursor.getNextAvailablePosition(position);
callback.updateReadPosition(nextReadPosition);
}
callback.internalReadEntriesFailed(entries, exception, ctx);
}

/**
* Filter the entries that have been read success.
* <p>
* If we want to read [1, 2, 3, 4, 5], but only read [1, 2, 4, 5] successfully, [3] is read failed,
* only return [1,2] to the caller, to make sure the read operation success as possible
* and keep the ordering guarantee.
*
* @return filtered entries
*/
private List<Entry> filterEntries() {
if (entries.isEmpty()) {
return Collections.emptyList();
}
entries.sort(ManagedCursorImpl.ENTRY_COMPARATOR);
List<Entry> entries0 = new ArrayList<>();
for (long entryId : entryIds) {
if (this.entries.isEmpty()) {
break;
}
Entry entry = this.entries.remove(0);
if (entry.getEntryId() == entryId) {
entries0.add(entry);
} else {
entry.release();
break;
}
}
// Release the entries that are not in the result.
for (Entry entry : entries) {
entry.release();
}
return entries0;
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}

protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCallback callback, Object ctx) {
Expand All @@ -2261,20 +2373,18 @@ protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCal
}
}

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
Object ctx) {
protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries,
ReadEntriesCallback callback, Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
callback, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
readCallback, readOpCount);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
ctx);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, callback, ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import org.apache.bookkeeper.mledger.Position;

final class MutablePositionImpl implements Position {

private volatile long ledgerId;
private volatile long entryId;

MutablePositionImpl(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
}

MutablePositionImpl() {
this.ledgerId = -1;
this.entryId = -1;
}

/**
* Change the ledgerId and entryId.
*
* @param ledgerId
* @param entryId
*/
public void changePositionTo(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
}

@Override
public long getLedgerId() {
return ledgerId;
}

@Override
public long getEntryId() {
return entryId;
}

/**
* String representation of virtual cursor - LedgerId:EntryId.
*/
@Override
public String toString() {
return ledgerId + ":" + entryId;
}

@Override
public int hashCode() {
return hashCodeForPosition();
}

@Override
public boolean equals(Object obj) {
return obj instanceof Position && compareTo((Position) obj) == 0;
}

}
Loading
Loading