Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
} else {
ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
}
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(this.enableTls));

if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -147,11 +148,12 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true));

// Setup channel except for the SsHandler for TLS enabled connections
ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled));

ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
ChannelHandler clientCnx = clientCnxSupplier.get();
ch.pipeline().addLast("handler", clientCnx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,39 @@ public ReferenceCounted touch(Object hint) {
return this;
}

/**
* Encoder that writes a {@link ByteBufPair} to the socket.
* Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this.
*/
@Deprecated
public static final Encoder ENCODER = new Encoder();

private static final boolean COPY_ENCODER_REQUIRED_FOR_TLS;
static {
boolean copyEncoderRequiredForTls = false;
try {
// io.netty.handler.ssl.SslHandlerCoalescingBufferQueue is only available in netty 4.1.111 and later
// when the class is available, there's no need to use the CopyingEncoder when TLS is enabled
ByteBuf.class.getClassLoader().loadClass("io.netty.handler.ssl.SslHandlerCoalescingBufferQueue");
} catch (ClassNotFoundException e) {
copyEncoderRequiredForTls = true;
}
COPY_ENCODER_REQUIRED_FOR_TLS = copyEncoderRequiredForTls;
}

/**
* Encoder that makes a copy of the ByteBufs before writing them to the socket.
* This is needed with Netty <4.1.111.Final when TLS is enabled, because the SslHandler will modify the input
* ByteBufs.
* Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this.
*/
@Deprecated
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();

public static ChannelOutboundHandlerAdapter getEncoder(boolean tlsEnabled) {
return tlsEnabled && COPY_ENCODER_REQUIRED_FOR_TLS ? COPYING_ENCODER : ENCODER;
}

@Sharable
@SuppressWarnings("checkstyle:JavadocType")
public static class Encoder extends ChannelOutboundHandlerAdapter {
Expand Down