Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryStatusCheckMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
Expand All @@ -67,6 +68,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;

/** Message factory for discovery messages. */
public class DiscoveryMessageFactory implements MessageFactoryProvider {
Expand Down Expand Up @@ -98,5 +100,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer());
factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer());
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5717,7 +5717,7 @@ private void processStatusCheckMessage(final TcpDiscoveryStatusCheckMessage msg)

if (F.contains(msg.failedNodes(), msg.creatorNodeId())) {
msg0 = createTcpDiscoveryStatusCheckMessage(
msg.creatorNode(),
null,
msg.creatorNodeId(),
msg.failedNodeId());

Expand All @@ -5736,7 +5736,7 @@ private void processStatusCheckMessage(final TcpDiscoveryStatusCheckMessage msg)
}

try {
trySendMessageDirectly(msg0.creatorNodeAddrs(), msg0.creatorNodeId(), msg0);
trySendMessageDirectly(msg0.creatorNodeAddresses(), msg0.creatorNodeId(), msg0);

if (log.isDebugEnabled())
log.debug("Responded to status check message " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.Collection;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -33,7 +36,7 @@
* If a failed node id is specified then the message is sent across the ring up to the sender node
* to ensure that the failed node is actually failed.
*/
public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage {
public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage implements Message {
/** */
private static final long serialVersionUID = 0L;

Expand All @@ -43,60 +46,94 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
/** Status RECONNECT. */
public static final int STATUS_RECON = 2;

/** Creator node. */
private final TcpDiscoveryNode creatorNode;

/** Creator node addresses. */
private final Collection<InetSocketAddress> creatorNodeAddrs;
@Order(value = 5, method = "creatorNodeAddressesMessages")
@Nullable private Collection<InetSocketAddressMessage> creatorNodeAddrsMsgs;

/** Failed node id. */
private final UUID failedNodeId;
@Order(6)
@Nullable private UUID failedNodeId;

/** Creator node status (initialized by coordinator). */
@Order(7)
private int status;

/** Empty constructor for {@link DiscoveryMessageFactory}. */
public TcpDiscoveryStatusCheckMessage() {
// No-op.
}

/**
* Constructor.
*
* @param creatorNodeAddrs Addresses of creator node, used to be able not to serialize node in message.
* @param creatorNodeId Creator node ID.
* @param failedNodeId Failed node id.
*/
public TcpDiscoveryStatusCheckMessage(UUID creatorNodeId, Collection<InetSocketAddress> creatorNodeAddrs, UUID failedNodeId) {
public TcpDiscoveryStatusCheckMessage(
UUID creatorNodeId,
@Nullable Collection<InetSocketAddress> creatorNodeAddrs,
@Nullable UUID failedNodeId
) {
super(creatorNodeId);

this.creatorNodeAddrs = creatorNodeAddrs;
this.creatorNode = null;
if (creatorNodeAddrs != null) {
creatorNodeAddrsMsgs = creatorNodeAddrs.stream().map(a -> new InetSocketAddressMessage(a.getAddress(), a.getPort()))
.collect(Collectors.toList());
}

this.failedNodeId = failedNodeId;
}

/**
* Gets creator node.
* Gets creator node addresses.
*
* @return Creator node.
* @return Creator node addresses.
*/
public @Nullable TcpDiscoveryNode creatorNode() {
return creatorNode;
public @Nullable Collection<InetSocketAddress> creatorNodeAddresses() {
if (creatorNodeAddrsMsgs == null)
return null;

return creatorNodeAddrsMsgs.stream().map(m -> new InetSocketAddress(m.address(), m.port()))
.collect(Collectors.toList());
}

/**
* Gets creator node addresses.
*
* @return Creator node addresses.
*/
public Collection<InetSocketAddress> creatorNodeAddrs() {
return creatorNodeAddrs;
public @Nullable Collection<InetSocketAddressMessage> creatorNodeAddressesMessages() {
return creatorNodeAddrsMsgs;
}

/**
* Sets creator node addresses.
*
* @param creatorNodeAddrsMsgs Creator node addresses.
*/
public void creatorNodeAddressesMessages(@Nullable Collection<InetSocketAddressMessage> creatorNodeAddrsMsgs) {
this.creatorNodeAddrsMsgs = creatorNodeAddrsMsgs;
}

/**
* Gets failed node id.
*
* @return Failed node id.
*/
public UUID failedNodeId() {
public @Nullable UUID failedNodeId() {
return failedNodeId;
}

/**
* Sets failed node id.
*
* @param failedNodeId Failed node id.
*/
public void failedNodeId(@Nullable UUID failedNodeId) {
this.failedNodeId = failedNodeId;
}

/**
* Gets creator status.
*
Expand All @@ -115,6 +152,11 @@ public void status(int status) {
this.status = status;
}

/** {@inheritDoc} */
@Override public short directType() {
return 18;
}

/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
// NOTE!
Expand Down
Loading