diff --git a/src/main/java/io/termd/core/http/HttpTtyConnection.java b/src/main/java/io/termd/core/http/HttpTtyConnection.java index 4578184..3350699 100644 --- a/src/main/java/io/termd/core/http/HttpTtyConnection.java +++ b/src/main/java/io/termd/core/http/HttpTtyConnection.java @@ -20,14 +20,14 @@ import io.termd.core.function.BiConsumer; import io.termd.core.function.Consumer; import io.termd.core.io.BinaryDecoder; -import io.termd.core.io.BinaryEncoder; -import io.termd.core.tty.TtyConnectionSupport; -import io.termd.core.tty.TtyEvent; -import io.termd.core.tty.TtyEventDecoder; -import io.termd.core.tty.TtyOutputMode; +import io.termd.core.io.BufferBinaryEncoder; +import io.termd.core.tty.*; +import io.termd.core.util.Helper; import io.termd.core.util.Vector; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.charset.Charset; import java.util.Map; @@ -52,6 +52,7 @@ * * @author Julien Viet * @author Matej Lazar + * @author gongdewei 2020/05/20 */ public abstract class HttpTtyConnection extends TtyConnectionSupport { @@ -62,10 +63,12 @@ public abstract class HttpTtyConnection extends TtyConnectionSupport { private Consumer sizeHandler; private final TtyEventDecoder eventDecoder; private final BinaryDecoder decoder; - private final Consumer stdout; + private final Consumer stdout; + private final Consumer stdoutWrapper; private Consumer closeHandler; private Consumer termHandler; private long lastAccessedTime = System.currentTimeMillis(); + private final IntBuffer codePointBuf = IntBuffer.allocate(8192); public HttpTtyConnection() { this(Charset.forName("UTF-8"), DEFAULT_SIZE); @@ -76,12 +79,38 @@ public HttpTtyConnection(Charset charset, Vector size) { this.size = size; this.eventDecoder = new TtyEventDecoder(3, 26, 4); this.decoder = new BinaryDecoder(512, charset, eventDecoder); - this.stdout = new TtyOutputMode(new BinaryEncoder(charset, new Consumer() { + this.stdout = new BufferTtyOutputMode(new BufferBinaryEncoder(charset, new Consumer() { @Override - public void accept(byte[] bytes) { - write(bytes); + public void accept(ByteBuffer data) { + write(data.array(), data.position(), data.remaining()); } })); + this.stdoutWrapper = new Consumer() { + @Override + public void accept(int[] data) { + stdout.accept(IntBuffer.wrap(data)); + } + }; + } + + @Override + public TtyConnection write(String s) { + synchronized (this) { + int count = Helper.codePointCount(s); + IntBuffer buffer = null; + if (count <= codePointBuf.capacity()) { + buffer = codePointBuf; + } else { + buffer = IntBuffer.allocate(count); + } + + buffer.clear(); + Helper.toCodePoints(s, buffer); + buffer.flip(); + + stdout.accept(buffer); + return this; + } } @Override @@ -104,7 +133,11 @@ public String terminalType() { return "vt100"; } - protected abstract void write(byte[] buffer); + protected void write(byte[] buffer) { + this.write(buffer, 0, buffer.length); + } + + protected abstract void write(byte[] buffer, int offset, int length); /** * Special case to handle tty events. @@ -195,7 +228,8 @@ public void setStdinHandler(Consumer handler) { } public Consumer stdoutHandler() { - return stdout; + //TODO replace with Consumer + return stdoutWrapper; } @Override diff --git a/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java b/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java index 95fbbe5..05f81fe 100644 --- a/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java +++ b/src/main/java/io/termd/core/http/netty/TtyServerInitializer.java @@ -62,6 +62,6 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast(httpRequestHandler); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); - pipeline.addLast(new TtyWebSocketFrameHandler(group, handler)); + pipeline.addLast(new TtyWebSocketFrameHandler(group, handler, HttpRequestHandler.class)); } } diff --git a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java index 570ae2d..37d9d72 100644 --- a/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java +++ b/src/main/java/io/termd/core/http/netty/TtyWebSocketFrameHandler.java @@ -17,7 +17,8 @@ package io.termd.core.http.netty; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; @@ -26,6 +27,7 @@ import io.termd.core.function.Consumer; import io.termd.core.http.HttpTtyConnection; import io.termd.core.tty.TtyConnection; +import io.termd.core.util.ByteBufPool; import java.util.concurrent.TimeUnit; @@ -38,10 +40,21 @@ public class TtyWebSocketFrameHandler extends SimpleChannelInboundHandler handler; private ChannelHandlerContext context; private HttpTtyConnection conn; + private Class removingHandlerClass; + private final ByteBufPool byteBufPool; - public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler) { + /** + * Create TtyWebSocketFrameHandler + * + * @param group + * @param handler tty connection handler + * @param removingHandlerClass removing specify handler class after protocol upgrade + */ + public TtyWebSocketFrameHandler(ChannelGroup group, Consumer handler, Class removingHandlerClass) { this.group = group; this.handler = handler; + this.removingHandlerClass = removingHandlerClass; + this.byteBufPool = new ByteBufPool(); } @Override @@ -53,16 +66,50 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { - ctx.pipeline().remove(HttpRequestHandler.class); + if (removingHandlerClass != null) { + ctx.pipeline().remove(removingHandlerClass); + } group.add(ctx.channel()); conn = new HttpTtyConnection() { @Override - protected void write(byte[] buffer) { - ByteBuf byteBuf = Unpooled.buffer(); - byteBuf.writeBytes(buffer); - if (context != null) { - context.writeAndFlush(new TextWebSocketFrame(byteBuf)); + protected void write(byte[] buffer, int offset, int length) { + + int start = offset; + int remain = length; + while (remain > 0) { + if (context == null) { + break; + } + + //ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(remain<=32?32: (remain<=64?64: byteBufSize)); + final ByteBuf byteBuf = byteBufPool.get(50, TimeUnit.MILLISECONDS); + boolean done = false; + int size = 0; + + try { + //write segment + size = Math.min(remain, byteBuf.writableBytes()); + byteBuf.writeBytes(buffer, start, size); + if (context != null) { + context.writeAndFlush(new TextWebSocketFrame(byteBuf)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + byteBufPool.put(byteBuf); + } + }); + done = true; + } + } finally { + if (!done) { + //discard + byteBufPool.discard(byteBuf); + } + } + + start += size; + remain -= size; } + } @Override @@ -84,6 +131,7 @@ public void close() { if (context != null) { context.close(); } + byteBufPool.release(); } }; handler.accept(conn); diff --git a/src/main/java/io/termd/core/io/BufferBinaryEncoder.java b/src/main/java/io/termd/core/io/BufferBinaryEncoder.java new file mode 100644 index 0000000..3a41585 --- /dev/null +++ b/src/main/java/io/termd/core/io/BufferBinaryEncoder.java @@ -0,0 +1,127 @@ +/* + * Copyright 2015 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.termd.core.io; + +import io.termd.core.function.Consumer; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.IntBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; + +/** + * @author Julien Viet + * @author gongdewei 2020/05/19 + */ +public class BufferBinaryEncoder implements Consumer { + + private CharsetEncoder charsetEncoder; + private volatile Charset charset; + private final Consumer onByte; + private final char[] charsBuf = new char[2]; + private int capacity = 8192; + private ByteBuffer cachedByteBuffer; + private CharBuffer cachedCharBuffer; + + public BufferBinaryEncoder(Charset charset, Consumer onByte) { + this.setCharset(charset); + this.onByte = onByte; + } + + /** + * Set a new charset on the encoder. + * + * @param charset the new charset + */ + public void setCharset(Charset charset) { + this.charset = charset; + charsetEncoder = charset.newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + + //check buffer + ensureBuffer(); + } + + @Override + public void accept(IntBuffer codePoints) { + int[] array = codePoints.array(); + int offset = codePoints.position(); + int limit = codePoints.limit(); + + int capacity = 0; + for (int i = offset; i < limit; i++) { + capacity += Character.charCount(array[i]); + } + + //charsetEncoder/charsBuf/cachedBuffer are not thread-safe + synchronized (this) { + //convert code points to chars + CharBuffer charBuffer = getCharBuffer(capacity); + for (int i = offset; i < limit; i++) { + int size = Character.toChars(array[i], charsBuf, 0); + charBuffer.put(charsBuf, 0, size); + } + charBuffer.flip(); + + //encode chars to bytes + ByteBuffer byteBuffer = getByteBuffer(getByteCapacity(capacity)); + charsetEncoder.encode(charBuffer, byteBuffer, true); + byteBuffer.flip(); + + onByte.accept(byteBuffer); + } + } + + private CharBuffer getCharBuffer(int capacity) { + CharBuffer charBuffer = null; + if (capacity <= cachedCharBuffer.capacity()) { + charBuffer = cachedCharBuffer; + charBuffer.clear(); + } else { + charBuffer = CharBuffer.allocate(capacity); + } + return charBuffer; + } + + private ByteBuffer getByteBuffer(int capacity) { + ByteBuffer byteBuffer = null; + if (capacity <= cachedByteBuffer.capacity()) { + byteBuffer = cachedByteBuffer; + byteBuffer.clear(); + } else { + byteBuffer = ByteBuffer.allocate(capacity); + } + return byteBuffer; + } + + private void ensureBuffer() { + if (cachedCharBuffer ==null || cachedCharBuffer.limit() != capacity) { + cachedCharBuffer = CharBuffer.allocate(capacity); + } + int byteBufCapacity = getByteCapacity(capacity); + if (cachedByteBuffer ==null || cachedByteBuffer.limit() != byteBufCapacity) { + cachedByteBuffer = ByteBuffer.allocate(byteBufCapacity); + } + } + + private int getByteCapacity(int capacity) { + return (int) (capacity *charsetEncoder.averageBytesPerChar()); + } +} diff --git a/src/main/java/io/termd/core/telnet/TelnetConnection.java b/src/main/java/io/termd/core/telnet/TelnetConnection.java index b2b2f87..edb78cf 100644 --- a/src/main/java/io/termd/core/telnet/TelnetConnection.java +++ b/src/main/java/io/termd/core/telnet/TelnetConnection.java @@ -20,6 +20,7 @@ /** * @author Julien Viet +* @author gongdewei 2020/05/20 */ public abstract class TelnetConnection { @@ -83,23 +84,15 @@ public final void writeWillOption(Option option) { send(new byte[]{BYTE_IAC, BYTE_WILL, option.code}); } - private void rawWrite(byte[] data, int offset, int length) { - if (length > 0) { - if (offset == 0 && length == data.length) { - send(data); - } else { - byte[] chunk = new byte[length]; - System.arraycopy(data, offset, chunk, 0, chunk.length); - send(chunk); - } - } - } - protected abstract void execute(Runnable task); protected abstract void schedule(Runnable task, long delay, TimeUnit unit); - protected abstract void send(byte[] data); + protected abstract void send(byte[] data, int offset, int len); + + protected void send(byte[] data) { + this.send(data, 0, data.length); + } public void receive(byte[] data) { for (byte b : data) { @@ -108,7 +101,6 @@ public void receive(byte[] data) { flushDataIfNecessary(); } - /** * Write data to the client, escaping data if necessary or truncating it. The original buffer can * be mutated if incorrect data is provided. @@ -116,17 +108,30 @@ public void receive(byte[] data) { * @param data the data to write */ public final void write(byte[] data) { + this.write(data, 0, data.length); + } + + /** + * Write data to the client, escaping data if necessary or truncating it. The original buffer can + * be mutated if incorrect data is provided. + * + * @param data the data to write + * @param offset + * @param len + */ + public final void write(byte[] data, int offset, int len) { if (sendBinary) { // actually the logic here never get executed. int prev = 0; - for (int i = 0;i < data.length;i++) { + int end = offset+len; + for (int i = offset;i < end;i++) { if (data[i] == -1) { - rawWrite(data, prev, i - prev); + send(data, prev, i - prev); send(new byte[]{-1, -1}); prev = i + 1; } } - rawWrite(data, prev, data.length - prev); + send(data, prev, data.length - prev); } else { // Not fully understand the logic below, but // Chinese characters will be truncated by the following logic. @@ -136,7 +141,7 @@ public final void write(byte[] data) { // for (int i = 0;i < data.length;i++) { // data[i] = (byte)(data[i] & 0x7F); // } - send(data); + send(data, offset, len); } } diff --git a/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java b/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java index 720a0df..72df1be 100644 --- a/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java +++ b/src/main/java/io/termd/core/telnet/TelnetTtyConnection.java @@ -18,17 +18,15 @@ import io.termd.core.function.BiConsumer; import io.termd.core.function.Consumer; -import io.termd.core.tty.ReadBuffer; -import io.termd.core.tty.TtyEvent; -import io.termd.core.tty.TtyEventDecoder; -import io.termd.core.tty.TtyOutputMode; +import io.termd.core.io.BufferBinaryEncoder; +import io.termd.core.tty.*; import io.termd.core.util.Helper; import io.termd.core.util.Vector; import io.termd.core.io.BinaryDecoder; -import io.termd.core.io.BinaryEncoder; import io.termd.core.io.TelnetCharset; -import io.termd.core.tty.TtyConnection; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.charset.Charset; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -37,6 +35,7 @@ * A telnet handler that implements {@link io.termd.core.tty.TtyConnection}. * * @author Julien Viet + * @author gongdewei 2020/05/20 */ public final class TelnetTtyConnection extends TelnetHandler implements TtyConnection { @@ -63,10 +62,12 @@ public void execute(Runnable command) { }); private final BinaryDecoder decoder; - private final BinaryEncoder encoder; + private final BufferBinaryEncoder encoder; - private final Consumer stdout; + private final Consumer stdout; + private final Consumer stdoutWrapper; private final Consumer handler; + private final IntBuffer codePointBuf = IntBuffer.allocate(8192); private long lastAccessedTime = System.currentTimeMillis(); public TelnetTtyConnection(boolean inBinary, boolean outBinary, Charset charset, Consumer handler) { @@ -76,13 +77,19 @@ public TelnetTtyConnection(boolean inBinary, boolean outBinary, Charset charset, this.handler = handler; this.size = new Vector(); this.decoder = new BinaryDecoder(512, TelnetCharset.INSTANCE, readBuffer); - this.encoder = new BinaryEncoder(charset, new Consumer() { + this.encoder = new BufferBinaryEncoder(charset, new Consumer() { @Override - public void accept(byte[] data) { - conn.write(data); + public void accept(ByteBuffer data) { + conn.write(data.array(), data.position(), data.remaining()); } }); - this.stdout = new TtyOutputMode(encoder); + this.stdout = new BufferTtyOutputMode(encoder); + this.stdoutWrapper = new Consumer() { + @Override + public void accept(int[] data) { + stdout.accept(IntBuffer.wrap(data)); + } + }; } @Override @@ -243,7 +250,8 @@ public void setStdinHandler(Consumer handler) { @Override public Consumer stdoutHandler() { - return stdout; + //TODO replace with Consumer + return stdoutWrapper; } @Override @@ -275,8 +283,21 @@ public void close(int exit) { @Override public TtyConnection write(String s) { - int[] codePoints = Helper.toCodePoints(s); - stdoutHandler().accept(codePoints); - return this; + synchronized (this) { + int count = Helper.codePointCount(s); + IntBuffer buffer = null; + if (count <= codePointBuf.capacity()) { + buffer = codePointBuf; + } else { + buffer = IntBuffer.allocate(count); + } + + buffer.clear(); + Helper.toCodePoints(s, buffer); + buffer.flip(); + + stdout.accept(buffer); + return this; + } } } diff --git a/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java b/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java index 6246f0b..006530c 100644 --- a/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java +++ b/src/main/java/io/termd/core/telnet/netty/NettyTelnetConnection.java @@ -16,24 +16,31 @@ package io.termd.core.telnet.netty; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.termd.core.telnet.TelnetConnection; import io.termd.core.telnet.TelnetHandler; +import io.termd.core.util.ByteBufPool; import java.util.concurrent.TimeUnit; /** * @author Julien Viet + * @author gongdewei 2020/05/20 */ public class NettyTelnetConnection extends TelnetConnection { final ChannelHandlerContext context; + final ByteBufPool byteBufPool; + public NettyTelnetConnection(TelnetHandler handler, ChannelHandlerContext context) { super(handler); this.context = context; + byteBufPool = new ByteBufPool(); } @Override @@ -48,13 +55,42 @@ protected void schedule(Runnable task, long delay, TimeUnit unit) { // Not properly synchronized, but ok for now @Override - protected void send(byte[] data) { - context.writeAndFlush(Unpooled.buffer().writeBytes(data)); + protected void send(byte[] data, int offset, int len) { + + int start = offset; + int remain = len; + while (remain > 0) { +// ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(remain<=32?32: (remain<=64?64: byteBufSize)); + final ByteBuf byteBuf = byteBufPool.get(50, TimeUnit.MILLISECONDS); + boolean done = false; + int size = 0; + + try { + //write segment + size = Math.min(remain, byteBuf.writableBytes()); + byteBuf.writeBytes(data, start, size); + context.writeAndFlush(byteBuf).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + byteBufPool.put(byteBuf); + } + }); + done = true; + } finally { + if (!done) { + byteBufPool.discard(byteBuf); + } + } + + start += size; + remain -= size; + } } @Override protected void onClose() { super.onClose(); + byteBufPool.release(); } @Override diff --git a/src/main/java/io/termd/core/tty/BufferTtyOutputMode.java b/src/main/java/io/termd/core/tty/BufferTtyOutputMode.java new file mode 100644 index 0000000..0a21457 --- /dev/null +++ b/src/main/java/io/termd/core/tty/BufferTtyOutputMode.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.termd.core.tty; + + +import io.termd.core.function.Consumer; + +import java.nio.IntBuffer; + +/** + * @author Julien Viet + * @author gongdewei 2020/05/19 + */ +public class BufferTtyOutputMode implements Consumer { + + private static final IntBuffer CRLF = IntBuffer.wrap(new int[]{'\r', '\n'}); + private final Consumer readHandler; + + public BufferTtyOutputMode(Consumer readHandler) { + this.readHandler = readHandler; + } + + @Override + public void accept(IntBuffer data) { + synchronized (this){ + if (readHandler != null && data.remaining() > 0) { + int[] array = data.array(); + int offset = data.position(); + int limit = data.limit(); + int prev = offset; + int ptr = offset; + while (ptr < limit) { + // Simple implementation that works only on system that uses /n as line terminator + // equivalent to 'stty onlcr' + int cp = array[ptr]; + if (cp == '\n') { + if (ptr > prev) { + sendChunk(data, prev, ptr); + } + //reset crlf read index + CRLF.position(0); + readHandler.accept(CRLF); + prev = ++ptr; + } else { + ptr++; + } + } + if (ptr > prev) { + sendChunk(data, prev, ptr); + } + } + } + } + + private void sendChunk(IntBuffer data, int prev, int ptr) { + //hotspot, reduce memory fragment + //change limit first, then position + data.limit(ptr); + data.position(prev); + readHandler.accept(data); + } +} diff --git a/src/main/java/io/termd/core/tty/TtyOutputMode.java b/src/main/java/io/termd/core/tty/TtyOutputMode.java index 4282dec..d181f1b 100644 --- a/src/main/java/io/termd/core/tty/TtyOutputMode.java +++ b/src/main/java/io/termd/core/tty/TtyOutputMode.java @@ -24,6 +24,7 @@ */ public class TtyOutputMode implements Consumer { + private static final int[] CRLF = {'\r', '\n'}; private final Consumer readHandler; public TtyOutputMode(Consumer readHandler) { @@ -43,7 +44,7 @@ public void accept(int[] data) { if (ptr > prev) { sendChunk(data, prev, ptr); } - readHandler.accept(new int[]{'\r','\n'}); + readHandler.accept(CRLF); prev = ++ptr; } else { ptr++; diff --git a/src/main/java/io/termd/core/util/ByteBufPool.java b/src/main/java/io/termd/core/util/ByteBufPool.java new file mode 100644 index 0000000..92e1d36 --- /dev/null +++ b/src/main/java/io/termd/core/util/ByteBufPool.java @@ -0,0 +1,124 @@ +package io.termd.core.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 简单的ByteBuf池,比netty的PooledByteBufAllocator更轻量,内存碎片少 + *
+ *     //get buf
+ *     final ByteBuf byteBuf = byteBufPool.get(50, TimeUnit.MILLISECONDS);
+ *     boolean done = false;
+ *     try {
+ *       //write bytes
+ *       byteBuf.writeBytes(buffer, start, size);
+ *       if (context != null) {
+ *         context.writeAndFlush(new TextWebSocketFrame(byteBuf)).addListener(new ChannelFutureListener() {
+ *           public void operationComplete(ChannelFuture future) throws Exception {
+ *           	//give back
+ *             byteBufPool.put(byteBuf);
+ *           }
+ *         });
+ *         done = true;
+ *       }
+ *     } finally {
+ *       if (!done) {
+ *         //discard
+ *         byteBufPool.discard(byteBuf);
+ *       }
+ *     }
+ *
+ *     //onClose
+ *     protected void onClose() {
+ *       //...
+ *       byteBufPool.release();
+ *     }
+ * 
+ * @author gongdewei 2020/5/19 + */ +public class ByteBufPool { + + private int byteBufCapacity;// = 128; + + //pool size: 128 * 1024 = 128K + private int poolSize;// = 1024; + private AtomicInteger allocSize = new AtomicInteger(); + private BlockingQueue byteBufPool; + private AtomicBoolean closed = new AtomicBoolean(false); + + public ByteBufPool() { + this(1024, 128); + } + + public ByteBufPool(int poolSize, int byteBufCapacity) { + this.byteBufCapacity = byteBufCapacity; + this.poolSize = poolSize; + byteBufPool = new ArrayBlockingQueue(poolSize); + } + + /** + * Apply a fixed capacity ByteBuf + * @return + */ + public ByteBuf get(long timeout, TimeUnit unit) { + if (closed.get()) { + throw new IllegalStateException("ByteBufPool is closed"); + } + ByteBuf byteBuf = byteBufPool.poll(); + if (byteBuf == null && allocSize.get() >= poolSize) { + //wait if pool is full alloc + try { + byteBuf = byteBufPool.poll(timeout, unit); + } catch (InterruptedException e) { + //ignore + } + } + if (byteBuf == null) { + //heap buffer + byteBuf = Unpooled.buffer(byteBufCapacity); + allocSize.incrementAndGet(); + } + return byteBuf.retain(); + } + + /** + * Give back a ByteBuf + * @param byteBuf + */ + public void put(ByteBuf byteBuf) { + // give back + byteBuf.clear(); + if (byteBuf.capacity() != byteBufCapacity || !byteBufPool.offer(byteBuf)) { + //buf pool is full or capacity not match + byteBuf.release(); + allocSize.decrementAndGet(); + } + } + + /** + * Discard a ByteBuf, call it when there is a problem/exception and you don’t know how to recycle it safely. + * @param byteBuf + */ + public void discard(ByteBuf byteBuf) { + //just release refCnt + byteBuf.release(); + allocSize.decrementAndGet(); + } + + /** + * Release all cached ByteBuf, call it when shutdown/stop service + */ + public void release() { + closed.set(true); + while (!byteBufPool.isEmpty()) { + ByteBuf byteBuf = byteBufPool.poll(); + byteBuf.release(); + } + } +} diff --git a/src/main/java/io/termd/core/util/Helper.java b/src/main/java/io/termd/core/util/Helper.java index 7e85f1d..bc5da38 100644 --- a/src/main/java/io/termd/core/util/Helper.java +++ b/src/main/java/io/termd/core/util/Helper.java @@ -19,10 +19,10 @@ import io.termd.core.function.Consumer; import io.termd.core.function.IntConsumer; +import java.nio.IntBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.ServiceLoader; @@ -53,21 +53,55 @@ public static void noop() {} * @return the code points */ public static int[] toCodePoints(String s) { - List codePoints = new ArrayList(); - for (int offset = 0; offset < s.length();) { + int count = Character.codePointCount(s, 0, s.length()); + int[] codePoints = new int[count]; + for (int offset = 0, i = 0; i < count && offset < s.length();i++) { int cp = s.codePointAt(offset); - codePoints.add(cp); + codePoints[i] = cp; offset += Character.charCount(cp); } - return convert(codePoints); + return codePoints; + +// List codePoints = new ArrayList(); +// for (int offset = 0; offset < s.length();) { +// int cp = s.codePointAt(offset); +// codePoints.add(cp); +// offset += Character.charCount(cp); +// } +// return convert(codePoints); } /** - * Code point to string conversion. + * Convert the string to an array of code points. + * Notice: Ensure out buffer capacity >= Helper.codePointCount(s) * - * @param codePoints the code points - * @return the corresponding string + * @param s the string to convert + * @param out the buffer to write code points + */ + public static void toCodePoints(String s, IntBuffer out) { + //Assert out.remaining() >= codePointCount + for (int offset = 0; offset < s.length(); ) { + int cp = s.codePointAt(offset); + out.put(cp); + offset += Character.charCount(cp); + } + } + + /** + * Get code points count of string + * @param s + * @return */ + public static int codePointCount(String s) { + return Character.codePointCount(s, 0, s.length()); + } + + /** + * Code point to string conversion. + * + * @param codePoints the code points + * @return the corresponding string + */ public static String fromCodePoints(int[] codePoints) { return new String(codePoints, 0, codePoints.length); } diff --git a/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java b/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java index b54c080..5e18000 100644 --- a/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java +++ b/src/test/java/io/termd/core/http/websocket/server/WebSocketTtyConnection.java @@ -45,17 +45,17 @@ public class WebSocketTtyConnection extends HttpTtyConnection { private Set readonlyChannels = new HashSet(); @Override - protected void write(byte[] buffer) { + protected void write(byte[] buffer, int offset, int length) { if (isOpen()) { - sendBinary(buffer, webSocketChannel); + sendBinary(ByteBuffer.wrap(buffer, offset, length), webSocketChannel); } for (WebSocketChannel channel : readonlyChannels) { - sendBinary(buffer, channel); + sendBinary(ByteBuffer.wrap(buffer, offset, length), channel); } } - private void sendBinary(byte[] buffer, WebSocketChannel webSocketChannel) { - WebSockets.sendBinary(ByteBuffer.wrap(buffer), webSocketChannel, null); + private void sendBinary(ByteBuffer buffer, WebSocketChannel webSocketChannel) { + WebSockets.sendBinary(buffer, webSocketChannel, null); } @Override