Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Serializable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -93,9 +92,13 @@ public interface DiscoveryCustomMessage extends Serializable {
public boolean isMutable();

/**
* See {@link DiscoverySpiCustomMessage#stopProcess()}.
* Called on discovery coordinator node after listener is notified. If returns {@code true}
* then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack
* message, it is sent to all nodes.
*
* @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
* Note: this method is used then and only then the zookeeper discovery is configured.
*
* @return {@code True} if message should not be sent to all nodes.
*/
public default boolean stopProcess() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.managers.discovery;

import org.apache.ignite.internal.codegen.CacheStatisticsModeChangeMessageSerializer;
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
Expand All @@ -28,6 +29,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCustomEventMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
Expand All @@ -41,6 +43,8 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
Expand All @@ -54,6 +58,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
Expand All @@ -67,6 +72,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage;

/** Message factory for discovery messages. */
public class DiscoveryMessageFactory implements MessageFactoryProvider {
Expand All @@ -80,6 +86,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());

// TcpDiscoveryAbstractMessage
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer());
factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer());
Expand All @@ -98,5 +105,11 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer());
factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer());
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
factory.register((short)18, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)19, TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());

// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
Expand Down Expand Up @@ -592,8 +591,11 @@ private void onDiscovery0(DiscoveryNotification notification) {
ClusterNode node = notification.getNode();
long topVer = notification.getTopVer();

DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null
: ((CustomMessageWrapper)notification.getCustomMsgData()).delegate();
DiscoveryCustomMessage customMsg0 = notification.getCustomMsgData() == null ? null :
notification.getCustomMsgData();

DiscoveryCustomMessage customMsg = customMsg0 instanceof SecurityAwareCustomMessageWrapper ?
((SecurityAwareCustomMessageWrapper)customMsg0).delegate() : customMsg0;

if (skipMessage(notification.type(), customMsg))
return;
Expand Down Expand Up @@ -932,7 +934,7 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) {

/** */
@Override public void run() {
DiscoverySpiCustomMessage customMsg = notification.getCustomMsgData();
DiscoveryCustomMessage customMsg = notification.getCustomMsgData();

if (customMsg instanceof SecurityAwareCustomMessageWrapper) {
UUID secSubjId = ((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId();
Expand Down Expand Up @@ -2301,7 +2303,7 @@ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedExce

getSpi().sendCustomEvent(security.enabled()
? new SecurityAwareCustomMessageWrapper(msg, security.securityContext().subject().id())
: new CustomMessageWrapper(msg));
: msg);
}
catch (IgniteClientDisconnectedException e) {
IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,27 @@
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;

/** Extends {@link CustomMessageWrapper} with ID of security subject that initiated the current message. */
public class SecurityAwareCustomMessageWrapper extends CustomMessageWrapper {
/** Custom message wrapper with ID of security subject that initiated the current message. */
public class SecurityAwareCustomMessageWrapper extends DiscoverySpiCustomMessage {
/** */
private static final long serialVersionUID = 0L;

/** Security subject ID. */
private final UUID secSubjId;
private UUID secSubjId;

/** Original message. */
private DiscoveryCustomMessage delegate;

/**
* Default constructor.
*/
public SecurityAwareCustomMessageWrapper() {
// No-op.
}

/** */
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) {
super(delegate);

this.delegate = delegate;
this.secSubjId = secSubjId;
}

Expand All @@ -42,8 +51,25 @@ public UUID securitySubjectId() {
}

/** {@inheritDoc} */
@Override public @Nullable DiscoverySpiCustomMessage ackMessage() {
DiscoveryCustomMessage ack = delegate().ackMessage();
@Override public boolean isMutable() {
return delegate.isMutable();
}

/** {@inheritDoc} */
@Override public boolean stopProcess() {
return delegate.stopProcess();
}

/**
* @return Delegate.
*/
public DiscoveryCustomMessage delegate() {
return delegate;
}

/** {@inheritDoc} */
@Override public @Nullable DiscoveryCustomMessage ackMessage() {
DiscoveryCustomMessage ack = delegate.ackMessage();

return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId);
}
Expand Down
Loading
Loading