From 66aa79249798e6d5d9dc4cb446f7c03152f36027 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Mon, 2 Feb 2026 18:59:58 +0300 Subject: [PATCH 1/4] IGNITE-27722 Extract message serde logic from TcpDiscoveryIoSession --- .../ignite/spi/discovery/tcp/ServerImpl.java | 65 ++- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 7 +- .../discovery/tcp/TcpDiscoveryIoSession.java | 346 +-------------- .../tcp/TcpDiscoveryIoSessionSerializer.java | 407 ++++++++++++++++++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +- 5 files changed, 476 insertions(+), 352 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0a24730a12ceb..87305a73b343a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -3241,19 +3242,37 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (spi.ensured(msg)) msgHist.add(msg); + byte[] msgBytes = null; + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { + if (msgBytes == null) { + try { + msgBytes = serde.serializeMessage(msg); + } + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message to a client: " + msg + ", recepient " + + "client id: " + clientMsgWorker.clientNodeId, e); + + break; + } + } + + TcpDiscoveryAbstractMessage msg0 = msg; + byte[] msgBytes0 = msgBytes; + if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) { - msg = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); + msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); + + prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); - prepareNodeAddedMessage(msg, clientMsgWorker.clientNodeId, null); + msgBytes0 = null; } } - // TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722 - clientMsgWorker.addMessage(msg); + clientMsgWorker.addMessage(msg0, msgBytes0); } } } @@ -7582,7 +7601,7 @@ private class StatisticsPrinter extends IgniteSpiThread { } /** */ - private class ClientMessageWorker extends MessageWorker { + private class ClientMessageWorker extends MessageWorker> { /** Node ID. */ private final UUID clientNodeId; @@ -7649,10 +7668,20 @@ void metrics(ClusterMetrics metrics) { * @param msg Message. */ void addMessage(TcpDiscoveryAbstractMessage msg) { + addMessage(msg, null); + } + + /** + * @param msg Message. + * @param msgBytes Optional message bytes. + */ + void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { + T2 t = new T2<>(msg, msgBytes); + if (msg.highPriority()) - queue.addFirst(msg); + queue.addFirst(t); else - queue.add(msg); + queue.add(t); DebugLogger log = messageLogger(msg); @@ -7661,14 +7690,14 @@ void addMessage(TcpDiscoveryAbstractMessage msg) { } /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + @Override protected void processMessage(T2 msgT) { boolean success = false; + TcpDiscoveryAbstractMessage msg = msgT.get1(); + try { assert msg.verified() : msg; - byte[] msgBytes = ses.serializeMessage(msg); - DebugLogger msgLog = messageLogger(msg); if (msg instanceof TcpDiscoveryClientAckResponse) { @@ -7690,8 +7719,8 @@ else if (msgLog.isDebugEnabled()) { + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); } - spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? - spi.clientFailureDetectionTimeout() : spi.getSocketTimeout()); + writeToSocket(msgT, spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() : + spi.getSocketTimeout()); } } else { @@ -7702,7 +7731,7 @@ else if (msgLog.isDebugEnabled()) { assert topologyInitialized(msg) : msg; - spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false)); + writeToSocket(msgT, spi.getEffectiveSocketTimeout(false)); } boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage && @@ -7731,6 +7760,16 @@ else if (msgLog.isDebugEnabled()) { } } + /** + * @param msgT Message tuple. + * @param timeout Timeout. + */ + private void writeToSocket(T2 msgT, long timeout) throws IgniteCheckedException, IOException { + byte[] msgBytes = msgT.get2() == null ? serde.serializeMessage(msgT.get1()) : msgT.get2(); + + spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout); + } + /** * @param msg Message. * @return {@code True} if topology initialized. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index e115a3cca03b1..07c227bc09eaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -90,6 +90,9 @@ abstract class TcpDiscoveryImpl { /** */ protected final TcpDiscoverySpi spi; + /** Session serializer. */ + protected final TcpDiscoveryIoSessionSerializer serde; + /** */ protected final IgniteLogger log; @@ -158,6 +161,8 @@ protected static Collection upcast(Collection c) { TcpDiscoveryImpl(TcpDiscoverySpi spi) { this.spi = spi; + serde = new TcpDiscoveryIoSessionSerializer(spi); + log = spi.log; if (spi.ignite() instanceof IgniteEx) @@ -484,7 +489,7 @@ protected static List toOrderedList(Collection addrs) * @return IO session for writing and reading {@link TcpDiscoveryAbstractMessage}. */ TcpDiscoveryIoSession createSession(Socket sock) { - return new TcpDiscoveryIoSession(sock, spi); + return new TcpDiscoveryIoSession(sock, spi, serde); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index fa8d71e40f171..e85e83db07d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -19,98 +19,55 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; -import java.io.StreamCorruptedException; import java.net.Socket; -import java.nio.ByteBuffer; import java.security.cert.Certificate; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.direct.DirectMessageReader; -import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSessionSerializer.CompositeInputStream; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; - /** * Handles I/O operations between discovery nodes in the cluster. This class encapsulates the socket connection used * by the {@link TcpDiscoverySpi} to exchange discovery protocol messages between nodes. - *

- * Currently, there are two modes for message serialization: - *

    - *
  • Using {@link MessageSerializer} for messages implementing the {@link Message} interface.
  • - *
  • Deprecated: Using {@link JdkMarshaller} for messages that have not yet been refactored.
  • - *
- * A leading byte is used to distinguish between the modes. The byte will be removed in future. + * Ьessage serialization is delegated to a {@link TcpDiscoveryIoSessionSerializer}. */ public class TcpDiscoveryIoSession { /** Default size of buffer used for buffering socket in/out. */ private static final int DFLT_SOCK_BUFFER_SIZE = 8192; - /** Size for an intermediate buffer for serializing discovery messages. */ - private static final int MSG_BUFFER_SIZE = 100; - - /** Leading byte for messages use {@link JdkMarshaller} for serialization. */ - // TODO: remove these flags after refactoring all discovery messages. - static final byte JAVA_SERIALIZATION = (byte)1; - - /** Leading byte for messages use {@link MessageSerializer} for serialization. */ - static final byte MESSAGE_SERIALIZATION = (byte)2; - /** */ private final TcpDiscoverySpi spi; - /** Loads discovery messages classes during java deserialization. */ - private final ClassLoader clsLdr; - /** */ private final Socket sock; - /** */ - private final DirectMessageWriter msgWriter; - - /** */ - private final DirectMessageReader msgReader; - /** Buffered socket output stream. */ private final OutputStream out; /** Buffered socket input stream. */ private final CompositeInputStream in; - /** Intermediate buffer for serializing discovery messages. */ - private final ByteBuffer msgBuf; + /** */ + private final TcpDiscoveryIoSessionSerializer serde; /** * Creates a new discovery I/O session bound to the given socket. * * @param sock Socket connected to a remote discovery node. * @param spi Discovery SPI instance owning this session. + * @param serde Session serializer. * @throws IgniteException If an I/O error occurs while initializing buffers. */ - TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) { + TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi, TcpDiscoveryIoSessionSerializer serde) { this.sock = sock; this.spi = spi; - - clsLdr = U.resolveClassLoader(spi.ignite().configuration()); - - msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); - - msgWriter = new DirectMessageWriter(spi.messageFactory()); - msgReader = new DirectMessageReader(spi.messageFactory(), null); + this.serde = serde; try { int sendBufSize = sock.getSendBufferSize() > 0 ? sock.getSendBufferSize() : DFLT_SOCK_BUFFER_SIZE; @@ -131,28 +88,7 @@ public class TcpDiscoveryIoSession { * @throws IgniteCheckedException If serialization fails. */ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { - if (!(msg instanceof Message)) { - out.write(JAVA_SERIALIZATION); - - U.marshal(spi.marshaller(), msg, out); - - return; - } - - try { - out.write(MESSAGE_SERIALIZATION); - - serializeMessage((Message)msg, out); - - out.flush(); - } - catch (Exception e) { - // Keep logic similar to `U.marshal(...)`. - if (e instanceof IgniteCheckedException) - throw (IgniteCheckedException)e; - - throw new IgniteCheckedException(e); - } + serde.writeMessage(msg, out); } /** @@ -163,64 +99,7 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException * @throws IgniteCheckedException If deserialization fails. */ T readMessage() throws IgniteCheckedException, IOException { - byte serMode = (byte)in.read(); - - if (JAVA_SERIALIZATION == serMode) - return U.unmarshal(spi.marshaller(), in, clsLdr); - - try { - if (MESSAGE_SERIALIZATION != serMode) { - detectSslAlert(serMode, in); - - // IOException type is important for ServerImpl. It may search the cause (X.hasCause). - // The connection error processing behavior depends on it. - throw new IOException("Received unexpected byte while reading discovery message: " + serMode); - } - - Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); - - msgReader.reset(); - msgReader.setBuffer(msgBuf); - - MessageSerializer msgSer = spi.messageFactory().serializer(msg.directType()); - - boolean finished; - - do { - msgBuf.clear(); - - int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); - - if (read == -1) - throw new EOFException("Connection closed before message was fully read."); - - msgBuf.limit(read); - - finished = msgSer.readFrom(msg, msgReader); - - // Server Discovery only sends next message to next Server upon receiving a receipt for the previous one. - // This behaviour guarantees that we never read a next message from the buffer right after the end of - // the previous message. But it is not guaranteed with Client Discovery where messages aren't acknowledged. - // Thus, we have to keep the uprocessed bytes read from the socket. It won't return them again. - if (msgBuf.hasRemaining()) { - byte[] unprocessedReadTail = new byte[msgBuf.remaining()]; - - msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining()); - - in.attachByteArray(unprocessedReadTail); - } - } - while (!finished); - - return (T)msg; - } - catch (Exception e) { - // Keep logic similar to `U.marshal(...)`. - if (e instanceof IgniteCheckedException) - throw (IgniteCheckedException)e; - - throw new IgniteCheckedException(e); - } + return serde.readMessage(in); } /** @return SSL certificate this session is established with. {@code null} if SSL is disabled or certificate validation failed. */ @@ -238,216 +117,9 @@ T readMessage() throws IgniteCheckedException, IOException { } } - /** - * Serializes a discovery message into a byte array. - * - * @param msg Discovery message to serialize. - * @return Serialized byte array containing the message data. - * @throws IgniteCheckedException If serialization fails. - * @throws IOException If serialization fails. - */ - byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { - if (!(msg instanceof Message)) - return U.marshal(spi.marshaller(), msg); - - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - serializeMessage((Message)msg, out); - - return out.toByteArray(); - } - } - /** @return Socket. */ public Socket socket() { return sock; } - - /** - * Serializes a discovery message into given output stream. - * - * @param m Discovery message to serialize. - * @param out Output stream to write serialized message. - * @throws IOException If serialization fails. - */ - private void serializeMessage(Message m, OutputStream out) throws IOException { - MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); - - msgWriter.reset(); - msgWriter.setBuffer(msgBuf); - - boolean finished; - - do { - // Should be cleared before first operation. - msgBuf.clear(); - - finished = msgSer.writeTo(m, msgWriter); - - out.write(msgBuf.array(), 0, msgBuf.position()); - } - while (!finished); - } - - /** - * Checks wheter input stream contains SSL alert. - * See handling {@code StreamCorruptedException} in {@link #readMessage()}. - * Keeps logic similar to {@link java.io.ObjectInputStream#readStreamHeader}. - */ - private void detectSslAlert(byte firstByte, InputStream in) throws IOException { - byte[] hdr = new byte[4]; - hdr[0] = firstByte; - int read = in.readNBytes(hdr, 1, 3); - - if (read < 3) - throw new EOFException(); - - String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], hdr[3]); - - if (hex.matches("15....00")) - throw new StreamCorruptedException("invalid stream header: " + hex); - } - - /** - * Input stream implementation that combines a byte array and a regular InputStream allowing to read bytes - * from the array first and then proceed with reading from InputStream. - * Supports only basic read methods. - */ - private static class CompositeInputStream extends BufferedInputStream { - /** Prefix data input stream to read before the original input stream. */ - @Nullable private ByteArrayInputStream attachedBytesIs; - - /** @param srcIs Original input stream to read when {@link #attachedBytesIs} is empty. */ - private CompositeInputStream(InputStream srcIs) { - super(srcIs); - } - - /** @param prefixData Prefix data to read before the original input stream. */ - private void attachByteArray(byte[] prefixData) { - assert prefixBytesLeft() == 0; - - attachedBytesIs = new ByteArrayInputStream(prefixData); - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - if (prefixBytesLeft() > 0) { - int res = attachedBytesIs.read(); - - checkPrefixBufferExhausted(); - - return res; - } - - return super.read(); - } - - /** {@inheritDoc} */ - @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { - int len0 = readPrefixBuffer(b, off, len); - - assert len0 <= len; - - if (len0 == len) - return len0; - - return len0 + super.read(b, off + len0, len - len0); - } - - /** {@inheritDoc} */ - @Override public int read(@NotNull byte[] b) throws IOException { - return read(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public int readNBytes(byte[] b, int off, int len) throws IOException { - int len0 = readPrefixBuffer(b, off, len); - - return super.readNBytes(b, off + len0, len - len0); - } - - /** {@inheritDoc} */ - @Override public int available() throws IOException { - // Original input stream may return Integer#MAX_VALUE. - if (super.available() > Integer.MAX_VALUE - prefixBytesLeft()) - return super.available(); - - return super.available() + prefixBytesLeft(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (attachedBytesIs != null) { - attachedBytesIs.close(); - - attachedBytesIs = null; - } - - super.close(); - } - - /** */ - private int readPrefixBuffer(byte[] b, int off, int len) { - int res = 0; - - int prefixBytesLeft = prefixBytesLeft(); - - if (prefixBytesLeft > 0) { - if (len > b.length - off) - len = b.length - off; - - res = attachedBytesIs.read(b, off, Math.min(len, prefixBytesLeft)); - - checkPrefixBufferExhausted(); - } - - return res; - } - - /** */ - private int prefixBytesLeft() { - return attachedBytesIs == null ? 0 : attachedBytesIs.available(); - } - - /** */ - private void checkPrefixBufferExhausted() { - if (attachedBytesIs != null && attachedBytesIs.available() == 0) - attachedBytesIs = null; - } - - /** {@inheritDoc} */ - @Override public void mark(int readlimit) { - throw new UnsupportedOperationException("mark() is not supported."); - } - - /** {@inheritDoc} */ - @Override public boolean markSupported() { - return false; - } - - /** {@inheritDoc} */ - @Override public void reset() { - throw new UnsupportedOperationException("reset() is not supported."); - } - - /** {@inheritDoc} */ - @Override public long skip(long n) { - throw new UnsupportedOperationException("skip() is not supported."); - } - - /** {@inheritDoc} */ - @Override public long transferTo(OutputStream out) { - throw new UnsupportedOperationException("transferTo() is not supported."); - } - - /** {@inheritDoc} */ - @Override public @NotNull byte[] readAllBytes() { - throw new UnsupportedOperationException("readAllBytes() is not supported."); - } - - /** {@inheritDoc} */ - @Override public @NotNull byte[] readNBytes(int len) { - throw new UnsupportedOperationException("readNBytes() is not supported."); - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java new file mode 100644 index 0000000000000..59c0abcaee8cc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StreamCorruptedException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.direct.DirectMessageReader; +import org.apache.ignite.internal.direct.DirectMessageWriter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; + +/** + * Provides serialization operations for {@link TcpDiscoveryIoSession}. + * Encapsulates {@link MessageWriter}, {@link MessageReader} and message buffer. + * NOTE: Is not thread-safe! Should not be shared between threads. + *

+ * Currently, there are two modes for message serialization: + *

    + *
  • Using {@link MessageSerializer} for messages implementing the {@link Message} interface.
  • + *
  • Deprecated: Using {@link JdkMarshaller} for messages that have not yet been refactored.
  • + *
+ * A leading byte is used to distinguish between the modes. The byte will be removed in the future. + */ +public class TcpDiscoveryIoSessionSerializer { + /** Size for an intermediate buffer for serializing discovery messages. */ + private static final int MSG_BUFFER_SIZE = 100; + + /** Leading byte for messages use {@link JdkMarshaller} for serialization. */ + // TODO: remove these flags after refactoring all discovery messages. + static final byte JAVA_SERIALIZATION = (byte)1; + + /** Leading byte for messages use {@link MessageSerializer} for serialization. */ + static final byte MESSAGE_SERIALIZATION = (byte)2; + + /** */ + private final TcpDiscoverySpi spi; + + /** Loads discovery messages classes during java deserialization. */ + private final ClassLoader clsLdr; + + /** */ + private final DirectMessageWriter msgWriter; + + /** */ + private final DirectMessageReader msgReader; + + /** Intermediate buffer for serializing discovery messages. */ + private final ByteBuffer msgBuf; + + /** + * Creates a new discovery I/O session bound to the given socket. + * + * @param spi Discovery SPI instance owning this session. + * @throws IgniteException If an I/O error occurs while initializing buffers. + */ + TcpDiscoveryIoSessionSerializer(TcpDiscoverySpi spi) { + this.spi = spi; + + clsLdr = U.resolveClassLoader(spi.ignite().configuration()); + + msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + + msgWriter = new DirectMessageWriter(spi.messageFactory()); + msgReader = new DirectMessageReader(spi.messageFactory(), null); + } + + /** + * Writes a discovery message to the specified output stream. + * + * @param msg Message to send to the remote node. + * @param out Output stream. + * @throws IgniteCheckedException If serialization fails. + */ + void writeMessage(TcpDiscoveryAbstractMessage msg, OutputStream out) throws IgniteCheckedException, IOException { + if (!(msg instanceof Message)) { + out.write(JAVA_SERIALIZATION); + + U.marshal(spi.marshaller(), msg, out); + + return; + } + + try { + out.write(MESSAGE_SERIALIZATION); + + serializeMessage((Message)msg, out); + + out.flush(); + } + catch (Exception e) { + // Keep logic similar to `U.marshal(...)`. + if (e instanceof IgniteCheckedException) + throw (IgniteCheckedException)e; + + throw new IgniteCheckedException(e); + } + } + + /** + * Reads the next discovery message from the socket input stream. + * + * @param Type of the expected message. + * @return Deserialized message instance. + * @throws IgniteCheckedException If deserialization fails. + */ + T readMessage(CompositeInputStream in) throws IgniteCheckedException, IOException { + byte serMode = (byte)in.read(); + + if (JAVA_SERIALIZATION == serMode) + return U.unmarshal(spi.marshaller(), in, clsLdr); + + try { + if (MESSAGE_SERIALIZATION != serMode) { + detectSslAlert(serMode, in); + + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); + } + + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); + + msgReader.reset(); + msgReader.setBuffer(msgBuf); + + MessageSerializer msgSer = spi.messageFactory().serializer(msg.directType()); + + boolean finished; + + do { + msgBuf.clear(); + + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); + + if (read == -1) + throw new EOFException("Connection closed before message was fully read."); + + msgBuf.limit(read); + + finished = msgSer.readFrom(msg, msgReader); + + // Server Discovery only sends next message to next Server upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. But it is not guaranteed with Client Discovery where messages aren't acknowledged. + // Thus, we have to keep the uprocessed bytes read from the socket. It won't return them again. + if (msgBuf.hasRemaining()) { + byte[] unprocessedReadTail = new byte[msgBuf.remaining()]; + + msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining()); + + in.attachByteArray(unprocessedReadTail); + } + } + while (!finished); + + return (T)msg; + } + catch (Exception e) { + // Keep logic similar to `U.marshal(...)`. + if (e instanceof IgniteCheckedException) + throw (IgniteCheckedException)e; + + throw new IgniteCheckedException(e); + } + } + + /** + * Serializes a discovery message into a byte array. + * + * @param msg Discovery message to serialize. + * @return Serialized byte array containing the message data. + * @throws IgniteCheckedException If serialization fails. + * @throws IOException If serialization fails. + */ + byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { + if (!(msg instanceof Message)) + return U.marshal(spi.marshaller(), msg); + + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + serializeMessage((Message)msg, out); + + return out.toByteArray(); + } + } + + /** + * Serializes a discovery message into given output stream. + * + * @param m Discovery message to serialize. + * @param out Output stream to write serialized message. + * @throws IOException If serialization fails. + */ + void serializeMessage(Message m, OutputStream out) throws IOException { + MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); + + msgWriter.reset(); + msgWriter.setBuffer(msgBuf); + + boolean finished; + + do { + // Should be cleared before first operation. + msgBuf.clear(); + + finished = msgSer.writeTo(m, msgWriter); + + out.write(msgBuf.array(), 0, msgBuf.position()); + } + while (!finished); + } + + /** + * Checks wheter input stream contains SSL alert. + * See handling {@code StreamCorruptedException} in {@link #readMessage()}. + * Keeps logic similar to {@link java.io.ObjectInputStream#readStreamHeader}. + */ + private void detectSslAlert(byte firstByte, InputStream in) throws IOException { + byte[] hdr = new byte[4]; + hdr[0] = firstByte; + int read = in.readNBytes(hdr, 1, 3); + + if (read < 3) + throw new EOFException(); + + String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], hdr[3]); + + if (hex.matches("15....00")) + throw new StreamCorruptedException("invalid stream header: " + hex); + } + + /** + * Input stream implementation that combines a byte array and a regular InputStream allowing to read bytes + * from the array first and then proceed with reading from InputStream. + * Supports only basic read methods. + */ + static class CompositeInputStream extends BufferedInputStream { + /** Prefix data input stream to read before the original input stream. */ + @Nullable private ByteArrayInputStream attachedBytesIs; + + /** @param srcIs Original input stream to read when {@link #attachedBytesIs} is empty. */ + CompositeInputStream(InputStream srcIs) { + super(srcIs); + } + + /** @param prefixData Prefix data to read before the original input stream. */ + private void attachByteArray(byte[] prefixData) { + assert prefixBytesLeft() == 0; + + attachedBytesIs = new ByteArrayInputStream(prefixData); + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + if (prefixBytesLeft() > 0) { + int res = attachedBytesIs.read(); + + checkPrefixBufferExhausted(); + + return res; + } + + return super.read(); + } + + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { + int len0 = readPrefixBuffer(b, off, len); + + assert len0 <= len; + + if (len0 == len) + return len0; + + return len0 + super.read(b, off + len0, len - len0); + } + + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public int readNBytes(byte[] b, int off, int len) throws IOException { + int len0 = readPrefixBuffer(b, off, len); + + return super.readNBytes(b, off + len0, len - len0); + } + + /** {@inheritDoc} */ + @Override public int available() throws IOException { + // Original input stream may return Integer#MAX_VALUE. + if (super.available() > Integer.MAX_VALUE - prefixBytesLeft()) + return super.available(); + + return super.available() + prefixBytesLeft(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (attachedBytesIs != null) { + attachedBytesIs.close(); + + attachedBytesIs = null; + } + + super.close(); + } + + /** */ + private int readPrefixBuffer(byte[] b, int off, int len) { + int res = 0; + + int prefixBytesLeft = prefixBytesLeft(); + + if (prefixBytesLeft > 0) { + if (len > b.length - off) + len = b.length - off; + + res = attachedBytesIs.read(b, off, Math.min(len, prefixBytesLeft)); + + checkPrefixBufferExhausted(); + } + + return res; + } + + /** */ + private int prefixBytesLeft() { + return attachedBytesIs == null ? 0 : attachedBytesIs.available(); + } + + /** */ + private void checkPrefixBufferExhausted() { + if (attachedBytesIs != null && attachedBytesIs.available() == 0) + attachedBytesIs = null; + } + + /** {@inheritDoc} */ + @Override public void mark(int readlimit) { + throw new UnsupportedOperationException("mark() is not supported."); + } + + /** {@inheritDoc} */ + @Override public boolean markSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public void reset() { + throw new UnsupportedOperationException("reset() is not supported."); + } + + /** {@inheritDoc} */ + @Override public long skip(long n) { + throw new UnsupportedOperationException("skip() is not supported."); + } + + /** {@inheritDoc} */ + @Override public long transferTo(OutputStream out) { + throw new UnsupportedOperationException("transferTo() is not supported."); + } + + /** {@inheritDoc} */ + @Override public @NotNull byte[] readAllBytes() { + throw new UnsupportedOperationException("readAllBytes() is not supported."); + } + + /** {@inheritDoc} */ + @Override public @NotNull byte[] readNBytes(int len) { + throw new UnsupportedOperationException("readNBytes() is not supported."); + } + } +} + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index c748fee183243..820dc74eefeab 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1696,7 +1696,8 @@ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[ // Write Ignite header without leading byte. if (msg != null) { - byte mode = msg instanceof Message ? TcpDiscoveryIoSession.MESSAGE_SERIALIZATION : TcpDiscoveryIoSession.JAVA_SERIALIZATION; + byte mode = msg instanceof Message ? TcpDiscoveryIoSessionSerializer.MESSAGE_SERIALIZATION : + TcpDiscoveryIoSessionSerializer.JAVA_SERIALIZATION; out.write(mode); } From 17c4287508fa81b872c89cce583a3321448ce342 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 3 Feb 2026 16:16:40 +0300 Subject: [PATCH 2/4] Fix javadoc --- .../apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index e85e83db07d13..7f7502f903a5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -35,7 +35,7 @@ /** * Handles I/O operations between discovery nodes in the cluster. This class encapsulates the socket connection used * by the {@link TcpDiscoverySpi} to exchange discovery protocol messages between nodes. - * Ьessage serialization is delegated to a {@link TcpDiscoveryIoSessionSerializer}. + * Message serialization is delegated to a {@link TcpDiscoveryIoSessionSerializer}. */ public class TcpDiscoveryIoSession { /** Default size of buffer used for buffering socket in/out. */ From 0382f28bd8398c844bafa3c52b2ac7c638589a72 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 3 Feb 2026 16:17:32 +0300 Subject: [PATCH 3/4] Fix javadoc --- .../spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java index 59c0abcaee8cc..346ce5f847119 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSessionSerializer.java @@ -130,7 +130,7 @@ void writeMessage(TcpDiscoveryAbstractMessage msg, OutputStream out) throws Igni } /** - * Reads the next discovery message from the socket input stream. + * Reads the next discovery message from the specified input stream. * * @param Type of the expected message. * @return Deserialized message instance. From 677c0e2d71b7396dc595e6618c8f2e64ff326bd7 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 3 Feb 2026 16:37:07 +0300 Subject: [PATCH 4/4] WIP --- .../java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java | 2 ++ .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 2 ++ .../org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 151c0857ce433..44aed2521b2b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -305,6 +305,8 @@ class ClientImpl extends TcpDiscoveryImpl { msgWorker = new MessageWorker(log); + serde = new TcpDiscoveryIoSessionSerializer(spi); + new IgniteSpiThread(msgWorker.igniteInstanceName(), msgWorker.name(), log) { @Override protected void body() { msgWorker.run(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 87305a73b343a..3c7b53c52905c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -439,6 +439,8 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorkerThread = new MessageWorkerDiscoveryThread(msgWorker, log); msgWorkerThread.start(); + serde = new TcpDiscoveryIoSessionSerializer(spi); + if (tcpSrvr == null) tcpSrvr = new TcpServer(log); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 07c227bc09eaa..0a3eaafee6976 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -91,7 +91,7 @@ abstract class TcpDiscoveryImpl { protected final TcpDiscoverySpi spi; /** Session serializer. */ - protected final TcpDiscoveryIoSessionSerializer serde; + protected TcpDiscoveryIoSessionSerializer serde; /** */ protected final IgniteLogger log; @@ -161,8 +161,6 @@ protected static Collection upcast(Collection c) { TcpDiscoveryImpl(TcpDiscoverySpi spi) { this.spi = spi; - serde = new TcpDiscoveryIoSessionSerializer(spi); - log = spi.log; if (spi.ignite() instanceof IgniteEx)