From a7486426dfff27dddb0d61a5bd37373c97486dcf Mon Sep 17 00:00:00 2001 From: gongdewei Date: Wed, 20 May 2020 10:42:52 +0800 Subject: [PATCH 1/9] reduce memory of toCodePoints --- src/main/java/io/termd/core/util/Helper.java | 50 ++++++++++++++++---- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/termd/core/util/Helper.java b/src/main/java/io/termd/core/util/Helper.java index 7e85f1d..bc5da38 100644 --- a/src/main/java/io/termd/core/util/Helper.java +++ b/src/main/java/io/termd/core/util/Helper.java @@ -19,10 +19,10 @@ import io.termd.core.function.Consumer; import io.termd.core.function.IntConsumer; +import java.nio.IntBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.ServiceLoader; @@ -53,21 +53,55 @@ public static void noop() {} * @return the code points */ public static int[] toCodePoints(String s) { - List codePoints = new ArrayList(); - for (int offset = 0; offset < s.length();) { + int count = Character.codePointCount(s, 0, s.length()); + int[] codePoints = new int[count]; + for (int offset = 0, i = 0; i < count && offset < s.length();i++) { int cp = s.codePointAt(offset); - codePoints.add(cp); + codePoints[i] = cp; offset += Character.charCount(cp); } - return convert(codePoints); + return codePoints; + +// List codePoints = new ArrayList(); +// for (int offset = 0; offset < s.length();) { +// int cp = s.codePointAt(offset); +// codePoints.add(cp); +// offset += Character.charCount(cp); +// } +// return convert(codePoints); } /** - * Code point to string conversion. + * Convert the string to an array of code points. + * Notice: Ensure out buffer capacity >= Helper.codePointCount(s) * - * @param codePoints the code points - * @return the corresponding string + * @param s the string to convert + * @param out the buffer to write code points + */ + public static void toCodePoints(String s, IntBuffer out) { + //Assert out.remaining() >= codePointCount + for (int offset = 0; offset < s.length(); ) { + int cp = s.codePointAt(offset); + out.put(cp); + offset += Character.charCount(cp); + } + } + + /** + * Get code points count of string + * @param s + * @return */ + public static int codePointCount(String s) { + return Character.codePointCount(s, 0, s.length()); + } + + /** + * Code point to string conversion. + * + * @param codePoints the code points + * @return the corresponding string + */ public static String fromCodePoints(int[] codePoints) { return new String(codePoints, 0, codePoints.length); } From f6d6f9ad3b7180e2cb182d58ff29cd0dbf6319f0 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Wed, 20 May 2020 10:53:01 +0800 Subject: [PATCH 2/9] change stdout to Consumer, reduce memory fragments --- .../io/termd/core/http/HttpTtyConnection.java | 56 ++++++-- .../core/http/netty/TtyServerInitializer.java | 2 +- .../http/netty/TtyWebSocketFrameHandler.java | 48 +++++-- .../io/termd/core/io/BufferBinaryEncoder.java | 124 ++++++++++++++++++ .../termd/core/telnet/TelnetConnection.java | 41 +++--- .../core/telnet/TelnetTtyConnection.java | 53 +++++--- .../telnet/netty/NettyTelnetConnection.java | 30 ++++- .../termd/core/tty/BufferTtyOutputMode.java | 76 +++++++++++ .../java/io/termd/core/tty/TtyOutputMode.java | 3 +- .../java/io/termd/core/util/ByteBufPool.java | 61 +++++++++ .../server/WebSocketTtyConnection.java | 10 +- 11 files changed, 442 insertions(+), 62 deletions(-) create mode 100644 src/main/java/io/termd/core/io/BufferBinaryEncoder.java create mode 100644 src/main/java/io/termd/core/tty/BufferTtyOutputMode.java create mode 100644 src/main/java/io/termd/core/util/ByteBufPool.java diff --git a/src/main/java/io/termd/core/http/HttpTtyConnection.java b/src/main/java/io/termd/core/http/HttpTtyConnection.java index 4578184..e0e6371 100644 --- a/src/main/java/io/termd/core/http/HttpTtyConnection.java +++ b/src/main/java/io/termd/core/http/HttpTtyConnection.java @@ -20,14 +20,14 @@ import io.termd.core.function.BiConsumer; import io.termd.core.function.Consumer; import io.termd.core.io.BinaryDecoder; -import io.termd.core.io.BinaryEncoder; -import io.termd.core.tty.TtyConnectionSupport; -import io.termd.core.tty.TtyEvent; -import io.termd.core.tty.TtyEventDecoder; -import io.termd.core.tty.TtyOutputMode; +import io.termd.core.io.BufferBinaryEncoder; +import io.termd.core.tty.*; +import io.termd.core.util.Helper; import io.termd.core.util.Vector; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.charset.Charset; import java.util.Map; @@ -52,6 +52,7 @@ * * @author Julien Viet * @author Matej Lazar + * @author gongdewei 2020/05/20 */ public abstract class HttpTtyConnection extends TtyConnectionSupport { @@ -62,10 +63,12 @@ public abstract class HttpTtyConnection extends TtyConnectionSupport { private Consumer sizeHandler; private final TtyEventDecoder eventDecoder; private final BinaryDecoder decoder; - private final Consumer stdout; + private final Consumer stdout; + private final Consumer stdoutWrapper; private Consumer closeHandler; private Consumer termHandler; private long lastAccessedTime = System.currentTimeMillis(); + private final IntBuffer codePointBuf = IntBuffer.allocate(8192); public HttpTtyConnection() { this(Charset.forName("UTF-8"), DEFAULT_SIZE); @@ -76,12 +79,38 @@ public HttpTtyConnection(Charset charset, Vector size) { this.size = size; this.eventDecoder = new TtyEventDecoder(3, 26, 4); this.decoder = new BinaryDecoder(512, charset, eventDecoder); - this.stdout = new TtyOutputMode(new BinaryEncoder(charset, new Consumer() { + this.stdout = new BufferTtyOutputMode(new BufferBinaryEncoder(charset, new Consumer() { @Override - public void accept(byte[] bytes) { - write(bytes); + public void accept(ByteBuffer data) { + write(data.array(), data.position(), data.remaining()); } })); + this.stdoutWrapper = new Consumer() { + @Override + public void accept(int[] data) { + stdout.accept(IntBuffer.wrap(data)); + } + }; + } + + @Override + public TtyConnection write(String s) { + synchronized (this) { + int count = Helper.codePointCount(s); + IntBuffer buffer = null; + if (count < codePointBuf.capacity()) { + buffer = codePointBuf; + } else { + buffer = IntBuffer.allocate(count); + } + + buffer.clear(); + Helper.toCodePoints(s, buffer); + buffer.flip(); + + stdout.accept(buffer); + return this; + } } @Override @@ -104,7 +133,11 @@ public String terminalType() { return "vt100"; } - protected abstract void write(byte[] buffer); + protected void write(byte[] buffer) { + this.write(buffer, 0, buffer.length); + } + + protected abstract void write(byte[] buffer, int offset, int length); /** * Special case to handle tty events. @@ -195,7 +228,8 @@ public void setStdinHandler(Consumer handler) { } public Consumer stdoutHandler() { - return stdout; + //TODO replace with Consumer + return stdoutWrapper; } @Override diff --git a/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java b/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java index 95fbbe5..05f81fe 100644 --- a/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java +++ b/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java @@ -62,6 +62,6 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast(httpRequestHandler); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); - pipeline.addLast(new TtyWebSocketFrameHandler(group, handler)); + pipeline.addLast(new TtyWebSocketFrameHandler(group, handler, HttpRequestHandler.class)); } } diff --git a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java index 570ae2d..aa4e511 100644 --- a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java +++ b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java @@ -17,7 +17,8 @@ package io.termd.core.http.netty; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; @@ -26,6 +27,7 @@ import io.termd.core.function.Consumer; import io.termd.core.http.HttpTtyConnection; import io.termd.core.tty.TtyConnection; +import io.termd.core.util.ByteBufPool; import java.util.concurrent.TimeUnit; @@ -38,10 +40,14 @@ public class TtyWebSocketFrameHandler extends SimpleChannelInboundHandler handler; private ChannelHandlerContext context; private HttpTtyConnection conn; + private final ByteBufPool byteBufPool; + private Class[] removingHandlerClasses; - public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler) { + public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class... removingHandlerClasses) { this.group = group; this.handler = handler; + this.removingHandlerClasses = removingHandlerClasses; + byteBufPool = new ByteBufPool(); } @Override @@ -53,16 +59,42 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { - ctx.pipeline().remove(HttpRequestHandler.class); + if (removingHandlerClasses != null) { + for (Class handlerClass : removingHandlerClasses) { + ctx.pipeline().remove(handlerClass); + } + } group.add(ctx.channel()); conn = new HttpTtyConnection() { @Override - protected void write(byte[] buffer) { - ByteBuf byteBuf = Unpooled.buffer(); - byteBuf.writeBytes(buffer); - if (context != null) { - context.writeAndFlush(new TextWebSocketFrame(byteBuf)); + protected void write(byte[] buffer, int offset, int length) { + + int start = offset; + int remain = length; + while (remain > 0) { + if (context == null) { + break; + } + + //ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(remain<=32?32: (remain<=64?64: byteBufSize)); + final ByteBuf byteBuf = byteBufPool.get(); + + //write segment + int size = Math.min(remain, byteBuf.writableBytes()); + byteBuf.writeBytes(buffer, start, size); + if (context != null) { + context.writeAndFlush(new TextWebSocketFrame(byteBuf)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + byteBufPool.put(byteBuf); + } + }); + } + + start += size; + remain -= size; } + } @Override diff --git a/src/main/java/io/termd/core/io/BufferBinaryEncoder.java b/src/main/java/io/termd/core/io/BufferBinaryEncoder.java new file mode 100644 index 0000000..1ba189b --- /dev/null +++ b/src/main/java/io/termd/core/io/BufferBinaryEncoder.java @@ -0,0 +1,124 @@ +/* + * Copyright 2015 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.termd.core.io; + +import io.termd.core.function.Consumer; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.IntBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; + +/** + * @author Julien Viet + * @author gongdewei 2020/05/19 + */ +public class BufferBinaryEncoder implements Consumer { + + private CharsetEncoder charsetEncoder; + private volatile Charset charset; + private final Consumer onByte; + private final char[] charsBuf = new char[2]; + private int capacity = 8192; + private ByteBuffer cachedByteBuffer; + private CharBuffer cachedCharBuffer; + + public BufferBinaryEncoder(Charset charset, Consumer onByte) { + this.setCharset(charset); + this.onByte = onByte; + } + + /** + * Set a new charset on the encoder. + * + * @param charset the new charset + */ + public void setCharset(Charset charset) { + this.charset = charset; + charsetEncoder = charset.newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + + //check buffer + ensureBuffer(); + } + + @Override + public void accept(IntBuffer codePoints) { + int[] array = codePoints.array(); + int offset = codePoints.position(); + int limit = codePoints.limit(); + + int capacity = 0; + for (int i = offset; i < limit; i++) { + capacity += Character.charCount(array[i]); + } + + //convert code points to chars + CharBuffer charBuffer = getCharBuffer(capacity); + for (int i = offset; i < limit; i++) { + int size = Character.toChars(array[i], charsBuf, 0); + charBuffer.put(charsBuf, 0, size); + } + charBuffer.flip(); + + //encode chars to bytes + ByteBuffer byteBuffer = getByteBuffer(getByteCapacity(capacity)); + charsetEncoder.encode(charBuffer, byteBuffer, true); + byteBuffer.flip(); + + onByte.accept(byteBuffer); + } + + private CharBuffer getCharBuffer(int capacity) { + CharBuffer charBuffer = null; + if (capacity <= cachedCharBuffer.capacity()) { + charBuffer = cachedCharBuffer; + charBuffer.clear(); + } else { + charBuffer = CharBuffer.allocate(capacity); + } + return charBuffer; + } + + private ByteBuffer getByteBuffer(int capacity) { + ByteBuffer byteBuffer = null; + if (capacity <= cachedByteBuffer.capacity()) { + byteBuffer = cachedByteBuffer; + byteBuffer.clear(); + } else { + byteBuffer = ByteBuffer.allocate(capacity); + } + return byteBuffer; + } + + private void ensureBuffer() { + if (cachedCharBuffer ==null || cachedCharBuffer.limit() != capacity) { + cachedCharBuffer = CharBuffer.allocate(capacity); + } + int byteBufCapacity = getByteCapacity(capacity); + if (cachedByteBuffer ==null || cachedByteBuffer.limit() != byteBufCapacity) { + cachedByteBuffer = ByteBuffer.allocate(byteBufCapacity); + } + } + + private int getByteCapacity(int capacity) { + return (int) (capacity *charsetEncoder.averageBytesPerChar()); + } +} diff --git a/src/main/java/io/termd/core/telnet/TelnetConnection.java b/src/main/java/io/termd/core/telnet/TelnetConnection.java index b2b2f87..edb78cf 100644 --- a/src/main/java/io/termd/core/telnet/TelnetConnection.java +++ b/src/main/java/io/termd/core/telnet/TelnetConnection.java @@ -20,6 +20,7 @@ /** * @author Julien Viet +* @author gongdewei 2020/05/20 */ public abstract class TelnetConnection { @@ -83,23 +84,15 @@ public final void writeWillOption(Option option) { send(new byte[]{BYTE_IAC, BYTE_WILL, option.code}); } - private void rawWrite(byte[] data, int offset, int length) { - if (length > 0) { - if (offset == 0 && length == data.length) { - send(data); - } else { - byte[] chunk = new byte[length]; - System.arraycopy(data, offset, chunk, 0, chunk.length); - send(chunk); - } - } - } - protected abstract void execute(Runnable task); protected abstract void schedule(Runnable task, long delay, TimeUnit unit); - protected abstract void send(byte[] data); + protected abstract void send(byte[] data, int offset, int len); + + protected void send(byte[] data) { + this.send(data, 0, data.length); + } public void receive(byte[] data) { for (byte b : data) { @@ -108,7 +101,6 @@ public void receive(byte[] data) { flushDataIfNecessary(); } - /** * Write data to the client, escaping data if necessary or truncating it. The original buffer can * be mutated if incorrect data is provided. @@ -116,17 +108,30 @@ public void receive(byte[] data) { * @param data the data to write */ public final void write(byte[] data) { + this.write(data, 0, data.length); + } + + /** + * Write data to the client, escaping data if necessary or truncating it. The original buffer can + * be mutated if incorrect data is provided. + * + * @param data the data to write + * @param offset + * @param len + */ + public final void write(byte[] data, int offset, int len) { if (sendBinary) { // actually the logic here never get executed. int prev = 0; - for (int i = 0;i < data.length;i++) { + int end = offset+len; + for (int i = offset;i < end;i++) { if (data[i] == -1) { - rawWrite(data, prev, i - prev); + send(data, prev, i - prev); send(new byte[]{-1, -1}); prev = i + 1; } } - rawWrite(data, prev, data.length - prev); + send(data, prev, data.length - prev); } else { // Not fully understand the logic below, but // Chinese characters will be truncated by the following logic. @@ -136,7 +141,7 @@ public final void write(byte[] data) { // for (int i = 0;i < data.length;i++) { // data[i] = (byte)(data[i] & 0x7F); // } - send(data); + send(data, offset, len); } } diff --git a/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java b/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java index 720a0df..1d19baa 100644 --- a/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java +++ b/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java @@ -18,17 +18,15 @@ import io.termd.core.function.BiConsumer; import io.termd.core.function.Consumer; -import io.termd.core.tty.ReadBuffer; -import io.termd.core.tty.TtyEvent; -import io.termd.core.tty.TtyEventDecoder; -import io.termd.core.tty.TtyOutputMode; +import io.termd.core.io.BufferBinaryEncoder; +import io.termd.core.tty.*; import io.termd.core.util.Helper; import io.termd.core.util.Vector; import io.termd.core.io.BinaryDecoder; -import io.termd.core.io.BinaryEncoder; import io.termd.core.io.TelnetCharset; -import io.termd.core.tty.TtyConnection; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.charset.Charset; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -37,6 +35,7 @@ * A telnet handler that implements {@link io.termd.core.tty.TtyConnection}. * * @author Julien Viet + * @author gongdewei 2020/05/20 */ public final class TelnetTtyConnection extends TelnetHandler implements TtyConnection { @@ -63,10 +62,12 @@ public void execute(Runnable command) { }); private final BinaryDecoder decoder; - private final BinaryEncoder encoder; + private final BufferBinaryEncoder encoder; - private final Consumer stdout; + private final Consumer stdout; + private final Consumer stdoutWrapper; private final Consumer handler; + private final IntBuffer codePointBuf = IntBuffer.allocate(8192); private long lastAccessedTime = System.currentTimeMillis(); public TelnetTtyConnection(boolean inBinary, boolean outBinary, Charset charset, Consumer handler) { @@ -76,13 +77,19 @@ public TelnetTtyConnection(boolean inBinary, boolean outBinary, Charset charset, this.handler = handler; this.size = new Vector(); this.decoder = new BinaryDecoder(512, TelnetCharset.INSTANCE, readBuffer); - this.encoder = new BinaryEncoder(charset, new Consumer() { + this.encoder = new BufferBinaryEncoder(charset, new Consumer() { @Override - public void accept(byte[] data) { - conn.write(data); + public void accept(ByteBuffer data) { + conn.write(data.array(), data.position(), data.remaining()); } }); - this.stdout = new TtyOutputMode(encoder); + this.stdout = new BufferTtyOutputMode(encoder); + this.stdoutWrapper = new Consumer() { + @Override + public void accept(int[] data) { + stdout.accept(IntBuffer.wrap(data)); + } + }; } @Override @@ -243,7 +250,8 @@ public void setStdinHandler(Consumer handler) { @Override public Consumer stdoutHandler() { - return stdout; + //TODO replace with Consumer + return stdoutWrapper; } @Override @@ -275,8 +283,21 @@ public void close(int exit) { @Override public TtyConnection write(String s) { - int[] codePoints = Helper.toCodePoints(s); - stdoutHandler().accept(codePoints); - return this; + synchronized (this) { + int count = Helper.codePointCount(s); + IntBuffer buffer = null; + if (count < codePointBuf.capacity()) { + buffer = codePointBuf; + } else { + buffer = IntBuffer.allocate(count); + } + + buffer.clear(); + Helper.toCodePoints(s, buffer); + buffer.flip(); + + stdout.accept(buffer); + return this; + } } } diff --git a/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java b/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java index 6246f0b..314417d 100644 --- a/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java +++ b/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java @@ -16,24 +16,31 @@ package io.termd.core.telnet.netty; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.termd.core.telnet.TelnetConnection; import io.termd.core.telnet.TelnetHandler; +import io.termd.core.util.ByteBufPool; import java.util.concurrent.TimeUnit; /** * @author Julien Viet + * @author gongdewei 2020/05/20 */ public class NettyTelnetConnection extends TelnetConnection { final ChannelHandlerContext context; + final ByteBufPool byteBufPool; + public NettyTelnetConnection(TelnetHandler handler, ChannelHandlerContext context) { super(handler); this.context = context; + byteBufPool = new ByteBufPool(); } @Override @@ -48,8 +55,27 @@ protected void schedule(Runnable task, long delay, TimeUnit unit) { // Not properly synchronized, but ok for now @Override - protected void send(byte[] data) { - context.writeAndFlush(Unpooled.buffer().writeBytes(data)); + protected void send(byte[] data, int offset, int len) { + + int start = offset; + int remain = len; + while (remain > 0) { +// ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(remain<=32?32: (remain<=64?64: byteBufSize)); + final ByteBuf byteBuf = byteBufPool.get(); + + //write segment + int size = Math.min(remain, byteBuf.writableBytes()); + byteBuf.writeBytes(data, start, size); + context.writeAndFlush(byteBuf).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + byteBufPool.put(byteBuf); + } + }); + + start += size; + remain -= size; + } } @Override diff --git a/src/main/java/io/termd/core/tty/BufferTtyOutputMode.java b/src/main/java/io/termd/core/tty/BufferTtyOutputMode.java new file mode 100644 index 0000000..0a21457 --- /dev/null +++ b/src/main/java/io/termd/core/tty/BufferTtyOutputMode.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.termd.core.tty; + + +import io.termd.core.function.Consumer; + +import java.nio.IntBuffer; + +/** + * @author Julien Viet + * @author gongdewei 2020/05/19 + */ +public class BufferTtyOutputMode implements Consumer { + + private static final IntBuffer CRLF = IntBuffer.wrap(new int[]{'\r', '\n'}); + private final Consumer readHandler; + + public BufferTtyOutputMode(Consumer readHandler) { + this.readHandler = readHandler; + } + + @Override + public void accept(IntBuffer data) { + synchronized (this){ + if (readHandler != null && data.remaining() > 0) { + int[] array = data.array(); + int offset = data.position(); + int limit = data.limit(); + int prev = offset; + int ptr = offset; + while (ptr < limit) { + // Simple implementation that works only on system that uses /n as line terminator + // equivalent to 'stty onlcr' + int cp = array[ptr]; + if (cp == '\n') { + if (ptr > prev) { + sendChunk(data, prev, ptr); + } + //reset crlf read index + CRLF.position(0); + readHandler.accept(CRLF); + prev = ++ptr; + } else { + ptr++; + } + } + if (ptr > prev) { + sendChunk(data, prev, ptr); + } + } + } + } + + private void sendChunk(IntBuffer data, int prev, int ptr) { + //hotspot, reduce memory fragment + //change limit first, then position + data.limit(ptr); + data.position(prev); + readHandler.accept(data); + } +} diff --git a/src/main/java/io/termd/core/tty/TtyOutputMode.java b/src/main/java/io/termd/core/tty/TtyOutputMode.java index 4282dec..d181f1b 100644 --- a/src/main/java/io/termd/core/tty/TtyOutputMode.java +++ b/src/main/java/io/termd/core/tty/TtyOutputMode.java @@ -24,6 +24,7 @@ */ public class TtyOutputMode implements Consumer { + private static final int[] CRLF = {'\r', '\n'}; private final Consumer readHandler; public TtyOutputMode(Consumer readHandler) { @@ -43,7 +44,7 @@ public void accept(int[] data) { if (ptr > prev) { sendChunk(data, prev, ptr); } - readHandler.accept(new int[]{'\r','\n'}); + readHandler.accept(CRLF); prev = ++ptr; } else { ptr++; diff --git a/src/main/java/io/termd/core/util/ByteBufPool.java b/src/main/java/io/termd/core/util/ByteBufPool.java new file mode 100644 index 0000000..eeaadc7 --- /dev/null +++ b/src/main/java/io/termd/core/util/ByteBufPool.java @@ -0,0 +1,61 @@ +package io.termd.core.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 简单的ByteBuf池,比netty的PooledByteBufAllocator更轻量,内存碎片少 + * @author gongdewei 2020/5/19 + */ +public class ByteBufPool { + + private int byteBufCapacity;// = 128; + + //pool size: 128 * 1024 = 128K + private int poolSize;// = 1024; + private AtomicInteger allocSize = new AtomicInteger(); + private BlockingQueue byteBufPool; + + public ByteBufPool() { + this(1024, 128); + } + + public ByteBufPool(int poolSize, int byteBufCapacity) { + this.byteBufCapacity = byteBufCapacity; + this.poolSize = poolSize; + byteBufPool = new ArrayBlockingQueue(poolSize); + } + + public ByteBuf get() { + ByteBuf byteBuf = null; + if (allocSize.get() >= poolSize) { + //wait if pool is full alloc + try { + byteBuf = byteBufPool.poll(50, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + //ignore + } + } else { + byteBuf = byteBufPool.poll(); + } + if (byteBuf == null) { + byteBuf = Unpooled.buffer(byteBufCapacity); + allocSize.incrementAndGet(); + } + return byteBuf.retain(); + } + + public void put(ByteBuf byteBuf) { + // give back + byteBuf.clear(); + if(!(byteBuf.capacity() == byteBufCapacity && byteBufPool.offer(byteBuf))){ + //buf pool is full + byteBuf.release(); + } + } +} diff --git a/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java b/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java index b54c080..5e18000 100644 --- a/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java +++ b/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java @@ -45,17 +45,17 @@ public class WebSocketTtyConnection extends HttpTtyConnection { private Set readonlyChannels = new HashSet(); @Override - protected void write(byte[] buffer) { + protected void write(byte[] buffer, int offset, int length) { if (isOpen()) { - sendBinary(buffer, webSocketChannel); + sendBinary(ByteBuffer.wrap(buffer, offset, length), webSocketChannel); } for (WebSocketChannel channel : readonlyChannels) { - sendBinary(buffer, channel); + sendBinary(ByteBuffer.wrap(buffer, offset, length), channel); } } - private void sendBinary(byte[] buffer, WebSocketChannel webSocketChannel) { - WebSockets.sendBinary(ByteBuffer.wrap(buffer), webSocketChannel, null); + private void sendBinary(ByteBuffer buffer, WebSocketChannel webSocketChannel) { + WebSockets.sendBinary(buffer, webSocketChannel, null); } @Override From 97515671b3790a59b97ff170885df751d6e73eab Mon Sep 17 00:00:00 2001 From: gongdewei Date: Wed, 20 May 2020 16:54:27 +0800 Subject: [PATCH 3/9] support discard and release --- .../java/io/termd/core/util/ByteBufPool.java | 79 +++++++++++++++++-- 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/termd/core/util/ByteBufPool.java b/src/main/java/io/termd/core/util/ByteBufPool.java index eeaadc7..92e1d36 100644 --- a/src/main/java/io/termd/core/util/ByteBufPool.java +++ b/src/main/java/io/termd/core/util/ByteBufPool.java @@ -6,10 +6,40 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * 简单的ByteBuf池,比netty的PooledByteBufAllocator更轻量,内存碎片少 + *
+ *     //get buf
+ *     final ByteBuf byteBuf = byteBufPool.get(50, TimeUnit.MILLISECONDS);
+ *     boolean done = false;
+ *     try {
+ *       //write bytes
+ *       byteBuf.writeBytes(buffer, start, size);
+ *       if (context != null) {
+ *         context.writeAndFlush(new TextWebSocketFrame(byteBuf)).addListener(new ChannelFutureListener() {
+ *           public void operationComplete(ChannelFuture future) throws Exception {
+ *           	//give back
+ *             byteBufPool.put(byteBuf);
+ *           }
+ *         });
+ *         done = true;
+ *       }
+ *     } finally {
+ *       if (!done) {
+ *         //discard
+ *         byteBufPool.discard(byteBuf);
+ *       }
+ *     }
+ *
+ *     //onClose
+ *     protected void onClose() {
+ *       //...
+ *       byteBufPool.release();
+ *     }
+ * 
* @author gongdewei 2020/5/19 */ public class ByteBufPool { @@ -20,6 +50,7 @@ public class ByteBufPool { private int poolSize;// = 1024; private AtomicInteger allocSize = new AtomicInteger(); private BlockingQueue byteBufPool; + private AtomicBoolean closed = new AtomicBoolean(false); public ByteBufPool() { this(1024, 128); @@ -31,30 +62,62 @@ public ByteBufPool(int poolSize, int byteBufCapacity) { byteBufPool = new ArrayBlockingQueue(poolSize); } - public ByteBuf get() { - ByteBuf byteBuf = null; - if (allocSize.get() >= poolSize) { + /** + * Apply a fixed capacity ByteBuf + * @return + */ + public ByteBuf get(long timeout, TimeUnit unit) { + if (closed.get()) { + throw new IllegalStateException("ByteBufPool is closed"); + } + ByteBuf byteBuf = byteBufPool.poll(); + if (byteBuf == null && allocSize.get() >= poolSize) { //wait if pool is full alloc try { - byteBuf = byteBufPool.poll(50, TimeUnit.MILLISECONDS); + byteBuf = byteBufPool.poll(timeout, unit); } catch (InterruptedException e) { //ignore } - } else { - byteBuf = byteBufPool.poll(); } if (byteBuf == null) { + //heap buffer byteBuf = Unpooled.buffer(byteBufCapacity); allocSize.incrementAndGet(); } return byteBuf.retain(); } + /** + * Give back a ByteBuf + * @param byteBuf + */ public void put(ByteBuf byteBuf) { // give back byteBuf.clear(); - if(!(byteBuf.capacity() == byteBufCapacity && byteBufPool.offer(byteBuf))){ - //buf pool is full + if (byteBuf.capacity() != byteBufCapacity || !byteBufPool.offer(byteBuf)) { + //buf pool is full or capacity not match + byteBuf.release(); + allocSize.decrementAndGet(); + } + } + + /** + * Discard a ByteBuf, call it when there is a problem/exception and you don’t know how to recycle it safely. + * @param byteBuf + */ + public void discard(ByteBuf byteBuf) { + //just release refCnt + byteBuf.release(); + allocSize.decrementAndGet(); + } + + /** + * Release all cached ByteBuf, call it when shutdown/stop service + */ + public void release() { + closed.set(true); + while (!byteBufPool.isEmpty()) { + ByteBuf byteBuf = byteBufPool.poll(); byteBuf.release(); } } From 16385390a45526667e3353b8527fc4d629ec59a4 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Wed, 20 May 2020 16:56:05 +0800 Subject: [PATCH 4/9] discard ByteBuf when write data error --- .../http/netty/TtyWebSocketFrameHandler.java | 35 ++++++++++++------- .../telnet/netty/NettyTelnetConnection.java | 28 ++++++++++----- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java index aa4e511..16b28a0 100644 --- a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java +++ b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java @@ -77,18 +77,28 @@ protected void write(byte[] buffer, int offset, int length) { } //ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(remain<=32?32: (remain<=64?64: byteBufSize)); - final ByteBuf byteBuf = byteBufPool.get(); - - //write segment - int size = Math.min(remain, byteBuf.writableBytes()); - byteBuf.writeBytes(buffer, start, size); - if (context != null) { - context.writeAndFlush(new TextWebSocketFrame(byteBuf)).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - byteBufPool.put(byteBuf); - } - }); + final ByteBuf byteBuf = byteBufPool.get(50, TimeUnit.MILLISECONDS); + boolean done = false; + int size = 0; + + try { + //write segment + size = Math.min(remain, byteBuf.writableBytes()); + byteBuf.writeBytes(buffer, start, size); + if (context != null) { + context.writeAndFlush(new TextWebSocketFrame(byteBuf)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + byteBufPool.put(byteBuf); + } + }); + done = true; + } + } finally { + if (!done) { + //discard + byteBufPool.discard(byteBuf); + } } start += size; @@ -116,6 +126,7 @@ public void close() { if (context != null) { context.close(); } + byteBufPool.release(); } }; handler.accept(conn); diff --git a/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java b/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java index 314417d..006530c 100644 --- a/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java +++ b/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java @@ -61,17 +61,26 @@ protected void send(byte[] data, int offset, int len) { int remain = len; while (remain > 0) { // ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(remain<=32?32: (remain<=64?64: byteBufSize)); - final ByteBuf byteBuf = byteBufPool.get(); + final ByteBuf byteBuf = byteBufPool.get(50, TimeUnit.MILLISECONDS); + boolean done = false; + int size = 0; - //write segment - int size = Math.min(remain, byteBuf.writableBytes()); - byteBuf.writeBytes(data, start, size); - context.writeAndFlush(byteBuf).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - byteBufPool.put(byteBuf); + try { + //write segment + size = Math.min(remain, byteBuf.writableBytes()); + byteBuf.writeBytes(data, start, size); + context.writeAndFlush(byteBuf).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + byteBufPool.put(byteBuf); + } + }); + done = true; + } finally { + if (!done) { + byteBufPool.discard(byteBuf); } - }); + } start += size; remain -= size; @@ -81,6 +90,7 @@ public void operationComplete(ChannelFuture future) throws Exception { @Override protected void onClose() { super.onClose(); + byteBufPool.release(); } @Override From 26b837ff0c5603457be0c3f33dfc455142172663 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Wed, 20 May 2020 17:42:29 +0800 Subject: [PATCH 5/9] correct code points capacity check condition --- src/main/java/io/termd/core/http/HttpTtyConnection.java | 2 +- src/main/java/io/termd/core/telnet/TelnetTtyConnection.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/termd/core/http/HttpTtyConnection.java b/src/main/java/io/termd/core/http/HttpTtyConnection.java index e0e6371..3350699 100644 --- a/src/main/java/io/termd/core/http/HttpTtyConnection.java +++ b/src/main/java/io/termd/core/http/HttpTtyConnection.java @@ -98,7 +98,7 @@ public TtyConnection write(String s) { synchronized (this) { int count = Helper.codePointCount(s); IntBuffer buffer = null; - if (count < codePointBuf.capacity()) { + if (count <= codePointBuf.capacity()) { buffer = codePointBuf; } else { buffer = IntBuffer.allocate(count); diff --git a/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java b/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java index 1d19baa..72df1be 100644 --- a/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java +++ b/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java @@ -286,7 +286,7 @@ public TtyConnection write(String s) { synchronized (this) { int count = Helper.codePointCount(s); IntBuffer buffer = null; - if (count < codePointBuf.capacity()) { + if (count <= codePointBuf.capacity()) { buffer = codePointBuf; } else { buffer = IntBuffer.allocate(count); From fde19ff06cd2edd46f22dd178be9aac016e9a8ae Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 14:11:33 +0800 Subject: [PATCH 6/9] Resolve buffer and encoder thread safety issues --- .../io/termd/core/io/BufferBinaryEncoder.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/termd/core/io/BufferBinaryEncoder.java b/src/main/java/io/termd/core/io/BufferBinaryEncoder.java index 1ba189b..3a41585 100644 --- a/src/main/java/io/termd/core/io/BufferBinaryEncoder.java +++ b/src/main/java/io/termd/core/io/BufferBinaryEncoder.java @@ -70,20 +70,23 @@ public void accept(IntBuffer codePoints) { capacity += Character.charCount(array[i]); } - //convert code points to chars - CharBuffer charBuffer = getCharBuffer(capacity); - for (int i = offset; i < limit; i++) { - int size = Character.toChars(array[i], charsBuf, 0); - charBuffer.put(charsBuf, 0, size); + //charsetEncoder/charsBuf/cachedBuffer are not thread-safe + synchronized (this) { + //convert code points to chars + CharBuffer charBuffer = getCharBuffer(capacity); + for (int i = offset; i < limit; i++) { + int size = Character.toChars(array[i], charsBuf, 0); + charBuffer.put(charsBuf, 0, size); + } + charBuffer.flip(); + + //encode chars to bytes + ByteBuffer byteBuffer = getByteBuffer(getByteCapacity(capacity)); + charsetEncoder.encode(charBuffer, byteBuffer, true); + byteBuffer.flip(); + + onByte.accept(byteBuffer); } - charBuffer.flip(); - - //encode chars to bytes - ByteBuffer byteBuffer = getByteBuffer(getByteCapacity(capacity)); - charsetEncoder.encode(charBuffer, byteBuffer, true); - byteBuffer.flip(); - - onByte.accept(byteBuffer); } private CharBuffer getCharBuffer(int capacity) { From 5e6cff9ad77947e9a3bb5f6d6646480d314dacd6 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 14:36:44 +0800 Subject: [PATCH 7/9] websocket frame handler compatibility --- .../io/termd/core/http/netty/TtyServerInitializer.java | 2 +- .../core/http/netty/TtyWebSocketFrameHandler.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java b/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java index 95fbbe5..05f81fe 100644 --- a/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java +++ b/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java @@ -62,6 +62,6 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast(httpRequestHandler); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); - pipeline.addLast(new TtyWebSocketFrameHandler(group, handler)); + pipeline.addLast(new TtyWebSocketFrameHandler(group, handler, HttpRequestHandler.class)); } } diff --git a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java index 570ae2d..a74413e 100644 --- a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java +++ b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java @@ -38,10 +38,12 @@ public class TtyWebSocketFrameHandler extends SimpleChannelInboundHandler handler; private ChannelHandlerContext context; private HttpTtyConnection conn; + private Class[] removingHandlerClasses; - public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler) { + public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class... removingHandlerClasses) { this.group = group; this.handler = handler; + this.removingHandlerClasses = removingHandlerClasses; } @Override @@ -53,7 +55,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { - ctx.pipeline().remove(HttpRequestHandler.class); + if (removingHandlerClasses != null) { + for (Class handlerClass : removingHandlerClasses) { + ctx.pipeline().remove(handlerClass); + } + } group.add(ctx.channel()); conn = new HttpTtyConnection() { @Override From 1c3cea84169fbd3b482ba308b19ba739271bbf35 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 15:17:03 +0800 Subject: [PATCH 8/9] change parameter 'removingHandlerClasses' type --- .../core/http/netty/TtyWebSocketFrameHandler.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java index a74413e..136caac 100644 --- a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java +++ b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java @@ -38,9 +38,16 @@ public class TtyWebSocketFrameHandler extends SimpleChannelInboundHandler handler; private ChannelHandlerContext context; private HttpTtyConnection conn; - private Class[] removingHandlerClasses; + private Class removingHandlerClasses; - public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class... removingHandlerClasses) { + /** + * Create TtyWebSocketFrameHandler + * + * @param group + * @param handler tty connection handler + * @param removingHandlerClasses removing handler classes after protocol upgrade + */ + public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class removingHandlerClasses) { this.group = group; this.handler = handler; this.removingHandlerClasses = removingHandlerClasses; @@ -56,9 +63,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { if (removingHandlerClasses != null) { - for (Class handlerClass : removingHandlerClasses) { - ctx.pipeline().remove(handlerClass); - } + ctx.pipeline().remove(removingHandlerClasses); } group.add(ctx.channel()); conn = new HttpTtyConnection() { From d7ed6edc3941dc7150ee611fa26dcc5f15932f3f Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 15:18:52 +0800 Subject: [PATCH 9/9] fix typo --- .../core/http/netty/TtyWebSocketFrameHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java index 136caac..aa828dd 100644 --- a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java +++ b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java @@ -38,19 +38,19 @@ public class TtyWebSocketFrameHandler extends SimpleChannelInboundHandler handler; private ChannelHandlerContext context; private HttpTtyConnection conn; - private Class removingHandlerClasses; + private Class removingHandlerClass; /** * Create TtyWebSocketFrameHandler * * @param group * @param handler tty connection handler - * @param removingHandlerClasses removing handler classes after protocol upgrade + * @param removingHandlerClass removing specify handler class after protocol upgrade */ - public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class removingHandlerClasses) { + public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class removingHandlerClass) { this.group = group; this.handler = handler; - this.removingHandlerClasses = removingHandlerClasses; + this.removingHandlerClass = removingHandlerClass; } @Override @@ -62,8 +62,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { - if (removingHandlerClasses != null) { - ctx.pipeline().remove(removingHandlerClasses); + if (removingHandlerClass != null) { + ctx.pipeline().remove(removingHandlerClass); } group.add(ctx.channel()); conn = new HttpTtyConnection() {