From 7e824613f03ac163aeff2a22c9e5e37631c99a16 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 16 Apr 2024 11:53:59 +0800 Subject: [PATCH 1/7] [fix] ByteBuf release/retain incorrect --- .../proto/PerChannelBookieClient.java | 2 +- .../bookkeeper/test/BookieClientTest.java | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 95eef54111d..28927cf8dcd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -845,7 +845,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen // usually checked in writeAndFlush, but we have extra check // because we need to release toSend. errorOut(completionKey); - ReferenceCountUtil.release(toSend); + ReferenceCountUtil.release(request); return; } else { // addEntry times out on backpressure diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 60f89159a04..2c2bf0897a7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -29,6 +29,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.ReferenceCounted; @@ -64,6 +65,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol; import org.apache.bookkeeper.proto.DataFormats; +import org.apache.bookkeeper.proto.PerChannelBookieClient; +import org.apache.bookkeeper.proto.PerChannelBookieClientPool; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger; @@ -71,6 +74,7 @@ import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.IOUtils; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -745,4 +749,40 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception { assertTrue(Arrays.equals(kbData, bytes)); } } + + @Test + public void testDataRefCnfWhenReconnect() throws Exception { + long ledgerId = 1; + byte[] passwd = new byte[20]; + Arrays.fill(passwd, (byte) 'a'); + BookieId addr = bs.getBookieId(); + ResultStruct arc = new ResultStruct(); + + BookieClientImpl client = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, + UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, + BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + ByteBufList bb = createByteBuffer(1, 1, 1); + for (int i = 0; i < 30; i++) { + // Inject a reconnect event. + // 1. Get the channel will be used. + // 2. Call add entry. + // 3. Another thread close the channel that is using. + PerChannelBookieClientPool perChannelBookieClientPool = client.lookupClient(addr); + AtomicReference perChannelBookieClient = new AtomicReference<>(); + perChannelBookieClientPool.obtain((rc, result) -> perChannelBookieClient.set(result), ledgerId); + Awaitility.await().untilAsserted(() -> { + assertNotNull(perChannelBookieClient.get()); + }); + new Thread(() -> { + Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel"); + channel.close(); + }).start(); + client.addEntry(addr, ledgerId, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); + Awaitility.await().untilAsserted(() -> { + assertEquals(1, bb.refCnt()); + }); + } + // cleanup. + bb.release(); + } } From 37deb384d08e1813de3c099668dd145e51b159fe Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 16 Apr 2024 15:32:43 +0800 Subject: [PATCH 2/7] improve the code comment --- .../org/apache/bookkeeper/proto/PerChannelBookieClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 28927cf8dcd..3d71ee7f415 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -842,8 +842,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen cb, ctx, ledgerId, entryId)); final Channel c = channel; if (c == null) { - // usually checked in writeAndFlush, but we have extra check - // because we need to release toSend. + // Manually release the binary data(variable "request") that we manually created when can not be sent out + // because the channel is switching. errorOut(completionKey); ReferenceCountUtil.release(request); return; From 64fc771cb42fb30829414781e8fda37ea74253fe Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 16 Apr 2024 16:10:16 +0800 Subject: [PATCH 3/7] fix other cases --- .../proto/PerChannelBookieClient.java | 7 +++ .../bookkeeper/test/BookieClientTest.java | 47 ++++++++++++++++--- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 3d71ee7f415..273ffa324b4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1180,6 +1180,7 @@ private void writeAndFlush(final Channel channel, if (channel == null) { LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request)); errorOut(key); + ReferenceCountUtil.release(request); return; } @@ -1194,9 +1195,11 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); + ReferenceCountUtil.release(request); return; } + boolean calledWrite = false; try { final long startTime = MathUtils.nowInNano(); @@ -1211,10 +1214,14 @@ private void writeAndFlush(final Channel channel, nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); + calledWrite = true; channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); + if (!calledWrite) { + ReferenceCountUtil.release(request); + } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 2c2bf0897a7..d8681f35c6f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; @@ -750,18 +751,22 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception { } } - @Test - public void testDataRefCnfWhenReconnect() throws Exception { + public void testDataRefCnfWhenReconnect(BookieClientImpl client) throws Exception { long ledgerId = 1; + long entryId = 1; byte[] passwd = new byte[20]; Arrays.fill(passwd, (byte) 'a'); BookieId addr = bs.getBookieId(); ResultStruct arc = new ResultStruct(); - BookieClientImpl client = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, - UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, - BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); - ByteBufList bb = createByteBuffer(1, 1, 1); + + ByteBuf internalBB = PooledByteBufAllocator.DEFAULT.buffer(4 + 24); + internalBB.writeLong(ledgerId); + internalBB.writeLong(entryId); + internalBB.writeLong(entryId - 1); + internalBB.writeInt(1); + ByteBufList bb = ByteBufList.get(internalBB); + for (int i = 0; i < 30; i++) { // Inject a reconnect event. // 1. Get the channel will be used. @@ -777,12 +782,40 @@ public void testDataRefCnfWhenReconnect() throws Exception { Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel"); channel.close(); }).start(); - client.addEntry(addr, ledgerId, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); + client.addEntry(addr, ledgerId, passwd, entryId, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, + WriteFlag.NONE); Awaitility.await().untilAsserted(() -> { assertEquals(1, bb.refCnt()); + assertEquals(1, internalBB.refCnt()); }); } // cleanup. bb.release(); } + + @Test + public void testDataRefCnfWhenReconnectV2() throws Exception { + // test. + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setUseV2WireProtocol(true); + BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup, + UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, + BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + testDataRefCnfWhenReconnect(client); + // cleanup. + client.close(); + } + + @Test + public void testDataRefCnfWhenReconnectV3() throws Exception { + // test. + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setUseV2WireProtocol(false); + BookieClientImpl client = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, + UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, + BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + testDataRefCnfWhenReconnect(client); + // cleanup. + client.close(); + } } From 7a06a97f09acc67885058ef2fe3361eee890c102 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 16 Apr 2024 16:14:18 +0800 Subject: [PATCH 4/7] modify the code comment --- .../org/apache/bookkeeper/proto/PerChannelBookieClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 273ffa324b4..ddc7d548b55 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -842,7 +842,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen cb, ctx, ledgerId, entryId)); final Channel c = channel; if (c == null) { - // Manually release the binary data(variable "request") that we manually created when can not be sent out + // Manually release the binary data(variable "request") that we manually created when it can not be sent out // because the channel is switching. errorOut(completionKey); ReferenceCountUtil.release(request); From 30c92df7b987b3957d5d5a16943f488f3eca3bbc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 16 Apr 2024 16:43:28 +0800 Subject: [PATCH 5/7] improve the code --- .../apache/bookkeeper/proto/PerChannelBookieClient.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index ddc7d548b55..8485d21b74f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1199,7 +1199,6 @@ private void writeAndFlush(final Channel channel, return; } - boolean calledWrite = false; try { final long startTime = MathUtils.nowInNano(); @@ -1214,14 +1213,13 @@ private void writeAndFlush(final Channel channel, nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); - calledWrite = true; channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); - if (!calledWrite) { - ReferenceCountUtil.release(request); - } + // If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we + // get here, we can release the request. + ReferenceCountUtil.release(request); } } From 02422ac1f7f743e68a430ea8526c71af26bab76d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 16 Apr 2024 20:14:22 +0800 Subject: [PATCH 6/7] improve the test --- .../bookkeeper/test/BookieClientTest.java | 138 +++++++++++++----- 1 file changed, 99 insertions(+), 39 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index d8681f35c6f..5dc4e1ef8f5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -38,16 +38,20 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.api.WriteFlag; @@ -59,6 +63,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieClientImpl; +import org.apache.bookkeeper.proto.BookieProtoEncoding; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; @@ -83,6 +88,7 @@ /** * Test the bookie client. */ +@Slf4j public class BookieClientTest { BookieServer bs; File tmpDir; @@ -751,71 +757,125 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception { } } - public void testDataRefCnfWhenReconnect(BookieClientImpl client) throws Exception { - long ledgerId = 1; - long entryId = 1; + public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean smallPayload, + boolean withDelayReconnect, boolean withDelayAddEntry, + int tryTimes) throws Exception { + final long ledgerId = 1; + final BookieId addr = bs.getBookieId(); + // Build passwd. byte[] passwd = new byte[20]; Arrays.fill(passwd, (byte) 'a'); - BookieId addr = bs.getBookieId(); - ResultStruct arc = new ResultStruct(); + // Build digest manager. + DigestManager digestManager = DigestManager.instantiate(1, passwd, + BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY), + PooledByteBufAllocator.DEFAULT, useV2WireProtocol); + // Build client. + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setUseV2WireProtocol(useV2WireProtocol); + BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup, + UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, + BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + // Inject a reconnect event. + // 1. Get the channel that will be used. + // 2. Call add entry. + // 3. Another thread close the channel that is using. + for (int i = 0; i < tryTimes; i++) { + long entryId = i + 1; + long lac = i; + // Build payload. + int payloadLen; + ByteBuf payload; + if (smallPayload) { + payloadLen = 1; + payload = PooledByteBufAllocator.DEFAULT.buffer(1); + payload.writeByte(1); + } else { + payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD; + payload = PooledByteBufAllocator.DEFAULT.buffer(); + byte[] bs = new byte[payloadLen]; + payload.writeBytes(bs); + } - ByteBuf internalBB = PooledByteBufAllocator.DEFAULT.buffer(4 + 24); - internalBB.writeLong(ledgerId); - internalBB.writeLong(entryId); - internalBB.writeLong(entryId - 1); - internalBB.writeInt(1); - ByteBufList bb = ByteBufList.get(internalBB); + // Digest. + ReferenceCounted bb = digestManager.computeDigestAndPackageForSending(entryId, lac, + payloadLen * entryId, payload, passwd, BookieProtocol.FLAG_NONE); + log.info("Before send. bb.refCnf: {}", bb.refCnt()); - for (int i = 0; i < 30; i++) { - // Inject a reconnect event. - // 1. Get the channel will be used. - // 2. Call add entry. - // 3. Another thread close the channel that is using. + // Step: get the channel that will be used. PerChannelBookieClientPool perChannelBookieClientPool = client.lookupClient(addr); AtomicReference perChannelBookieClient = new AtomicReference<>(); perChannelBookieClientPool.obtain((rc, result) -> perChannelBookieClient.set(result), ledgerId); Awaitility.await().untilAsserted(() -> { assertNotNull(perChannelBookieClient.get()); }); + + // Step: Inject a reconnect event. + final int delayMillis = i; new Thread(() -> { + if (withDelayReconnect) { + sleep(delayMillis); + } Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel"); - channel.close(); + if (channel != null) { + channel.close(); + } }).start(); - client.addEntry(addr, ledgerId, passwd, entryId, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, + if (withDelayAddEntry) { + sleep(delayMillis); + } + + // Step: add entry. + AtomicBoolean callbackExecuted = new AtomicBoolean(); + WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> { + log.info("Writing is finished. rc: {}, withDelayReconnect: {}, withDelayAddEntry: {}, ledgerId: {}," + + " entryId: {}, socketAddr: {}, ctx: {}", + rc, withDelayReconnect, withDelayAddEntry, lId, eId, socketAddr, ctx); + callbackExecuted.set(true); + }; + client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); - Awaitility.await().untilAsserted(() -> { + // Wait for adding entry is finish. + Awaitility.await().untilAsserted(() -> assertTrue(callbackExecuted.get())); + // Check the ref count. + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { assertEquals(1, bb.refCnt()); - assertEquals(1, internalBB.refCnt()); + // V2 will release this original data if it is a small. + if (!useV2WireProtocol && !smallPayload) { + assertEquals(1, payload.refCnt()); + } }); + bb.release(); + // V2 will release this original data if it is a small. + if (!useV2WireProtocol && !smallPayload) { + payload.release(); + } } // cleanup. - bb.release(); + client.close(); + } + + private void sleep(int milliSeconds) { + try { + if (milliSeconds > 0) { + Thread.sleep(1); + } + } catch (InterruptedException e) { + log.warn("Error occurs", e); + } } @Test public void testDataRefCnfWhenReconnectV2() throws Exception { - // test. - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setUseV2WireProtocol(true); - BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup, - UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, - BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); - testDataRefCnfWhenReconnect(client); - // cleanup. - client.close(); + testDataRefCnfWhenReconnect(true, false, false, false, 10); + testDataRefCnfWhenReconnect(true, false, true, false, 10); + testDataRefCnfWhenReconnect(true, false, false, true, 10); } @Test public void testDataRefCnfWhenReconnectV3() throws Exception { - // test. - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setUseV2WireProtocol(false); - BookieClientImpl client = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, - UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE, - BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); - testDataRefCnfWhenReconnect(client); - // cleanup. - client.close(); + testDataRefCnfWhenReconnect(false, true,false, false, 10); + testDataRefCnfWhenReconnect(false, true, true, false, 10); + testDataRefCnfWhenReconnect(false, true, false, true, 10); } } From 475f25912eae86194e1b08621c2f1f9e9ad8f7ae Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 17 Apr 2024 03:58:39 +0800 Subject: [PATCH 7/7] add description --- .../bookkeeper/test/BookieClientTest.java | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 5dc4e1ef8f5..2fadbbd2c25 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -757,6 +757,20 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception { } } + /** + * Explain the stacks of "BookieClientImpl.addEntry" here + * 1.`BookieClientImpl.addEntry`. + * a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call this `ByteBuf` as `toSend` in the + * following sections. `toSend.recCnf` is `2` now. + * 2.`Get PerChannelBookieClient`. + * 3.`ChannelReadyForAddEntryCallback.operationComplete` + * a.`PerChannelBookieClient.addEntry` + * a-1.Build a new ByteBuf for request command. We call this `ByteBuf` new as `request` in the following + * sections. + * a-2.`channle.writeAndFlush(request)` or release the ByteBuf when `channel` is switching. + * Note the callback will be called immediately if the channel is switching. + * b.Release the `ByteBuf` since it has been retained at `step 1`. `toSend.recCnf` should be `1` now. + */ public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean smallPayload, boolean withDelayReconnect, boolean withDelayAddEntry, int tryTimes) throws Exception { @@ -837,8 +851,12 @@ public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean small WriteFlag.NONE); // Wait for adding entry is finish. Awaitility.await().untilAsserted(() -> assertTrue(callbackExecuted.get())); + // The steps have be explained on the method description. + // Since the step "3-a-2" always runs before the step "3-b", so the "callbackExecuted" will be finished + // before the step "3-b". Add a sleep to wait the step "3-a-2" is finish. + Thread.sleep(100); // Check the ref count. - Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { assertEquals(1, bb.refCnt()); // V2 will release this original data if it is a small. if (!useV2WireProtocol && !smallPayload) { @@ -865,16 +883,36 @@ private void sleep(int milliSeconds) { } } + /** + * Relate to https://github.com/apache/bookkeeper/pull/4289. + */ @Test public void testDataRefCnfWhenReconnectV2() throws Exception { + // Large payload. + // Run this test may not reproduce the issue, you can reproduce the issue this way: + // 1. Add two break points. + // a. At the line "Channel c = channel" in the method PerChannelBookieClient.addEntry. + // b. At the line "channel = null" in the method "PerChannelBookieClient.channelInactive". + // 2. Make the break point b to run earlier than the break point a during debugging. testDataRefCnfWhenReconnect(true, false, false, false, 10); testDataRefCnfWhenReconnect(true, false, true, false, 10); testDataRefCnfWhenReconnect(true, false, false, true, 10); + + // Small payload. + // There is no issue without https://github.com/apache/bookkeeper/pull/4289, just add a test for this scenario. + testDataRefCnfWhenReconnect(true, true, false, false, 10); + testDataRefCnfWhenReconnect(true, true, true, false, 10); + testDataRefCnfWhenReconnect(true, true, false, true, 10); } + /** + * Please see the comment of the scenario "Large payload" in the {@link #testDataRefCnfWhenReconnectV2()} if you + * can not reproduce the issue when running this test. + * Relate to https://github.com/apache/bookkeeper/pull/4289. + */ @Test public void testDataRefCnfWhenReconnectV3() throws Exception { - testDataRefCnfWhenReconnect(false, true,false, false, 10); + testDataRefCnfWhenReconnect(false, true, false, false, 10); testDataRefCnfWhenReconnect(false, true, true, false, 10); testDataRefCnfWhenReconnect(false, true, false, true, 10); }