diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index e276ea24fed18..f15f6d67766f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -116,10 +116,8 @@ protected void initChannel(SocketChannel ch) throws Exception { } else { ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc())); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); - } else { - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(this.enableTls)); if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index ed34f7d41c130..dff423d19fbef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -147,11 +148,12 @@ public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true)); // Setup channel except for the SsHandler for TLS enabled connections - ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled)); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); - ch.pipeline().addLast("handler", clientCnxSupplier.get()); + ChannelHandler clientCnx = clientCnxSupplier.get(); + ch.pipeline().addLast("handler", clientCnx); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index cfd89d3bb28ab..6c4f42fcf88b9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -107,9 +107,39 @@ public ReferenceCounted touch(Object hint) { return this; } + /** + * Encoder that writes a {@link ByteBufPair} to the socket. + * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this. + */ + @Deprecated public static final Encoder ENCODER = new Encoder(); + + private static final boolean COPY_ENCODER_REQUIRED_FOR_TLS; + static { + boolean copyEncoderRequiredForTls = false; + try { + // io.netty.handler.ssl.SslHandlerCoalescingBufferQueue is only available in netty 4.1.111 and later + // when the class is available, there's no need to use the CopyingEncoder when TLS is enabled + ByteBuf.class.getClassLoader().loadClass("io.netty.handler.ssl.SslHandlerCoalescingBufferQueue"); + } catch (ClassNotFoundException e) { + copyEncoderRequiredForTls = true; + } + COPY_ENCODER_REQUIRED_FOR_TLS = copyEncoderRequiredForTls; + } + + /** + * Encoder that makes a copy of the ByteBufs before writing them to the socket. + * This is needed with Netty <4.1.111.Final when TLS is enabled, because the SslHandler will modify the input + * ByteBufs. + * Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this. + */ + @Deprecated public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder(); + public static ChannelOutboundHandlerAdapter getEncoder(boolean tlsEnabled) { + return tlsEnabled && COPY_ENCODER_REQUIRED_FOR_TLS ? COPYING_ENCODER : ENCODER; + } + @Sharable @SuppressWarnings("checkstyle:JavadocType") public static class Encoder extends ChannelOutboundHandlerAdapter {