From 73d994287d9aecc69beae889d974e86c45c55a73 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 3 Feb 2026 16:48:25 +0300 Subject: [PATCH 1/3] Revert "IGNITE-27652 Refactor RingMessageWorker#sendMessageToClients (#12663)" This reverts commit 1f175f97a8cc77a88c61aba45f729403c9f4e2c4. --- .../ignite/spi/discovery/tcp/ServerImpl.java | 63 ++++++++++++++++--- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0a24730a12ceb..0d0323bbc0999 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -3241,19 +3242,46 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (spi.ensured(msg)) msgHist.add(msg); + byte[] msgBytes = null; + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { + if (msgBytes == null) { + try { + msgBytes = clientMsgWorker.ses.serializeMessage(msg); + } + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message to a client: " + msg + ", recepient " + + "client id: " + clientMsgWorker.clientNodeId, e); + + break; + } + } + + TcpDiscoveryAbstractMessage msg0 = msg; + byte[] msgBytes0 = msgBytes; + if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) { - msg = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); + TcpDiscoveryNode node = nodeAddedMsg.node(); + + if (clientMsgWorker.clientNodeId.equals(node.id())) { + try { + // TODO: https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization. + msg0 = U.unmarshal(spi.marshaller(), msgBytes, + U.resolveClassLoader(spi.ignite().configuration())); + + prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); - prepareNodeAddedMessage(msg, clientMsgWorker.clientNodeId, null); + msgBytes0 = null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to create message copy: " + msg, e); + } } } - // TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722 - clientMsgWorker.addMessage(msg); + clientMsgWorker.addMessage(msg0, msgBytes0); } } } @@ -7582,7 +7610,7 @@ private class StatisticsPrinter extends IgniteSpiThread { } /** */ - private class ClientMessageWorker extends MessageWorker { + private class ClientMessageWorker extends MessageWorker> { /** Node ID. */ private final UUID clientNodeId; @@ -7649,10 +7677,20 @@ void metrics(ClusterMetrics metrics) { * @param msg Message. */ void addMessage(TcpDiscoveryAbstractMessage msg) { + addMessage(msg, null); + } + + /** + * @param msg Message. + * @param msgBytes Optional message bytes. + */ + void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { + T2 t = new T2<>(msg, msgBytes); + if (msg.highPriority()) - queue.addFirst(msg); + queue.addFirst(t); else - queue.add(msg); + queue.add(t); DebugLogger log = messageLogger(msg); @@ -7661,13 +7699,18 @@ void addMessage(TcpDiscoveryAbstractMessage msg) { } /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + @Override protected void processMessage(T2 msgT) { boolean success = false; + TcpDiscoveryAbstractMessage msg = msgT.get1(); + try { assert msg.verified() : msg; - byte[] msgBytes = ses.serializeMessage(msg); + byte[] msgBytes = msgT.get2(); + + if (msgBytes == null) + msgBytes = ses.serializeMessage(msg); DebugLogger msgLog = messageLogger(msg); From 1b81771d55cd0973a5a1bec63a16fd95f806581c Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 3 Feb 2026 18:43:26 +0300 Subject: [PATCH 2/3] WIP --- .../ignite/spi/discovery/tcp/ServerImpl.java | 39 ++++--- .../tcp/TcpDiscoveryIoSerializer.java | 108 ++++++++++++++++++ .../discovery/tcp/TcpDiscoveryIoSession.java | 73 +----------- 3 files changed, 131 insertions(+), 89 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0d0323bbc0999..865b8754d3294 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3244,10 +3244,12 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { byte[] msgBytes = null; + TcpDiscoveryIoSerializer ser = ses != null ? ses : new TcpDiscoveryIoSerializer(spi); + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { if (msgBytes == null) { try { - msgBytes = clientMsgWorker.ses.serializeMessage(msg); + msgBytes = ser.serializeMessage(msg); } catch (IgniteCheckedException | IOException e) { U.error(log, "Failed to serialize message to a client: " + msg + ", recepient " + @@ -3266,18 +3268,11 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { TcpDiscoveryNode node = nodeAddedMsg.node(); if (clientMsgWorker.clientNodeId.equals(node.id())) { - try { - // TODO: https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization. - msg0 = U.unmarshal(spi.marshaller(), msgBytes, - U.resolveClassLoader(spi.ignite().configuration())); + msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); - prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); + prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); - msgBytes0 = null; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to create message copy: " + msg, e); - } + msgBytes0 = null; } } @@ -7707,11 +7702,6 @@ void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { try { assert msg.verified() : msg; - byte[] msgBytes = msgT.get2(); - - if (msgBytes == null) - msgBytes = ses.serializeMessage(msg); - DebugLogger msgLog = messageLogger(msg); if (msg instanceof TcpDiscoveryClientAckResponse) { @@ -7733,8 +7723,8 @@ else if (msgLog.isDebugEnabled()) { + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); } - spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? - spi.clientFailureDetectionTimeout() : spi.getSocketTimeout()); + writeToSocket(msgT, spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() : + spi.getSocketTimeout()); } } else { @@ -7745,7 +7735,7 @@ else if (msgLog.isDebugEnabled()) { assert topologyInitialized(msg) : msg; - spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false)); + writeToSocket(msgT, spi.getEffectiveSocketTimeout(false)); } boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage && @@ -7774,6 +7764,17 @@ else if (msgLog.isDebugEnabled()) { } } + /** + * @param msgT Message tuple. + * @param timeout Timeout. + */ + private void writeToSocket(T2 msgT, long timeout) + throws IgniteCheckedException, IOException { + byte[] msgBytes = msgT.get2() == null ? ses.serializeMessage(msgT.get1()) : msgT.get2(); + + spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout); + } + /** * @param msg Message. * @return {@code True} if topology initialized. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java new file mode 100644 index 0000000000000..d8cc0564f0f74 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.direct.DirectMessageWriter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; + +/** + * Serializer of messages. Converts discovery messages into bytes. + */ +public class TcpDiscoveryIoSerializer { + /** Size for an intermediate buffer for serializing discovery messages. */ + static final int MSG_BUFFER_SIZE = 100; + + /** */ + final TcpDiscoverySpi spi; + + /** Loads discovery messages classes during java deserialization. */ + final ClassLoader clsLdr; + + /** */ + final DirectMessageWriter msgWriter; + + /** Intermediate buffer for serializing discovery messages. */ + final ByteBuffer msgBuf; + + /** + * @param spi Discovery SPI instance. + */ + public TcpDiscoveryIoSerializer(TcpDiscoverySpi spi) { + this.spi = spi; + + clsLdr = U.resolveClassLoader(spi.ignite().configuration()); + + msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + + msgWriter = new DirectMessageWriter(spi.messageFactory()); + } + + /** + * Serializes a discovery message into a byte array. + * + * @param msg Discovery message to serialize. + * @return Serialized byte array containing the message data. + * @throws IgniteCheckedException If serialization fails. + * @throws IOException If serialization fails. + */ + byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { + if (!(msg instanceof Message)) + return U.marshal(spi.marshaller(), msg); + + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + serializeMessage((Message)msg, out); + + return out.toByteArray(); + } + } + + + /** + * Serializes a discovery message into given output stream. + * + * @param m Discovery message to serialize. + * @param out Output stream to write serialized message. + * @throws IOException If serialization fails. + */ + void serializeMessage(Message m, OutputStream out) throws IOException { + MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); + + msgWriter.reset(); + msgWriter.setBuffer(msgBuf); + + boolean finished; + + do { + // Should be cleared before first operation. + msgBuf.clear(); + + finished = msgSer.writeTo(m, msgWriter); + + out.write(msgBuf.array(), 0, msgBuf.position()); + } + while (!finished); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index fa8d71e40f171..131c228c0af57 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -20,21 +20,18 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.StreamCorruptedException; import java.net.Socket; -import java.nio.ByteBuffer; import java.security.cert.Certificate; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.direct.DirectMessageReader; -import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.extensions.communication.Message; @@ -56,13 +53,10 @@ * * A leading byte is used to distinguish between the modes. The byte will be removed in future. */ -public class TcpDiscoveryIoSession { +public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer { /** Default size of buffer used for buffering socket in/out. */ private static final int DFLT_SOCK_BUFFER_SIZE = 8192; - /** Size for an intermediate buffer for serializing discovery messages. */ - private static final int MSG_BUFFER_SIZE = 100; - /** Leading byte for messages use {@link JdkMarshaller} for serialization. */ // TODO: remove these flags after refactoring all discovery messages. static final byte JAVA_SERIALIZATION = (byte)1; @@ -70,18 +64,9 @@ public class TcpDiscoveryIoSession { /** Leading byte for messages use {@link MessageSerializer} for serialization. */ static final byte MESSAGE_SERIALIZATION = (byte)2; - /** */ - private final TcpDiscoverySpi spi; - - /** Loads discovery messages classes during java deserialization. */ - private final ClassLoader clsLdr; - /** */ private final Socket sock; - /** */ - private final DirectMessageWriter msgWriter; - /** */ private final DirectMessageReader msgReader; @@ -91,9 +76,6 @@ public class TcpDiscoveryIoSession { /** Buffered socket input stream. */ private final CompositeInputStream in; - /** Intermediate buffer for serializing discovery messages. */ - private final ByteBuffer msgBuf; - /** * Creates a new discovery I/O session bound to the given socket. * @@ -102,14 +84,10 @@ public class TcpDiscoveryIoSession { * @throws IgniteException If an I/O error occurs while initializing buffers. */ TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) { - this.sock = sock; - this.spi = spi; + super(spi); - clsLdr = U.resolveClassLoader(spi.ignite().configuration()); - - msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + this.sock = sock; - msgWriter = new DirectMessageWriter(spi.messageFactory()); msgReader = new DirectMessageReader(spi.messageFactory(), null); try { @@ -238,56 +216,11 @@ T readMessage() throws IgniteCheckedException, IOException { } } - /** - * Serializes a discovery message into a byte array. - * - * @param msg Discovery message to serialize. - * @return Serialized byte array containing the message data. - * @throws IgniteCheckedException If serialization fails. - * @throws IOException If serialization fails. - */ - byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { - if (!(msg instanceof Message)) - return U.marshal(spi.marshaller(), msg); - - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - serializeMessage((Message)msg, out); - - return out.toByteArray(); - } - } - /** @return Socket. */ public Socket socket() { return sock; } - /** - * Serializes a discovery message into given output stream. - * - * @param m Discovery message to serialize. - * @param out Output stream to write serialized message. - * @throws IOException If serialization fails. - */ - private void serializeMessage(Message m, OutputStream out) throws IOException { - MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); - - msgWriter.reset(); - msgWriter.setBuffer(msgBuf); - - boolean finished; - - do { - // Should be cleared before first operation. - msgBuf.clear(); - - finished = msgSer.writeTo(m, msgWriter); - - out.write(msgBuf.array(), 0, msgBuf.position()); - } - while (!finished); - } - /** * Checks wheter input stream contains SSL alert. * See handling {@code StreamCorruptedException} in {@link #readMessage()}. From ead042fabd4b115cba5bc7eb5cc76b0c264833e2 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 3 Feb 2026 22:01:03 +0300 Subject: [PATCH 3/3] WIP --- .../ignite/spi/discovery/tcp/ServerImpl.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 865b8754d3294..cf5182fca332c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3242,32 +3242,30 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (spi.ensured(msg)) msgHist.add(msg); - byte[] msgBytes = null; + if (clientMsgWorkers.isEmpty()) + return; + + byte[] msgBytes; TcpDiscoveryIoSerializer ser = ses != null ? ses : new TcpDiscoveryIoSerializer(spi); - for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { - if (msgBytes == null) { - try { - msgBytes = ser.serializeMessage(msg); - } - catch (IgniteCheckedException | IOException e) { - U.error(log, "Failed to serialize message to a client: " + msg + ", recepient " + - "client id: " + clientMsgWorker.clientNodeId, e); + try { + msgBytes = ser.serializeMessage(msg); + } + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message: " + msg, e); - break; - } - } + return; + } + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { TcpDiscoveryAbstractMessage msg0 = msg; byte[] msgBytes0 = msgBytes; if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - TcpDiscoveryNode node = nodeAddedMsg.node(); - - if (clientMsgWorker.clientNodeId.equals(node.id())) { + if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) { msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null);