From 42095c73b02b5abd248db49e51951008f4a6f044 Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Wed, 22 Oct 2025 06:23:24 -0700 Subject: [PATCH 1/3] Added a test for bulk reader multi-rack replica placement --- .../spark/data/partitioner/MurmurHash.java | 85 +++++++ .../distributed/impl/CassandraCluster.java | 14 +- .../testing/ClusterBuilderConfiguration.java | 17 ++ .../BulkReaderTokenRangeReplicasTest.java | 225 ++++++++++++++++++ 4 files changed, 337 insertions(+), 4 deletions(-) create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java index ae3722eb9..89eeb1ae3 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java @@ -156,4 +156,89 @@ public static long[] hash(ByteBuffer key, int offset, int length, long seed) return new long[]{h1, h2}; } + + protected static long invRShiftXor(long value, int shift) + { + long output = 0; + long i = 0; + while (i * shift < 64) + { + long c = (0xffffffffffffffffL << (64 - shift)) >>> (shift * i); + long partOutput = value & c; + value ^= partOutput >>> shift; + output |= partOutput; + i += 1; + } + return output; + } + + protected static long invRotl64(long v, int n) + { + return ((v >>> n) | (v << (64 - n))); + } + + protected static long invFmix(long k) + { + k = invRShiftXor(k, 33); + k *= 0x9cb4b2f8129337dbL; + k = invRShiftXor(k, 33); + k *= 0x4f74430c22a54005L; + k = invRShiftXor(k, 33); + return k; + } + + public static long[] inv_hash3_x64_128(long[] result) + { + long c1 = 0xa98409e882ce4d7dL; + long c2 = 0xa81e14edd9de2c7fL; + + long k1 = 0; + long k2 = 0; + long h1 = result[0]; + long h2 = result[1]; + + //---------- + // reverse finalization + h2 -= h1; + h1 -= h2; + + h1 = invFmix(h1); + h2 = invFmix(h2); + + h2 -= h1; + h1 -= h2; + + h1 ^= 16; + h2 ^= 16; + + //---------- + // reverse body + h2 -= 0x38495ab5; + h2 *= 0xcccccccccccccccdL; + h2 -= h1; + h2 = invRotl64(h2, 31); + k2 = h2; + h2 = 0; + + k2 *= c1; + k2 = invRotl64(k2, 33); + k2 *= c2; + + h1 -= 0x52dce729; + h1 *= 0xcccccccccccccccdL; + //h1 -= h2; + h1 = invRotl64(h1, 27); + + k1 = h1; + + k1 *= c2; + k1 = invRotl64(k1, 31); + k1 *= c1; + + // note that while this works for body block reversing the tail reverse requires `invTailReverse` + k1 = Long.reverseBytes(k1); + k2 = Long.reverseBytes(k2); + + return new long[] {k1, k2}; + } } diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java index ab8524dc4..713bafdf8 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java @@ -132,10 +132,16 @@ public AbstractCluster initializeCluster(String versionString, if (dcCount > 1) { - clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, - (nodeId) -> nodeId % 2 != 0 ? - dcAndRack("datacenter1", "rack1") : - dcAndRack("datacenter2", "rack2"))); + if (configuration.dcAndRackSupplier != null) + { + clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, configuration.dcAndRackSupplier)); + } else + { + clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, + (nodeId) -> nodeId % 2 != 0 ? + dcAndRack("datacenter1", "rack1") : + dcAndRack("datacenter2", "rack2"))); + } } if (configuration.instanceInitializer != null) diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java index a08fe9320..51b69504f 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java @@ -22,10 +22,12 @@ import java.util.EnumSet; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.IntFunction; import com.google.common.base.Preconditions; import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.shared.NetworkTopology; /** * Defines the configuration to build the {@link IClusterExtension} cluster @@ -42,6 +44,7 @@ public class ClusterBuilderConfiguration public String partitioner; public Map additionalInstanceConfig = null; public int tokenCount = 1; + public IntFunction dcAndRackSupplier; /** * Adds a features to the list of default features. @@ -176,4 +179,18 @@ public ClusterBuilderConfiguration tokenCount(int tokenCount) this.tokenCount = tokenCount; return this; } + + + /** + * Sets a supplier function that provides datacenter and rack information for each node in the cluster. + * + * @param dcAndRackSupplier a function that takes a node index and returns the corresponding + * datacenter and rack configuration for that node + * @return this configuration instance for method chaining + */ + public ClusterBuilderConfiguration dcAndRackSupplier(IntFunction dcAndRackSupplier) + { + this.dcAndRackSupplier = dcAndRackSupplier; + return this; + } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java new file mode 100644 index 000000000..82ca58888 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java @@ -0,0 +1,225 @@ +package org.apache.cassandra.analytics; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.data.partitioner.MurmurHash; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.utils.ClusterUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + *

Single rack replica placement with NetworkTopologyStrategy

+ *
    + *
  • DC1: {Rack1:[Node1, Node2, Node3, Node4]}, Replication Factor: 3
  • + *
  • DC2: {Rack1:[Node5, Node6, Node7, Node8]}, Replication Factor: 3
  • + *
+ * + *

Token range ownership: {Node1: T1, Node2: T2, Node3: T3, Node4: T4, Node5: T5, Node6: T6, Node7: T7, Node8: T8}

+ *

T1 will be replicated in the next 2 nodes in the same DC1(Node2, Node3) and the first 3 nodes in DC2(Node5, Node6, Node7).

+ *

For each token range the replicas are:

+ *
+ * T1:[Node1, Node2, Node3, Node5, Node6, Node7]
+ * T2:[Node2, Node3, Node4, Node5, Node6, Node7]
+ * T3:[Node3, Node4, Node1, Node5, Node6, Node7]
+ * T4:[Node4, Node1, Node2, Node5, Node6, Node7]
+ * T5:[Node5, Node6, Node7, Node1, Node2, Node3]
+ * T6:[Node6, Node7, Node8, Node1, Node2, Node3]
+ * T7:[Node7, Node8, Node5, Node1, Node2, Node3]
+ * T8:[Node8, Node5, Node6, Node1, Node2, Node3]
+ * 
+ * + *

Multi-rack replica placement with NetworkTopologyStrategy

+ *
    + *
  • DC1: {Rack1:[Node1, Node2], Rack2:[Node3], Rack3:[Node4]}, Replication Factor: 3
  • + *
  • DC2: {Rack1:[Node5, Node6], Rack2:[Node7], Rack3:[Node8]}, Replication Factor: 3
  • + *
+ * + *

Cassandra will try to place replicas in different racks.

+ *

T1 will be replicated in the next 2 nodes in the same DC1 and different racks (Node3, Node4) and the first 3 nodes in different racks in DC2(Node5, Node7, Node8).

+ *

For each token range the replicas are:

+ *
+ * T1:[Node1, Node3, Node4, Node5, Node7, Node8]
+ * T2:[Node2, Node3, Node4, Node5, Node7, Node8]
+ * T3:[Node3, Node4, Node1, Node5, Node7, Node8]
+ * T4:[Node4, Node1, Node3, Node5, Node7, Node8]
+ * T5:[Node5, Node7, Node8, Node1, Node3, Node4]
+ * T6:[Node6, Node7, Node8, Node1, Node3, Node4]
+ * T7:[Node7, Node8, Node5, Node1, Node3, Node4]
+ * T8:[Node8, Node5, Node7, Node1, Node3, Node4]
+ * 
+ */ +public class BulkReaderTokenRangeReplicasTest extends SharedClusterSparkIntegrationTestBase +{ + QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE); + private static final String VALUE1 = "VAL1"; + private static final String VALUE2 = "VAL2"; + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .dcCount(2) + .nodesPerDc(4) + .dcAndRackSupplier((nodeId) -> { + switch (nodeId) + { + case 1: + case 2: + return dcAndRack("datacenter1", "rack1"); + case 3: + return dcAndRack("datacenter1", "rack2"); + case 4: + return dcAndRack("datacenter1", "rack3"); + case 5: + case 6: + return dcAndRack("datacenter2", "rack1"); + case 7: + return dcAndRack("datacenter2", "rack2"); + case 8: + return dcAndRack("datacenter2", "rack3"); + } + return dcAndRack("", ""); + }); + } + + + @Test + void testMultiDCMultiRack() + { + // get token for node 1 + long token = getTokenForNode(1); + // reverse hash the token to a blob key + ByteBuffer key = keyForToken(token); + // insert value for the key in node 1 token range + insert(key, VALUE1); + // Nodes placement: + // DC1: {rack1: [node1, node2], rack2:[node3], rack3:[node4]} + // DC2: {rack1: [node5, node6], rack2:[node7], rack3:[node8]} + // validate that all nodes except node 2, 6 stored the key, value. + Map expectedValuesInNodes = new HashMap<>(Map.of(1, VALUE1, + 3, VALUE1, + 4, VALUE1, + 5, VALUE1, + 7, VALUE1, + 8, VALUE1)); + validateValuesInNodes(expectedValuesInNodes, key); + + // update the value internally at node 4 + updateInternal(4, key, VALUE2); + // validate the values across the nodes: + // node 4 should have VALUE2 + // node 2, node 6 shouldn't have the key + // all other nodes should have VALUE1 + expectedValuesInNodes.put(4, VALUE2); + validateValuesInNodes(expectedValuesInNodes, key); + + List rowList = bulkRead(ConsistencyLevel.ALL.name()); + Object[][] driverVal = readFromCluster(key, ConsistencyLevel.ALL); + + // validate that the value matches the data read using driver + assertThat(rowList).isNotNull(); + assertThat(rowList).isNotEmpty(); + assertThat(driverVal).isNotNull(); + assertThat(driverVal).isNotEmpty(); + assertThat(driverVal[0][0]).isInstanceOf(String.class); + assertThat(rowList.get(0).getString(1)).isEqualTo((String) driverVal[0][0]); + } + + private void validateValuesInNodes(Map values, ByteBuffer key) + { + for (int i = 1; i <= 8; i++) + { + Object[][] obj = getInternal(i, key); + if (!values.containsKey(i)) + { + assertThat(obj).isEmpty(); + } + else + { + assertThat(obj).isNotEmpty(); + assertThat(obj[0][0]).isInstanceOf(String.class); + assertThat((String) obj[0][0]).isEqualTo(values.get(i)); + } + } + } + + @NotNull + private List bulkRead(String consistency) + { + List rowList; + Dataset dataForTable1; + dataForTable1 = bulkReaderDataFrame(table1) + .option("consistencyLevel", consistency) + .option("dc", null) + .load(); + + rowList = dataForTable1.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(0))) + .collect(Collectors.toList()); + return rowList; + } + + private Object[][] getInternal(int node, ByteBuffer key) + { + return cluster.get(node).executeInternal(String.format("SELECT value FROM %s WHERE key = ?", table1), key); + } + + private Object[][] readFromCluster(ByteBuffer key, ConsistencyLevel consistencyLevel) + { + return cluster.getFirstRunningInstance().coordinator().execute(String.format("SELECT value FROM %s WHERE key = ?", table1), consistencyLevel, key); + } + + private void insert(ByteBuffer key, String value) + { + String query1 = String.format("INSERT INTO %s (key, value) VALUES (?, ?);", table1); + cluster.getFirstRunningInstance().coordinator().execute(query1, ConsistencyLevel.ALL, key, value); + } + + private void updateInternal(int node, ByteBuffer key, String value) + { + cluster.get(node).executeInternal(String.format("UPDATE %s SET value='%s' WHERE key=?", table1, value), key); + } + + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3_DC2_RF3); + createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (key blob, value text, PRIMARY KEY (key));"); + } + + private long getTokenForNode(int nodeNumber) + { + String nodeAddress = cluster.get(nodeNumber).config().broadcastAddress().getAddress().getHostAddress(); + List ringDetails = ClusterUtils.ring(cluster.get(nodeNumber)); + + return ringDetails.stream() + .filter(details -> details.getAddress().contains(nodeAddress)) + .findFirst() + .map(details -> Long.parseLong(details.getToken())) + .orElseThrow(() -> new RuntimeException("Node " + nodeNumber + " token not found")); + } + + public static ByteBuffer keyForToken(long token) + { + ByteBuffer result = ByteBuffer.allocate(16); + long[] inv = MurmurHash.inv_hash3_x64_128(new long[]{ token, 0L }); + result.putLong(inv[0]).putLong(inv[1]).position(0); + return result; + } +} From 682bf79526af8d699101cb66b8c3ba8b3d02b10f Mon Sep 17 00:00:00 2001 From: Sudipta Laha Date: Wed, 22 Oct 2025 22:34:39 -0700 Subject: [PATCH 2/3] Added support for reack aware replication in bulk reader --- .../data/partitioner/CassandraInstance.java | 23 +- .../spark/data/partitioner/CassandraRing.java | 70 ++- .../spark/data/partitioner/MurmurHash.java | 2 +- .../spark/data/CassandraDataLayer.java | 17 +- .../spark/utils/TokenRangeUtils.java | 137 ++++++ .../partitioner/JDKSerializationTests.java | 79 ++++ .../spark/utils/TokenRangeUtilsTest.java | 432 ++++++++++++++++++ .../distributed/impl/CassandraCluster.java | 3 +- .../BulkReaderTokenRangeReplicasTest.java | 29 +- 9 files changed, 776 insertions(+), 16 deletions(-) create mode 100644 cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java create mode 100644 cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java index 85b5ebdb6..19fecc643 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java @@ -19,6 +19,9 @@ package org.apache.cassandra.spark.data.partitioner; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Objects; @@ -34,9 +37,9 @@ public class CassandraInstance implements TokenOwner, Serializable public static final CassandraInstance.Serializer SERIALIZER = new CassandraInstance.Serializer(); private static final long serialVersionUID = 6767636627576239773L; - private final String token; - private final String node; - private final String dataCenter; + private String token; + private String node; + private String dataCenter; public CassandraInstance(String token, String node, String dataCenter) { @@ -100,6 +103,20 @@ public String toString() return String.format("{\"token\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", token, node, dataCenter); } + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException + { + this.token = in.readUTF(); + this.node = in.readUTF(); + this.dataCenter = in.readUTF(); + } + + private void writeObject(ObjectOutputStream out) throws IOException + { + out.writeUTF(token()); + out.writeUTF(nodeName()); + out.writeUTF(dataCenter()); + } + public static class Serializer extends com.esotericsoftware.kryo.Serializer { @Override diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 3d142f8df..c8dd4888f 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -74,6 +74,7 @@ public class CassandraRing implements Serializable private ReplicationFactor replicationFactor; private List instances; + private transient RangeMap> replicasForRanges; private transient RangeMap> replicas; private transient Multimap> tokenRangeMap; @@ -120,8 +121,33 @@ public CassandraRing(Partitioner partitioner, this.init(); } + public CassandraRing(Partitioner partitioner, + String keyspace, + ReplicationFactor replicationFactor, + Collection instances, + RangeMap> replicasForRanges) + { + this.partitioner = partitioner; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.instances = instances.stream() + .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .collect(Collectors.toCollection(ArrayList::new)); + this.replicasForRanges = replicasForRanges; + init(); + } + private void init() { + if (this.replicasForRanges != null) + { + this.replicas = this.replicasForRanges; + this.tokenRangeMap = ArrayListMultimap.create(); + this.replicasForRanges.asMapOfRanges().forEach((range, instanceList) -> + instanceList.forEach(instance -> + this.tokenRangeMap.put(instance, range))); + return; + } // Setup token range map replicas = TreeRangeMap.create(); tokenRangeMap = ArrayListMultimap.create(); @@ -276,8 +302,28 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE this.instances = new ArrayList<>(numInstances); for (int instance = 0; instance < numInstances; instance++) { - this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF())); + this.instances.add((CassandraInstance) in.readObject()); + } + int numRanges = in.readShort(); + if (numRanges > 0) + { + this.replicasForRanges = TreeRangeMap.create(); + for (int rangeIndex = 0; rangeIndex < numRanges; rangeIndex++) + { + BigInteger lowerEndpoint = new BigInteger(in.readUTF()); + BigInteger upperEndpoint = new BigInteger(in.readUTF()); + Range range = Range.openClosed(lowerEndpoint, upperEndpoint); + + int numReplicas = in.readShort(); + List replicas = new ArrayList<>(numReplicas); + for (int replicaIndex = 0; replicaIndex < numReplicas; replicaIndex++) + { + replicas.add((CassandraInstance) in.readObject()); + } + this.replicasForRanges.put(range, replicas); + } } + this.init(); } @@ -299,9 +345,21 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeShort(this.instances.size()); for (CassandraInstance instance : this.instances) { - out.writeUTF(instance.token()); - out.writeUTF(instance.nodeName()); - out.writeUTF(instance.dataCenter()); + out.writeObject(instance); + } + out.writeShort(this.replicasForRanges == null ? 0 : this.replicasForRanges.asMapOfRanges().size()); + if (this.replicasForRanges != null) + { + for (Map.Entry, List> entry : replicasForRanges.asMapOfRanges().entrySet()) + { + out.writeUTF(entry.getKey().lowerEndpoint().toString()); + out.writeUTF(entry.getKey().upperEndpoint().toString()); + out.writeShort(entry.getValue().size()); + for (CassandraInstance instance : entry.getValue()) + { + out.writeObject(instance); + } + } } } @@ -314,6 +372,7 @@ public void write(Kryo kryo, Output out, CassandraRing ring) out.writeString(ring.keyspace); kryo.writeObject(out, ring.replicationFactor); kryo.writeObject(out, ring.instances); + kryo.writeObject(out, ring.replicasForRanges); } @Override @@ -324,7 +383,8 @@ public CassandraRing read(Kryo kryo, Input in, Class type) : Partitioner.Murmur3Partitioner, in.readString(), kryo.readObject(in, ReplicationFactor.class), - kryo.readObject(in, ArrayList.class)); + kryo.readObject(in, ArrayList.class), + kryo.readObject(in, RangeMap.class)); } } } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java index 89eeb1ae3..afb4ef40a 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java @@ -187,7 +187,7 @@ protected static long invFmix(long k) return k; } - public static long[] inv_hash3_x64_128(long[] result) + public static long[] invHash3X64128(long[] result) { long c1 = 0xa98409e882ce4d7dL; long c2 = 0xa81e14edd9de2c7fL; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 557aaf2cb..c929814a0 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -51,6 +51,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ import o.a.c.sidecar.client.shaded.common.response.NodeSettings; import o.a.c.sidecar.client.shaded.common.response.RingResponse; import o.a.c.sidecar.client.shaded.common.response.SchemaResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.analytics.stats.Stats; import org.apache.cassandra.bridge.BigNumberConfig; import org.apache.cassandra.bridge.BigNumberConfigImpl; @@ -93,6 +95,7 @@ import org.apache.cassandra.spark.utils.ScalaFunctions; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.TimeProvider; +import org.apache.cassandra.spark.utils.TokenRangeUtils; import org.apache.cassandra.spark.validation.CassandraValidation; import org.apache.cassandra.spark.validation.SidecarValidation; import org.apache.cassandra.spark.validation.StartupValidatable; @@ -300,7 +303,12 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt)); cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount, false); - CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get()); + + TokenRangeReplicasResponse tokenRangeReplicas = sidecar.tokenRangeReplicas( + new ArrayList<>(clusterConfig), maybeQuotedKeyspace).get(); + TokenRangeUtils.validateTokenRangeReplicasResponse(tokenRangeReplicas); + + CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get(), tokenRangeReplicas); int effectiveNumberOfCores = sizingFuture.get(); tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism(), effectiveNumberOfCores); @@ -657,14 +665,17 @@ public BigNumberConfig bigNumberConfig(CqlField field) @VisibleForTesting public CassandraRing createCassandraRingFromRing(Partitioner partitioner, ReplicationFactor replicationFactor, - RingResponse ring) + RingResponse ring, + TokenRangeReplicasResponse tokenRangeReplicas) { Collection instances = ring .stream() .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) .collect(Collectors.toList()); - return new CassandraRing(partitioner, keyspace, replicationFactor, instances); + RangeMap> replicasForRanges = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(tokenRangeReplicas, instances, datacenter); + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicasForRanges); } // Startup Validation diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java new file mode 100644 index 000000000..d93f51c8f --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; + +public class TokenRangeUtils +{ + public static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeUtils.class); + + private TokenRangeUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + /** + * Converts a TokenRangeReplicasResponse from Cassandra Sidecar into a RangeMap data structure + * that maps token ranges to their corresponding replica instances. + * + * @param tokenRangeReplicas the response from Cassandra Sidecar containing token range and replica information + * @param instances collection of available Cassandra instances to match against + * @param datacenter the datacenter to exclude from replica selection, or null to include all datacenters + * @return a RangeMap where keys are token ranges (openClosed BigInteger ranges) and values are lists of + * CassandraInstance objects that serve as replicas for that range + */ + public static RangeMap> convertTokenRangeReplicasToRangeMap( + TokenRangeReplicasResponse tokenRangeReplicas, + Collection instances, + String datacenter) + { + RangeMap> replicas = TreeRangeMap.create(); + Map metadata = tokenRangeReplicas.replicaMetadata(); + + for (TokenRangeReplicasResponse.ReplicaInfo replicaInfo : tokenRangeReplicas.readReplicas()) + { + Range range = Range.openClosed(new BigInteger(replicaInfo.start()), new BigInteger(replicaInfo.end())); + List instanceListForRange = new ArrayList<>(); + for (Map.Entry> entry : replicaInfo.replicasByDatacenter().entrySet()) + { + if (datacenter != null && !datacenter.equals(entry.getKey())) + { + continue; + } + instanceListForRange.addAll(entry.getValue().stream() + .map(ipPort -> metadata.get(ipPort).fqdn()) + .map(fqdn -> getCassandraInstanceByFqdn(instances, fqdn)) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + } + replicas.put(range, instanceListForRange); + } + return replicas; + } + + private static CassandraInstance getCassandraInstanceByFqdn(Collection instances, String fqdn) + { + return instances.stream() + .filter(instance -> instance.nodeName().equals(fqdn)) + .findFirst() + .orElse(null); + } + + public static void validateTokenRangeReplicasResponse(TokenRangeReplicasResponse tokenRangeReplicas) + { + String msg = getTokenRangeReplicasValidationMessage(tokenRangeReplicas); + if (msg != null) + { + LOGGER.error(msg); + throw new IllegalStateException(msg); + } + } + + private static String getTokenRangeReplicasValidationMessage(TokenRangeReplicasResponse tokenRangeReplicas) + { + if (tokenRangeReplicas == null) + { + return "Null TokenRangeReplicasResponse from sidecar"; + } + if (tokenRangeReplicas.replicaMetadata() == null) + { + return "Null replicaMetadata in TokenRangeReplicasResponse from sidecar"; + } + if (tokenRangeReplicas.replicaMetadata().isEmpty()) + { + return "Empty replicaMetadata in TokenRangeReplicasResponse from sidecar"; + } + for (Map.Entry entry : tokenRangeReplicas.replicaMetadata().entrySet()) + { + if (entry.getValue().fqdn() == null || entry.getValue().fqdn().trim().isEmpty()) + { + return String.format( + "ReplicaMetadata entry '%s' has null or empty fqdn in TokenRangeReplicasResponse from sidecar", entry.getKey()); + } + } + if (tokenRangeReplicas.readReplicas() == null) + { + return "Null readReplicas in TokenRangeReplicasResponse from sidecar"; + } + if (tokenRangeReplicas.readReplicas().isEmpty()) + { + return "Empty readReplicas in TokenRangeReplicasResponse from sidecar"; + } + return null; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java index fdcd7b9d4..e6d2be315 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java @@ -23,8 +23,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -32,6 +34,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -55,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.quicktheories.QuickTheory.qt; import static org.quicktheories.generators.SourceDSL.arbitrary; +import static org.quicktheories.generators.SourceDSL.integers; public class JDKSerializationTests extends VersionRunner { @@ -83,6 +88,80 @@ public void testCassandraRing(CassandraBridge bridge) })); } + private void testCassandraRingSerializationWithReplicasForRanges(CassandraBridge bridge, + Partitioner partitioner, + List instances, + RangeMap> replicasForRanges, + ReplicationFactor replicationFactor, + int expectedRangeCount) + { + CassandraRing ring = new CassandraRing(partitioner, "test_keyspace", replicationFactor, instances, replicasForRanges); + + byte[] bytes = bridge.javaSerialize(ring); + CassandraRing deserialized = bridge.javaDeserialize(bytes, CassandraRing.class); + + assertThat(deserialized).isNotNull(); + assertThat(deserialized.rangeMap()).isNotNull(); + assertThat(deserialized.rangeMap().asMapOfRanges()).hasSize(expectedRangeCount); + assertThat(deserialized).isEqualTo(ring); + assertThat(deserialized.rangeMap().asMapOfRanges()).isEqualTo(ring.rangeMap().asMapOfRanges()); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges") + public void testCassandraRingWithReplicasForRanges(CassandraBridge bridge) + { + // Create test instances that will be reused across different configurations + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1", "DC1"), + new CassandraInstance("0", "node2", "DC1"), + new CassandraInstance("1000", "node3", "DC2") + ); + + // Create different replicasForRanges configurations to test + List>> replicasForRangesList = new ArrayList<>(); + + // Configuration 1: Basic two-range setup + RangeMap> config1 = TreeRangeMap.create(); + config1.put(Range.openClosed(new BigInteger("-1000"), BigInteger.ZERO), + Arrays.asList(instances.get(0), instances.get(1))); + config1.put(Range.openClosed(BigInteger.ZERO, new BigInteger("1000")), + Arrays.asList(instances.get(1), instances.get(2))); + replicasForRangesList.add(config1); + + // Configuration 2: Single range with all replicas + RangeMap> config2 = TreeRangeMap.create(); + config2.put(Range.openClosed(new BigInteger("-500"), new BigInteger("500")), + Arrays.asList(instances.get(0), instances.get(1), instances.get(2))); + replicasForRangesList.add(config2); + + // Configuration 3: Range with empty replica list + RangeMap> config4 = TreeRangeMap.create(); + config4.put(Range.openClosed(new BigInteger("-100"), BigInteger.ZERO), + Collections.emptyList()); + config4.put(Range.openClosed(BigInteger.ZERO, new BigInteger("100")), + Arrays.asList(instances.get(1))); + replicasForRangesList.add(config4); + + // Configuration 4: Token ranges with boundary values (Long.MAX_VALUE, Long.MIN_VALUE) + RangeMap> config5 = TreeRangeMap.create(); + config5.put(Range.openClosed(BigInteger.valueOf(Long.MIN_VALUE), BigInteger.ZERO), + Arrays.asList(instances.get(0), instances.get(1))); + config5.put(Range.openClosed(BigInteger.ZERO, BigInteger.valueOf(Long.MAX_VALUE)), + Arrays.asList(instances.get(1), instances.get(2))); + replicasForRangesList.add(config5); + + qt().forAll(TestUtils.partitioners(), integers().between(0, replicasForRangesList.size() - 1)) + .checkAssert((partitioner, index) -> { + ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, + ImmutableMap.of("DC1", 2, "DC2", 1)); + + int expectedRangeCount = replicasForRangesList.get(index).asMapOfRanges().size(); + + testCassandraRingSerializationWithReplicasForRanges(bridge, partitioner, instances, replicasForRangesList.get(index), rf, expectedRangeCount); + }); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges") public void testTokenPartitioner(CassandraBridge bridge) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java new file mode 100644 index 000000000..6e70a13f6 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.RangeMap; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class TokenRangeUtilsTest +{ + @Test + void testConvertTokenRangeReplicasToRangeMapBasic() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1"), + new CassandraInstance("0", "node2.example.com", "DC1"), + new CassandraInstance("1000", "node3.example.com", "DC2") + ); + + // Create replica metadata using actual class + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node2.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node3.example.com", "192.168.1.3", 9042, "DC2")); + + // Create replica info using actual class + Map> replicasByDatacenter1 = new HashMap<>(); + replicasByDatacenter1.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter1.put("DC2", Arrays.asList("192.168.1.3:9042")); + + Map> replicasByDatacenter2 = new HashMap<>(); + replicasByDatacenter2.put("DC1", Arrays.asList("192.168.1.2:9042")); + replicasByDatacenter2.put("DC2", Arrays.asList("192.168.1.3:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter1), + new TokenRangeReplicasResponse.ReplicaInfo("0", "1000", replicasByDatacenter2) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + // Validate results + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(2); + + // Check first range + List replicas1 = result.get(new BigInteger("-500")); + assertThat(replicas1).hasSize(3); + assertThat(replicas1).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node1.example.com", "node2.example.com", "node3.example.com"); + + // Check second range + List replicas2 = result.get(new BigInteger("500")); + assertThat(replicas2).hasSize(2); + assertThat(replicas2).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node2.example.com", "node3.example.com"); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapWithDatacenterFilter() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1"), + new CassandraInstance("0", "node2.example.com", "DC1"), + new CassandraInstance("1000", "node3.example.com", "DC2") + ); + + // Create replica metadata + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node2.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node3.example.com", "192.168.1.3", 9042, "DC2")); + + // Create replica info + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter.put("DC2", Arrays.asList("192.168.1.3:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion with datacenter filter (include only DC1) + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, "DC1"); + + // Validate results - should only contain DC1 replicas + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(new BigInteger("-500")); + assertThat(replicas).hasSize(2); + assertThat(replicas).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node1.example.com", "node2.example.com"); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapEmptyResponse() + { + List instances = Arrays.asList( + new CassandraInstance("0", "node1.example.com", "DC1") + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyMap() + ); + + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).isEmpty(); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapWithUnknownFqdn() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1") + ); + + // Create replica metadata with unknown FQDN + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "unknown.example.com", "192.168.1.1", 9042, "DC1")); + + // Create replica info + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + // Validate results - should have empty replica list since FQDN doesn't match + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(new BigInteger("-500")); + assertThat(replicas).isEmpty(); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapWithBoundaryTokenValues() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance(String.valueOf(Long.MIN_VALUE), "nodeMin.example.com", "DC1"), + new CassandraInstance("0", "nodeZero.example.com", "DC1"), + new CassandraInstance(String.valueOf(Long.MAX_VALUE), "nodeMax.example.com", "DC2") + ); + + // Create replica metadata + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "nodeMin.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "nodeZero.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "nodeMax.example.com", "192.168.1.3", 9042, "DC2")); + + // Create replica info with boundary values + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter.put("DC2", Arrays.asList("192.168.1.3:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo(String.valueOf(Long.MIN_VALUE), String.valueOf(Long.MAX_VALUE), replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + // Validate results + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(BigInteger.ZERO); + assertThat(replicas).hasSize(3); + assertThat(replicas).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("nodeMin.example.com", "nodeZero.example.com", "nodeMax.example.com"); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapFilterIncludeDatacenter() + { + // Create test instances across multiple datacenters + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1"), + new CassandraInstance("0", "node2.example.com", "DC1"), + new CassandraInstance("1000", "node3.example.com", "DC2"), + new CassandraInstance("2000", "node4.example.com", "DC3") + ); + + // Create replica metadata + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node2.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node3.example.com", "192.168.1.3", 9042, "DC2")); + metadata.put("192.168.1.4:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node4.example.com", "192.168.1.4", 9042, "DC3")); + + // Create replica info with replicas from all datacenters + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter.put("DC2", Arrays.asList("192.168.1.3:9042")); + replicasByDatacenter.put("DC3", Arrays.asList("192.168.1.4:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion with datacenter filter (include only DC2) + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, "DC2"); + + // Validate results - should contain only DC2 replicas + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(new BigInteger("-500")); + assertThat(replicas).hasSize(1); + assertThat(replicas).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node3.example.com"); + assertThat(replicas).extracting(CassandraInstance::dataCenter) + .containsExactlyInAnyOrder("DC2"); + } + + @Test + void testValidateTokenRangeReplicasResponseHappyPath() + { + // Create valid TokenRangeReplicasResponse + List instances = List.of( + new CassandraInstance("-1000", "node1.example.com", "DC1") + ); + + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Should not throw any exception + assertThat(response).isNotNull(); + TokenRangeUtils.validateTokenRangeReplicasResponse(response); + } + + @Test + void testValidateTokenRangeReplicasResponseNullResponse() + { + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(null)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Null TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseNullReplicaMetadata() + { + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, null); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Null replicaMetadata in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseEmptyReplicaMetadata() + { + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, Collections.emptyMap()); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Empty replicaMetadata in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseNullFqdn() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", null, "192.168.1.1", 9042, "DC1")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("ReplicaMetadata entry '192.168.1.1:9042' has null or empty fqdn in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseEmptyFqdn() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "", "192.168.1.1", 9042, "DC1")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("ReplicaMetadata entry '192.168.1.1:9042' has null or empty fqdn in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseWhitespaceFqdn() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", " ", "192.168.1.1", 9042, "DC1")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("ReplicaMetadata entry '192.168.1.1:9042' has null or empty fqdn in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseNullReadReplicas() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(Collections.emptyList(), null, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Null readReplicas in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseEmptyReadReplicas() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(Collections.emptyList(), Collections.emptyList(), metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Empty readReplicas in TokenRangeReplicasResponse from sidecar"); + } +} diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java index 713bafdf8..42bac9958 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java @@ -135,7 +135,8 @@ public AbstractCluster initializeCluster(String versionString, if (configuration.dcAndRackSupplier != null) { clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, configuration.dcAndRackSupplier)); - } else + } + else { clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, (nodeId) -> nodeId % 2 != 0 ? diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java index 82ca58888..7158a921e 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.analytics; import java.nio.ByteBuffer; @@ -52,7 +71,8 @@ * * *

Cassandra will try to place replicas in different racks.

- *

T1 will be replicated in the next 2 nodes in the same DC1 and different racks (Node3, Node4) and the first 3 nodes in different racks in DC2(Node5, Node7, Node8).

+ *

T1 will be replicated in the next 2 nodes in the same DC1 and different racks (Node3, Node4) and the first 3 nodes + * in different racks in DC2(Node5, Node7, Node8).

*

For each token range the replicas are:

*
  * T1:[Node1, Node3, Node4, Node5, Node7, Node8]
@@ -94,8 +114,9 @@ protected ClusterBuilderConfiguration testClusterConfiguration()
                                 return dcAndRack("datacenter2", "rack2");
                             case 8:
                                 return dcAndRack("datacenter2", "rack3");
+                            default:
+                                return dcAndRack("", "");
                         }
-                        return dcAndRack("", "");
                     });
     }
 
@@ -128,6 +149,8 @@ void testMultiDCMultiRack()
         // node 2, node 6 shouldn't have the key
         // all other nodes should have VALUE1
         expectedValuesInNodes.put(4, VALUE2);
+        //updateInternal(1, key, VALUE2);
+        //expectedValuesInNodes.put(1, VALUE2);
         validateValuesInNodes(expectedValuesInNodes, key);
 
         List rowList = bulkRead(ConsistencyLevel.ALL.name());
@@ -218,7 +241,7 @@ private long getTokenForNode(int nodeNumber)
     public static ByteBuffer keyForToken(long token)
     {
         ByteBuffer result = ByteBuffer.allocate(16);
-        long[] inv = MurmurHash.inv_hash3_x64_128(new long[]{ token, 0L });
+        long[] inv = MurmurHash.invHash3X64128(new long[]{token, 0L});
         result.putLong(inv[0]).putLong(inv[1]).position(0);
         return result;
     }

From 4996af41143afeff7bbd30025eb18be751f1e75f Mon Sep 17 00:00:00 2001
From: Sudipta Laha 
Date: Fri, 31 Oct 2025 15:51:41 -0700
Subject: [PATCH 3/3] Fixed kryo serialization for CassandraRing

---
 .../spark/data/partitioner/CassandraRing.java | 53 ++++++++++--
 .../spark/KryoSerializationTests.java         | 83 +++++++++++++++++++
 2 files changed, 129 insertions(+), 7 deletions(-)

diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java
index c8dd4888f..eb125a349 100644
--- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java
@@ -372,19 +372,58 @@ public void write(Kryo kryo, Output out, CassandraRing ring)
             out.writeString(ring.keyspace);
             kryo.writeObject(out, ring.replicationFactor);
             kryo.writeObject(out, ring.instances);
-            kryo.writeObject(out, ring.replicasForRanges);
+
+            // Manually serialize replicasForRanges
+            out.writeShort(ring.replicasForRanges == null ? 0 : ring.replicasForRanges.asMapOfRanges().size());
+            if (ring.replicasForRanges != null)
+            {
+                for (Map.Entry, List> entry : ring.replicasForRanges.asMapOfRanges().entrySet())
+                {
+                    out.writeString(entry.getKey().lowerEndpoint().toString());
+                    out.writeString(entry.getKey().upperEndpoint().toString());
+                    out.writeShort(entry.getValue().size());
+                    for (CassandraInstance instance : entry.getValue())
+                    {
+                        kryo.writeObject(out, instance);
+                    }
+                }
+            }
         }
 
         @Override
         @SuppressWarnings("unchecked")
         public CassandraRing read(Kryo kryo, Input in, Class type)
         {
-            return new CassandraRing(in.readByte() == 1 ? Partitioner.RandomPartitioner
-                                                        : Partitioner.Murmur3Partitioner,
-                                     in.readString(),
-                                     kryo.readObject(in, ReplicationFactor.class),
-                                     kryo.readObject(in, ArrayList.class),
-                                     kryo.readObject(in, RangeMap.class));
+            // Read all data first
+            Partitioner partitioner = in.readByte() == 1 ? Partitioner.RandomPartitioner : Partitioner.Murmur3Partitioner;
+            String keyspace = in.readString();
+            ReplicationFactor replicationFactor = kryo.readObject(in, ReplicationFactor.class);
+            List instances = kryo.readObject(in, ArrayList.class);
+
+            // Read replicasForRanges data
+            RangeMap> replicasForRanges = null;
+            int numRanges = in.readShort();
+            if (numRanges > 0)
+            {
+                replicasForRanges = TreeRangeMap.create();
+                for (int rangeIndex = 0; rangeIndex < numRanges; rangeIndex++)
+                {
+                    BigInteger lowerEndpoint = new BigInteger(in.readString());
+                    BigInteger upperEndpoint = new BigInteger(in.readString());
+                    Range range = Range.openClosed(lowerEndpoint, upperEndpoint);
+
+                    int numReplicas = in.readShort();
+                    List replicas = new ArrayList<>(numReplicas);
+                    for (int replicaIndex = 0; replicaIndex < numReplicas; replicaIndex++)
+                    {
+                        replicas.add(kryo.readObject(in, CassandraInstance.class));
+                    }
+                    replicasForRanges.put(range, replicas);
+                }
+            }
+
+            // Create the object with all data
+            return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicasForRanges);
         }
     }
 }
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
index 62e76774b..8c123fe66 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java
@@ -20,12 +20,18 @@
 package org.apache.cassandra.spark;
 
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
@@ -46,6 +52,7 @@
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
 import org.apache.cassandra.spark.data.partitioner.CassandraRing;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
 import org.apache.cassandra.spark.transports.storage.StorageAccessConfiguration;
 import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
@@ -340,6 +347,82 @@ public void testTokenPartitioner(CassandraBridge bridge)
             });
     }
 
+    private void testCassandraRingSerializationWithReplicasForRangesKryo(CassandraBridge bridge,
+                                                                        Partitioner partitioner,
+                                                                        List instances,
+                                                                        RangeMap> replicasForRanges,
+                                                                        ReplicationFactor replicationFactor,
+                                                                        int expectedRangeCount)
+    {
+        CassandraRing ring = new CassandraRing(partitioner, "test_keyspace", replicationFactor, instances, replicasForRanges);
+
+        Output out = serialize(bridge.getVersion(), ring);
+        CassandraRing deserialized = deserialize(bridge.getVersion(), out, CassandraRing.class);
+
+        assertThat(deserialized).isNotNull();
+        assertThat(deserialized.rangeMap()).isNotNull();
+        assertThat(deserialized.rangeMap().asMapOfRanges()).hasSize(expectedRangeCount);
+        assertThat(deserialized).isEqualTo(ring);
+        assertThat(deserialized.rangeMap().asMapOfRanges()).isEqualTo(ring.rangeMap().asMapOfRanges());
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testCassandraRingWithReplicasForRangesKryo(CassandraBridge bridge)
+    {
+        // Create test instances that will be reused across different configurations
+        List instances = Arrays.asList(
+        new CassandraInstance("-1000", "node1", "DC1"),
+        new CassandraInstance("0", "node2", "DC1"),
+        new CassandraInstance("1000", "node3", "DC2")
+        );
+
+        // Create different replicasForRanges configurations to test
+        List>> replicasForRangesList = new ArrayList<>();
+
+        // Configuration 1: Basic two-range setup
+        RangeMap> config1 = TreeRangeMap.create();
+        config1.put(Range.openClosed(new BigInteger("-1000"), BigInteger.ZERO),
+                    Arrays.asList(instances.get(0), instances.get(1)));
+        config1.put(Range.openClosed(BigInteger.ZERO, new BigInteger("1000")),
+                    Arrays.asList(instances.get(1), instances.get(2)));
+        replicasForRangesList.add(config1);
+
+        // Configuration 2: Single range with all replicas
+        RangeMap> config2 = TreeRangeMap.create();
+        config2.put(Range.openClosed(new BigInteger("-500"), new BigInteger("500")),
+                    Arrays.asList(instances.get(0), instances.get(1), instances.get(2)));
+        replicasForRangesList.add(config2);
+
+        // Configuration 3: Range with empty replica list
+        RangeMap> config4 = TreeRangeMap.create();
+        config4.put(Range.openClosed(new BigInteger("-100"), BigInteger.ZERO),
+                    Collections.emptyList());
+        config4.put(Range.openClosed(BigInteger.ZERO, new BigInteger("100")),
+                    Arrays.asList(instances.get(1)));
+        replicasForRangesList.add(config4);
+
+        // Configuration 4: Token ranges with boundary values (Long.MAX_VALUE, Long.MIN_VALUE)
+        RangeMap> config5 = TreeRangeMap.create();
+        config5.put(Range.openClosed(BigInteger.valueOf(Long.MIN_VALUE), BigInteger.ZERO),
+                    Arrays.asList(instances.get(0), instances.get(1)));
+        config5.put(Range.openClosed(BigInteger.ZERO, BigInteger.valueOf(Long.MAX_VALUE)),
+                    Arrays.asList(instances.get(1), instances.get(2)));
+        replicasForRangesList.add(config5);
+
+        qt().forAll(TestUtils.partitioners(), integers().between(0, replicasForRangesList.size() - 1))
+            .checkAssert((partitioner, index) -> {
+                ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+                                                             ImmutableMap.of("DC1", 2, "DC2", 1));
+
+                int expectedRangeCount = replicasForRangesList.get(index).asMapOfRanges().size();
+
+                testCassandraRingSerializationWithReplicasForRangesKryo(bridge, partitioner, instances,
+                                                                        replicasForRangesList.get(index),
+                                                                        rf, expectedRangeCount);
+            });
+    }
+
     @ParameterizedTest
     @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
     public void testCqlUdtField(CassandraBridge bridge)