Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ object AccumuloCollectionReader {
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(range)
scanner.fetchColumnFamily(columnFamily)
val result = scanner.iterator.map { case entry =>
AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec)
}.flatMap { pairs: Vector[(K, V)] =>
if(filterIndexOnly) pairs
else pairs.filter { pair => includeKey(pair._1) }
}.toVector
val result = scanner.iterator
.map({ case entry =>
AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec) })
.flatMap({ pairs: Vector[(K, V)] =>
if(filterIndexOnly) pairs
else pairs.filter { pair => includeKey(pair._1) } })
.toVector
scanner.close()
result
}(pool) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ import org.apache.accumulo.core.data.Key
import org.apache.hadoop.io.Text

object AccumuloKeyEncoder {
final def long2Bytes(x: Long): Array[Byte] =
Array[Byte](x>>56 toByte, x>>48 toByte, x>>40 toByte, x>>32 toByte, x>>24 toByte, x>>16 toByte, x>>8 toByte, x toByte)
final def long2Bytes(x: BigInt): Array[Byte] = {
val bytes1: Array[Byte] = x.toByteArray
val bytes2: Array[Byte] = Stream.continually(0.toByte).take(8 - bytes1.length).toArray
(bytes2 ++ bytes1) // XXX
}

final def index2RowId(index: Long): Text = new Text(long2Bytes(index))
final def index2RowId(index: BigInt): Text = {
new Text(long2Bytes(index))
}

def encode[K](id: LayerId, key: K, index: Long): Key =
def encode[K](id: LayerId, key: K, index: BigInt): Key =
new Key(index2RowId(index), columnFamily(id))

def getLocalityGroups(id: LayerId): Seq[String] = Seq(columnFamily(id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object AccumuloUtils {
* Mapping KeyBounds of Extent to SFC ranges will often result in a set of non-contigrious ranges.
* The indices exluded by these ranges should not be included in split calculation as they will never be seen.
*/
def getSplits[K](kb: KeyBounds[K], ki: KeyIndex[K], count: Int): Seq[Long] =
def getSplits[K](kb: KeyBounds[K], ki: KeyIndex[K], count: Int): Seq[BigInt] =
KeyIndex.breaks(kb, ki, count)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class AccumuloValueReader(
val attributeStore: AttributeStore
) extends OverzoomingValueReader {

val rowId = (index: Long) => new Text(AccumuloKeyEncoder.long2Bytes(index))
val rowId = (index: BigInt) => new Text(AccumuloKeyEncoder.long2Bytes(index))

def reader[K: AvroRecordCodec: JsonFormat: ClassTag, V: AvroRecordCodec](layerId: LayerId): Reader[K, V] = new Reader[K, V] {
val header = attributeStore.readHeader[AccumuloLayerHeader](layerId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package geotrellis.spark.io.cassandra;

import com.datastax.driver.core.DataType;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.TypeCodec.PrimitiveLongCodec;

import java.nio.ByteBuffer;

import java.math.BigInteger;
import java.util.Arrays;


/*
* This coded is used for backward compatibilty only.
*
* @author James McClain
*/
public class BigIntegerIffBigint extends TypeCodec<BigInteger> {

public static final BigIntegerIffBigint instance = new BigIntegerIffBigint();
private static final PrimitiveLongCodec _instance = TypeCodec.bigint();

private BigIntegerIffBigint() {
super(DataType.bigint(), BigInteger.class);
}

@Override
public ByteBuffer serialize(BigInteger value, ProtocolVersion protocolVersion) {
return _instance.serialize(value.longValue(), protocolVersion);
}

@Override
public BigInteger deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) {
return BigInteger.valueOf(_instance.deserialize(bytes, protocolVersion));
}

@Override
public String format(BigInteger value) {
return _instance.format(value.longValue());
}

@Override
public BigInteger parse(String value) {
return BigInteger.valueOf(_instance.parse(value));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ import com.typesafe.config.ConfigFactory
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import java.math.BigInteger


object CassandraCollectionReader {
def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag](
instance: CassandraInstance,
keyspace: String,
table: String,
layerId: LayerId,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.collection.read")
Expand All @@ -64,8 +67,8 @@ object CassandraCollectionReader {
instance.withSessionDo { session =>
val statement = session.prepare(query)

LayerReader.njoin[K, V](ranges.toIterator, threads){ index: Long =>
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
LayerReader.njoin[K, V](ranges.toIterator, threads){ index: BigInt =>
val row = session.execute(statement.bind(index: BigInteger))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.util.Try
import java.net.URI

object CassandraInstance {
var bigIntegerRegistered: Boolean = false

def apply(uri: URI): CassandraInstance = {
import geotrellis.util.UriUtils._

Expand Down Expand Up @@ -66,6 +68,16 @@ trait CassandraInstance extends Serializable {
@transient lazy val cluster = getCluster
@transient lazy val session = cluster.connect()

def registerBigInteger(): Unit = {
if (!CassandraInstance.bigIntegerRegistered) {
cluster
.getConfiguration()
.getCodecRegistry()
.register(BigIntegerIffBigint.instance)
CassandraInstance.bigIntegerRegistered = true
}
}

def ensureKeyspaceExists(keyspace: String, session: Session): Unit =
session.execute(s"create keyspace if not exists ${keyspace} with replication = {'class': '${replicationStrategy}', 'replication_factor': ${replicationFactor} }")

Expand Down Expand Up @@ -107,7 +119,9 @@ case class BaseCassandraInstance(
replicationFactor: Int = Cassandra.cfg.getInt("replicationFactor"),
localDc: String = Cassandra.cfg.getString("localDc"),
usedHostsPerRemoteDc: Int = Cassandra.cfg.getInt("usedHostsPerRemoteDc"),
allowRemoteDCsForLocalConsistencyLevel: Boolean = Cassandra.cfg.getBoolean("allowRemoteDCsForLocalConsistencyLevel")) extends CassandraInstance
allowRemoteDCsForLocalConsistencyLevel: Boolean = Cassandra.cfg.getBoolean("allowRemoteDCsForLocalConsistencyLevel")) extends CassandraInstance {
registerBigInteger()
}

object Cassandra {
lazy val cfg = ConfigFactory.load().getConfig("geotrellis.cassandra")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}

import scala.collection.JavaConversions._

import java.math.BigInteger


class CassandraLayerDeleter(val attributeStore: AttributeStore, instance: CassandraInstance) extends LazyLogging with LayerDeleter[LayerId] {

def delete(id: LayerId): Unit = {
Expand All @@ -45,7 +48,7 @@ class CassandraLayerDeleter(val attributeStore: AttributeStore, instance: Cassan
val statement = session.prepare(dquery)

session.execute(squery).all().map { entry =>
session.execute(statement.bind(entry.getLong("key").asInstanceOf[java.lang.Long]))
session.execute(statement.bind(entry.getVarint("key")))
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.typesafe.config.ConfigFactory
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import java.math.BigInteger

object CassandraRDDReader {
final val DefaultThreadCount =
Expand All @@ -46,7 +47,7 @@ object CassandraRDDReader {
table: String,
layerId: LayerId,
queryKeyBounds: Seq[KeyBounds[K]],
decomposeBounds: KeyBounds[K] => Seq[(Long, Long)],
decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None,
Expand All @@ -73,13 +74,13 @@ object CassandraRDDReader {
.toString

sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
.mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] =>
instance.withSession { session =>
val statement = session.prepare(query)

val result = partition map { seq =>
LayerReader.njoin[K, V](seq.iterator, threads) { index: Long =>
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
LayerReader.njoin[K, V](seq.iterator, threads) { index: BigInt =>
val row = session.execute(statement.bind(index: BigInteger))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import java.util.concurrent.Executors

import scala.collection.JavaConversions._

import java.math.BigInteger


object CassandraRDDWriter {
final val DefaultThreadCount =
Expand All @@ -50,7 +52,7 @@ object CassandraRDDWriter {
rdd: RDD[(K, V)],
instance: CassandraInstance,
layerId: LayerId,
decomposeKey: K => Long,
decomposeKey: K => BigInt,
keyspace: String,
table: String,
threads: Int = DefaultThreadCount
Expand All @@ -60,7 +62,7 @@ object CassandraRDDWriter {
raster: RDD[(K, V)],
instance: CassandraInstance,
layerId: LayerId,
decomposeKey: K => Long,
decomposeKey: K => BigInt,
keyspace: String,
table: String,
writerSchema: Option[Schema],
Expand All @@ -75,7 +77,7 @@ object CassandraRDDWriter {
instance.ensureKeyspaceExists(keyspace, session)
session.execute(
SchemaBuilder.createTable(keyspace, table).ifNotExists()
.addPartitionKey("key", bigint)
.addPartitionKey("key", varint)
.addClusteringColumn("name", text)
.addClusteringColumn("zoom", cint)
.addColumn("value", blob)
Expand Down Expand Up @@ -112,7 +114,7 @@ object CassandraRDDWriter {
val readStatement = session.prepare(readQuery)
val writeStatement = session.prepare(writeQuery)

val rows: Process[Task, (java.lang.Long, Vector[(K,V)])] =
val rows: Process[Task, (BigInt, Vector[(K,V)])] =
Process.unfold(partition)({ iter =>
if (iter.hasNext) {
val record = iter.next()
Expand All @@ -122,12 +124,12 @@ object CassandraRDDWriter {

val pool = Executors.newFixedThreadPool(threads)

def elaborateRow(row: (java.lang.Long, Vector[(K,V)])): Process[Task, (java.lang.Long, Vector[(K,V)])] = {
def elaborateRow(row: (BigInt, Vector[(K,V)])): Process[Task, (BigInt, Vector[(K,V)])] = {
Process eval Task ({
val (key, kvs1) = row
val kvs2 =
if (mergeFunc.nonEmpty) {
val oldRow = session.execute(readStatement.bind(key))
val oldRow = session.execute(readStatement.bind(key: BigInteger))
if (oldRow.nonEmpty) {
val bytes = oldRow.one().getBytes("value").array()
AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
Expand All @@ -148,18 +150,18 @@ object CassandraRDDWriter {
})(pool)
}

def rowToBytes(row: (java.lang.Long, Vector[(K,V)])): Process[Task, (java.lang.Long, ByteBuffer)] = {
def rowToBytes(row: (BigInt, Vector[(K,V)])): Process[Task, (BigInt, ByteBuffer)] = {
Process eval Task({
val (key, kvs) = row
val bytes = ByteBuffer.wrap(AvroEncoder.toBinary(kvs)(codec))
(key, bytes)
})(pool)
}

def retire(row: (java.lang.Long, ByteBuffer)): Process[Task, ResultSet] = {
def retire(row: (BigInt, ByteBuffer)): Process[Task, ResultSet] = {
val (id, value) = row
Process eval Task({
session.execute(writeStatement.bind(id, value))
session.execute(writeStatement.bind(id: BigInteger, value))
})(pool)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import spray.json._
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import java.math.BigInteger


class CassandraValueReader(
instance: CassandraInstance,
val attributeStore: AttributeStore
Expand All @@ -50,7 +53,7 @@ class CassandraValueReader(
.and(eqs("zoom", layerId.zoom))
)

val row = session.execute(statement.bind(keyIndex.toIndex(key).asInstanceOf[java.lang.Long])).all()
val row = session.execute(statement.bind(keyIndex.toIndex(key): BigInteger)).all()
val tiles = row.map { entry =>
AvroEncoder.fromBinary(writerSchema, entry.getBytes("value").array())(codec)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package geotrellis.spark.io

import java.math.BigInteger

package object cassandra {
implicit def bigToBig(i: BigInt): BigInteger = {
new BigInteger(i.toByteArray)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ class ShardingKeyIndex[K](val inner: KeyIndex[K], val shardCount: Int) extends K
* prefixWithShard(i, s) == 8070450532247928869
* }}}
*/
private def prefixWithShard(i: Long, shard: Long): Long =
private def prefixWithShard(i: BigInt, shard: Long): BigInt =
(shard << 60) | i

/* Necessary for extending `KeyIndex` */
def toIndex(key: K): Long = {
val i: Long = inner.toIndex(key)
val shard: Long = i % shardCount /* Shard prefix between 0 and 7 */
def toIndex(key: K): BigInt = {
val i: BigInt = inner.toIndex(key)
val shard: Long = (i % shardCount).toLong /* Shard prefix between 0 and 7 */

prefixWithShard(inner.toIndex(key), shard)
}

/* Necessary for extending `KeyIndex` */
def indexRanges(keyRange: (K, K)): Seq[(Long, Long)] = {
def indexRanges(keyRange: (K, K)): Seq[(BigInt, BigInt)] = {
inner
.indexRanges(keyRange)
.flatMap({ case (i1, i2) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ class ZVoxelKeyIndex(val keyBounds: KeyBounds[VoxelKey]) extends KeyIndex[VoxelK
/* ''Z3'' here is a convenient shorthand for any 3-dimensional key. */
private def toZ(k: VoxelKey): Z3 = Z3(k.x, k.y, k.z)

def toIndex(k: VoxelKey): Long = toZ(k).z
def toIndex(k: VoxelKey): BigInt = toZ(k).z

def indexRanges(keyRange: (VoxelKey, VoxelKey)): Seq[(Long, Long)] =
def indexRanges(keyRange: (VoxelKey, VoxelKey)): Seq[(BigInt, BigInt)] =
Z3.zranges(toZ(keyRange._1), toZ(keyRange._2))
}

Expand Down
12 changes: 12 additions & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
Changelog
=========

2.0.0
-----

API Changes
^^^^^^^^^^^

- ``geotrellis.spark``

- **Change:** The length of the key (the space-filling curve index or address) used for layer reading and writing has
been extended from a fixed length of 8 bytes to an arbitrary length. This change affects not only the
``geotrellis.spark`` package, but all backends (excluding ``geotrellis.geowave`` and ``geotrellis.geomesa``).

1.2.0
-----
*2017 Nov 6*
Expand Down
Loading