Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,10 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
cb, ctx, ledgerId, entryId));
final Channel c = channel;
if (c == null) {
// usually checked in writeAndFlush, but we have extra check
// because we need to release toSend.
// Manually release the binary data(variable "request") that we manually created when it can not be sent out
// because the channel is switching.
errorOut(completionKey);
ReferenceCountUtil.release(toSend);
ReferenceCountUtil.release(request);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also result in the correct behavior when useV2WireProtocol == false?

It doesn't look consistent how the V3 protocol is currently handled since there's no retained duplicate and the backing Netty ByteBuf isn't released.
In the case of V3 protocol, the body might consist of com.google.protobuf.NioByteString instances that wrap the Nio ByteBuffer instances backed by Netty ByteBuf instances which also control the lifecycle of the Nio ByteBuffer instances.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the concern about the NioBuffer lifecycle was raised by @sijie already in the original PR review: #791 (comment) .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added release at these locations

Copy link
Contributor Author

@poorbarcode poorbarcode Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also result in the correct behavior when useV2WireProtocol == false?

No memory leak when useV2WireProtocol is false, but the error io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1 described in the Motivation will also occur.

return;
} else {
// addEntry times out on backpressure
Expand Down Expand Up @@ -1180,6 +1180,7 @@ private void writeAndFlush(final Channel channel,
if (channel == null) {
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
errorOut(key);
ReferenceCountUtil.release(request);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's the wrong thing to do generally in this method. I'll share a new PR where I explain an alternative approach that handles both the V3 protocol and the UnsafeByteOperations issue and the issue that was found.

Copy link
Contributor Author

@poorbarcode poorbarcode Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To trace the context easier, I move the second comment here.

I think it's the wrong thing to do generally in this method. I'll share a new PR where I explain an alternative approach that handles both the V3 protocol and the UnsafeByteOperations issue and the issue that was found.

I don't think that the current changes in this PR are ok. I created #4293 to show what I think that should be done.

I added a section named Explanation in the Motivation, it explains that the current change is correct. Thanks for mentioning me and great suggestions ❤️

return;
}

Expand All @@ -1194,6 +1195,7 @@ private void writeAndFlush(final Channel channel,
StringUtils.requestToString(request));

errorOut(key, BKException.Code.TooManyRequestsException);
ReferenceCountUtil.release(request);
return;
}

Expand All @@ -1215,6 +1217,9 @@ private void writeAndFlush(final Channel channel,
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
// If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we
// get here, we can release the request.
ReferenceCountUtil.release(request);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
Expand All @@ -57,27 +63,32 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.IOUtils;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
* Test the bookie client.
*/
@Slf4j
public class BookieClientTest {
BookieServer bs;
File tmpDir;
Expand Down Expand Up @@ -745,4 +756,164 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception {
assertTrue(Arrays.equals(kbData, bytes));
}
}

/**
* Explain the stacks of "BookieClientImpl.addEntry" here
* 1.`BookieClientImpl.addEntry`.
* a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call this `ByteBuf` as `toSend` in the
* following sections. `toSend.recCnf` is `2` now.
* 2.`Get PerChannelBookieClient`.
* 3.`ChannelReadyForAddEntryCallback.operationComplete`
* a.`PerChannelBookieClient.addEntry`
* a-1.Build a new ByteBuf for request command. We call this `ByteBuf` new as `request` in the following
* sections.
* a-2.`channle.writeAndFlush(request)` or release the ByteBuf when `channel` is switching.
* Note the callback will be called immediately if the channel is switching.
* b.Release the `ByteBuf` since it has been retained at `step 1`. `toSend.recCnf` should be `1` now.
*/
public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean smallPayload,
boolean withDelayReconnect, boolean withDelayAddEntry,
int tryTimes) throws Exception {
final long ledgerId = 1;
final BookieId addr = bs.getBookieId();
// Build passwd.
byte[] passwd = new byte[20];
Arrays.fill(passwd, (byte) 'a');
// Build digest manager.
DigestManager digestManager = DigestManager.instantiate(1, passwd,
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY),
PooledByteBufAllocator.DEFAULT, useV2WireProtocol);
// Build client.
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setUseV2WireProtocol(useV2WireProtocol);
BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup,
UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);

// Inject a reconnect event.
// 1. Get the channel that will be used.
// 2. Call add entry.
// 3. Another thread close the channel that is using.
for (int i = 0; i < tryTimes; i++) {
long entryId = i + 1;
long lac = i;
// Build payload.
int payloadLen;
ByteBuf payload;
if (smallPayload) {
payloadLen = 1;
payload = PooledByteBufAllocator.DEFAULT.buffer(1);
payload.writeByte(1);
} else {
payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
payload = PooledByteBufAllocator.DEFAULT.buffer();
byte[] bs = new byte[payloadLen];
payload.writeBytes(bs);
}

// Digest.
ReferenceCounted bb = digestManager.computeDigestAndPackageForSending(entryId, lac,
payloadLen * entryId, payload, passwd, BookieProtocol.FLAG_NONE);
log.info("Before send. bb.refCnf: {}", bb.refCnt());

// Step: get the channel that will be used.
PerChannelBookieClientPool perChannelBookieClientPool = client.lookupClient(addr);
AtomicReference<PerChannelBookieClient> perChannelBookieClient = new AtomicReference<>();
perChannelBookieClientPool.obtain((rc, result) -> perChannelBookieClient.set(result), ledgerId);
Awaitility.await().untilAsserted(() -> {
assertNotNull(perChannelBookieClient.get());
});

// Step: Inject a reconnect event.
final int delayMillis = i;
new Thread(() -> {
if (withDelayReconnect) {
sleep(delayMillis);
}
Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel");
if (channel != null) {
channel.close();
}
}).start();
if (withDelayAddEntry) {
sleep(delayMillis);
}

// Step: add entry.
AtomicBoolean callbackExecuted = new AtomicBoolean();
WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> {
log.info("Writing is finished. rc: {}, withDelayReconnect: {}, withDelayAddEntry: {}, ledgerId: {},"
+ " entryId: {}, socketAddr: {}, ctx: {}",
rc, withDelayReconnect, withDelayAddEntry, lId, eId, socketAddr, ctx);
callbackExecuted.set(true);
};
client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i, BookieProtocol.FLAG_NONE, false,
WriteFlag.NONE);
// Wait for adding entry is finish.
Awaitility.await().untilAsserted(() -> assertTrue(callbackExecuted.get()));
// The steps have be explained on the method description.
// Since the step "3-a-2" always runs before the step "3-b", so the "callbackExecuted" will be finished
// before the step "3-b". Add a sleep to wait the step "3-a-2" is finish.
Thread.sleep(100);
// Check the ref count.
Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
assertEquals(1, bb.refCnt());
// V2 will release this original data if it is a small.
if (!useV2WireProtocol && !smallPayload) {
assertEquals(1, payload.refCnt());
}
});
bb.release();
// V2 will release this original data if it is a small.
if (!useV2WireProtocol && !smallPayload) {
payload.release();
}
}
// cleanup.
client.close();
}

private void sleep(int milliSeconds) {
try {
if (milliSeconds > 0) {
Thread.sleep(1);
}
} catch (InterruptedException e) {
log.warn("Error occurs", e);
}
}

/**
* Relate to https://github.com/apache/bookkeeper/pull/4289.
*/
@Test
public void testDataRefCnfWhenReconnectV2() throws Exception {
// Large payload.
// Run this test may not reproduce the issue, you can reproduce the issue this way:
// 1. Add two break points.
// a. At the line "Channel c = channel" in the method PerChannelBookieClient.addEntry.
// b. At the line "channel = null" in the method "PerChannelBookieClient.channelInactive".
// 2. Make the break point b to run earlier than the break point a during debugging.
testDataRefCnfWhenReconnect(true, false, false, false, 10);
testDataRefCnfWhenReconnect(true, false, true, false, 10);
testDataRefCnfWhenReconnect(true, false, false, true, 10);

// Small payload.
// There is no issue without https://github.com/apache/bookkeeper/pull/4289, just add a test for this scenario.
testDataRefCnfWhenReconnect(true, true, false, false, 10);
testDataRefCnfWhenReconnect(true, true, true, false, 10);
testDataRefCnfWhenReconnect(true, true, false, true, 10);
}

/**
* Please see the comment of the scenario "Large payload" in the {@link #testDataRefCnfWhenReconnectV2()} if you
* can not reproduce the issue when running this test.
* Relate to https://github.com/apache/bookkeeper/pull/4289.
*/
@Test
public void testDataRefCnfWhenReconnectV3() throws Exception {
testDataRefCnfWhenReconnect(false, true, false, false, 10);
testDataRefCnfWhenReconnect(false, true, true, false, 10);
testDataRefCnfWhenReconnect(false, true, false, true, 10);
}
}