diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java index 85b5ebdb..19fecc64 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java @@ -19,6 +19,9 @@ package org.apache.cassandra.spark.data.partitioner; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Objects; @@ -34,9 +37,9 @@ public class CassandraInstance implements TokenOwner, Serializable public static final CassandraInstance.Serializer SERIALIZER = new CassandraInstance.Serializer(); private static final long serialVersionUID = 6767636627576239773L; - private final String token; - private final String node; - private final String dataCenter; + private String token; + private String node; + private String dataCenter; public CassandraInstance(String token, String node, String dataCenter) { @@ -100,6 +103,20 @@ public String toString() return String.format("{\"token\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", token, node, dataCenter); } + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException + { + this.token = in.readUTF(); + this.node = in.readUTF(); + this.dataCenter = in.readUTF(); + } + + private void writeObject(ObjectOutputStream out) throws IOException + { + out.writeUTF(token()); + out.writeUTF(nodeName()); + out.writeUTF(dataCenter()); + } + public static class Serializer extends com.esotericsoftware.kryo.Serializer { @Override diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 3d142f8d..eb125a34 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -74,6 +74,7 @@ public class CassandraRing implements Serializable private ReplicationFactor replicationFactor; private List instances; + private transient RangeMap> replicasForRanges; private transient RangeMap> replicas; private transient Multimap> tokenRangeMap; @@ -120,8 +121,33 @@ public CassandraRing(Partitioner partitioner, this.init(); } + public CassandraRing(Partitioner partitioner, + String keyspace, + ReplicationFactor replicationFactor, + Collection instances, + RangeMap> replicasForRanges) + { + this.partitioner = partitioner; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.instances = instances.stream() + .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .collect(Collectors.toCollection(ArrayList::new)); + this.replicasForRanges = replicasForRanges; + init(); + } + private void init() { + if (this.replicasForRanges != null) + { + this.replicas = this.replicasForRanges; + this.tokenRangeMap = ArrayListMultimap.create(); + this.replicasForRanges.asMapOfRanges().forEach((range, instanceList) -> + instanceList.forEach(instance -> + this.tokenRangeMap.put(instance, range))); + return; + } // Setup token range map replicas = TreeRangeMap.create(); tokenRangeMap = ArrayListMultimap.create(); @@ -276,8 +302,28 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE this.instances = new ArrayList<>(numInstances); for (int instance = 0; instance < numInstances; instance++) { - this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF())); + this.instances.add((CassandraInstance) in.readObject()); + } + int numRanges = in.readShort(); + if (numRanges > 0) + { + this.replicasForRanges = TreeRangeMap.create(); + for (int rangeIndex = 0; rangeIndex < numRanges; rangeIndex++) + { + BigInteger lowerEndpoint = new BigInteger(in.readUTF()); + BigInteger upperEndpoint = new BigInteger(in.readUTF()); + Range range = Range.openClosed(lowerEndpoint, upperEndpoint); + + int numReplicas = in.readShort(); + List replicas = new ArrayList<>(numReplicas); + for (int replicaIndex = 0; replicaIndex < numReplicas; replicaIndex++) + { + replicas.add((CassandraInstance) in.readObject()); + } + this.replicasForRanges.put(range, replicas); + } } + this.init(); } @@ -299,9 +345,21 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeShort(this.instances.size()); for (CassandraInstance instance : this.instances) { - out.writeUTF(instance.token()); - out.writeUTF(instance.nodeName()); - out.writeUTF(instance.dataCenter()); + out.writeObject(instance); + } + out.writeShort(this.replicasForRanges == null ? 0 : this.replicasForRanges.asMapOfRanges().size()); + if (this.replicasForRanges != null) + { + for (Map.Entry, List> entry : replicasForRanges.asMapOfRanges().entrySet()) + { + out.writeUTF(entry.getKey().lowerEndpoint().toString()); + out.writeUTF(entry.getKey().upperEndpoint().toString()); + out.writeShort(entry.getValue().size()); + for (CassandraInstance instance : entry.getValue()) + { + out.writeObject(instance); + } + } } } @@ -314,17 +372,58 @@ public void write(Kryo kryo, Output out, CassandraRing ring) out.writeString(ring.keyspace); kryo.writeObject(out, ring.replicationFactor); kryo.writeObject(out, ring.instances); + + // Manually serialize replicasForRanges + out.writeShort(ring.replicasForRanges == null ? 0 : ring.replicasForRanges.asMapOfRanges().size()); + if (ring.replicasForRanges != null) + { + for (Map.Entry, List> entry : ring.replicasForRanges.asMapOfRanges().entrySet()) + { + out.writeString(entry.getKey().lowerEndpoint().toString()); + out.writeString(entry.getKey().upperEndpoint().toString()); + out.writeShort(entry.getValue().size()); + for (CassandraInstance instance : entry.getValue()) + { + kryo.writeObject(out, instance); + } + } + } } @Override @SuppressWarnings("unchecked") public CassandraRing read(Kryo kryo, Input in, Class type) { - return new CassandraRing(in.readByte() == 1 ? Partitioner.RandomPartitioner - : Partitioner.Murmur3Partitioner, - in.readString(), - kryo.readObject(in, ReplicationFactor.class), - kryo.readObject(in, ArrayList.class)); + // Read all data first + Partitioner partitioner = in.readByte() == 1 ? Partitioner.RandomPartitioner : Partitioner.Murmur3Partitioner; + String keyspace = in.readString(); + ReplicationFactor replicationFactor = kryo.readObject(in, ReplicationFactor.class); + List instances = kryo.readObject(in, ArrayList.class); + + // Read replicasForRanges data + RangeMap> replicasForRanges = null; + int numRanges = in.readShort(); + if (numRanges > 0) + { + replicasForRanges = TreeRangeMap.create(); + for (int rangeIndex = 0; rangeIndex < numRanges; rangeIndex++) + { + BigInteger lowerEndpoint = new BigInteger(in.readString()); + BigInteger upperEndpoint = new BigInteger(in.readString()); + Range range = Range.openClosed(lowerEndpoint, upperEndpoint); + + int numReplicas = in.readShort(); + List replicas = new ArrayList<>(numReplicas); + for (int replicaIndex = 0; replicaIndex < numReplicas; replicaIndex++) + { + replicas.add(kryo.readObject(in, CassandraInstance.class)); + } + replicasForRanges.put(range, replicas); + } + } + + // Create the object with all data + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicasForRanges); } } } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java index ae3722eb..afb4ef40 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java @@ -156,4 +156,89 @@ public static long[] hash(ByteBuffer key, int offset, int length, long seed) return new long[]{h1, h2}; } + + protected static long invRShiftXor(long value, int shift) + { + long output = 0; + long i = 0; + while (i * shift < 64) + { + long c = (0xffffffffffffffffL << (64 - shift)) >>> (shift * i); + long partOutput = value & c; + value ^= partOutput >>> shift; + output |= partOutput; + i += 1; + } + return output; + } + + protected static long invRotl64(long v, int n) + { + return ((v >>> n) | (v << (64 - n))); + } + + protected static long invFmix(long k) + { + k = invRShiftXor(k, 33); + k *= 0x9cb4b2f8129337dbL; + k = invRShiftXor(k, 33); + k *= 0x4f74430c22a54005L; + k = invRShiftXor(k, 33); + return k; + } + + public static long[] invHash3X64128(long[] result) + { + long c1 = 0xa98409e882ce4d7dL; + long c2 = 0xa81e14edd9de2c7fL; + + long k1 = 0; + long k2 = 0; + long h1 = result[0]; + long h2 = result[1]; + + //---------- + // reverse finalization + h2 -= h1; + h1 -= h2; + + h1 = invFmix(h1); + h2 = invFmix(h2); + + h2 -= h1; + h1 -= h2; + + h1 ^= 16; + h2 ^= 16; + + //---------- + // reverse body + h2 -= 0x38495ab5; + h2 *= 0xcccccccccccccccdL; + h2 -= h1; + h2 = invRotl64(h2, 31); + k2 = h2; + h2 = 0; + + k2 *= c1; + k2 = invRotl64(k2, 33); + k2 *= c2; + + h1 -= 0x52dce729; + h1 *= 0xcccccccccccccccdL; + //h1 -= h2; + h1 = invRotl64(h1, 27); + + k1 = h1; + + k1 *= c2; + k1 = invRotl64(k1, 31); + k1 *= c1; + + // note that while this works for body block reversing the tail reverse requires `invTailReverse` + k1 = Long.reverseBytes(k1); + k2 = Long.reverseBytes(k2); + + return new long[] {k1, k2}; + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index b6e84c6f..948514c2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -51,6 +51,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ import o.a.c.sidecar.client.shaded.common.response.NodeSettings; import o.a.c.sidecar.client.shaded.common.response.RingResponse; import o.a.c.sidecar.client.shaded.common.response.SchemaResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.analytics.stats.Stats; import org.apache.cassandra.bridge.BigNumberConfig; import org.apache.cassandra.bridge.BigNumberConfigImpl; @@ -94,6 +96,7 @@ import org.apache.cassandra.spark.utils.ScalaFunctions; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.TimeProvider; +import org.apache.cassandra.spark.utils.TokenRangeUtils; import org.apache.cassandra.spark.validation.CassandraValidation; import org.apache.cassandra.spark.validation.SidecarValidation; import org.apache.cassandra.spark.validation.StartupValidatable; @@ -317,7 +320,12 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt)); cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount, false); - CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get()); + + TokenRangeReplicasResponse tokenRangeReplicas = sidecar.tokenRangeReplicas( + new ArrayList<>(clusterConfig), maybeQuotedKeyspace).get(); + TokenRangeUtils.validateTokenRangeReplicasResponse(tokenRangeReplicas); + + CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get(), tokenRangeReplicas); int effectiveNumberOfCores = sizingFuture.get(); tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism(), effectiveNumberOfCores); @@ -681,14 +689,17 @@ public BigNumberConfig bigNumberConfig(CqlField field) @VisibleForTesting public CassandraRing createCassandraRingFromRing(Partitioner partitioner, ReplicationFactor replicationFactor, - RingResponse ring) + RingResponse ring, + TokenRangeReplicasResponse tokenRangeReplicas) { Collection instances = ring .stream() .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) .collect(Collectors.toList()); - return new CassandraRing(partitioner, keyspace, replicationFactor, instances); + RangeMap> replicasForRanges = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(tokenRangeReplicas, instances, datacenter); + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicasForRanges); } // Startup Validation diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java new file mode 100644 index 00000000..d93f51c8 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/utils/TokenRangeUtils.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; + +public class TokenRangeUtils +{ + public static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeUtils.class); + + private TokenRangeUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + /** + * Converts a TokenRangeReplicasResponse from Cassandra Sidecar into a RangeMap data structure + * that maps token ranges to their corresponding replica instances. + * + * @param tokenRangeReplicas the response from Cassandra Sidecar containing token range and replica information + * @param instances collection of available Cassandra instances to match against + * @param datacenter the datacenter to exclude from replica selection, or null to include all datacenters + * @return a RangeMap where keys are token ranges (openClosed BigInteger ranges) and values are lists of + * CassandraInstance objects that serve as replicas for that range + */ + public static RangeMap> convertTokenRangeReplicasToRangeMap( + TokenRangeReplicasResponse tokenRangeReplicas, + Collection instances, + String datacenter) + { + RangeMap> replicas = TreeRangeMap.create(); + Map metadata = tokenRangeReplicas.replicaMetadata(); + + for (TokenRangeReplicasResponse.ReplicaInfo replicaInfo : tokenRangeReplicas.readReplicas()) + { + Range range = Range.openClosed(new BigInteger(replicaInfo.start()), new BigInteger(replicaInfo.end())); + List instanceListForRange = new ArrayList<>(); + for (Map.Entry> entry : replicaInfo.replicasByDatacenter().entrySet()) + { + if (datacenter != null && !datacenter.equals(entry.getKey())) + { + continue; + } + instanceListForRange.addAll(entry.getValue().stream() + .map(ipPort -> metadata.get(ipPort).fqdn()) + .map(fqdn -> getCassandraInstanceByFqdn(instances, fqdn)) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + } + replicas.put(range, instanceListForRange); + } + return replicas; + } + + private static CassandraInstance getCassandraInstanceByFqdn(Collection instances, String fqdn) + { + return instances.stream() + .filter(instance -> instance.nodeName().equals(fqdn)) + .findFirst() + .orElse(null); + } + + public static void validateTokenRangeReplicasResponse(TokenRangeReplicasResponse tokenRangeReplicas) + { + String msg = getTokenRangeReplicasValidationMessage(tokenRangeReplicas); + if (msg != null) + { + LOGGER.error(msg); + throw new IllegalStateException(msg); + } + } + + private static String getTokenRangeReplicasValidationMessage(TokenRangeReplicasResponse tokenRangeReplicas) + { + if (tokenRangeReplicas == null) + { + return "Null TokenRangeReplicasResponse from sidecar"; + } + if (tokenRangeReplicas.replicaMetadata() == null) + { + return "Null replicaMetadata in TokenRangeReplicasResponse from sidecar"; + } + if (tokenRangeReplicas.replicaMetadata().isEmpty()) + { + return "Empty replicaMetadata in TokenRangeReplicasResponse from sidecar"; + } + for (Map.Entry entry : tokenRangeReplicas.replicaMetadata().entrySet()) + { + if (entry.getValue().fqdn() == null || entry.getValue().fqdn().trim().isEmpty()) + { + return String.format( + "ReplicaMetadata entry '%s' has null or empty fqdn in TokenRangeReplicasResponse from sidecar", entry.getKey()); + } + } + if (tokenRangeReplicas.readReplicas() == null) + { + return "Null readReplicas in TokenRangeReplicasResponse from sidecar"; + } + if (tokenRangeReplicas.readReplicas().isEmpty()) + { + return "Empty readReplicas in TokenRangeReplicasResponse from sidecar"; + } + return null; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java index 62e76774..8c123fe6 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/KryoSerializationTests.java @@ -20,12 +20,18 @@ package org.apache.cassandra.spark; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -46,6 +52,7 @@ import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import org.apache.cassandra.spark.data.partitioner.CassandraRing; +import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.data.partitioner.TokenPartitioner; import org.apache.cassandra.spark.transports.storage.StorageAccessConfiguration; import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; @@ -340,6 +347,82 @@ public void testTokenPartitioner(CassandraBridge bridge) }); } + private void testCassandraRingSerializationWithReplicasForRangesKryo(CassandraBridge bridge, + Partitioner partitioner, + List instances, + RangeMap> replicasForRanges, + ReplicationFactor replicationFactor, + int expectedRangeCount) + { + CassandraRing ring = new CassandraRing(partitioner, "test_keyspace", replicationFactor, instances, replicasForRanges); + + Output out = serialize(bridge.getVersion(), ring); + CassandraRing deserialized = deserialize(bridge.getVersion(), out, CassandraRing.class); + + assertThat(deserialized).isNotNull(); + assertThat(deserialized.rangeMap()).isNotNull(); + assertThat(deserialized.rangeMap().asMapOfRanges()).hasSize(expectedRangeCount); + assertThat(deserialized).isEqualTo(ring); + assertThat(deserialized.rangeMap().asMapOfRanges()).isEqualTo(ring.rangeMap().asMapOfRanges()); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testCassandraRingWithReplicasForRangesKryo(CassandraBridge bridge) + { + // Create test instances that will be reused across different configurations + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1", "DC1"), + new CassandraInstance("0", "node2", "DC1"), + new CassandraInstance("1000", "node3", "DC2") + ); + + // Create different replicasForRanges configurations to test + List>> replicasForRangesList = new ArrayList<>(); + + // Configuration 1: Basic two-range setup + RangeMap> config1 = TreeRangeMap.create(); + config1.put(Range.openClosed(new BigInteger("-1000"), BigInteger.ZERO), + Arrays.asList(instances.get(0), instances.get(1))); + config1.put(Range.openClosed(BigInteger.ZERO, new BigInteger("1000")), + Arrays.asList(instances.get(1), instances.get(2))); + replicasForRangesList.add(config1); + + // Configuration 2: Single range with all replicas + RangeMap> config2 = TreeRangeMap.create(); + config2.put(Range.openClosed(new BigInteger("-500"), new BigInteger("500")), + Arrays.asList(instances.get(0), instances.get(1), instances.get(2))); + replicasForRangesList.add(config2); + + // Configuration 3: Range with empty replica list + RangeMap> config4 = TreeRangeMap.create(); + config4.put(Range.openClosed(new BigInteger("-100"), BigInteger.ZERO), + Collections.emptyList()); + config4.put(Range.openClosed(BigInteger.ZERO, new BigInteger("100")), + Arrays.asList(instances.get(1))); + replicasForRangesList.add(config4); + + // Configuration 4: Token ranges with boundary values (Long.MAX_VALUE, Long.MIN_VALUE) + RangeMap> config5 = TreeRangeMap.create(); + config5.put(Range.openClosed(BigInteger.valueOf(Long.MIN_VALUE), BigInteger.ZERO), + Arrays.asList(instances.get(0), instances.get(1))); + config5.put(Range.openClosed(BigInteger.ZERO, BigInteger.valueOf(Long.MAX_VALUE)), + Arrays.asList(instances.get(1), instances.get(2))); + replicasForRangesList.add(config5); + + qt().forAll(TestUtils.partitioners(), integers().between(0, replicasForRangesList.size() - 1)) + .checkAssert((partitioner, index) -> { + ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, + ImmutableMap.of("DC1", 2, "DC2", 1)); + + int expectedRangeCount = replicasForRangesList.get(index).asMapOfRanges().size(); + + testCassandraRingSerializationWithReplicasForRangesKryo(bridge, partitioner, instances, + replicasForRangesList.get(index), + rf, expectedRangeCount); + }); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") public void testCqlUdtField(CassandraBridge bridge) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java index fdcd7b9d..e6d2be31 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/JDKSerializationTests.java @@ -23,8 +23,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -32,6 +34,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -55,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.quicktheories.QuickTheory.qt; import static org.quicktheories.generators.SourceDSL.arbitrary; +import static org.quicktheories.generators.SourceDSL.integers; public class JDKSerializationTests extends VersionRunner { @@ -83,6 +88,80 @@ public void testCassandraRing(CassandraBridge bridge) })); } + private void testCassandraRingSerializationWithReplicasForRanges(CassandraBridge bridge, + Partitioner partitioner, + List instances, + RangeMap> replicasForRanges, + ReplicationFactor replicationFactor, + int expectedRangeCount) + { + CassandraRing ring = new CassandraRing(partitioner, "test_keyspace", replicationFactor, instances, replicasForRanges); + + byte[] bytes = bridge.javaSerialize(ring); + CassandraRing deserialized = bridge.javaDeserialize(bytes, CassandraRing.class); + + assertThat(deserialized).isNotNull(); + assertThat(deserialized.rangeMap()).isNotNull(); + assertThat(deserialized.rangeMap().asMapOfRanges()).hasSize(expectedRangeCount); + assertThat(deserialized).isEqualTo(ring); + assertThat(deserialized.rangeMap().asMapOfRanges()).isEqualTo(ring.rangeMap().asMapOfRanges()); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges") + public void testCassandraRingWithReplicasForRanges(CassandraBridge bridge) + { + // Create test instances that will be reused across different configurations + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1", "DC1"), + new CassandraInstance("0", "node2", "DC1"), + new CassandraInstance("1000", "node3", "DC2") + ); + + // Create different replicasForRanges configurations to test + List>> replicasForRangesList = new ArrayList<>(); + + // Configuration 1: Basic two-range setup + RangeMap> config1 = TreeRangeMap.create(); + config1.put(Range.openClosed(new BigInteger("-1000"), BigInteger.ZERO), + Arrays.asList(instances.get(0), instances.get(1))); + config1.put(Range.openClosed(BigInteger.ZERO, new BigInteger("1000")), + Arrays.asList(instances.get(1), instances.get(2))); + replicasForRangesList.add(config1); + + // Configuration 2: Single range with all replicas + RangeMap> config2 = TreeRangeMap.create(); + config2.put(Range.openClosed(new BigInteger("-500"), new BigInteger("500")), + Arrays.asList(instances.get(0), instances.get(1), instances.get(2))); + replicasForRangesList.add(config2); + + // Configuration 3: Range with empty replica list + RangeMap> config4 = TreeRangeMap.create(); + config4.put(Range.openClosed(new BigInteger("-100"), BigInteger.ZERO), + Collections.emptyList()); + config4.put(Range.openClosed(BigInteger.ZERO, new BigInteger("100")), + Arrays.asList(instances.get(1))); + replicasForRangesList.add(config4); + + // Configuration 4: Token ranges with boundary values (Long.MAX_VALUE, Long.MIN_VALUE) + RangeMap> config5 = TreeRangeMap.create(); + config5.put(Range.openClosed(BigInteger.valueOf(Long.MIN_VALUE), BigInteger.ZERO), + Arrays.asList(instances.get(0), instances.get(1))); + config5.put(Range.openClosed(BigInteger.ZERO, BigInteger.valueOf(Long.MAX_VALUE)), + Arrays.asList(instances.get(1), instances.get(2))); + replicasForRangesList.add(config5); + + qt().forAll(TestUtils.partitioners(), integers().between(0, replicasForRangesList.size() - 1)) + .checkAssert((partitioner, index) -> { + ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, + ImmutableMap.of("DC1", 2, "DC2", 1)); + + int expectedRangeCount = replicasForRangesList.get(index).asMapOfRanges().size(); + + testCassandraRingSerializationWithReplicasForRanges(bridge, partitioner, instances, replicasForRangesList.get(index), rf, expectedRangeCount); + }); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges") public void testTokenPartitioner(CassandraBridge bridge) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java new file mode 100644 index 00000000..6e70a13f --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/TokenRangeUtilsTest.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.utils; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.RangeMap; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class TokenRangeUtilsTest +{ + @Test + void testConvertTokenRangeReplicasToRangeMapBasic() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1"), + new CassandraInstance("0", "node2.example.com", "DC1"), + new CassandraInstance("1000", "node3.example.com", "DC2") + ); + + // Create replica metadata using actual class + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node2.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node3.example.com", "192.168.1.3", 9042, "DC2")); + + // Create replica info using actual class + Map> replicasByDatacenter1 = new HashMap<>(); + replicasByDatacenter1.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter1.put("DC2", Arrays.asList("192.168.1.3:9042")); + + Map> replicasByDatacenter2 = new HashMap<>(); + replicasByDatacenter2.put("DC1", Arrays.asList("192.168.1.2:9042")); + replicasByDatacenter2.put("DC2", Arrays.asList("192.168.1.3:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter1), + new TokenRangeReplicasResponse.ReplicaInfo("0", "1000", replicasByDatacenter2) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + // Validate results + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(2); + + // Check first range + List replicas1 = result.get(new BigInteger("-500")); + assertThat(replicas1).hasSize(3); + assertThat(replicas1).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node1.example.com", "node2.example.com", "node3.example.com"); + + // Check second range + List replicas2 = result.get(new BigInteger("500")); + assertThat(replicas2).hasSize(2); + assertThat(replicas2).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node2.example.com", "node3.example.com"); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapWithDatacenterFilter() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1"), + new CassandraInstance("0", "node2.example.com", "DC1"), + new CassandraInstance("1000", "node3.example.com", "DC2") + ); + + // Create replica metadata + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node2.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node3.example.com", "192.168.1.3", 9042, "DC2")); + + // Create replica info + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter.put("DC2", Arrays.asList("192.168.1.3:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion with datacenter filter (include only DC1) + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, "DC1"); + + // Validate results - should only contain DC1 replicas + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(new BigInteger("-500")); + assertThat(replicas).hasSize(2); + assertThat(replicas).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node1.example.com", "node2.example.com"); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapEmptyResponse() + { + List instances = Arrays.asList( + new CassandraInstance("0", "node1.example.com", "DC1") + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyMap() + ); + + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).isEmpty(); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapWithUnknownFqdn() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1") + ); + + // Create replica metadata with unknown FQDN + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "unknown.example.com", "192.168.1.1", 9042, "DC1")); + + // Create replica info + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + // Validate results - should have empty replica list since FQDN doesn't match + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(new BigInteger("-500")); + assertThat(replicas).isEmpty(); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapWithBoundaryTokenValues() + { + // Create test instances + List instances = Arrays.asList( + new CassandraInstance(String.valueOf(Long.MIN_VALUE), "nodeMin.example.com", "DC1"), + new CassandraInstance("0", "nodeZero.example.com", "DC1"), + new CassandraInstance(String.valueOf(Long.MAX_VALUE), "nodeMax.example.com", "DC2") + ); + + // Create replica metadata + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "nodeMin.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "nodeZero.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "nodeMax.example.com", "192.168.1.3", 9042, "DC2")); + + // Create replica info with boundary values + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter.put("DC2", Arrays.asList("192.168.1.3:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo(String.valueOf(Long.MIN_VALUE), String.valueOf(Long.MAX_VALUE), replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, null); + + // Validate results + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(BigInteger.ZERO); + assertThat(replicas).hasSize(3); + assertThat(replicas).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("nodeMin.example.com", "nodeZero.example.com", "nodeMax.example.com"); + } + + @Test + void testConvertTokenRangeReplicasToRangeMapFilterIncludeDatacenter() + { + // Create test instances across multiple datacenters + List instances = Arrays.asList( + new CassandraInstance("-1000", "node1.example.com", "DC1"), + new CassandraInstance("0", "node2.example.com", "DC1"), + new CassandraInstance("1000", "node3.example.com", "DC2"), + new CassandraInstance("2000", "node4.example.com", "DC3") + ); + + // Create replica metadata + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + metadata.put("192.168.1.2:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node2.example.com", "192.168.1.2", 9042, "DC1")); + metadata.put("192.168.1.3:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node3.example.com", "192.168.1.3", 9042, "DC2")); + metadata.put("192.168.1.4:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node4.example.com", "192.168.1.4", 9042, "DC3")); + + // Create replica info with replicas from all datacenters + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042", "192.168.1.2:9042")); + replicasByDatacenter.put("DC2", Arrays.asList("192.168.1.3:9042")); + replicasByDatacenter.put("DC3", Arrays.asList("192.168.1.4:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Test conversion with datacenter filter (include only DC2) + RangeMap> result = + TokenRangeUtils.convertTokenRangeReplicasToRangeMap(response, instances, "DC2"); + + // Validate results - should contain only DC2 replicas + assertThat(result).isNotNull(); + assertThat(result.asMapOfRanges()).hasSize(1); + + List replicas = result.get(new BigInteger("-500")); + assertThat(replicas).hasSize(1); + assertThat(replicas).extracting(CassandraInstance::nodeName) + .containsExactlyInAnyOrder("node3.example.com"); + assertThat(replicas).extracting(CassandraInstance::dataCenter) + .containsExactlyInAnyOrder("DC2"); + } + + @Test + void testValidateTokenRangeReplicasResponseHappyPath() + { + // Create valid TokenRangeReplicasResponse + List instances = List.of( + new CassandraInstance("-1000", "node1.example.com", "DC1") + ); + + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + + Map> replicasByDatacenter = new HashMap<>(); + replicasByDatacenter.put("DC1", Arrays.asList("192.168.1.1:9042")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", replicasByDatacenter) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + // Should not throw any exception + assertThat(response).isNotNull(); + TokenRangeUtils.validateTokenRangeReplicasResponse(response); + } + + @Test + void testValidateTokenRangeReplicasResponseNullResponse() + { + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(null)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Null TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseNullReplicaMetadata() + { + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, null); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Null replicaMetadata in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseEmptyReplicaMetadata() + { + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, Collections.emptyMap()); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Empty replicaMetadata in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseNullFqdn() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", null, "192.168.1.1", 9042, "DC1")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("ReplicaMetadata entry '192.168.1.1:9042' has null or empty fqdn in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseEmptyFqdn() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "", "192.168.1.1", 9042, "DC1")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("ReplicaMetadata entry '192.168.1.1:9042' has null or empty fqdn in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseWhitespaceFqdn() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", " ", "192.168.1.1", 9042, "DC1")); + + List replicaInfos = Arrays.asList( + new TokenRangeReplicasResponse.ReplicaInfo("-1000", "0", new HashMap<>()) + ); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(replicaInfos, replicaInfos, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("ReplicaMetadata entry '192.168.1.1:9042' has null or empty fqdn in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseNullReadReplicas() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(Collections.emptyList(), null, metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Null readReplicas in TokenRangeReplicasResponse from sidecar"); + } + + @Test + void testValidateTokenRangeReplicasResponseEmptyReadReplicas() + { + Map metadata = new HashMap<>(); + metadata.put("192.168.1.1:9042", new TokenRangeReplicasResponse.ReplicaMetadata( + "NORMAL", "UP", "node1.example.com", "192.168.1.1", 9042, "DC1")); + + TokenRangeReplicasResponse response = new TokenRangeReplicasResponse(Collections.emptyList(), Collections.emptyList(), metadata); + + assertThatThrownBy(() -> TokenRangeUtils.validateTokenRangeReplicasResponse(response)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Empty readReplicas in TokenRangeReplicasResponse from sidecar"); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java new file mode 100644 index 00000000..7158a921 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.data.partitioner.MurmurHash; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.utils.ClusterUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + *

Single rack replica placement with NetworkTopologyStrategy

+ *
    + *
  • DC1: {Rack1:[Node1, Node2, Node3, Node4]}, Replication Factor: 3
  • + *
  • DC2: {Rack1:[Node5, Node6, Node7, Node8]}, Replication Factor: 3
  • + *
+ * + *

Token range ownership: {Node1: T1, Node2: T2, Node3: T3, Node4: T4, Node5: T5, Node6: T6, Node7: T7, Node8: T8}

+ *

T1 will be replicated in the next 2 nodes in the same DC1(Node2, Node3) and the first 3 nodes in DC2(Node5, Node6, Node7).

+ *

For each token range the replicas are:

+ *
+ * T1:[Node1, Node2, Node3, Node5, Node6, Node7]
+ * T2:[Node2, Node3, Node4, Node5, Node6, Node7]
+ * T3:[Node3, Node4, Node1, Node5, Node6, Node7]
+ * T4:[Node4, Node1, Node2, Node5, Node6, Node7]
+ * T5:[Node5, Node6, Node7, Node1, Node2, Node3]
+ * T6:[Node6, Node7, Node8, Node1, Node2, Node3]
+ * T7:[Node7, Node8, Node5, Node1, Node2, Node3]
+ * T8:[Node8, Node5, Node6, Node1, Node2, Node3]
+ * 
+ * + *

Multi-rack replica placement with NetworkTopologyStrategy

+ *
    + *
  • DC1: {Rack1:[Node1, Node2], Rack2:[Node3], Rack3:[Node4]}, Replication Factor: 3
  • + *
  • DC2: {Rack1:[Node5, Node6], Rack2:[Node7], Rack3:[Node8]}, Replication Factor: 3
  • + *
+ * + *

Cassandra will try to place replicas in different racks.

+ *

T1 will be replicated in the next 2 nodes in the same DC1 and different racks (Node3, Node4) and the first 3 nodes + * in different racks in DC2(Node5, Node7, Node8).

+ *

For each token range the replicas are:

+ *
+ * T1:[Node1, Node3, Node4, Node5, Node7, Node8]
+ * T2:[Node2, Node3, Node4, Node5, Node7, Node8]
+ * T3:[Node3, Node4, Node1, Node5, Node7, Node8]
+ * T4:[Node4, Node1, Node3, Node5, Node7, Node8]
+ * T5:[Node5, Node7, Node8, Node1, Node3, Node4]
+ * T6:[Node6, Node7, Node8, Node1, Node3, Node4]
+ * T7:[Node7, Node8, Node5, Node1, Node3, Node4]
+ * T8:[Node8, Node5, Node7, Node1, Node3, Node4]
+ * 
+ */ +public class BulkReaderTokenRangeReplicasTest extends SharedClusterSparkIntegrationTestBase +{ + QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE); + private static final String VALUE1 = "VAL1"; + private static final String VALUE2 = "VAL2"; + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .dcCount(2) + .nodesPerDc(4) + .dcAndRackSupplier((nodeId) -> { + switch (nodeId) + { + case 1: + case 2: + return dcAndRack("datacenter1", "rack1"); + case 3: + return dcAndRack("datacenter1", "rack2"); + case 4: + return dcAndRack("datacenter1", "rack3"); + case 5: + case 6: + return dcAndRack("datacenter2", "rack1"); + case 7: + return dcAndRack("datacenter2", "rack2"); + case 8: + return dcAndRack("datacenter2", "rack3"); + default: + return dcAndRack("", ""); + } + }); + } + + + @Test + void testMultiDCMultiRack() + { + // get token for node 1 + long token = getTokenForNode(1); + // reverse hash the token to a blob key + ByteBuffer key = keyForToken(token); + // insert value for the key in node 1 token range + insert(key, VALUE1); + // Nodes placement: + // DC1: {rack1: [node1, node2], rack2:[node3], rack3:[node4]} + // DC2: {rack1: [node5, node6], rack2:[node7], rack3:[node8]} + // validate that all nodes except node 2, 6 stored the key, value. + Map expectedValuesInNodes = new HashMap<>(Map.of(1, VALUE1, + 3, VALUE1, + 4, VALUE1, + 5, VALUE1, + 7, VALUE1, + 8, VALUE1)); + validateValuesInNodes(expectedValuesInNodes, key); + + // update the value internally at node 4 + updateInternal(4, key, VALUE2); + // validate the values across the nodes: + // node 4 should have VALUE2 + // node 2, node 6 shouldn't have the key + // all other nodes should have VALUE1 + expectedValuesInNodes.put(4, VALUE2); + //updateInternal(1, key, VALUE2); + //expectedValuesInNodes.put(1, VALUE2); + validateValuesInNodes(expectedValuesInNodes, key); + + List rowList = bulkRead(ConsistencyLevel.ALL.name()); + Object[][] driverVal = readFromCluster(key, ConsistencyLevel.ALL); + + // validate that the value matches the data read using driver + assertThat(rowList).isNotNull(); + assertThat(rowList).isNotEmpty(); + assertThat(driverVal).isNotNull(); + assertThat(driverVal).isNotEmpty(); + assertThat(driverVal[0][0]).isInstanceOf(String.class); + assertThat(rowList.get(0).getString(1)).isEqualTo((String) driverVal[0][0]); + } + + private void validateValuesInNodes(Map values, ByteBuffer key) + { + for (int i = 1; i <= 8; i++) + { + Object[][] obj = getInternal(i, key); + if (!values.containsKey(i)) + { + assertThat(obj).isEmpty(); + } + else + { + assertThat(obj).isNotEmpty(); + assertThat(obj[0][0]).isInstanceOf(String.class); + assertThat((String) obj[0][0]).isEqualTo(values.get(i)); + } + } + } + + @NotNull + private List bulkRead(String consistency) + { + List rowList; + Dataset dataForTable1; + dataForTable1 = bulkReaderDataFrame(table1) + .option("consistencyLevel", consistency) + .option("dc", null) + .load(); + + rowList = dataForTable1.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(0))) + .collect(Collectors.toList()); + return rowList; + } + + private Object[][] getInternal(int node, ByteBuffer key) + { + return cluster.get(node).executeInternal(String.format("SELECT value FROM %s WHERE key = ?", table1), key); + } + + private Object[][] readFromCluster(ByteBuffer key, ConsistencyLevel consistencyLevel) + { + return cluster.getFirstRunningInstance().coordinator().execute(String.format("SELECT value FROM %s WHERE key = ?", table1), consistencyLevel, key); + } + + private void insert(ByteBuffer key, String value) + { + String query1 = String.format("INSERT INTO %s (key, value) VALUES (?, ?);", table1); + cluster.getFirstRunningInstance().coordinator().execute(query1, ConsistencyLevel.ALL, key, value); + } + + private void updateInternal(int node, ByteBuffer key, String value) + { + cluster.get(node).executeInternal(String.format("UPDATE %s SET value='%s' WHERE key=?", table1, value), key); + } + + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3_DC2_RF3); + createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (key blob, value text, PRIMARY KEY (key));"); + } + + private long getTokenForNode(int nodeNumber) + { + String nodeAddress = cluster.get(nodeNumber).config().broadcastAddress().getAddress().getHostAddress(); + List ringDetails = ClusterUtils.ring(cluster.get(nodeNumber)); + + return ringDetails.stream() + .filter(details -> details.getAddress().contains(nodeAddress)) + .findFirst() + .map(details -> Long.parseLong(details.getToken())) + .orElseThrow(() -> new RuntimeException("Node " + nodeNumber + " token not found")); + } + + public static ByteBuffer keyForToken(long token) + { + ByteBuffer result = ByteBuffer.allocate(16); + long[] inv = MurmurHash.invHash3X64128(new long[]{token, 0L}); + result.putLong(inv[0]).putLong(inv[1]).position(0); + return result; + } +}