Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
addMsgAndPromiseToQueue(msg, promise);
}
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
Expand All @@ -364,16 +364,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
addMsgAndPromiseToQueue(msg, promise);
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
addMsgAndPromiseToQueue(msg, promise);
} else {
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
}
}

// Add the message and the associated promise to the queue.
// The promise is added to the same queue as the message without an additional wrapper object so
// that object allocations can be avoided. A similar solution is used in Netty codebase.
private void addMsgAndPromiseToQueue(Object msg, ChannelPromise promise) {
waitingForAuth.add(msg);
if (promise != null && !promise.isVoid()) {
waitingForAuth.add(promise);
}
}

long newTxnId() {
return transactionIdGenerator.incrementAndGet();
}
Expand Down Expand Up @@ -433,10 +443,19 @@ public void operationComplete(int rc, Void v) {
if (rc == BKException.Code.OK) {
synchronized (this) {
authenticated = true;
Object msg = waitingForAuth.poll();
while (msg != null) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, msg);
msg = waitingForAuth.poll();
while (true) {
Object msg = waitingForAuth.poll();
if (msg == null) {
break;
}
ChannelPromise promise;
// check if the message has an associated promise as the next element in the queue
if (waitingForAuth.peek() instanceof ChannelPromise) {
promise = (ChannelPromise) waitingForAuth.poll();
} else {
promise = ctx.voidPromise();
}
ctx.writeAndFlush(msg, promise);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,18 +260,20 @@ public void writeLac(final BookieId addr, final long ledgerId, final byte[] mast

toSend.retain();
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.executeOrdered(ledgerId,
() -> cb.writeLacComplete(rc, ledgerId, addr, ctx));
} catch (RejectedExecutionException re) {
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
try {
if (rc != BKException.Code.OK) {
try {
executor.executeOrdered(ledgerId,
() -> cb.writeLacComplete(rc, ledgerId, addr, ctx));
} catch (RejectedExecutionException re) {
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
}
} else {
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
}
} else {
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
} finally {
ReferenceCountUtil.release(toSend);
}

ReferenceCountUtil.release(toSend);
}, ledgerId, useV3Enforced);
}

Expand Down Expand Up @@ -392,14 +394,16 @@ static ChannelReadyForAddEntryCallback create(
@Override
public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
} else {
pcbc.addEntry(ledgerId, masterKey, entryId,
toSend, cb, ctx, options, allowFastFail, writeFlags);
try {
if (rc != BKException.Code.OK) {
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
} else {
pcbc.addEntry(ledgerId, masterKey, entryId,
toSend, cb, ctx, options, allowFastFail, writeFlags);
}
} finally {
ReferenceCountUtil.release(toSend);
}

ReferenceCountUtil.release(toSend);
recycle();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.proto;

import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.util.ByteBufList;

public class ByteStringUtil {

/**
* Wrap the internal buffers of a ByteBufList into a single ByteString.
* The lifecycle of the wrapped ByteString is tied to the ByteBufList.
*
* @param bufList ByteBufList to wrap
* @return ByteString wrapping the internal buffers of the ByteBufList
*/
public static ByteString byteBufListToByteString(ByteBufList bufList) {
ByteString aggregated = null;
for (int i = 0; i < bufList.size(); i++) {
ByteBuf buffer = bufList.getBuffer(i);
if (buffer.readableBytes() > 0) {
aggregated = byteBufToByteString(aggregated, buffer);
}
}
return aggregated != null ? aggregated : ByteString.EMPTY;
}

/**
* Wrap the internal buffers of a ByteBuf into a single ByteString.
* The lifecycle of the wrapped ByteString is tied to the ByteBuf.
*
* @param byteBuf ByteBuf to wrap
* @return ByteString wrapping the internal buffers of the ByteBuf
*/
public static ByteString byteBufToByteString(ByteBuf byteBuf) {
return byteBufToByteString(null, byteBuf);
}

// internal method to aggregate a ByteBuf into a single aggregated ByteString
private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) {
if (byteBuf.readableBytes() == 0) {
return ByteString.EMPTY;
}
if (byteBuf.nioBufferCount() > 1) {
for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) {
ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
}
} else {
ByteString piece;
if (byteBuf.hasArray()) {
piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(),
byteBuf.readableBytes());
} else {
piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer());
}
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
}
return aggregated;
}
}
Loading