Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand Down Expand Up @@ -3241,19 +3242,39 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
if (spi.ensured(msg))
msgHist.add(msg);

if (clientMsgWorkers.isEmpty())
return;

byte[] msgBytes;

TcpDiscoveryIoSerializer ser = ses != null ? ses : new TcpDiscoveryIoSerializer(spi);

try {
msgBytes = ser.serializeMessage(msg);
}
catch (IgniteCheckedException | IOException e) {
U.error(log, "Failed to serialize message: " + msg, e);

return;
}

for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
TcpDiscoveryAbstractMessage msg0 = msg;
byte[] msgBytes0 = msgBytes;

if (msg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;

if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
msg = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);

prepareNodeAddedMessage(msg, clientMsgWorker.clientNodeId, null);
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null);

msgBytes0 = null;
}
}

// TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722
clientMsgWorker.addMessage(msg);
clientMsgWorker.addMessage(msg0, msgBytes0);
}
}
}
Expand Down Expand Up @@ -7582,7 +7603,7 @@ private class StatisticsPrinter extends IgniteSpiThread {
}

/** */
private class ClientMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessage> {
private class ClientMessageWorker extends MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
/** Node ID. */
private final UUID clientNodeId;

Expand Down Expand Up @@ -7649,10 +7670,20 @@ void metrics(ClusterMetrics metrics) {
* @param msg Message.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
addMessage(msg, null);
}

/**
* @param msg Message.
* @param msgBytes Optional message bytes.
*/
void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg, msgBytes);

if (msg.highPriority())
queue.addFirst(msg);
queue.addFirst(t);
else
queue.add(msg);
queue.add(t);

DebugLogger log = messageLogger(msg);

Expand All @@ -7661,14 +7692,14 @@ void addMessage(TcpDiscoveryAbstractMessage msg) {
}

/** {@inheritDoc} */
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
@Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
boolean success = false;

TcpDiscoveryAbstractMessage msg = msgT.get1();

try {
assert msg.verified() : msg;

byte[] msgBytes = ses.serializeMessage(msg);

DebugLogger msgLog = messageLogger(msg);

if (msg instanceof TcpDiscoveryClientAckResponse) {
Expand All @@ -7690,8 +7721,8 @@ else if (msgLog.isDebugEnabled()) {
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
}

spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
spi.clientFailureDetectionTimeout() : spi.getSocketTimeout());
writeToSocket(msgT, spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() :
spi.getSocketTimeout());
}
}
else {
Expand All @@ -7702,7 +7733,7 @@ else if (msgLog.isDebugEnabled()) {

assert topologyInitialized(msg) : msg;

spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false));
writeToSocket(msgT, spi.getEffectiveSocketTimeout(false));
}

boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
Expand Down Expand Up @@ -7731,6 +7762,17 @@ else if (msgLog.isDebugEnabled()) {
}
}

/**
* @param msgT Message tuple.
* @param timeout Timeout.
*/
private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]> msgT, long timeout)
throws IgniteCheckedException, IOException {
byte[] msgBytes = msgT.get2() == null ? ses.serializeMessage(msgT.get1()) : msgT.get2();

spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout);
}

/**
* @param msg Message.
* @return {@code True} if topology initialized.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.tcp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;

/**
* Serializer of messages. Converts discovery messages into bytes.
*/
public class TcpDiscoveryIoSerializer {
/** Size for an intermediate buffer for serializing discovery messages. */
static final int MSG_BUFFER_SIZE = 100;

/** */
final TcpDiscoverySpi spi;

/** Loads discovery messages classes during java deserialization. */
final ClassLoader clsLdr;

/** */
final DirectMessageWriter msgWriter;

/** Intermediate buffer for serializing discovery messages. */
final ByteBuffer msgBuf;

/**
* @param spi Discovery SPI instance.
*/
public TcpDiscoveryIoSerializer(TcpDiscoverySpi spi) {
this.spi = spi;

clsLdr = U.resolveClassLoader(spi.ignite().configuration());

msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);

msgWriter = new DirectMessageWriter(spi.messageFactory());
}

/**
* Serializes a discovery message into a byte array.
*
* @param msg Discovery message to serialize.
* @return Serialized byte array containing the message data.
* @throws IgniteCheckedException If serialization fails.
* @throws IOException If serialization fails.
*/
byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException {
if (!(msg instanceof Message))
return U.marshal(spi.marshaller(), msg);

try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
serializeMessage((Message)msg, out);

return out.toByteArray();
}
}


/**
* Serializes a discovery message into given output stream.
*
* @param m Discovery message to serialize.
* @param out Output stream to write serialized message.
* @throws IOException If serialization fails.
*/
void serializeMessage(Message m, OutputStream out) throws IOException {
MessageSerializer msgSer = spi.messageFactory().serializer(m.directType());

msgWriter.reset();
msgWriter.setBuffer(msgBuf);

boolean finished;

do {
// Should be cleared before first operation.
msgBuf.clear();

finished = msgSer.writeTo(m, msgWriter);

out.write(msgBuf.array(), 0, msgBuf.position());
}
while (!finished);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,18 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocket;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
Expand All @@ -56,32 +53,20 @@
* </ul>
* A leading byte is used to distinguish between the modes. The byte will be removed in future.
*/
public class TcpDiscoveryIoSession {
public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer {
/** Default size of buffer used for buffering socket in/out. */
private static final int DFLT_SOCK_BUFFER_SIZE = 8192;

/** Size for an intermediate buffer for serializing discovery messages. */
private static final int MSG_BUFFER_SIZE = 100;

/** Leading byte for messages use {@link JdkMarshaller} for serialization. */
// TODO: remove these flags after refactoring all discovery messages.
static final byte JAVA_SERIALIZATION = (byte)1;

/** Leading byte for messages use {@link MessageSerializer} for serialization. */
static final byte MESSAGE_SERIALIZATION = (byte)2;

/** */
private final TcpDiscoverySpi spi;

/** Loads discovery messages classes during java deserialization. */
private final ClassLoader clsLdr;

/** */
private final Socket sock;

/** */
private final DirectMessageWriter msgWriter;

/** */
private final DirectMessageReader msgReader;

Expand All @@ -91,9 +76,6 @@ public class TcpDiscoveryIoSession {
/** Buffered socket input stream. */
private final CompositeInputStream in;

/** Intermediate buffer for serializing discovery messages. */
private final ByteBuffer msgBuf;

/**
* Creates a new discovery I/O session bound to the given socket.
*
Expand All @@ -102,14 +84,10 @@ public class TcpDiscoveryIoSession {
* @throws IgniteException If an I/O error occurs while initializing buffers.
*/
TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) {
this.sock = sock;
this.spi = spi;
super(spi);

clsLdr = U.resolveClassLoader(spi.ignite().configuration());

msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
this.sock = sock;

msgWriter = new DirectMessageWriter(spi.messageFactory());
msgReader = new DirectMessageReader(spi.messageFactory(), null);

try {
Expand Down Expand Up @@ -238,56 +216,11 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
}
}

/**
* Serializes a discovery message into a byte array.
*
* @param msg Discovery message to serialize.
* @return Serialized byte array containing the message data.
* @throws IgniteCheckedException If serialization fails.
* @throws IOException If serialization fails.
*/
byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException {
if (!(msg instanceof Message))
return U.marshal(spi.marshaller(), msg);

try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
serializeMessage((Message)msg, out);

return out.toByteArray();
}
}

/** @return Socket. */
public Socket socket() {
return sock;
}

/**
* Serializes a discovery message into given output stream.
*
* @param m Discovery message to serialize.
* @param out Output stream to write serialized message.
* @throws IOException If serialization fails.
*/
private void serializeMessage(Message m, OutputStream out) throws IOException {
MessageSerializer msgSer = spi.messageFactory().serializer(m.directType());

msgWriter.reset();
msgWriter.setBuffer(msgBuf);

boolean finished;

do {
// Should be cleared before first operation.
msgBuf.clear();

finished = msgSer.writeTo(m, msgWriter);

out.write(msgBuf.array(), 0, msgBuf.position());
}
while (!finished);
}

/**
* Checks wheter input stream contains SSL alert.
* See handling {@code StreamCorruptedException} in {@link #readMessage()}.
Expand Down
Loading