diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 06cdf3e25d3c5..90bfb8636be42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryStatusCheckMessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -67,6 +68,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage; /** Message factory for discovery messages. */ public class DiscoveryMessageFactory implements MessageFactoryProvider { @@ -98,5 +100,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer()); factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer()); + factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0a24730a12ceb..08982a5f6aa87 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -5717,7 +5717,7 @@ private void processStatusCheckMessage(final TcpDiscoveryStatusCheckMessage msg) if (F.contains(msg.failedNodes(), msg.creatorNodeId())) { msg0 = createTcpDiscoveryStatusCheckMessage( - msg.creatorNode(), + null, msg.creatorNodeId(), msg.failedNodeId()); @@ -5736,7 +5736,7 @@ private void processStatusCheckMessage(final TcpDiscoveryStatusCheckMessage msg) } try { - trySendMessageDirectly(msg0.creatorNodeAddrs(), msg0.creatorNodeId(), msg0); + trySendMessageDirectly(msg0.creatorNodeAddresses(), msg0.creatorNodeId(), msg0); if (log.isDebugEnabled()) log.debug("Responded to status check message " + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java index 35b854e3c787b..8d04b4f6d90d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java @@ -21,8 +21,11 @@ import java.util.Collection; import java.util.Objects; import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** @@ -33,7 +36,7 @@ * If a failed node id is specified then the message is sent across the ring up to the sender node * to ensure that the failed node is actually failed. */ -public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; @@ -43,18 +46,23 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage /** Status RECONNECT. */ public static final int STATUS_RECON = 2; - /** Creator node. */ - private final TcpDiscoveryNode creatorNode; - /** Creator node addresses. */ - private final Collection creatorNodeAddrs; + @Order(value = 5, method = "creatorNodeAddressesMessages") + @Nullable private Collection creatorNodeAddrsMsgs; /** Failed node id. */ - private final UUID failedNodeId; + @Order(6) + @Nullable private UUID failedNodeId; /** Creator node status (initialized by coordinator). */ + @Order(7) private int status; + /** Empty constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryStatusCheckMessage() { + // No-op. + } + /** * Constructor. * @@ -62,21 +70,32 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage * @param creatorNodeId Creator node ID. * @param failedNodeId Failed node id. */ - public TcpDiscoveryStatusCheckMessage(UUID creatorNodeId, Collection creatorNodeAddrs, UUID failedNodeId) { + public TcpDiscoveryStatusCheckMessage( + UUID creatorNodeId, + @Nullable Collection creatorNodeAddrs, + @Nullable UUID failedNodeId + ) { super(creatorNodeId); - this.creatorNodeAddrs = creatorNodeAddrs; - this.creatorNode = null; + if (creatorNodeAddrs != null) { + creatorNodeAddrsMsgs = creatorNodeAddrs.stream().map(a -> new InetSocketAddressMessage(a.getAddress(), a.getPort())) + .collect(Collectors.toList()); + } + this.failedNodeId = failedNodeId; } /** - * Gets creator node. + * Gets creator node addresses. * - * @return Creator node. + * @return Creator node addresses. */ - public @Nullable TcpDiscoveryNode creatorNode() { - return creatorNode; + public @Nullable Collection creatorNodeAddresses() { + if (creatorNodeAddrsMsgs == null) + return null; + + return creatorNodeAddrsMsgs.stream().map(m -> new InetSocketAddress(m.address(), m.port())) + .collect(Collectors.toList()); } /** @@ -84,8 +103,17 @@ public TcpDiscoveryStatusCheckMessage(UUID creatorNodeId, Collection creatorNodeAddrs() { - return creatorNodeAddrs; + public @Nullable Collection creatorNodeAddressesMessages() { + return creatorNodeAddrsMsgs; + } + + /** + * Sets creator node addresses. + * + * @param creatorNodeAddrsMsgs Creator node addresses. + */ + public void creatorNodeAddressesMessages(@Nullable Collection creatorNodeAddrsMsgs) { + this.creatorNodeAddrsMsgs = creatorNodeAddrsMsgs; } /** @@ -93,10 +121,19 @@ public Collection creatorNodeAddrs() { * * @return Failed node id. */ - public UUID failedNodeId() { + public @Nullable UUID failedNodeId() { return failedNodeId; } + /** + * Sets failed node id. + * + * @param failedNodeId Failed node id. + */ + public void failedNodeId(@Nullable UUID failedNodeId) { + this.failedNodeId = failedNodeId; + } + /** * Gets creator status. * @@ -115,6 +152,11 @@ public void status(int status) { this.status = status; } + /** {@inheritDoc} */ + @Override public short directType() { + return 18; + } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { // NOTE!