Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.compatibility.spi.discovery;

import org.apache.ignite.compatibility.IgniteReleasedVersion;
import org.apache.ignite.compatibility.testframework.junits.IgniteCompatibilityAbstractTest;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.junit.Test;

/** */
public class TcpDiscoveryDifferentClusterVersionsTest extends IgniteCompatibilityAbstractTest {
/** */
private static final String LEGACY_PROTOCOL_MSG = "Remote node uses legacy discovery protocol";

/** */
private static final IgniteReleasedVersion OLD_VERSION = IgniteReleasedVersion.VER_2_17_0;

/** */
private ListeningTestLogger listeningLog;

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids();
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

if (listeningLog != null)
cfg.setGridLogger(listeningLog);

return cfg;
}

/** Tests that connection from client of old version is properly refused. */
@Test
public void testOldClientRejected() throws Exception {
setLoggerDebugLevel();

listeningLog = new ListeningTestLogger(log);

LogListener logListener = LogListener.matches(LEGACY_PROTOCOL_MSG).build();

listeningLog.registerListener(logListener);

startGrid(0);

GridTestUtils.assertThrows(
log,
() -> startGrid("old-client", OLD_VERSION.toString(), cfg -> cfg.setClientMode(true)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check only client node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Initial problem was related to connection between client and server node
  2. Probably it would be a good idea to check server node too, but there is race condition between two nodes to become an initiator. As a result, new node may try to connect to old one and we won't get anything from logs

AssertionError.class,
null
);

assertTrue("Expected log about different protocol.", logListener.check(getTestTimeout()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.compatibility.persistence.MoveBinaryMetadataCompatibility;
import org.apache.ignite.compatibility.persistence.PersistenceBasicCompatibilityTest;
import org.apache.ignite.compatibility.persistence.SnapshotCompatibilityTest;
import org.apache.ignite.compatibility.spi.discovery.TcpDiscoveryDifferentClusterVersionsTest;
import org.apache.ignite.compatibility.testframework.util.MavenUtilsTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
Expand All @@ -52,7 +53,8 @@
JavaThinCompatibilityStoragePathTest.class,
IgnitePKIndexesMigrationToUnwrapPkTest.class,
CompoundIndexCompatibilityTest.class,
SnapshotCompatibilityTest.class
SnapshotCompatibilityTest.class,
TcpDiscoveryDifferentClusterVersionsTest.class
})
public class IgniteCompatibilityBasicTestSuite {
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadFactory;
Expand Down Expand Up @@ -349,8 +350,19 @@ public abstract class IgniteUtils extends CommonUtils {
public static final String JMX_DOMAIN = IgniteUtils.class.getName().substring(0, IgniteUtils.class.getName().
indexOf('.', IgniteUtils.class.getName().indexOf('.') + 1));

/** Network packet header. */
public static final byte[] IGNITE_HEADER = intToBytes(0x00004747);
/**
* Network packet header for discovery protocol V1 (legacy).
* V1 discovery messages use Java serialization (JdkMarshaller).
*/
public static final byte[] IGNITE_HEADER_V1 = intToBytes(0x00004747);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

00GG


/**
* Network packet header for discovery protocol V2.
* Used in discovery handshake to support rolling upgrade compatibility.
* V2 packets include a leading serialization mode byte ({@code serMode}) which defines how the payload is serialized.
* See {@link TcpDiscoveryIoSession#JAVA_SERIALIZATION} and {@link TcpDiscoveryIoSession#MESSAGE_SERIALIZATION}.
*/
public static final byte[] IGNITE_HEADER_V2 = intToBytes(0x0049474E);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0IGN

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need any header? What it used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need a header as a simple magic prefix to quickly reject non-Ignite or wrong-endpoint connections before we try to parse any data. IGNITE_HEADER_v2 makes the new handshake path explicit and helps keep backward compatibility and clearer diagnostics for version mismatches.


/** Default buffer size = 4K. */
private static final int BUF_SIZE = 4096;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

/**
* Verifies that first bytes received in accepted (incoming)
* NIO session are equal to {@link U#IGNITE_HEADER}.
* NIO session are equal to {@link U#IGNITE_HEADER_V1}.
* <p>
* First {@code U.IGNITE_HEADER.length} bytes are consumed by this filter
* First {@code U.IGNITE_HEADER_V1.length} bytes are consumed by this filter
* and all other bytes are forwarded through chain without any modification.
*/
public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
Expand Down Expand Up @@ -99,27 +99,27 @@ public GridConnectionBytesVerifyFilter(IgniteLogger log) {

Integer magic = ses.meta(MAGIC_META_KEY);

if (magic == null || magic < U.IGNITE_HEADER.length) {
if (magic == null || magic < U.IGNITE_HEADER_V1.length) {
byte[] magicBuf = ses.meta(MAGIC_BUF_KEY);

if (magicBuf == null)
magicBuf = new byte[U.IGNITE_HEADER.length];
magicBuf = new byte[U.IGNITE_HEADER_V1.length];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use V2 header here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggestion is to keep IGNITE_HEADER unchanged in the communication SPI and update it only in discovery. As I understand, this is not related to the discovery SPI


int magicRead = magic == null ? 0 : magic;

int cnt = buf.remaining();

buf.get(magicBuf, magicRead, Math.min(U.IGNITE_HEADER.length - magicRead, cnt));
buf.get(magicBuf, magicRead, Math.min(U.IGNITE_HEADER_V1.length - magicRead, cnt));

if (cnt + magicRead < U.IGNITE_HEADER.length) {
if (cnt + magicRead < U.IGNITE_HEADER_V1.length) {
// Magic bytes are not fully read.
ses.addMeta(MAGIC_META_KEY, cnt + magicRead);
ses.addMeta(MAGIC_BUF_KEY, magicBuf);
}
else if (U.bytesEqual(magicBuf, 0, U.IGNITE_HEADER, 0, U.IGNITE_HEADER.length)) {
// Magic bytes read and equal to IGNITE_HEADER.
else if (U.bytesEqual(magicBuf, 0, U.IGNITE_HEADER_V1, 0, U.IGNITE_HEADER_V1.length)) {
// Magic bytes read and equal to IGNITE_HEADER_V1.
ses.removeMeta(MAGIC_BUF_KEY);
ses.addMeta(MAGIC_META_KEY, U.IGNITE_HEADER.length);
ses.addMeta(MAGIC_META_KEY, U.IGNITE_HEADER_V1.length);

proceedMessageReceived(ses, buf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ private abstract static class BlockingTransport {
* @throws IgniteCheckedException If failed.
*/
void sendHandshake(HandshakeMessage msg) throws IgniteCheckedException {
ByteBuffer buf = ByteBuffer.allocate(msg.getMessageSize() + U.IGNITE_HEADER.length)
ByteBuffer buf = ByteBuffer.allocate(msg.getMessageSize() + U.IGNITE_HEADER_V1.length)
.order(ByteOrder.LITTLE_ENDIAN)
.put(U.IGNITE_HEADER);
.put(U.IGNITE_HEADER_V1);

writer.setBuffer(buf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6610,7 +6610,16 @@ private class SocketReader extends IgniteSpiThread {
}
}

if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
if (!Arrays.equals(buf, U.IGNITE_HEADER_V2)) {
if (Arrays.equals(buf, U.IGNITE_HEADER_V1)) {
if (log.isDebugEnabled())
log.debug("Remote node uses legacy discovery protocol (V1) (before the rolling upgrade compatibility). " +
"Local node expects V2. Verify that Ignite versions are compatible. " +
"[rmtAddr=" + rmtAddr + ", locAddr=" + sock.getLocalSocketAddress() + "]");

Copy link
Contributor Author

@chesnokoff chesnokoff Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed to send message to old node with legacy protocol. I tried it but new node fails with
Failed to deserialize object with given class loader: jdk.internal.loader.ClassLoaders$AppClassLoader@311d617d at org/apache/ignite/spi/discovery/tcp/ServerImpl.java:6955

It is Caused by: org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractTraceableMessage; local class incompatible: stream classdesc serialVersionUID = 5764055654892291162, local class serialVersionUID = -2609852714099079963

It looks like deserialization is impossible because we changed structure of our messages after refactoring. So it would be challenging to support all legacy formats. As a quick solution, let's just log message about legacy protocol

return;
}

if (log.isDebugEnabled())
log.debug("Unknown connection detected (is some other software connecting to " +
"this Ignite port?" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeou
}

/**
* Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established.
* Connects to remote address sending {@code U.IGNITE_HEADER_V2} when connection is established.
*
* @param sock Socket bound to a local host address.
* @param remAddr Remote address.
Expand All @@ -1604,7 +1604,7 @@ protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOpe

sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));

writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
writeToSocket(sock, null, U.IGNITE_HEADER_V2, timeoutHelper.nextTimeoutChunk(sockTimeout));

return sock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
public static final int DFLT_ADDR_REQ_ATTEMPTS = 2;

/** Address request message data. */
private static final byte[] MSG_ADDR_REQ_DATA = U.IGNITE_HEADER;
private static final byte[] MSG_ADDR_REQ_DATA = U.IGNITE_HEADER_V2;

/** */
private Marshaller marsh;
Expand Down Expand Up @@ -621,7 +621,12 @@ private T2<Collection<InetSocketAddress>, Boolean> requestAddresses(InetAddress

byte[] data = resPckt.getData();

if (!U.bytesEqual(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length)) {
if (!U.bytesEqual(U.IGNITE_HEADER_V2, 0, data, 0, U.IGNITE_HEADER_V2.length)) {
if (U.bytesEqual(U.IGNITE_HEADER_V1, 0, data, 0, U.IGNITE_HEADER_V1.length)) {
log.warning("Received message with the old header.");
continue;
}

U.error(log, "Failed to verify message header.");

continue;
Expand Down Expand Up @@ -738,22 +743,24 @@ private AddressResponse(Collection<InetSocketAddress> addrs, byte[] data) {
/** */
private static AddressResponse of(Marshaller marsh, Collection<InetSocketAddress> addrs) throws IgniteCheckedException {
byte[] addrsData = U.marshal(marsh, addrs);
byte[] data = new byte[U.IGNITE_HEADER.length + addrsData.length];
byte[] data = new byte[U.IGNITE_HEADER_V2.length + addrsData.length];

if (data.length > MAX_DATA_LENGTH)
throw new IgniteCheckedException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]");

System.arraycopy(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length);
System.arraycopy(U.IGNITE_HEADER_V2, 0, data, 0, U.IGNITE_HEADER_V2.length);
System.arraycopy(addrsData, 0, data, 4, addrsData.length);

return new AddressResponse(addrs, data);
}

/** */
private static AddressResponse of(Marshaller marsh, byte[] data) throws IgniteCheckedException {
assert U.bytesEqual(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length);
assert U.bytesEqual(U.IGNITE_HEADER_V2, 0, data, 0, U.IGNITE_HEADER_V2.length);

Collection<InetSocketAddress> addrs = U.unmarshal(marsh, Arrays.copyOfRange(data, U.IGNITE_HEADER.length, data.length), null);
Collection<InetSocketAddress> addrs = U.unmarshal(marsh,
Arrays.copyOfRange(data, U.IGNITE_HEADER_V2.length, data.length),
null);

return new AddressResponse(addrs, data);
}
Expand Down Expand Up @@ -901,7 +908,12 @@ private MulticastSocket createSocket() throws IOException {

sock.receive(pckt);

if (!U.bytesEqual(U.IGNITE_HEADER, 0, reqData, 0, U.IGNITE_HEADER.length)) {
if (!U.bytesEqual(U.IGNITE_HEADER_V2, 0, reqData, 0, U.IGNITE_HEADER_V2.length)) {
if (U.bytesEqual(U.IGNITE_HEADER_V1, 0, reqData, 0, U.IGNITE_HEADER_V1.length)) {
U.warn(log, "Received message with old header.");
continue;
}

U.error(log, "Failed to verify message header.");

continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void sslSocketTest() throws Exception {

readHandshake(connection);

connection.getOutputStream().write(U.IGNITE_HEADER);
connection.getOutputStream().write(U.IGNITE_HEADER_V2);

clientFut.get(20_000);
}
Expand Down Expand Up @@ -120,7 +120,7 @@ private void readHandshake(Socket connection) throws IOException {
fail("Failed to read from socket.");
}

assertEquals("Handshake did not pass, read bytes: " + read, Arrays.asList(U.IGNITE_HEADER), Arrays.asList(U.IGNITE_HEADER));
assertEquals("Handshake did not pass, read bytes: " + read, Arrays.asList(U.IGNITE_HEADER_V2), Arrays.asList(U.IGNITE_HEADER_V2));
}

/**
Expand All @@ -138,7 +138,7 @@ public void startSslClient() {
long handshakeStartTime = System.currentTimeMillis();

//need to send message in order to ssl handshake passed.
clientSock.getOutputStream().write(U.IGNITE_HEADER);
clientSock.getOutputStream().write(U.IGNITE_HEADER_V2);

readHandshake(clientSock);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void attack(byte[] data) throws IOException {
Socket sock = new Socket(addr, 47500);
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
oos.write(U.IGNITE_HEADER);
oos.write(U.IGNITE_HEADER_V2);
oos.write((byte)1); // Flag for java serialization.
oos.write(data);
}
Expand Down