Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.cassandra.spark.data.partitioner;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Objects;

Expand All @@ -34,9 +37,9 @@ public class CassandraInstance implements TokenOwner, Serializable
public static final CassandraInstance.Serializer SERIALIZER = new CassandraInstance.Serializer();

private static final long serialVersionUID = 6767636627576239773L;
private final String token;
private final String node;
private final String dataCenter;
private String token;
private String node;
private String dataCenter;

public CassandraInstance(String token, String node, String dataCenter)
{
Expand Down Expand Up @@ -100,6 +103,20 @@ public String toString()
return String.format("{\"token\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", token, node, dataCenter);
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
this.token = in.readUTF();
this.node = in.readUTF();
this.dataCenter = in.readUTF();
}

private void writeObject(ObjectOutputStream out) throws IOException
{
out.writeUTF(token());
out.writeUTF(nodeName());
out.writeUTF(dataCenter());
}

public static class Serializer extends com.esotericsoftware.kryo.Serializer<CassandraInstance>
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class CassandraRing implements Serializable
private ReplicationFactor replicationFactor;
private List<CassandraInstance> instances;

private transient RangeMap<BigInteger, List<CassandraInstance>> replicasForRanges;
private transient RangeMap<BigInteger, List<CassandraInstance>> replicas;
private transient Multimap<CassandraInstance, Range<BigInteger>> tokenRangeMap;

Expand Down Expand Up @@ -120,8 +121,33 @@ public CassandraRing(Partitioner partitioner,
this.init();
}

public CassandraRing(Partitioner partitioner,
String keyspace,
ReplicationFactor replicationFactor,
Collection<CassandraInstance> instances,
RangeMap<BigInteger, List<CassandraInstance>> replicasForRanges)
{
this.partitioner = partitioner;
this.keyspace = keyspace;
this.replicationFactor = replicationFactor;
this.instances = instances.stream()
.sorted(Comparator.comparing(instance -> new BigInteger(instance.token())))
.collect(Collectors.toCollection(ArrayList::new));
this.replicasForRanges = replicasForRanges;
init();
}

private void init()
{
if (this.replicasForRanges != null)
{
this.replicas = this.replicasForRanges;
this.tokenRangeMap = ArrayListMultimap.create();
this.replicasForRanges.asMapOfRanges().forEach((range, instanceList) ->
instanceList.forEach(instance ->
this.tokenRangeMap.put(instance, range)));
return;
}
// Setup token range map
replicas = TreeRangeMap.create();
tokenRangeMap = ArrayListMultimap.create();
Expand Down Expand Up @@ -276,8 +302,28 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.instances = new ArrayList<>(numInstances);
for (int instance = 0; instance < numInstances; instance++)
{
this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF()));
this.instances.add((CassandraInstance) in.readObject());
}
int numRanges = in.readShort();
if (numRanges > 0)
{
this.replicasForRanges = TreeRangeMap.create();
for (int rangeIndex = 0; rangeIndex < numRanges; rangeIndex++)
{
BigInteger lowerEndpoint = new BigInteger(in.readUTF());
BigInteger upperEndpoint = new BigInteger(in.readUTF());
Range<BigInteger> range = Range.openClosed(lowerEndpoint, upperEndpoint);

int numReplicas = in.readShort();
List<CassandraInstance> replicas = new ArrayList<>(numReplicas);
for (int replicaIndex = 0; replicaIndex < numReplicas; replicaIndex++)
{
replicas.add((CassandraInstance) in.readObject());
}
this.replicasForRanges.put(range, replicas);
}
}

this.init();
}

Expand All @@ -299,9 +345,21 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou
out.writeShort(this.instances.size());
for (CassandraInstance instance : this.instances)
{
out.writeUTF(instance.token());
out.writeUTF(instance.nodeName());
out.writeUTF(instance.dataCenter());
out.writeObject(instance);
}
out.writeShort(this.replicasForRanges == null ? 0 : this.replicasForRanges.asMapOfRanges().size());
if (this.replicasForRanges != null)
{
for (Map.Entry<Range<BigInteger>, List<CassandraInstance>> entry : replicasForRanges.asMapOfRanges().entrySet())
{
out.writeUTF(entry.getKey().lowerEndpoint().toString());
out.writeUTF(entry.getKey().upperEndpoint().toString());
out.writeShort(entry.getValue().size());
for (CassandraInstance instance : entry.getValue())
{
out.writeObject(instance);
}
}
}
}

Expand All @@ -314,17 +372,58 @@ public void write(Kryo kryo, Output out, CassandraRing ring)
out.writeString(ring.keyspace);
kryo.writeObject(out, ring.replicationFactor);
kryo.writeObject(out, ring.instances);

// Manually serialize replicasForRanges
out.writeShort(ring.replicasForRanges == null ? 0 : ring.replicasForRanges.asMapOfRanges().size());
if (ring.replicasForRanges != null)
{
for (Map.Entry<Range<BigInteger>, List<CassandraInstance>> entry : ring.replicasForRanges.asMapOfRanges().entrySet())
{
out.writeString(entry.getKey().lowerEndpoint().toString());
out.writeString(entry.getKey().upperEndpoint().toString());
out.writeShort(entry.getValue().size());
for (CassandraInstance instance : entry.getValue())
{
kryo.writeObject(out, instance);
}
}
}
}

@Override
@SuppressWarnings("unchecked")
public CassandraRing read(Kryo kryo, Input in, Class<CassandraRing> type)
{
return new CassandraRing(in.readByte() == 1 ? Partitioner.RandomPartitioner
: Partitioner.Murmur3Partitioner,
in.readString(),
kryo.readObject(in, ReplicationFactor.class),
kryo.readObject(in, ArrayList.class));
// Read all data first
Partitioner partitioner = in.readByte() == 1 ? Partitioner.RandomPartitioner : Partitioner.Murmur3Partitioner;
String keyspace = in.readString();
ReplicationFactor replicationFactor = kryo.readObject(in, ReplicationFactor.class);
List<CassandraInstance> instances = kryo.readObject(in, ArrayList.class);

// Read replicasForRanges data
RangeMap<BigInteger, List<CassandraInstance>> replicasForRanges = null;
int numRanges = in.readShort();
if (numRanges > 0)
{
replicasForRanges = TreeRangeMap.create();
for (int rangeIndex = 0; rangeIndex < numRanges; rangeIndex++)
{
BigInteger lowerEndpoint = new BigInteger(in.readString());
BigInteger upperEndpoint = new BigInteger(in.readString());
Range<BigInteger> range = Range.openClosed(lowerEndpoint, upperEndpoint);

int numReplicas = in.readShort();
List<CassandraInstance> replicas = new ArrayList<>(numReplicas);
for (int replicaIndex = 0; replicaIndex < numReplicas; replicaIndex++)
{
replicas.add(kryo.readObject(in, CassandraInstance.class));
}
replicasForRanges.put(range, replicas);
}
}

// Create the object with all data
return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicasForRanges);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,89 @@ public static long[] hash(ByteBuffer key, int offset, int length, long seed)

return new long[]{h1, h2};
}

protected static long invRShiftXor(long value, int shift)
{
long output = 0;
long i = 0;
while (i * shift < 64)
{
long c = (0xffffffffffffffffL << (64 - shift)) >>> (shift * i);
long partOutput = value & c;
value ^= partOutput >>> shift;
output |= partOutput;
i += 1;
}
return output;
}

protected static long invRotl64(long v, int n)
{
return ((v >>> n) | (v << (64 - n)));
}

protected static long invFmix(long k)
{
k = invRShiftXor(k, 33);
k *= 0x9cb4b2f8129337dbL;
k = invRShiftXor(k, 33);
k *= 0x4f74430c22a54005L;
k = invRShiftXor(k, 33);
return k;
}

public static long[] invHash3X64128(long[] result)
{
long c1 = 0xa98409e882ce4d7dL;
long c2 = 0xa81e14edd9de2c7fL;

long k1 = 0;
long k2 = 0;
long h1 = result[0];
long h2 = result[1];

//----------
// reverse finalization
h2 -= h1;
h1 -= h2;

h1 = invFmix(h1);
h2 = invFmix(h2);

h2 -= h1;
h1 -= h2;

h1 ^= 16;
h2 ^= 16;

//----------
// reverse body
h2 -= 0x38495ab5;
h2 *= 0xcccccccccccccccdL;
h2 -= h1;
h2 = invRotl64(h2, 31);
k2 = h2;
h2 = 0;

k2 *= c1;
k2 = invRotl64(k2, 33);
k2 *= c2;

h1 -= 0x52dce729;
h1 *= 0xcccccccccccccccdL;
//h1 -= h2;
h1 = invRotl64(h1, 27);

k1 = h1;

k1 *= c2;
k1 = invRotl64(k1, 31);
k1 *= c1;

// note that while this works for body block reversing the tail reverse requires `invTailReverse`
k1 = Long.reverseBytes(k1);
k2 = Long.reverseBytes(k2);

return new long[] {k1, k2};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,6 +63,7 @@
import o.a.c.sidecar.client.shaded.common.response.NodeSettings;
import o.a.c.sidecar.client.shaded.common.response.RingResponse;
import o.a.c.sidecar.client.shaded.common.response.SchemaResponse;
import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.bridge.BigNumberConfig;
import org.apache.cassandra.bridge.BigNumberConfigImpl;
Expand Down Expand Up @@ -94,6 +96,7 @@
import org.apache.cassandra.spark.utils.ScalaFunctions;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.cassandra.spark.utils.TokenRangeUtils;
import org.apache.cassandra.spark.validation.CassandraValidation;
import org.apache.cassandra.spark.validation.SidecarValidation;
import org.apache.cassandra.spark.validation.StartupValidatable;
Expand Down Expand Up @@ -317,7 +320,12 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept
udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt));

cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount, false);
CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get());

TokenRangeReplicasResponse tokenRangeReplicas = sidecar.tokenRangeReplicas(
new ArrayList<>(clusterConfig), maybeQuotedKeyspace).get();
TokenRangeUtils.validateTokenRangeReplicasResponse(tokenRangeReplicas);

CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get(), tokenRangeReplicas);

int effectiveNumberOfCores = sizingFuture.get();
tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism(), effectiveNumberOfCores);
Expand Down Expand Up @@ -681,14 +689,17 @@ public BigNumberConfig bigNumberConfig(CqlField field)
@VisibleForTesting
public CassandraRing createCassandraRingFromRing(Partitioner partitioner,
ReplicationFactor replicationFactor,
RingResponse ring)
RingResponse ring,
TokenRangeReplicasResponse tokenRangeReplicas)
{
Collection<CassandraInstance> instances = ring
.stream()
.filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter()))
.map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter()))
.collect(Collectors.toList());
return new CassandraRing(partitioner, keyspace, replicationFactor, instances);
RangeMap<BigInteger, List<CassandraInstance>> replicasForRanges =
TokenRangeUtils.convertTokenRangeReplicasToRangeMap(tokenRangeReplicas, instances, datacenter);
return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicasForRanges);
}

// Startup Validation
Expand Down
Loading