diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/spi/discovery/TcpDiscoveryDifferentClusterVersionsTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/spi/discovery/TcpDiscoveryDifferentClusterVersionsTest.java new file mode 100644 index 0000000000000..77e091d458c61 --- /dev/null +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/spi/discovery/TcpDiscoveryDifferentClusterVersionsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compatibility.spi.discovery; + +import org.apache.ignite.compatibility.IgniteReleasedVersion; +import org.apache.ignite.compatibility.testframework.junits.IgniteCompatibilityAbstractTest; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.junit.Test; + +/** */ +public class TcpDiscoveryDifferentClusterVersionsTest extends IgniteCompatibilityAbstractTest { + /** */ + private static final String LEGACY_PROTOCOL_MSG = "Remote node uses legacy discovery protocol"; + + /** */ + private static final IgniteReleasedVersion OLD_VERSION = IgniteReleasedVersion.VER_2_17_0; + + /** */ + private ListeningTestLogger listeningLog; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (listeningLog != null) + cfg.setGridLogger(listeningLog); + + return cfg; + } + + /** Tests that connection from client of old version is properly refused. */ + @Test + public void testOldNodeRejected() throws Exception { + setLoggerDebugLevel(); + + listeningLog = new ListeningTestLogger(log); + + LogListener logListener = LogListener.matches(LEGACY_PROTOCOL_MSG).build(); + + listeningLog.registerListener(logListener); + + startGrid(0); + + GridTestUtils.assertThrows( + log, + () -> startGrid("old-node", OLD_VERSION.toString(), new ConfigurationClosure()), + AssertionError.class, + null + ); + + assertTrue("Expected log about different protocol.", logListener.check(getTestTimeout())); + } + + /** Setup node closure. */ + private static class ConfigurationClosure implements IgniteInClosure { + /** {@inheritDoc} */ + @Override public void apply(IgniteConfiguration cfg) { + cfg.setLocalHost("127.0.0.1"); + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setIpFinder(LOCAL_IP_FINDER); + + cfg.setDiscoverySpi(disco); + } + } +} diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java index d615559ef065a..f244737024c92 100644 --- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java @@ -30,6 +30,7 @@ import org.apache.ignite.compatibility.persistence.MoveBinaryMetadataCompatibility; import org.apache.ignite.compatibility.persistence.PersistenceBasicCompatibilityTest; import org.apache.ignite.compatibility.persistence.SnapshotCompatibilityTest; +import org.apache.ignite.compatibility.spi.discovery.TcpDiscoveryDifferentClusterVersionsTest; import org.apache.ignite.compatibility.testframework.util.MavenUtilsTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -52,7 +53,8 @@ JavaThinCompatibilityStoragePathTest.class, IgnitePKIndexesMigrationToUnwrapPkTest.class, CompoundIndexCompatibilityTest.class, - SnapshotCompatibilityTest.class + SnapshotCompatibilityTest.class, + TcpDiscoveryDifferentClusterVersionsTest.class }) public class IgniteCompatibilityBasicTestSuite { } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 65408e6ec0be9..d0b7923b33f5a 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -228,6 +228,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThreadFactory; @@ -349,8 +350,19 @@ public abstract class IgniteUtils extends CommonUtils { public static final String JMX_DOMAIN = IgniteUtils.class.getName().substring(0, IgniteUtils.class.getName(). indexOf('.', IgniteUtils.class.getName().indexOf('.') + 1)); - /** Network packet header. */ - public static final byte[] IGNITE_HEADER = intToBytes(0x00004747); + /** + * Network packet header for discovery protocol V1 (legacy). + * V1 discovery messages use Java serialization (JdkMarshaller). + */ + public static final byte[] IGNITE_HEADER_V1 = intToBytes(0x00004747); + + /** + * Network packet header for discovery protocol V2. + * Used in discovery handshake to support rolling upgrade compatibility. + * V2 packets include a leading serialization mode byte ({@code serMode}) which defines how the payload is serialized. + * See {@link TcpDiscoveryIoSession#JAVA_SERIALIZATION} and {@link TcpDiscoveryIoSession#MESSAGE_SERIALIZATION}. + */ + public static final byte[] IGNITE_HEADER_V2 = intToBytes(0x0049474E); /** Default buffer size = 4K. */ private static final int BUF_SIZE = 4096; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java index 3829aaa6aa0e5..1bd4c3e0f7c4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java @@ -28,9 +28,9 @@ /** * Verifies that first bytes received in accepted (incoming) - * NIO session are equal to {@link U#IGNITE_HEADER}. + * NIO session are equal to {@link U#IGNITE_HEADER_V1}. *

- * First {@code U.IGNITE_HEADER.length} bytes are consumed by this filter + * First {@code U.IGNITE_HEADER_V1.length} bytes are consumed by this filter * and all other bytes are forwarded through chain without any modification. */ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter { @@ -99,27 +99,27 @@ public GridConnectionBytesVerifyFilter(IgniteLogger log) { Integer magic = ses.meta(MAGIC_META_KEY); - if (magic == null || magic < U.IGNITE_HEADER.length) { + if (magic == null || magic < U.IGNITE_HEADER_V1.length) { byte[] magicBuf = ses.meta(MAGIC_BUF_KEY); if (magicBuf == null) - magicBuf = new byte[U.IGNITE_HEADER.length]; + magicBuf = new byte[U.IGNITE_HEADER_V1.length]; int magicRead = magic == null ? 0 : magic; int cnt = buf.remaining(); - buf.get(magicBuf, magicRead, Math.min(U.IGNITE_HEADER.length - magicRead, cnt)); + buf.get(magicBuf, magicRead, Math.min(U.IGNITE_HEADER_V1.length - magicRead, cnt)); - if (cnt + magicRead < U.IGNITE_HEADER.length) { + if (cnt + magicRead < U.IGNITE_HEADER_V1.length) { // Magic bytes are not fully read. ses.addMeta(MAGIC_META_KEY, cnt + magicRead); ses.addMeta(MAGIC_BUF_KEY, magicBuf); } - else if (U.bytesEqual(magicBuf, 0, U.IGNITE_HEADER, 0, U.IGNITE_HEADER.length)) { - // Magic bytes read and equal to IGNITE_HEADER. + else if (U.bytesEqual(magicBuf, 0, U.IGNITE_HEADER_V1, 0, U.IGNITE_HEADER_V1.length)) { + // Magic bytes read and equal to IGNITE_HEADER_V1. ses.removeMeta(MAGIC_BUF_KEY); - ses.addMeta(MAGIC_META_KEY, U.IGNITE_HEADER.length); + ses.addMeta(MAGIC_META_KEY, U.IGNITE_HEADER_V1.length); proceedMessageReceived(ses, buf); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java index fd86c7f455fb1..f5265de4b286f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java @@ -186,9 +186,9 @@ private abstract static class BlockingTransport { * @throws IgniteCheckedException If failed. */ void sendHandshake(HandshakeMessage msg) throws IgniteCheckedException { - ByteBuffer buf = ByteBuffer.allocate(msg.getMessageSize() + U.IGNITE_HEADER.length) + ByteBuffer buf = ByteBuffer.allocate(msg.getMessageSize() + U.IGNITE_HEADER_V1.length) .order(ByteOrder.LITTLE_ENDIAN) - .put(U.IGNITE_HEADER); + .put(U.IGNITE_HEADER_V1); writer.setBuffer(buf); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0a24730a12ceb..0cc7825e76ec8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -6610,7 +6610,16 @@ private class SocketReader extends IgniteSpiThread { } } - if (!Arrays.equals(buf, U.IGNITE_HEADER)) { + if (!Arrays.equals(buf, U.IGNITE_HEADER_V2)) { + if (Arrays.equals(buf, U.IGNITE_HEADER_V1)) { + if (log.isDebugEnabled()) + log.debug("Remote node uses legacy discovery protocol (V1) (before the rolling upgrade compatibility). " + + "Local node expects V2. Verify that Ignite versions are compatible. " + + "[rmtAddr=" + rmtAddr + ", locAddr=" + sock.getLocalSocketAddress() + "]"); + + return; + } + if (log.isDebugEnabled()) log.debug("Unknown connection detected (is some other software connecting to " + "this Ignite port?" + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index c748fee183243..0e71d371a8dff 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1580,7 +1580,7 @@ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeou } /** - * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established. + * Connects to remote address sending {@code U.IGNITE_HEADER_V2} when connection is established. * * @param sock Socket bound to a local host address. * @param remAddr Remote address. @@ -1604,7 +1604,7 @@ protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOpe sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); + writeToSocket(sock, null, U.IGNITE_HEADER_V2, timeoutHelper.nextTimeoutChunk(sockTimeout)); return sock; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index a2c913160b21b..6feff8613cb21 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -93,7 +93,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { public static final int DFLT_ADDR_REQ_ATTEMPTS = 2; /** Address request message data. */ - private static final byte[] MSG_ADDR_REQ_DATA = U.IGNITE_HEADER; + private static final byte[] MSG_ADDR_REQ_DATA = U.IGNITE_HEADER_V2; /** */ private Marshaller marsh; @@ -621,7 +621,12 @@ private T2, Boolean> requestAddresses(InetAddress byte[] data = resPckt.getData(); - if (!U.bytesEqual(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length)) { + if (!U.bytesEqual(U.IGNITE_HEADER_V2, 0, data, 0, U.IGNITE_HEADER_V2.length)) { + if (U.bytesEqual(U.IGNITE_HEADER_V1, 0, data, 0, U.IGNITE_HEADER_V1.length)) { + log.warning("Received message with the old header."); + continue; + } + U.error(log, "Failed to verify message header."); continue; @@ -738,12 +743,12 @@ private AddressResponse(Collection addrs, byte[] data) { /** */ private static AddressResponse of(Marshaller marsh, Collection addrs) throws IgniteCheckedException { byte[] addrsData = U.marshal(marsh, addrs); - byte[] data = new byte[U.IGNITE_HEADER.length + addrsData.length]; + byte[] data = new byte[U.IGNITE_HEADER_V2.length + addrsData.length]; if (data.length > MAX_DATA_LENGTH) throw new IgniteCheckedException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]"); - System.arraycopy(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length); + System.arraycopy(U.IGNITE_HEADER_V2, 0, data, 0, U.IGNITE_HEADER_V2.length); System.arraycopy(addrsData, 0, data, 4, addrsData.length); return new AddressResponse(addrs, data); @@ -751,9 +756,11 @@ private static AddressResponse of(Marshaller marsh, Collection addrs = U.unmarshal(marsh, Arrays.copyOfRange(data, U.IGNITE_HEADER.length, data.length), null); + Collection addrs = U.unmarshal(marsh, + Arrays.copyOfRange(data, U.IGNITE_HEADER_V2.length, data.length), + null); return new AddressResponse(addrs, data); } @@ -901,7 +908,12 @@ private MulticastSocket createSocket() throws IOException { sock.receive(pckt); - if (!U.bytesEqual(U.IGNITE_HEADER, 0, reqData, 0, U.IGNITE_HEADER.length)) { + if (!U.bytesEqual(U.IGNITE_HEADER_V2, 0, reqData, 0, U.IGNITE_HEADER_V2.length)) { + if (U.bytesEqual(U.IGNITE_HEADER_V1, 0, reqData, 0, U.IGNITE_HEADER_V1.length)) { + U.warn(log, "Received message with old header."); + continue; + } + U.error(log, "Failed to verify message header."); continue; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java index 0a4907822588e..e9a8490ff1888 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java @@ -82,7 +82,7 @@ public void sslSocketTest() throws Exception { readHandshake(connection); - connection.getOutputStream().write(U.IGNITE_HEADER); + connection.getOutputStream().write(U.IGNITE_HEADER_V2); clientFut.get(20_000); } @@ -120,7 +120,7 @@ private void readHandshake(Socket connection) throws IOException { fail("Failed to read from socket."); } - assertEquals("Handshake did not pass, read bytes: " + read, Arrays.asList(U.IGNITE_HEADER), Arrays.asList(U.IGNITE_HEADER)); + assertEquals("Handshake did not pass, read bytes: " + read, Arrays.asList(U.IGNITE_HEADER_V2), Arrays.asList(U.IGNITE_HEADER_V2)); } /** @@ -138,7 +138,7 @@ public void startSslClient() { long handshakeStartTime = System.currentTimeMillis(); //need to send message in order to ssl handshake passed. - clientSock.getOutputStream().write(U.IGNITE_HEADER); + clientSock.getOutputStream().write(U.IGNITE_HEADER_V2); readHandshake(clientSock); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java index b8a82d52cc90b..fffc6fab93c2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java @@ -174,7 +174,7 @@ private void attack(byte[] data) throws IOException { Socket sock = new Socket(addr, 47500); OutputStream oos = new BufferedOutputStream(sock.getOutputStream()) ) { - oos.write(U.IGNITE_HEADER); + oos.write(U.IGNITE_HEADER_V2); oos.write((byte)1); // Flag for java serialization. oos.write(data); }