Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ class ClientImpl extends TcpDiscoveryImpl {

msgWorker = new MessageWorker(log);

serde = new TcpDiscoveryIoSessionSerializer(spi);

new IgniteSpiThread(msgWorker.igniteInstanceName(), msgWorker.name(), log) {
@Override protected void body() {
msgWorker.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand Down Expand Up @@ -438,6 +439,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorkerThread = new MessageWorkerDiscoveryThread(msgWorker, log);
msgWorkerThread.start();

serde = new TcpDiscoveryIoSessionSerializer(spi);

if (tcpSrvr == null)
tcpSrvr = new TcpServer(log);

Expand Down Expand Up @@ -3241,19 +3244,37 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
if (spi.ensured(msg))
msgHist.add(msg);

byte[] msgBytes = null;

for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
if (msgBytes == null) {
try {
msgBytes = serde.serializeMessage(msg);
}
catch (IgniteCheckedException | IOException e) {
U.error(log, "Failed to serialize message to a client: " + msg + ", recepient " +
"client id: " + clientMsgWorker.clientNodeId, e);

break;
}
}

TcpDiscoveryAbstractMessage msg0 = msg;
byte[] msgBytes0 = msgBytes;

if (msg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;

if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
msg = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);

prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null);

prepareNodeAddedMessage(msg, clientMsgWorker.clientNodeId, null);
msgBytes0 = null;
}
}

// TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722
clientMsgWorker.addMessage(msg);
clientMsgWorker.addMessage(msg0, msgBytes0);
}
}
}
Expand Down Expand Up @@ -7582,7 +7603,7 @@ private class StatisticsPrinter extends IgniteSpiThread {
}

/** */
private class ClientMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessage> {
private class ClientMessageWorker extends MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
/** Node ID. */
private final UUID clientNodeId;

Expand Down Expand Up @@ -7649,10 +7670,20 @@ void metrics(ClusterMetrics metrics) {
* @param msg Message.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
addMessage(msg, null);
}

/**
* @param msg Message.
* @param msgBytes Optional message bytes.
*/
void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg, msgBytes);

if (msg.highPriority())
queue.addFirst(msg);
queue.addFirst(t);
else
queue.add(msg);
queue.add(t);

DebugLogger log = messageLogger(msg);

Expand All @@ -7661,14 +7692,14 @@ void addMessage(TcpDiscoveryAbstractMessage msg) {
}

/** {@inheritDoc} */
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
@Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
boolean success = false;

TcpDiscoveryAbstractMessage msg = msgT.get1();

try {
assert msg.verified() : msg;

byte[] msgBytes = ses.serializeMessage(msg);

DebugLogger msgLog = messageLogger(msg);

if (msg instanceof TcpDiscoveryClientAckResponse) {
Expand All @@ -7690,8 +7721,8 @@ else if (msgLog.isDebugEnabled()) {
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
}

spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
spi.clientFailureDetectionTimeout() : spi.getSocketTimeout());
writeToSocket(msgT, spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() :
spi.getSocketTimeout());
}
}
else {
Expand All @@ -7702,7 +7733,7 @@ else if (msgLog.isDebugEnabled()) {

assert topologyInitialized(msg) : msg;

spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false));
writeToSocket(msgT, spi.getEffectiveSocketTimeout(false));
}

boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
Expand Down Expand Up @@ -7731,6 +7762,16 @@ else if (msgLog.isDebugEnabled()) {
}
}

/**
* @param msgT Message tuple.
* @param timeout Timeout.
*/
private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]> msgT, long timeout) throws IgniteCheckedException, IOException {
byte[] msgBytes = msgT.get2() == null ? serde.serializeMessage(msgT.get1()) : msgT.get2();

spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout);
}

/**
* @param msg Message.
* @return {@code True} if topology initialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ abstract class TcpDiscoveryImpl {
/** */
protected final TcpDiscoverySpi spi;

/** Session serializer. */
protected TcpDiscoveryIoSessionSerializer serde;

/** */
protected final IgniteLogger log;

Expand Down Expand Up @@ -484,7 +487,7 @@ protected static List<String> toOrderedList(Collection<InetSocketAddress> addrs)
* @return IO session for writing and reading {@link TcpDiscoveryAbstractMessage}.
*/
TcpDiscoveryIoSession createSession(Socket sock) {
return new TcpDiscoveryIoSession(sock, spi);
return new TcpDiscoveryIoSession(sock, spi, serde);
}

/**
Expand Down
Loading
Loading