From 62283e0e431fbc8101354cb6c161207c683a4c4d Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 09:33:26 -0400 Subject: [PATCH 01/25] Remove Excess Whitespace --- .../geotrellis/spark/io/index/zcurve/Z2.scala | 32 +++++++------- .../geotrellis/spark/io/index/zcurve/Z3.scala | 44 +++++++++---------- .../spark/rdd/FilteredCartesianRDD.scala | 2 +- .../spark/io/index/zcurve/Z3RangeSpec.scala | 2 +- 4 files changed, 40 insertions(+), 40 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala index 7f6b3d1a18..cf2818781f 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala @@ -29,8 +29,8 @@ class Z2(val z: Long) extends AnyVal { def == (other: Z2) = other.z == z - def decode: (Int, Int) = { - ( combine(z), combine(z>>1) ) + def decode: (Int, Int) = { + ( combine(z), combine(z>>1) ) } def dim(i: Int) = Z2.combine(z >> i) @@ -48,14 +48,14 @@ class Z2(val z: Long) extends AnyVal { override def toString = f"$z ${decode}" } -object Z2 { +object Z2 { final val MAX_BITS = 31 final val MAX_MASK = 0x7fffffff // ignore the sign bit, using it breaks < relationship final val MAX_DIM = 2 /** insert 0 between every bit in value. Only first 31 bits can be considred. */ def split(value: Long): Long = { - var x: Long = value & MAX_MASK + var x: Long = value & MAX_MASK x = (x ^ (x << 32)) & 0x00000000ffffffffL x = (x ^ (x << 16)) & 0x0000ffff0000ffffL x = (x ^ (x << 8)) & 0x00ff00ff00ff00ffL // 11111111000000001111111100000000.. @@ -72,30 +72,30 @@ object Z2 { x = (x ^ (x >> 2)) & 0x0f0f0f0f0f0f0f0fL; x = (x ^ (x >> 4)) & 0x00ff00ff00ff00ffL; x = (x ^ (x >> 8)) & 0x0000ffff0000ffffL; - x = (x ^ (x >> 16)) & 0x00000000ffffffffL; + x = (x ^ (x >> 16)) & 0x00000000ffffffffL; x.toInt } /** * Bits of x and y will be encoded as ....y1x1y0x0 */ - def apply(x: Int, y: Int): Z2 = - new Z2(split(x) | split(y) << 1) + def apply(x: Int, y: Int): Z2 = + new Z2(split(x) | split(y) << 1) - def unapply(z: Z2): Option[(Int, Int)] = + def unapply(z: Z2): Option[(Int, Int)] = Some(z.decode) - + def zdivide(p: Z2, rmin: Z2, rmax: Z2): (Z2, Z2) = { val (litmax,bigmin) = zdiv(load, MAX_DIM)(p.z, rmin.z, rmax.z) (new Z2(litmax), new Z2(bigmin)) } - + /** Loads either 1000... or 0111... into starting at given bit index of a given dimention */ - def load(target: Long, p: Long, bits: Int, dim: Int): Long = { + def load(target: Long, p: Long, bits: Int, dim: Int): Long = { val mask = ~(Z2.split(MAX_MASK >> (MAX_BITS-bits)) << dim) val wiped = target & mask wiped | (split(p) << dim) - } + } /** Recurse down the quad-tree and report all z-ranges which are contained in the rectangle defined by the min and max points */ def zranges(min: Z2, max: Z2): Seq[(Long, Long)] = { @@ -105,12 +105,12 @@ object Z2 { var recCounter = 0 var reportCounter = 0 - def _zranges(prefix: Long, offset: Int, quad: Long): Unit = { + def _zranges(prefix: Long, offset: Int, quad: Long): Unit = { recCounter += 1 val min: Long = prefix | (quad << offset) // QR + 000.. val max: Long = min | (1L << offset) - 1 // QR + 111.. - + val qr = Z2Range(new Z2(min), new Z2(max)) if (sr contains qr){ // whole range matches, happy day mq += (qr.min.z, qr.max.z) @@ -119,13 +119,13 @@ object Z2 { _zranges(min, offset - MAX_DIM, 0) _zranges(min, offset - MAX_DIM, 1) _zranges(min, offset - MAX_DIM, 2) - _zranges(min, offset - MAX_DIM, 3) + _zranges(min, offset - MAX_DIM, 3) //let our children punt on each subrange } } var prefix: Long = 0 - var offset = MAX_BITS*MAX_DIM + var offset = MAX_BITS*MAX_DIM _zranges(prefix, offset, 0) // the entire space mq.toSeq } diff --git a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala index f3730d5c04..cd48607e4d 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala @@ -30,7 +30,7 @@ class Z3(val z: Long) extends AnyVal { def == (other: Z3) = other.z == z def decode: (Int, Int, Int) = { - ( combine(z), combine(z >> 1), combine(z >> 2) ) + ( combine(z), combine(z >> 1), combine(z >> 2) ) } def dim(i: Int) = Z3.combine(z >> i) @@ -42,7 +42,7 @@ class Z3(val z: Long) extends AnyVal { y >= rmin.dim(1) && y <= rmax.dim(1) && z >= rmin.dim(2) && - z <= rmax.dim(2) + z <= rmax.dim(2) } def mid(p: Z3): Z3 = { @@ -70,16 +70,16 @@ object Z3 { x = (x | x << 16) & 0x1f0000ff0000ffL x = (x | x << 8) & 0x100f00f00f00f00fL x = (x | x << 4) & 0x10c30c30c30c30c3L - (x | x << 2) & 0x1249249249249249L + (x | x << 2) & 0x1249249249249249L } /** combine every third bit to form a value. Maximum value is 21 bits. */ def combine(z: Long): Int = { var x = z & 0x1249249249249249L x = (x ^ (x >> 2)) & 0x10c30c30c30c30c3L - x = (x ^ (x >> 4)) & 0x100f00f00f00f00fL - x = (x ^ (x >> 8)) & 0x1f0000ff0000ffL - x = (x ^ (x >> 16)) & 0x1f00000000ffffL + x = (x ^ (x >> 4)) & 0x100f00f00f00f00fL + x = (x ^ (x >> 8)) & 0x1f0000ff0000ffL + x = (x ^ (x >> 16)) & 0x1f00000000ffffL x = (x ^ (x >> 32)) & MAX_MASK x.toInt } @@ -93,20 +93,20 @@ object Z3 { new Z3(split(x) | split(y) << 1 | split(z) << 2) } - def unapply(z: Z3): Option[(Int, Int, Int)] = + def unapply(z: Z3): Option[(Int, Int, Int)] = Some(z.decode) def zdivide(p: Z3, rmin: Z3, rmax: Z3): (Z3, Z3) = { val (litmax,bigmin) = zdiv(load, MAX_DIM)(p.z, rmin.z, rmax.z) (new Z3(litmax), new Z3(bigmin)) } - + /** Loads either 1000... or 0111... into starting at given bit index of a given dimention */ - def load(target: Long, p: Long, bits: Int, dim: Int): Long = { + def load(target: Long, p: Long, bits: Int, dim: Int): Long = { val mask = ~(Z3.split(MAX_MASK >> (MAX_BITS-bits)) << dim) val wiped = target & mask wiped | (split(p) << dim) - } + } /** Recurse down the oct-tree and report all z-ranges which are contained in the cube defined by the min and max points */ def zranges(min: Z3, max: Z3): Seq[(Long, Long)] = { @@ -116,30 +116,30 @@ object Z3 { var recCounter = 0 var reportCounter = 0 - def _zranges(prefix: Long, offset: Int, quad: Long): Unit = { + def _zranges(prefix: Long, offset: Int, quad: Long): Unit = { recCounter += 1 val min: Long = prefix | (quad << offset) // QR + 000.. val max: Long = min | (1L << offset) - 1 // QR + 111.. val qr = Z3Range(new Z3(min), new Z3(max)) - if (sr contains qr){ // whole range matches, happy day - mq += (qr.min.z, qr.max.z) + if (sr contains qr){ // whole range matches, happy day + mq += (qr.min.z, qr.max.z) reportCounter +=1 - } else if (offset > 0 && (sr overlaps qr)) { // some portion of this range are excluded + } else if (offset > 0 && (sr overlaps qr)) { // some portion of this range are excluded _zranges(min, offset - MAX_DIM, 0) _zranges(min, offset - MAX_DIM, 1) _zranges(min, offset - MAX_DIM, 2) - _zranges(min, offset - MAX_DIM, 3) - _zranges(min, offset - MAX_DIM, 4) - _zranges(min, offset - MAX_DIM, 5) - _zranges(min, offset - MAX_DIM, 6) - _zranges(min, offset - MAX_DIM, 7) + _zranges(min, offset - MAX_DIM, 3) + _zranges(min, offset - MAX_DIM, 4) + _zranges(min, offset - MAX_DIM, 5) + _zranges(min, offset - MAX_DIM, 6) + _zranges(min, offset - MAX_DIM, 7) //let our children punt on each subrange - } + } } - + var prefix: Long = 0 - var offset = MAX_BITS*MAX_DIM + var offset = MAX_BITS*MAX_DIM _zranges(prefix, offset, 0) // the entire space mq.toSeq } diff --git a/spark/src/main/scala/org/apache/spark/rdd/FilteredCartesianRDD.scala b/spark/src/main/scala/org/apache/spark/rdd/FilteredCartesianRDD.scala index c0c53616fe..bddbd81bd0 100644 --- a/spark/src/main/scala/org/apache/spark/rdd/FilteredCartesianRDD.scala +++ b/spark/src/main/scala/org/apache/spark/rdd/FilteredCartesianRDD.scala @@ -70,7 +70,7 @@ sealed class FilteredCartesianRDD[T: ClassTag, U: ClassTag, V: ClassTag]( val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { val idx = s1.index * numPartitionsInRdd2 + s2.index - array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) + array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array } diff --git a/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala b/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala index 0074c063f7..a1f48235e9 100644 --- a/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala +++ b/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala @@ -44,7 +44,7 @@ class Z3RangeSpec extends FunSpec with Matchers { z <- z1 to z2 y <- y1 to y2 x <- x1 to x2 - } { + } { expectedSet = expectedSet + Tuple3(x,y,z) } From c8db087727629d0b2c224f70bfb4233d3b302b63 Mon Sep 17 00:00:00 2001 From: James McClain Date: Thu, 15 Jun 2017 18:18:52 -0400 Subject: [PATCH 02/25] Extend Key Length in KeyIndex.scala The range length is still expressed as a `Double`, the maximum bin size is still a `Long`. Signed-off-by: James McClain --- .../geotrellis/spark/io/index/KeyIndex.scala | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala index f11827b067..2197339333 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala @@ -21,37 +21,42 @@ import geotrellis.spark.KeyBounds trait KeyIndex[K] extends Serializable { /** Some(keybounds) if the indexed space is bounded; None if it is unbounded */ def keyBounds: KeyBounds[K] - def toIndex(key: K): Long - def indexRanges(keyRange: (K, K)): Seq[(Long, Long)] + def toIndex(key: K): BigInt + def indexRanges(keyRange: (K, K)): Seq[(BigInt, BigInt)] } object KeyIndex { /** - * Mapping KeyBounds of Extent to SFC ranges will often result in a set of non-contigrious ranges. - * The indices excluded by these ranges should not be included in breaks calculation as they will never be seen. + * Mapping KeyBounds of Extent to SFC ranges will often result in a + * set of non-contigrious ranges. The indices excluded by these + * ranges should not be included in breaks calculation as they will + * never be seen. */ - def breaks[K](kb: KeyBounds[K], ki: KeyIndex[K], count: Int): Vector[Long] = { + def breaks[K](kb: KeyBounds[K], ki: KeyIndex[K], count: Int): Vector[BigInt] = { breaks(ki.indexRanges(kb), count) } /** - * Divide the space covered by ranges as evenly as possible by providing break points from the ranges. - * All break points will be from the ranges given and never from spaces between the ranges. + * Divide the space covered by ranges as evenly as possible by + * providing break points from the ranges. All break points will + * be from the ranges given and never from spaces between the + * ranges. * * @param ranges sorted list of tuples which represent non-negative, non-intersecting ranges. * @param count desired number of break points */ - def breaks(ranges: Seq[(Long, Long)], count: Int): Vector[Long] = { + def breaks(ranges: Seq[(BigInt, BigInt)], count: Int): Vector[BigInt] = { require(count > 0, "breaks count must be at least one") - def len(r: (Long, Long)) = r._2 - r._1 + 1l - val total: Double = ranges.foldLeft(0l)(_ + len(_)) - val maxBinSize = math.max(math.ceil(total / (count+1)), 1).toLong + def len(r: (BigInt, BigInt)): Double = (r._2 - r._1).toDouble + 1 + val total: Double = ranges.foldLeft(0.0)(_ + len(_)) + val maxBinSize: Long = math.max(math.ceil(total /count+1), 1).toLong - def take(range: (Long, Long), count: Long): Long = { + def take(range: (BigInt, BigInt), count: Long): Long = { if (len(range) >= count) count - else len(range) + else len(range).toLong } - ranges.foldLeft((Vector.empty[Long], maxBinSize)) { case ((_breaks, _roomLeft), range) => + + ranges.foldLeft((Vector.empty[BigInt], maxBinSize)) { case ((_breaks, _roomLeft), range) => var breaks = _breaks var roomLeft = _roomLeft var remainder = range From 7f9fd555ec9344ae1d8eef33e5ca5501cee47ee2 Mon Sep 17 00:00:00 2001 From: James McClain Date: Thu, 15 Jun 2017 18:51:46 -0400 Subject: [PATCH 03/25] Second-Level Fallout From Key Extension The range lengths are still expressed as a `Long` values. --- .../scala/geotrellis/spark/io/LayerReader.scala | 12 ++++++------ .../scala/geotrellis/spark/io/index/Index.scala | 5 ++--- .../geotrellis/spark/io/index/IndexRanges.scala | 10 +++++----- .../geotrellis/spark/io/index/MergeQueue.scala | 16 +++++++--------- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/LayerReader.scala b/spark/src/main/scala/geotrellis/spark/io/LayerReader.scala index 49dff1a2a0..37900adbd2 100644 --- a/spark/src/main/scala/geotrellis/spark/io/LayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/LayerReader.scala @@ -99,24 +99,24 @@ object LayerReader { apply(new URI(uri)) def njoin[K, V]( - ranges: Iterator[(Long, Long)], + ranges: Iterator[(BigInt, BigInt)], threads: Int - )(readFunc: Long => Vector[(K, V)]): Vector[(K, V)] = { + )(readFunc: BigInt => Vector[(K, V)]): Vector[(K, V)] = { val pool = Executors.newFixedThreadPool(threads) - val indices: Iterator[Long] = ranges.flatMap { case (start, end) => + val indices: Iterator[BigInt] = ranges.flatMap { case (start, end) => (start to end).toIterator } - val index: Process[Task, Long] = Process.unfold(indices) { iter => + val index: Process[Task, BigInt] = Process.unfold(indices) { iter => if (iter.hasNext) { - val index: Long = iter.next() + val index: BigInt = iter.next() Some(index, iter) } else None } - val readRecord: (Long => Process[Task, Vector[(K, V)]]) = { index => + val readRecord: (BigInt => Process[Task, Vector[(K, V)]]) = { index => Process eval Task { readFunc(index) } (pool) } diff --git a/spark/src/main/scala/geotrellis/spark/io/index/Index.scala b/spark/src/main/scala/geotrellis/spark/io/index/Index.scala index 962b1ebbdd..fc95dbee80 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/Index.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/Index.scala @@ -18,10 +18,9 @@ package geotrellis.spark.io.index object Index { /** Encode an index value as a string */ - def encode(index: Long, max: Int): String = + def encode(index: BigInt, max: Int): String = index.toString.reverse.padTo(max, '0').reverse /** The number of digits in this index */ - def digits(x: Long): Int = - if (x < 10) 1 else 1 + digits(x/10) + def digits(x: BigInt): Int = x.toString.length } diff --git a/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala b/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala index bfefb96f66..18629e7bc0 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala @@ -21,19 +21,19 @@ object IndexRanges { * Will attempt to bin ranges into buckets, each containing at least the average number of elements. * Trailing bins may be empty if the count is too high for number of ranges. */ - def bin(ranges: Seq[(Long, Long)], count: Int): Seq[Seq[(Long, Long)]] = { + def bin(ranges: Seq[(BigInt, BigInt)], count: Int): Seq[Seq[(BigInt, BigInt)]] = { var stack = ranges.toList - def len(r: (Long, Long)) = r._2 - r._1 + 1l - val total = ranges.foldLeft(0l) { (s, r) => s + len(r) } + def len(r: (BigInt, BigInt)): Long = (r._2 - r._1).toLong + 1l + val total: Long = ranges.foldLeft(0l) { (s, r) => s + len(r) } val binWidth = total / count + 1 - def splitRange(range: (Long, Long), take: Long): ((Long, Long), (Long, Long)) = { + def splitRange(range: (BigInt, BigInt), take: Long): ((BigInt, BigInt), (BigInt, BigInt)) = { assert(len(range) > take) (range._1, range._1 + take - 1) -> (range._1 + take, range._2) } - val arr = Array.fill(count)(Nil: List[(Long, Long)]) + val arr = Array.fill(count)(Nil: List[(BigInt, BigInt)]) var sum = 0l var i = 0 while (stack.nonEmpty) { diff --git a/spark/src/main/scala/geotrellis/spark/io/index/MergeQueue.scala b/spark/src/main/scala/geotrellis/spark/io/index/MergeQueue.scala index 20295d2225..ee24c977d4 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/MergeQueue.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/MergeQueue.scala @@ -21,16 +21,15 @@ import scala.collection.mutable object MergeQueue{ - def apply(ranges: TraversableOnce[(Long, Long)]): Seq[(Long, Long)] = { + def apply(ranges: TraversableOnce[(BigInt, BigInt)]): Seq[(BigInt, BigInt)] = { val q = new MergeQueue() ranges.foreach(range => q += range) q.toSeq } } -private class RangeComparator extends java.util.Comparator[(Long, Long)] { - - def compare(r1: (Long, Long), r2: (Long, Long)): Int = { +private class RangeComparator extends java.util.Comparator[(BigInt, BigInt)] { + def compare(r1: (BigInt, BigInt), r2: (BigInt, BigInt)): Int = { val retval = (r1._2 - r2._2) if (retval < 0) +1 else if (retval == 0) 0 @@ -53,16 +52,16 @@ class MergeQueue(initialSize: Int = 1) { /** * Add a range to the merge queue. */ - def +=(range: (Long, Long)): Unit = treeSet.add(range) + def +=(range: (BigInt, BigInt)): Unit = treeSet.add(range) /** * Return a list of merged intervals. */ - def toSeq: Seq[(Long, Long)] = { - var stack = List.empty[(Long, Long)] + def toSeq: Seq[(BigInt, BigInt)] = { + var stack = List.empty[(BigInt, BigInt)] // The TreeSet will be consumed in the process below, so do not // use the original. - val workingTreeSet = treeSet.clone.asInstanceOf[java.util.TreeSet[(Long,Long)]] + val workingTreeSet = treeSet.clone.asInstanceOf[java.util.TreeSet[(BigInt,BigInt)]] if (!workingTreeSet.isEmpty) stack = (workingTreeSet.pollFirst) +: stack while (!workingTreeSet.isEmpty) { @@ -81,5 +80,4 @@ class MergeQueue(initialSize: Int = 1) { stack } - } From bacdaf72211893abe0ba8a24d3db839163a2e209 Mon Sep 17 00:00:00 2001 From: James McClain Date: Thu, 15 Jun 2017 19:24:35 -0400 Subject: [PATCH 04/25] Extend Key Length in Partitioning Machinery Note: `Long.MaxValue` has been changed to `BigInt(-1)`. --- .../spark/partition/IndexPartitioner.scala | 22 ++++++++++--------- .../spark/partition/PartitionerIndex.scala | 12 +++++----- .../spark/partition/SpacePartitioner.scala | 4 ++-- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala b/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala index 788187f071..a20f8a616f 100644 --- a/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala +++ b/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala @@ -25,31 +25,33 @@ import org.apache.spark._ import org.apache.spark.rdd.{ShuffledRDD, RDD} import scala.collection.mutable.ArrayBuffer +import scala.collection.Searching._ import scala.reflect._ /** - * Uses KeyIndex to partition an RDD in memory, giving its records some spatial locality. - * When persisting an RDD partitioned by this partitioner we can safely assume that all records - * contributing to the same SFC index will reside in one partition. + * Uses KeyIndex to partition an RDD in memory, giving its records + * some spatial locality. When persisting an RDD partitioned by this + * partitioner we can safely assume that all records contributing to + * the same SFC index will reside in one partition. */ class IndexPartitioner[K](index: KeyIndex[K], count: Int) extends Partitioner { - val breaks: Array[Long] = + val breaks: Array[BigInt] = if (count > 1) KeyIndex.breaks(index.keyBounds, index, count - 1).sorted.toArray else - Array(Long.MaxValue) + Array(BigInt(-1)) def numPartitions = breaks.length + 1 /** - * Because breaks define divisions rather than breaks all indexable keys will have a bucket. - * Keys that are far out of bounds will be assigned to either first or last partition. + * Because breaks define divisions rather than breaks, all + * indexable keys will have a bucket. Keys that are far out of + * bounds will be assigned to either first or last partition. */ def getPartition(key: Any): Int = { val i = index.toIndex(key.asInstanceOf[K]) - val res = java.util.Arrays.binarySearch(breaks, i) - if (res >= 0) res // matched break exactly, partitions are inclusive on right - else -(res+1) // got an insertion point, convert it to corresponding partition + + breaks.search(i).insertionPoint // XXX requires Scala 2.11 or later } } diff --git a/spark/src/main/scala/geotrellis/spark/partition/PartitionerIndex.scala b/spark/src/main/scala/geotrellis/spark/partition/PartitionerIndex.scala index af2e2958cd..91cf973f05 100644 --- a/spark/src/main/scala/geotrellis/spark/partition/PartitionerIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/partition/PartitionerIndex.scala @@ -25,8 +25,8 @@ import geotrellis.spark.io.index.zcurve.{Z3, Z2, ZSpatialKeyIndex} * This many to one mapping forms spatially relate key blocks */ trait PartitionerIndex[K] extends Serializable { - def toIndex(key: K): Long - def indexRanges(keyRange: (K, K)): Seq[(Long, Long)] + def toIndex(key: K): BigInt + def indexRanges(keyRange: (K, K)): Seq[(BigInt, BigInt)] } object PartitionerIndex { @@ -38,9 +38,9 @@ object PartitionerIndex { implicit object SpatialPartitioner extends PartitionerIndex[SpatialKey] { private def toZ(key: SpatialKey): Z2 = Z2(key.col >> 4, key.row >> 4) - def toIndex(key: SpatialKey): Long = toZ(key).z + def toIndex(key: SpatialKey): BigInt = toZ(key).z - def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(Long, Long)] = + def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(BigInt, BigInt)] = Z2.zranges(toZ(keyRange._1), toZ(keyRange._2)) } @@ -51,9 +51,9 @@ object PartitionerIndex { implicit object SpaceTimePartitioner extends PartitionerIndex[SpaceTimeKey] { private def toZ(key: SpaceTimeKey): Z3 = Z3(key.col >> 4, key.row >> 4, key.time.getYear) - def toIndex(key: SpaceTimeKey): Long = toZ(key).z + def toIndex(key: SpaceTimeKey): BigInt = toZ(key).z - def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(Long, Long)] = + def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(BigInt, BigInt)] = Z3.zranges(toZ(keyRange._1), toZ(keyRange._2)) } } diff --git a/spark/src/main/scala/geotrellis/spark/partition/SpacePartitioner.scala b/spark/src/main/scala/geotrellis/spark/partition/SpacePartitioner.scala index fd229f7a67..9cf4ce46f1 100644 --- a/spark/src/main/scala/geotrellis/spark/partition/SpacePartitioner.scala +++ b/spark/src/main/scala/geotrellis/spark/partition/SpacePartitioner.scala @@ -30,7 +30,7 @@ import scala.reflect._ case class SpacePartitioner[K: Boundable: ClassTag](bounds: Bounds[K]) (implicit index: PartitionerIndex[K]) extends Partitioner { - val regions: Array[Long] = + val regions: Array[BigInt] = bounds match { case b: KeyBounds[K] => for { @@ -56,7 +56,7 @@ case class SpacePartitioner[K: Boundable: ClassTag](bounds: Bounds[K]) regions.indexOf(i) > -1 } - def regionIndex(region: Long): Option[Int] = { + def regionIndex(region: BigInt): Option[Int] = { // Note: Consider future design where region can overlap several partitions, would change Option -> List val i = regions.indexOf(region) if (i > -1) Some(i) else None From 688f5a79663cf54c0a81ced58d8215e60571a7c9 Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 09:35:13 -0400 Subject: [PATCH 05/25] Extend Key Length in ZCurve Machinery --- .../src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala | 2 +- .../src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala | 2 +- .../geotrellis/spark/io/index/zcurve/ZSpaceTimeKeyIndex.scala | 4 ++-- .../geotrellis/spark/io/index/zcurve/ZSpatialKeyIndex.scala | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala index cf2818781f..e6975ca1e3 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z2.scala @@ -98,7 +98,7 @@ object Z2 { } /** Recurse down the quad-tree and report all z-ranges which are contained in the rectangle defined by the min and max points */ - def zranges(min: Z2, max: Z2): Seq[(Long, Long)] = { + def zranges(min: Z2, max: Z2): Seq[(BigInt, BigInt)] = { val mq = new MergeQueue val sr = Z2Range(min, max) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala index cd48607e4d..8d489a3db1 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/Z3.scala @@ -109,7 +109,7 @@ object Z3 { } /** Recurse down the oct-tree and report all z-ranges which are contained in the cube defined by the min and max points */ - def zranges(min: Z3, max: Z3): Seq[(Long, Long)] = { + def zranges(min: Z3, max: Z3): Seq[(BigInt, BigInt)] = { var mq: MergeQueue = new MergeQueue val sr = Z3Range(min, max) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpaceTimeKeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpaceTimeKeyIndex.scala index bca4f20b10..51594e4471 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpaceTimeKeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpaceTimeKeyIndex.scala @@ -64,8 +64,8 @@ object ZSpaceTimeKeyIndex { class ZSpaceTimeKeyIndex(val keyBounds: KeyBounds[SpaceTimeKey], val temporalResolution: Long) extends KeyIndex[SpaceTimeKey] { private def toZ(key: SpaceTimeKey): Z3 = Z3(key.col, key.row, (key.instant / temporalResolution).toInt) - def toIndex(key: SpaceTimeKey): Long = toZ(key).z + def toIndex(key: SpaceTimeKey): BigInt = toZ(key).z - def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(Long, Long)] = + def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(BigInt, BigInt)] = Z3.zranges(toZ(keyRange._1), toZ(keyRange._2)) } diff --git a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndex.scala index 6877957527..42f03c6e31 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndex.scala @@ -23,8 +23,8 @@ import geotrellis.spark.io.index.zcurve._ class ZSpatialKeyIndex(val keyBounds: KeyBounds[SpatialKey]) extends KeyIndex[SpatialKey] { private def toZ(key: SpatialKey): Z2 = Z2(key.col, key.row) - def toIndex(key: SpatialKey): Long = toZ(key).z + def toIndex(key: SpatialKey): BigInt = toZ(key).z - def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(Long, Long)] = + def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(BigInt, BigInt)] = Z2.zranges(toZ(keyRange._1), toZ(keyRange._2)) } From 63ea1544a1102cf56cd1e6ee811eb1e5c5607ab9 Mon Sep 17 00:00:00 2001 From: James McClain Date: Fri, 16 Jun 2017 12:37:25 -0400 Subject: [PATCH 06/25] Extend Key Length in Hilbert Machinery --- .../io/index/hilbert/HilbertSpaceTimeKeyIndex.scala | 10 +++++----- .../io/index/hilbert/HilbertSpatialKeyIndex.scala | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpaceTimeKeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpaceTimeKeyIndex.scala index 4289fad776..3361fe2919 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpaceTimeKeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpaceTimeKeyIndex.scala @@ -77,7 +77,7 @@ class HilbertSpaceTimeKeyIndex( (if (bin == temporalBinCount) bin - 1 else bin).toLong } - def toIndex(key: SpaceTimeKey): Long = { + def toIndex(key: SpaceTimeKey): BigInt = { val bitVectors = Array( BitVectorFactories.OPTIMAL.apply(xResolution), @@ -95,11 +95,11 @@ class HilbertSpaceTimeKeyIndex( chc.index(bitVectors, 0, hilbertBitVector) - hilbertBitVector.toExactLong + BigInt(hilbertBitVector.toExactLong) } // Note: this function will happily index outside of the index keyBounds - def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(Long, Long)] = { + def indexRanges(keyRange: (SpaceTimeKey, SpaceTimeKey)): Seq[(BigInt, BigInt)] = { val ranges: java.util.List[LongRange] = List( //LongRange is exclusive on upper bound, adjusting for it here with + 1 LongRange.of(keyRange._1.spatialKey.col - minKey.col, keyRange._2.spatialKey.col - minKey.col + 1), @@ -132,12 +132,12 @@ class HilbertSpaceTimeKeyIndex( chc.accept(new ZoomingSpaceVisitorAdapter(chc, queryBuilder)) val filteredIndexRanges = queryBuilder.get.getFilteredIndexRanges val size = filteredIndexRanges.size - val result = Array.ofDim[(Long, Long)](size) + val result = Array.ofDim[(BigInt, BigInt)](size) cfor(0)(_ < size, _ + 1) { i => val range = filteredIndexRanges.get(i) // uzaygezen ranges are exclusive on the interval, GeoTrellis index ranges are inclusive, adjusting here. - result(i) = (range.getIndexRange.getStart, range.getIndexRange.getEnd - 1) + result(i) = (BigInt(range.getIndexRange.getStart), BigInt(range.getIndexRange.getEnd) - 1) } result diff --git a/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpatialKeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpatialKeyIndex.scala index 98ce36d6da..411a8133ff 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpatialKeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/hilbert/HilbertSpatialKeyIndex.scala @@ -62,7 +62,7 @@ class HilbertSpatialKeyIndex(val keyBounds: KeyBounds[SpatialKey], val xResoluti new CompactHilbertCurve(dimensionSpec) } - def toIndex(key: SpatialKey): Long = { + def toIndex(key: SpatialKey): BigInt = { val bitVectors = Array( BitVectorFactories.OPTIMAL.apply(xResolution), @@ -79,7 +79,7 @@ class HilbertSpatialKeyIndex(val keyBounds: KeyBounds[SpatialKey], val xResoluti hilbertBitVector.toExactLong } - def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(Long, Long)] = { + def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(BigInt, BigInt)] = { val ranges: java.util.List[LongRange] = List( //LongRange is exclusive on upper bound, adjusting for it here with + 1 @@ -112,12 +112,12 @@ class HilbertSpatialKeyIndex(val keyBounds: KeyBounds[SpatialKey], val xResoluti chc.accept(new ZoomingSpaceVisitorAdapter(chc, queryBuilder)) val filteredIndexRanges = queryBuilder.get.getFilteredIndexRanges val size = filteredIndexRanges.size - val result = Array.ofDim[(Long, Long)](size) + val result = Array.ofDim[(BigInt, BigInt)](size) cfor(0)(_ < size, _ + 1) { i => val range = filteredIndexRanges.get(i) //LongRange is exclusive on upper bound, adjusting for it here with - 1 - result(i) = (range.getIndexRange.getStart, range.getIndexRange.getEnd - 1) + result(i) = (BigInt(range.getIndexRange.getStart), BigInt(range.getIndexRange.getEnd) - 1) } result From 042894649cb8f9d24d2d8e538c488f205a4eaf48 Mon Sep 17 00:00:00 2001 From: James McClain Date: Fri, 16 Jun 2017 12:43:59 -0400 Subject: [PATCH 07/25] Extend Key Length in "Row Major" Machinery --- .../io/index/rowmajor/RowMajorSpatialKeyIndex.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/rowmajor/RowMajorSpatialKeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/rowmajor/RowMajorSpatialKeyIndex.scala index cd97b13592..f95c77aa2c 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/rowmajor/RowMajorSpatialKeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/rowmajor/RowMajorSpatialKeyIndex.scala @@ -27,20 +27,20 @@ class RowMajorSpatialKeyIndex(val keyBounds: KeyBounds[SpatialKey]) extends KeyI val minRow = keyBounds.minKey.row val layoutCols = keyBounds.maxKey.col - keyBounds.minKey.col + 1 - def toIndex(key: SpatialKey): Long = + def toIndex(key: SpatialKey): BigInt = toIndex(key.col, key.row) - def toIndex(col: Int, row: Int): Long = - (layoutCols * (row - minRow) + (col - minCol)).toLong + def toIndex(col: Int, row: Int): BigInt = + BigInt(layoutCols * (row - minRow) + (col - minCol)) - def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(Long, Long)] = { + def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(BigInt, BigInt)] = { val SpatialKey(colMin, rowMin) = keyRange._1 val SpatialKey(colMax, rowMax) = keyRange._2 val cols = colMax - colMin + 1 val rows = rowMax - rowMin - val result = Array.ofDim[(Long, Long)](rowMax - rowMin + 1) + val result = Array.ofDim[(BigInt, BigInt)](rowMax - rowMin + 1) cfor(0)(_ <= rows, _ + 1) { i => val row = rowMin + i From baf88325c8295ea2957c5755b2fc1cb63a3f948f Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 09:32:00 -0400 Subject: [PATCH 08/25] Allow Tests To Compile --- .../geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala | 2 +- .../spark/io/index/zcurve/ZSpatialKeyIndexSpec.scala | 2 +- .../geotrellis/spark/io/json/TestKeyIndexRegistrator.scala | 6 +++--- .../scala/geotrellis/spark/partition/TestImplicits.scala | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala b/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala index a1f48235e9..f6e117cc3f 100644 --- a/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala +++ b/spark/src/test/scala/geotrellis/spark/io/index/zcurve/Z3RangeSpec.scala @@ -33,7 +33,7 @@ class Z3RangeSpec extends FunSpec with Matchers { ranges foreach { case (min, max) => for (z <- min to max) { - val zobj = new Z3(z) + val zobj = new Z3(z.toLong) actualSet = actualSet + Tuple3(zobj.dim(0),zobj.dim(1),zobj.dim(2)) count += 1 } diff --git a/spark/src/test/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndexSpec.scala b/spark/src/test/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndexSpec.scala index a38669e193..5f76620494 100644 --- a/spark/src/test/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndexSpec.scala +++ b/spark/src/test/scala/geotrellis/spark/io/index/zcurve/ZSpatialKeyIndexSpec.scala @@ -74,7 +74,7 @@ class ZSpatialKeyIndexSpec extends FunSpec with Matchers { val zsk = new ZSpatialKeyIndex(keyBounds) //checked by hand 4x4 - var idx: Seq[(Long,Long)] = zsk.indexRanges((SpatialKey(0,0), SpatialKey(1,1))) + var idx: Seq[(BigInt, BigInt)] = zsk.indexRanges((SpatialKey(0,0), SpatialKey(1,1))) idx.length should be(1) idx(0)._1 should be(0) idx(0)._2 should be(3) diff --git a/spark/src/test/scala/geotrellis/spark/io/json/TestKeyIndexRegistrator.scala b/spark/src/test/scala/geotrellis/spark/io/json/TestKeyIndexRegistrator.scala index dc4cc66c48..7c46c7ae11 100644 --- a/spark/src/test/scala/geotrellis/spark/io/json/TestKeyIndexRegistrator.scala +++ b/spark/src/test/scala/geotrellis/spark/io/json/TestKeyIndexRegistrator.scala @@ -24,10 +24,10 @@ import spray.json._ import spray.json.DefaultJsonProtocol._ class TestKeyIndex(val keyBounds: KeyBounds[SpatialKey]) extends KeyIndex[SpatialKey] { - def toIndex(key: SpatialKey): Long = 1L + def toIndex(key: SpatialKey): BigInt = BigInt(1) - def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(Long, Long)] = - Seq((1L, 2L)) + def indexRanges(keyRange: (SpatialKey, SpatialKey)): Seq[(BigInt, BigInt)] = + Seq((BigInt(1), BigInt(2))) } class TestKeyIndexRegistrator extends KeyIndexRegistrator { diff --git a/spark/src/test/scala/geotrellis/spark/partition/TestImplicits.scala b/spark/src/test/scala/geotrellis/spark/partition/TestImplicits.scala index 5e97c5c710..96a1669aec 100644 --- a/spark/src/test/scala/geotrellis/spark/partition/TestImplicits.scala +++ b/spark/src/test/scala/geotrellis/spark/partition/TestImplicits.scala @@ -26,10 +26,10 @@ object TestImplicits { def rescale(key: SpatialKey): SpatialKey = SpatialKey(key.col/2, key.row/2) - override def toIndex(key: SpatialKey): Long = + override def toIndex(key: SpatialKey): BigInt = zCurveIndex.toIndex(rescale(key)) - override def indexRanges(r: (SpatialKey, SpatialKey)): Seq[(Long, Long)] = + override def indexRanges(r: (SpatialKey, SpatialKey)): Seq[(BigInt, BigInt)] = zCurveIndex.indexRanges((rescale(r._1), rescale(r._2))) } } From c896b6453c30f1f35a842190e4a6e0af29271a0d Mon Sep 17 00:00:00 2001 From: James McClain Date: Thu, 15 Jun 2017 18:58:48 -0400 Subject: [PATCH 09/25] Extend Key Length in File Layer Machinery --- .../geotrellis/spark/io/file/FileCollectionReader.scala | 6 +++--- .../scala/geotrellis/spark/io/file/FileRDDReader.scala | 8 ++++---- .../scala/geotrellis/spark/io/file/KeyPathGenerator.scala | 4 ++-- .../io/hadoop/formats/FilterMapFileInputFormat.scala | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/file/FileCollectionReader.scala b/spark/src/main/scala/geotrellis/spark/io/file/FileCollectionReader.scala index 9390362ca3..f68272fd9c 100644 --- a/spark/src/main/scala/geotrellis/spark/io/file/FileCollectionReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/file/FileCollectionReader.scala @@ -30,9 +30,9 @@ import java.io.File object FileCollectionReader { def read[K: AvroRecordCodec : Boundable, V: AvroRecordCodec]( - keyPath: Long => String, + keyPath: BigInt => String, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, threads: Int = ConfigFactory.load().getThreads("geotrellis.file.threads.collection.read")): Seq[(K, V)] = { @@ -47,7 +47,7 @@ object FileCollectionReader { val includeKey = (key: K) => KeyBounds.includeKey(queryKeyBounds, key)(boundable) val _recordCodec = KeyValueRecordCodec[K, V] - LayerReader.njoin[K, V](ranges.toIterator, threads) { index: Long => + LayerReader.njoin[K, V](ranges.toIterator, threads) { index: BigInt => val path = keyPath(index) if (new File(path).exists) { val bytes: Array[Byte] = Filesystem.slurp(path) diff --git a/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala b/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala index e9b4b2ec49..4242073c20 100644 --- a/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala @@ -33,9 +33,9 @@ import java.io.File object FileRDDReader { def read[K: AvroRecordCodec: Boundable, V: AvroRecordCodec]( - keyPath: Long => String, + keyPath: BigInt => String, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None, @@ -56,9 +56,9 @@ object FileRDDReader { val kwWriterSchema = KryoWrapper(writerSchema) // Avro Schema is not Serializable sc.parallelize(bins, bins.size) - .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => + .mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] => partition flatMap { seq => - LayerReader.njoin[K, V](seq.toIterator, threads) { index: Long => + LayerReader.njoin[K, V](seq.toIterator, threads) { index: BigInt => val path = keyPath(index) if (new File(path).exists) { val bytes: Array[Byte] = Filesystem.slurp(path) diff --git a/spark/src/main/scala/geotrellis/spark/io/file/KeyPathGenerator.scala b/spark/src/main/scala/geotrellis/spark/io/file/KeyPathGenerator.scala index 5c90223e12..4a7e883fce 100644 --- a/spark/src/main/scala/geotrellis/spark/io/file/KeyPathGenerator.scala +++ b/spark/src/main/scala/geotrellis/spark/io/file/KeyPathGenerator.scala @@ -26,8 +26,8 @@ object KeyPathGenerator { (key: K) => new File(f, Index.encode(keyIndex.toIndex(key), maxWidth)).getAbsolutePath } - def apply(catalogPath: String, layerPath: String, maxWidth: Int): Long => String = { + def apply(catalogPath: String, layerPath: String, maxWidth: Int): BigInt => String = { val f = new File(catalogPath, layerPath) - (index: Long) => new File(f, Index.encode(index, maxWidth)).getAbsolutePath + (index: BigInt) => new File(f, Index.encode(index, maxWidth)).getAbsolutePath } } diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala index f64573b37f..ba0901955c 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala @@ -65,7 +65,7 @@ object FilterMapFileInputFormat { try { in.next(minKey) } finally { in.close() } - minKey.get + BigInt(minKey.getBytes) } } @@ -208,7 +208,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWrit while(!break) { if(seek) { seek = false - if(key == null || key.get < seekKey.get) { + if(key == null || BigInt(key.getBytes) < BigInt(seekKey.getBytes)) { // We are seeking to the beginning of a new range. key = mapFile.getClosest(seekKey, nextValue).asInstanceOf[LongWritable] if(key == null) { @@ -230,10 +230,10 @@ class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWrit } if(!break) { - if (nextKey.get > currMaxIndex) { + if (BigInt(nextKey.getBytes) > currMaxIndex) { // Must be out of current index range. if(nextRangeIndex < ranges.size) { - if(!setNextIndexRange(nextKey.get)) { + if(!setNextIndexRange(BigInt(nextKey.getBytes))) { break = true more = false key = null From af2d0e95aa1d508fd87748f186857b01bc672ab5 Mon Sep 17 00:00:00 2001 From: James McClain Date: Fri, 16 Jun 2017 11:35:27 -0400 Subject: [PATCH 10/25] Extend Key Length in Hadoop Layer Machinery Note: `Long.MaxValue` has been changed to `BigInt(-1)`. --- .../io/hadoop/HadoopCollectionReader.scala | 11 ++-- .../spark/io/hadoop/HadoopRDDReader.scala | 10 ++-- .../spark/io/hadoop/HadoopRDDWriter.scala | 12 ++--- .../spark/io/hadoop/HadoopValueReader.scala | 6 +-- .../formats/FilterMapFileInputFormat.scala | 52 ++++++++++--------- .../spark/io/kryo/KryoRegistrator.scala | 1 + .../spark/partition/IndexPartitioner.scala | 2 +- 7 files changed, 50 insertions(+), 44 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala index 1c9bb68e1f..c201694b2e 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala @@ -49,10 +49,11 @@ class HadoopCollectionReader(maxOpenFiles: Int) { ](path: Path, conf: Configuration, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], indexFilterOnly: Boolean, writerSchema: Option[Schema] = None, - threads: Int = ConfigFactory.load().getThreads("geotrellis.hadoop.threads.collection.read")): Seq[(K, V)] = { + threads: Int = ConfigFactory.load().getThreads("geotrellis.hadoop.threads.collection.read") + ): Seq[(K, V)] = { if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)] val includeKey = (key: K) => KeyBounds.includeKey(queryKeyBounds, key) @@ -60,16 +61,16 @@ class HadoopCollectionReader(maxOpenFiles: Int) { val codec = KeyValueRecordCodec[K, V] - val pathRanges: Vector[(Path, Long, Long)] = + val pathRanges: Vector[(Path, BigInt, BigInt)] = FilterMapFileInputFormat.layerRanges(path, conf) - LayerReader.njoin[K, V](indexRanges, threads){ index: Long => + LayerReader.njoin[K, V](indexRanges, threads){ index: BigInt => val valueWritable = pathRanges .find { row => index >= row._2 && index <= row._3 } .map { case (p, _, _) => readers.get(p, path => new MapFile.Reader(path, conf)) } - .map(_.get(new LongWritable(index), new BytesWritable()).asInstanceOf[BytesWritable]) + .map(_.get(new BytesWritable(index.toByteArray), new BytesWritable()).asInstanceOf[BytesWritable]) .getOrElse { println(s"Index ${index} not found."); null } if (valueWritable == null) Vector.empty diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala index 319ba85bbd..ee4743da5a 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala @@ -48,9 +48,9 @@ object HadoopRDDReader extends LazyLogging { sc.newAPIHadoopRDD( inputConf, - classOf[SequenceFileInputFormat[LongWritable, BytesWritable]], - classOf[LongWritable], - classOf[BytesWritable] + classOf[SequenceFileInputFormat[BytesWritable, BytesWritable]], + classOf[BytesWritable], // key class + classOf[BytesWritable] // value class ) .flatMap { case (keyWritable, valueWritable) => AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(codec.schema), valueWritable.getBytes)(codec) @@ -63,7 +63,7 @@ object HadoopRDDReader extends LazyLogging { ]( path: Path, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], indexFilterOnly: Boolean, writerSchema: Option[Schema] = None) (implicit sc: SparkContext): RDD[(K, V)] = { @@ -86,7 +86,7 @@ object HadoopRDDReader extends LazyLogging { sc.newAPIHadoopRDD( inputConf, classOf[FilterMapFileInputFormat], - classOf[LongWritable], + classOf[BytesWritable], classOf[BytesWritable] ) .flatMap { case (keyWritable, valueWritable) => diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala index 4722cb29d3..abceab85fc 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala @@ -54,27 +54,27 @@ object HadoopRDDWriter extends LazyLogging { private var writer: MapFile.Writer = null // avoids creating a MapFile for empty partitions private var bytesRemaining = 0l - private def getWriter(firstIndex: Long) = { + private def getWriter(firstIndex: BigInt) = { val path = new Path(layerPath, f"part-r-${partition}%05d-${firstIndex}") bytesRemaining = blockSize - 16*1024 // buffer by 16BK for SEQ file overhead val writer = new MapFile.Writer( new Configuration, path.toString, - MapFile.Writer.keyClass(classOf[LongWritable]), + MapFile.Writer.keyClass(classOf[BytesWritable]), MapFile.Writer.valueClass(classOf[BytesWritable]), MapFile.Writer.compression(SequenceFile.CompressionType.NONE)) writer.setIndexInterval(indexInterval) writer } - def write(key: LongWritable, value: BytesWritable): Unit = { + def write(key: BytesWritable, value: BytesWritable): Unit = { val recordSize = 8 + value.getLength if (writer == null) { - writer = getWriter(key.get) + writer = getWriter(BigInt(key.getBytes)) } else if (bytesRemaining - recordSize < 0) { writer.close() - writer = getWriter(key.get) + writer = getWriter(BigInt(key.getBytes)) } writer.append(key, value) bytesRemaining -= recordSize @@ -219,7 +219,7 @@ object HadoopRDDWriter extends LazyLogging { val writer = new MultiMapWriter(layerPath, pid, blockSize, indexInterval) for ( (index, pairs) <- GroupConsecutiveIterator(iter)(r => keyIndex.toIndex(r._1))) { writer.write( - new LongWritable(index), + new BytesWritable(index.toByteArray), new BytesWritable(AvroEncoder.toBinary(pairs.toVector)(codec))) } writer.close() diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala index 5d328489e6..1b721ca837 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala @@ -53,11 +53,11 @@ class HadoopValueReader( val writerSchema = attributeStore.readSchema(layerId) val codec = KeyValueRecordCodec[K, V] - val ranges: Vector[(Path, Long, Long)] = + val ranges: Vector[(Path, BigInt, BigInt)] = FilterMapFileInputFormat.layerRanges(header.path, conf) def read(key: K): V = { - val index: Long = keyIndex.toIndex(key) + val index: BigInt = keyIndex.toIndex(key) val valueWritable: BytesWritable = ranges .find{ row => @@ -67,7 +67,7 @@ class HadoopValueReader( readers.get((layerId, path), _ => new MapFile.Reader(path, conf)) } .getOrElse(throw new ValueNotFoundError(key, layerId)) - .get(new LongWritable(index), new BytesWritable()) + .get(new BytesWritable(index.toByteArray), new BytesWritable()) .asInstanceOf[BytesWritable] if (valueWritable == null) throw new ValueNotFoundError(key, layerId) diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala index ba0901955c..d93c2b2bd6 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala @@ -32,9 +32,9 @@ object FilterMapFileInputFormat { // Define some key names for Hadoop configuration val FILTER_INFO_KEY = "geotrellis.spark.io.hadoop.filterinfo" - type FilterDefinition = Array[(Long, Long)] + type FilterDefinition = Array[(BigInt, BigInt)] - def layerRanges(layerPath: Path, conf: Configuration): Vector[(Path, Long, Long)] = { + def layerRanges(layerPath: Path, conf: Configuration): Vector[(Path, BigInt, BigInt)] = { val file = layerPath .getFileSystem(conf) .globStatus(new Path(layerPath, "*")) @@ -43,7 +43,7 @@ object FilterMapFileInputFormat { mapFileRanges(file, conf) } - def mapFileRanges(mapFiles: Seq[Path], conf: Configuration): Vector[(Path, Long, Long)] = { + def mapFileRanges(mapFiles: Seq[Path], conf: Configuration): Vector[(Path, BigInt, BigInt)] = { // finding the max index for each file would be very expensive. // it may not be present in the index file and will require: // - reading the index file @@ -54,14 +54,14 @@ object FilterMapFileInputFormat { // instead we assume that each map file runs from its min index to min index of next file val fileNameRx = ".*part-r-([0-9]+)-([0-9]+)$".r - def readStartingIndex(path: Path): Long = { + def readStartingIndex(path: Path): BigInt = { path.toString match { case fileNameRx(part, firstIndex) => - firstIndex.toLong + BigInt(firstIndex) case _ => val indexPath = new Path(path, "index") val in = new SequenceFile.Reader(conf, SequenceFile.Reader.file(indexPath)) - val minKey = new LongWritable + val minKey = new BytesWritable try { in.next(minKey) } finally { in.close() } @@ -72,21 +72,21 @@ object FilterMapFileInputFormat { mapFiles .map { file => readStartingIndex(file) -> file } .sortBy(_._1) - .foldRight(Vector.empty[(Path, Long, Long)]) { + .foldRight(Vector.empty[(Path, BigInt, BigInt)]) { case ((minIndex, fs), vec @ Vector()) => - (fs, minIndex, Long.MaxValue) +: vec + (fs, minIndex, BigInt(-1)) +: vec // XXX case ((minIndex, fs), vec) => (fs, minIndex, vec.head._2 - 1) +: vec } } } -class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWritable] { +class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWritable] { var _filterDefinition: Option[FilterMapFileInputFormat.FilterDefinition] = None - def createKey() = new LongWritable() + def createKey() = new BytesWritable() - def createKey(index: Long) = new LongWritable(index) + def createKey(index: BigInt) = new BytesWritable(index.toByteArray) def createValue() = new BytesWritable() @@ -102,8 +102,8 @@ class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWrit } /** - * Produce list of files that overlap our query region. - * This function will be called on the driver. + * Produce list of files that overlap our query region. This + * function will be called on the driver. */ override def listStatus(context: JobContext): java.util.List[FileStatus] = { @@ -131,35 +131,38 @@ class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWrit } override - def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[LongWritable, BytesWritable] = + def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[BytesWritable, BytesWritable] = new FilterMapFileRecordReader(getFilterDefinition(context.getConfiguration)) override protected def getFormatMinSplitSize(): Long = return SequenceFile.SYNC_INTERVAL - /** The map files are not meant to be split. Raster sequence files - * are written such that data files should only be one block large */ + /** + * The map files are not meant to be split. Raster sequence files + * are written such that data files should only be one large + * block. + */ override def isSplitable(context: JobContext, filename: Path): Boolean = false - class FilterMapFileRecordReader(filterDefinition: FilterMapFileInputFormat.FilterDefinition) extends RecordReader[LongWritable, BytesWritable] { + class FilterMapFileRecordReader(filterDefinition: FilterMapFileInputFormat.FilterDefinition) extends RecordReader[BytesWritable, BytesWritable] { private var mapFile: MapFile.Reader = null private var start: Long = 0L private var more: Boolean = true - private var key: LongWritable = null + private var key: BytesWritable = null private var value: BytesWritable = null private val ranges = filterDefinition - private var currMinIndex: Long = 0L - private var currMaxIndex: Long = 0L + private var currMinIndex: BigInt = BigInt(0) + private var currMaxIndex: BigInt = BigInt(0) private var nextRangeIndex: Int = 0 private var seek = false - private var seekKey: LongWritable = null + private var seekKey: BytesWritable = null - private def setNextIndexRange(index: Long = 0L): Boolean = { + private def setNextIndexRange(index: BigInt = BigInt(0)): Boolean = { if(nextRangeIndex >= ranges.length) { false } else { @@ -205,12 +208,13 @@ class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWrit val nextKey = createKey() val nextValue = createValue() var break = false + while(!break) { if(seek) { seek = false if(key == null || BigInt(key.getBytes) < BigInt(seekKey.getBytes)) { // We are seeking to the beginning of a new range. - key = mapFile.getClosest(seekKey, nextValue).asInstanceOf[LongWritable] + key = mapFile.getClosest(seekKey, nextValue).asInstanceOf[BytesWritable] if(key == null) { break = true more = false @@ -258,7 +262,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[LongWritable, BytesWrit } override - def getCurrentKey(): LongWritable = key + def getCurrentKey(): BytesWritable = key override def getCurrentValue(): BytesWritable = value diff --git a/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala b/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala index 3ba8b57c1b..e6848084b0 100644 --- a/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala +++ b/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala @@ -218,6 +218,7 @@ class KryoRegistrator extends SparkKryoRegistrator { kryo.register(classOf[org.apache.hadoop.io.BytesWritable]) kryo.register(classOf[org.apache.hadoop.io.LongWritable]) kryo.register(classOf[Array[org.apache.hadoop.io.LongWritable]]) + kryo.register(classOf[Array[org.apache.hadoop.io.BytesWritable]]) kryo.register(classOf[org.codehaus.jackson.node.BooleanNode]) kryo.register(classOf[org.codehaus.jackson.node.IntNode]) kryo.register(classOf[org.osgeo.proj4j.CoordinateReferenceSystem]) diff --git a/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala b/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala index a20f8a616f..55355b5e5f 100644 --- a/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala +++ b/spark/src/main/scala/geotrellis/spark/partition/IndexPartitioner.scala @@ -39,7 +39,7 @@ class IndexPartitioner[K](index: KeyIndex[K], count: Int) extends Partitioner { if (count > 1) KeyIndex.breaks(index.keyBounds, index, count - 1).sorted.toArray else - Array(BigInt(-1)) + Array(BigInt(-1)) // XXX def numPartitions = breaks.length + 1 From 0e9abd3eb6ad8bc5c6d8967a774bfbd28e8dc830 Mon Sep 17 00:00:00 2001 From: James McClain Date: Mon, 19 Jun 2017 11:17:36 -0400 Subject: [PATCH 11/25] Account For Long.MaxValue -> BigInt(-1) Change --- .../spark/io/hadoop/HadoopCollectionReader.scala | 5 ++++- .../geotrellis/spark/io/hadoop/HadoopValueReader.scala | 7 ++++--- .../spark/io/hadoop/formats/FilterMapFileInputFormat.scala | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala index c201694b2e..1691ff7692 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala @@ -43,6 +43,9 @@ class HadoopCollectionReader(maxOpenFiles: Int) { .removalListener[Path, MapFile.Reader] { case (_, v, _) => v.close() } .build[Path, MapFile.Reader] + private def predicate(row: (Path, BigInt, BigInt), index: BigInt): Boolean = + (index >= row._2) && ((index <= row._3) || (row._3 == -1)) + def read[ K: AvroRecordCodec: Boundable, V: AvroRecordCodec @@ -66,7 +69,7 @@ class HadoopCollectionReader(maxOpenFiles: Int) { LayerReader.njoin[K, V](indexRanges, threads){ index: BigInt => val valueWritable = pathRanges - .find { row => index >= row._2 && index <= row._3 } + .find(row => predicate(row, index)) .map { case (p, _, _) => readers.get(p, path => new MapFile.Reader(path, conf)) } diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala index 1b721ca837..edb841f3e4 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala @@ -47,6 +47,9 @@ class HadoopValueReader( .removalListener[(LayerId, Path), MapFile.Reader] { case (_, v, _) => v.close() } .build[(LayerId, Path), MapFile.Reader] + private def predicate(row: (Path, BigInt, BigInt), index: BigInt): Boolean = + (index >= row._2) && ((index <= row._3) || (row._3 == -1)) + def reader[K: AvroRecordCodec: JsonFormat: ClassTag, V: AvroRecordCodec](layerId: LayerId): Reader[K, V] = new Reader[K, V] { val header = attributeStore.readHeader[HadoopLayerHeader](layerId) val keyIndex = attributeStore.readKeyIndex[K](layerId) @@ -60,9 +63,7 @@ class HadoopValueReader( val index: BigInt = keyIndex.toIndex(key) val valueWritable: BytesWritable = ranges - .find{ row => - index >= row._2 && index <= row._3 - } + .find(row => predicate(row, index)) .map { case (path, _, _) => readers.get((layerId, path), _ => new MapFile.Reader(path, conf)) } diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala index d93c2b2bd6..65a12c592f 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala @@ -121,7 +121,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri .filter { case (file, iMin, iMax) => // both file ranges and query ranges are sorted, use in-sync traversal while (it.hasNext && it.head._2 < iMin) it.next - if (it.hasNext) iMin <= it.head._2 && it.head._1 <= iMax + if (it.hasNext) iMin <= it.head._2 && (iMax == -1 || it.head._1 <= iMax) else false } .map(_._1) From fbc066ef1324f79ca4a281ad167520f7308842e2 Mon Sep 17 00:00:00 2001 From: James McClain Date: Mon, 19 Jun 2017 13:09:17 -0400 Subject: [PATCH 12/25] Account For Behavioral Differences Account for differences between `LongWritable` and `BytesWritable`. --- .../io/hadoop/formats/FilterMapFileInputFormat.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala index 65a12c592f..d3e46c5318 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala @@ -61,7 +61,7 @@ object FilterMapFileInputFormat { case _ => val indexPath = new Path(path, "index") val in = new SequenceFile.Reader(conf, SequenceFile.Reader.file(indexPath)) - val minKey = new BytesWritable + val minKey = new BytesWritable(Array[Byte](0)) try { in.next(minKey) } finally { in.close() } @@ -84,11 +84,11 @@ object FilterMapFileInputFormat { class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWritable] { var _filterDefinition: Option[FilterMapFileInputFormat.FilterDefinition] = None - def createKey() = new BytesWritable() + def createKey() = new BytesWritable(Array[Byte](0)) def createKey(index: BigInt) = new BytesWritable(index.toByteArray) - def createValue() = new BytesWritable() + def createValue() = new BytesWritable def getFilterDefinition(conf: Configuration): FilterMapFileInputFormat.FilterDefinition = _filterDefinition match { @@ -234,10 +234,12 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri } if(!break) { - if (BigInt(nextKey.getBytes) > currMaxIndex) { + val nextKeyBytes: Array[Byte] = nextKey.getBytes.take(nextKey.getLength) + + if ((nextKeyBytes.length > 0) && (BigInt(nextKeyBytes) > currMaxIndex)) { // Must be out of current index range. if(nextRangeIndex < ranges.size) { - if(!setNextIndexRange(BigInt(nextKey.getBytes))) { + if(!setNextIndexRange(BigInt(nextKeyBytes))) { break = true more = false key = null From fc6f1b5c7372f923ea66b7c34454ada5156e9902 Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 09:29:23 -0400 Subject: [PATCH 13/25] BigIntWritable Instead of BytesWritable --- .../org/apache/hadoop/io/BigIntWritable.java | 44 +++++++++++++++++++ .../io/hadoop/HadoopCollectionReader.scala | 2 +- .../spark/io/hadoop/HadoopRDDReader.scala | 6 +-- .../spark/io/hadoop/HadoopRDDWriter.scala | 6 +-- .../spark/io/hadoop/HadoopValueReader.scala | 2 +- .../formats/FilterMapFileInputFormat.scala | 20 ++++----- .../spark/io/kryo/KryoRegistrator.scala | 4 +- 7 files changed, 64 insertions(+), 20 deletions(-) create mode 100644 spark/src/main/java/org/apache/hadoop/io/BigIntWritable.java diff --git a/spark/src/main/java/org/apache/hadoop/io/BigIntWritable.java b/spark/src/main/java/org/apache/hadoop/io/BigIntWritable.java new file mode 100644 index 0000000000..3dce56cdba --- /dev/null +++ b/spark/src/main/java/org/apache/hadoop/io/BigIntWritable.java @@ -0,0 +1,44 @@ +package org.apache.hadoop.io; + +import java.math.BigInteger; +import java.util.Arrays; +import org.apache.hadoop.io.*; + +/** + * @author James McClain + */ +public class BigIntWritable + extends BytesWritable + implements WritableComparable { + + public BigIntWritable() { + super(); + } + + public BigIntWritable(byte[] bytes) { + super(bytes); + } + + @Override + public int compareTo(BinaryComparable that) { + BigInteger left = new BigInteger(Arrays.copyOf(this.getBytes(), this.getLength())); + BigInteger right = new BigInteger(Arrays.copyOf(that.getBytes(), that.getLength())); + return left.compareTo(right); + } + + public static class Comparator extends WritableComparator { + public Comparator() { + super(BigIntWritable.class); + } + + @Override + public int compare(WritableComparable a, WritableComparable b) { + return a.compareTo(b); + } + } + + static { + WritableComparator.define(BigIntWritable.class, new Comparator()); + } + +} diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala index 1691ff7692..c909696c07 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopCollectionReader.scala @@ -73,7 +73,7 @@ class HadoopCollectionReader(maxOpenFiles: Int) { .map { case (p, _, _) => readers.get(p, path => new MapFile.Reader(path, conf)) } - .map(_.get(new BytesWritable(index.toByteArray), new BytesWritable()).asInstanceOf[BytesWritable]) + .map(_.get(new BigIntWritable(index.toByteArray), new BytesWritable()).asInstanceOf[BytesWritable]) .getOrElse { println(s"Index ${index} not found."); null } if (valueWritable == null) Vector.empty diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala index ee4743da5a..bfb2d62996 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDReader.scala @@ -48,8 +48,8 @@ object HadoopRDDReader extends LazyLogging { sc.newAPIHadoopRDD( inputConf, - classOf[SequenceFileInputFormat[BytesWritable, BytesWritable]], - classOf[BytesWritable], // key class + classOf[SequenceFileInputFormat[BigIntWritable, BytesWritable]], + classOf[BigIntWritable], // key class classOf[BytesWritable] // value class ) .flatMap { case (keyWritable, valueWritable) => @@ -86,7 +86,7 @@ object HadoopRDDReader extends LazyLogging { sc.newAPIHadoopRDD( inputConf, classOf[FilterMapFileInputFormat], - classOf[BytesWritable], + classOf[BigIntWritable], classOf[BytesWritable] ) .flatMap { case (keyWritable, valueWritable) => diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala index abceab85fc..9c13607968 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala @@ -61,14 +61,14 @@ object HadoopRDDWriter extends LazyLogging { new MapFile.Writer( new Configuration, path.toString, - MapFile.Writer.keyClass(classOf[BytesWritable]), + MapFile.Writer.keyClass(classOf[BigIntWritable]), MapFile.Writer.valueClass(classOf[BytesWritable]), MapFile.Writer.compression(SequenceFile.CompressionType.NONE)) writer.setIndexInterval(indexInterval) writer } - def write(key: BytesWritable, value: BytesWritable): Unit = { + def write(key: BigIntWritable, value: BytesWritable): Unit = { val recordSize = 8 + value.getLength if (writer == null) { writer = getWriter(BigInt(key.getBytes)) @@ -219,7 +219,7 @@ object HadoopRDDWriter extends LazyLogging { val writer = new MultiMapWriter(layerPath, pid, blockSize, indexInterval) for ( (index, pairs) <- GroupConsecutiveIterator(iter)(r => keyIndex.toIndex(r._1))) { writer.write( - new BytesWritable(index.toByteArray), + new BigIntWritable(index.toByteArray), new BytesWritable(AvroEncoder.toBinary(pairs.toVector)(codec))) } writer.close() diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala index edb841f3e4..e9f9f9c3ee 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopValueReader.scala @@ -68,7 +68,7 @@ class HadoopValueReader( readers.get((layerId, path), _ => new MapFile.Reader(path, conf)) } .getOrElse(throw new ValueNotFoundError(key, layerId)) - .get(new BytesWritable(index.toByteArray), new BytesWritable()) + .get(new BigIntWritable(index.toByteArray), new BytesWritable()) .asInstanceOf[BytesWritable] if (valueWritable == null) throw new ValueNotFoundError(key, layerId) diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala index d3e46c5318..9d78ac7da0 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/formats/FilterMapFileInputFormat.scala @@ -61,7 +61,7 @@ object FilterMapFileInputFormat { case _ => val indexPath = new Path(path, "index") val in = new SequenceFile.Reader(conf, SequenceFile.Reader.file(indexPath)) - val minKey = new BytesWritable(Array[Byte](0)) + val minKey = new BigIntWritable(Array[Byte](0)) try { in.next(minKey) } finally { in.close() } @@ -81,12 +81,12 @@ object FilterMapFileInputFormat { } } -class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWritable] { +class FilterMapFileInputFormat() extends FileInputFormat[BigIntWritable, BytesWritable] { var _filterDefinition: Option[FilterMapFileInputFormat.FilterDefinition] = None - def createKey() = new BytesWritable(Array[Byte](0)) + def createKey() = new BigIntWritable(Array[Byte](0)) - def createKey(index: BigInt) = new BytesWritable(index.toByteArray) + def createKey(index: BigInt) = new BigIntWritable(index.toByteArray) def createValue() = new BytesWritable @@ -131,7 +131,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri } override - def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[BytesWritable, BytesWritable] = + def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[BigIntWritable, BytesWritable] = new FilterMapFileRecordReader(getFilterDefinition(context.getConfiguration)) override @@ -147,11 +147,11 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri def isSplitable(context: JobContext, filename: Path): Boolean = false - class FilterMapFileRecordReader(filterDefinition: FilterMapFileInputFormat.FilterDefinition) extends RecordReader[BytesWritable, BytesWritable] { + class FilterMapFileRecordReader(filterDefinition: FilterMapFileInputFormat.FilterDefinition) extends RecordReader[BigIntWritable, BytesWritable] { private var mapFile: MapFile.Reader = null private var start: Long = 0L private var more: Boolean = true - private var key: BytesWritable = null + private var key: BigIntWritable = null private var value: BytesWritable = null private val ranges = filterDefinition @@ -160,7 +160,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri private var nextRangeIndex: Int = 0 private var seek = false - private var seekKey: BytesWritable = null + private var seekKey: BigIntWritable = null private def setNextIndexRange(index: BigInt = BigInt(0)): Boolean = { if(nextRangeIndex >= ranges.length) { @@ -214,7 +214,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri seek = false if(key == null || BigInt(key.getBytes) < BigInt(seekKey.getBytes)) { // We are seeking to the beginning of a new range. - key = mapFile.getClosest(seekKey, nextValue).asInstanceOf[BytesWritable] + key = mapFile.getClosest(seekKey, nextValue).asInstanceOf[BigIntWritable] if(key == null) { break = true more = false @@ -264,7 +264,7 @@ class FilterMapFileInputFormat() extends FileInputFormat[BytesWritable, BytesWri } override - def getCurrentKey(): BytesWritable = key + def getCurrentKey(): BigIntWritable = key override def getCurrentValue(): BytesWritable = value diff --git a/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala b/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala index e6848084b0..ef97eba050 100644 --- a/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala +++ b/spark/src/main/scala/geotrellis/spark/io/kryo/KryoRegistrator.scala @@ -216,8 +216,8 @@ class KryoRegistrator extends SparkKryoRegistrator { kryo.register(classOf[java.util.LinkedHashMap[Any,Any]]) kryo.register(classOf[java.util.LinkedHashSet[Any]]) kryo.register(classOf[org.apache.hadoop.io.BytesWritable]) - kryo.register(classOf[org.apache.hadoop.io.LongWritable]) - kryo.register(classOf[Array[org.apache.hadoop.io.LongWritable]]) + kryo.register(classOf[org.apache.hadoop.io.BigIntWritable]) + kryo.register(classOf[Array[org.apache.hadoop.io.BigIntWritable]]) kryo.register(classOf[Array[org.apache.hadoop.io.BytesWritable]]) kryo.register(classOf[org.codehaus.jackson.node.BooleanNode]) kryo.register(classOf[org.codehaus.jackson.node.IntNode]) From 565222b6415a19b3720f9bb67c7e53fdd2ad23ca Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 11:57:02 -0400 Subject: [PATCH 14/25] Extend Range Lengths and Bin Sizes --- .../geotrellis/spark/io/index/IndexRanges.scala | 10 +++++----- .../scala/geotrellis/spark/io/index/KeyIndex.scala | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala b/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala index 18629e7bc0..d41ab9bcf8 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/IndexRanges.scala @@ -24,17 +24,17 @@ object IndexRanges { def bin(ranges: Seq[(BigInt, BigInt)], count: Int): Seq[Seq[(BigInt, BigInt)]] = { var stack = ranges.toList - def len(r: (BigInt, BigInt)): Long = (r._2 - r._1).toLong + 1l - val total: Long = ranges.foldLeft(0l) { (s, r) => s + len(r) } - val binWidth = total / count + 1 + def len(r: (BigInt, BigInt)): BigInt = (r._2 - r._1) + BigInt(1) + val total: BigInt = ranges.foldLeft(BigInt(0)) { (s, r) => s + len(r) } + val binWidth: BigInt = (total / count) + 1 - def splitRange(range: (BigInt, BigInt), take: Long): ((BigInt, BigInt), (BigInt, BigInt)) = { + def splitRange(range: (BigInt, BigInt), take: BigInt): ((BigInt, BigInt), (BigInt, BigInt)) = { assert(len(range) > take) (range._1, range._1 + take - 1) -> (range._1 + take, range._2) } val arr = Array.fill(count)(Nil: List[(BigInt, BigInt)]) - var sum = 0l + var sum = BigInt(0) var i = 0 while (stack.nonEmpty) { val head = stack.head diff --git a/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala b/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala index 2197339333..79c7fc1f74 100644 --- a/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala +++ b/spark/src/main/scala/geotrellis/spark/io/index/KeyIndex.scala @@ -47,20 +47,20 @@ object KeyIndex { */ def breaks(ranges: Seq[(BigInt, BigInt)], count: Int): Vector[BigInt] = { require(count > 0, "breaks count must be at least one") - def len(r: (BigInt, BigInt)): Double = (r._2 - r._1).toDouble + 1 - val total: Double = ranges.foldLeft(0.0)(_ + len(_)) - val maxBinSize: Long = math.max(math.ceil(total /count+1), 1).toLong + def len(r: (BigInt, BigInt)): BigInt = (r._2 - r._1) + BigInt(1) + val total: BigInt = ranges.foldLeft(BigInt(0))(_ + len(_)) + val maxBinSize: BigInt = (total / count) + 1 - def take(range: (BigInt, BigInt), count: Long): Long = { + def take(range: (BigInt, BigInt), count: BigInt): BigInt = { if (len(range) >= count) count - else len(range).toLong + else len(range) } ranges.foldLeft((Vector.empty[BigInt], maxBinSize)) { case ((_breaks, _roomLeft), range) => var breaks = _breaks var roomLeft = _roomLeft var remainder = range - var taken: Long = 0l + var taken = BigInt(0) do { taken = take(remainder, roomLeft) if (taken == roomLeft) { From 71b62bc57bf2e7497de8359feac169151e72f1a4 Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 12:46:33 -0400 Subject: [PATCH 15/25] Augement Accumulo An key length of at least eight bytes is ensured in `AccumuloKeyEncoder`. That was because keys of one byte (in the unit tests) were not working -- extra zero bytes were being mysteriously added. Behavior with keys of more than 8 bytes is unknown. --- .../io/accumulo/AccumuloCollectionReader.scala | 13 +++++++------ .../spark/io/accumulo/AccumuloKeyEncoder.scala | 13 +++++++++---- .../spark/io/accumulo/AccumuloUtils.scala | 2 +- .../spark/io/accumulo/AccumuloValueReader.scala | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloCollectionReader.scala b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloCollectionReader.scala index dc1f6320a1..5d8a1ed7cb 100644 --- a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloCollectionReader.scala +++ b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloCollectionReader.scala @@ -62,12 +62,13 @@ object AccumuloCollectionReader { val scanner = instance.connector.createScanner(table, new Authorizations()) scanner.setRange(range) scanner.fetchColumnFamily(columnFamily) - val result = scanner.iterator.map { case entry => - AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec) - }.flatMap { pairs: Vector[(K, V)] => - if(filterIndexOnly) pairs - else pairs.filter { pair => includeKey(pair._1) } - }.toVector + val result = scanner.iterator + .map({ case entry => + AvroEncoder.fromBinary(writerSchema.getOrElse(codec.schema), entry.getValue.get)(codec) }) + .flatMap({ pairs: Vector[(K, V)] => + if(filterIndexOnly) pairs + else pairs.filter { pair => includeKey(pair._1) } }) + .toVector scanner.close() result }(pool) } diff --git a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloKeyEncoder.scala b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloKeyEncoder.scala index f708b3ed8a..db3452d07c 100644 --- a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloKeyEncoder.scala +++ b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloKeyEncoder.scala @@ -22,12 +22,17 @@ import org.apache.accumulo.core.data.Key import org.apache.hadoop.io.Text object AccumuloKeyEncoder { - final def long2Bytes(x: Long): Array[Byte] = - Array[Byte](x>>56 toByte, x>>48 toByte, x>>40 toByte, x>>32 toByte, x>>24 toByte, x>>16 toByte, x>>8 toByte, x toByte) + final def long2Bytes(x: BigInt): Array[Byte] = { + val bytes1: Array[Byte] = x.toByteArray + val bytes2: Array[Byte] = Stream.continually(0.toByte).take(8 - bytes1.length).toArray + (bytes2 ++ bytes1) // XXX + } - final def index2RowId(index: Long): Text = new Text(long2Bytes(index)) + final def index2RowId(index: BigInt): Text = { + new Text(long2Bytes(index)) + } - def encode[K](id: LayerId, key: K, index: Long): Key = + def encode[K](id: LayerId, key: K, index: BigInt): Key = new Key(index2RowId(index), columnFamily(id)) def getLocalityGroups(id: LayerId): Seq[String] = Seq(columnFamily(id)) diff --git a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloUtils.scala b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloUtils.scala index 45463abca3..02c8d10a9c 100644 --- a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloUtils.scala +++ b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloUtils.scala @@ -32,7 +32,7 @@ object AccumuloUtils { * Mapping KeyBounds of Extent to SFC ranges will often result in a set of non-contigrious ranges. * The indices exluded by these ranges should not be included in split calculation as they will never be seen. */ - def getSplits[K](kb: KeyBounds[K], ki: KeyIndex[K], count: Int): Seq[Long] = + def getSplits[K](kb: KeyBounds[K], ki: KeyIndex[K], count: Int): Seq[BigInt] = KeyIndex.breaks(kb, ki, count) /** diff --git a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloValueReader.scala b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloValueReader.scala index 18833612d7..cd124242ba 100644 --- a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloValueReader.scala +++ b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloValueReader.scala @@ -39,7 +39,7 @@ class AccumuloValueReader( val attributeStore: AttributeStore ) extends OverzoomingValueReader { - val rowId = (index: Long) => new Text(AccumuloKeyEncoder.long2Bytes(index)) + val rowId = (index: BigInt) => new Text(AccumuloKeyEncoder.long2Bytes(index)) def reader[K: AvroRecordCodec: JsonFormat: ClassTag, V: AvroRecordCodec](layerId: LayerId): Reader[K, V] = new Reader[K, V] { val header = attributeStore.readHeader[AccumuloLayerHeader](layerId) From 6d3d7b4be98e705863d4bfa3bfcb15fe73f10dc5 Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 14:21:28 -0400 Subject: [PATCH 16/25] Augment Cassandra Key length still capped at 64-bits, needs custom coded. --- .../io/cassandra/CassandraCollectionReader.scala | 6 +++--- .../spark/io/cassandra/CassandraRDDReader.scala | 8 ++++---- .../spark/io/cassandra/CassandraRDDWriter.scala | 12 ++++++------ .../spark/io/cassandra/CassandraValueReader.scala | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala index 00ccb4fd35..6bc9f0d802 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala @@ -38,7 +38,7 @@ object CassandraCollectionReader { table: String, layerId: LayerId, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.collection.read") @@ -64,8 +64,8 @@ object CassandraCollectionReader { instance.withSessionDo { session => val statement = session.prepare(query) - LayerReader.njoin[K, V](ranges.toIterator, threads){ index: Long => - val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) + LayerReader.njoin[K, V](ranges.toIterator, threads){ index: BigInt => + val row = session.execute(statement.bind(index.toLong.asInstanceOf[java.lang.Long])) // XXX if (row.nonEmpty) { val bytes = row.one().getBytes("value").array() val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index ca7069beea..df3fc4372e 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -46,7 +46,7 @@ object CassandraRDDReader { table: String, layerId: LayerId, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None, @@ -73,13 +73,13 @@ object CassandraRDDReader { .toString sc.parallelize(bins, bins.size) - .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => + .mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] => instance.withSession { session => val statement = session.prepare(query) val result = partition map { seq => - LayerReader.njoin[K, V](seq.iterator, threads) { index: Long => - val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) + LayerReader.njoin[K, V](seq.iterator, threads) { index: BigInt => + val row = session.execute(statement.bind(index.toLong.asInstanceOf[java.lang.Long])) // XXX if (row.nonEmpty) { val bytes = row.one().getBytes("value").array() val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index 9918ec3f1e..a1b752dabd 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -50,7 +50,7 @@ object CassandraRDDWriter { rdd: RDD[(K, V)], instance: CassandraInstance, layerId: LayerId, - decomposeKey: K => Long, + decomposeKey: K => BigInt, keyspace: String, table: String, threads: Int = DefaultThreadCount @@ -60,7 +60,7 @@ object CassandraRDDWriter { raster: RDD[(K, V)], instance: CassandraInstance, layerId: LayerId, - decomposeKey: K => Long, + decomposeKey: K => BigInt, keyspace: String, table: String, writerSchema: Option[Schema], @@ -112,7 +112,7 @@ object CassandraRDDWriter { val readStatement = session.prepare(readQuery) val writeStatement = session.prepare(writeQuery) - val rows: Process[Task, (java.lang.Long, Vector[(K,V)])] = + val rows: Process[Task, (BigInt, Vector[(K,V)])] = Process.unfold(partition)({ iter => if (iter.hasNext) { val record = iter.next() @@ -122,7 +122,7 @@ object CassandraRDDWriter { val pool = Executors.newFixedThreadPool(threads) - def elaborateRow(row: (java.lang.Long, Vector[(K,V)])): Process[Task, (java.lang.Long, Vector[(K,V)])] = { + def elaborateRow(row: (BigInt, Vector[(K,V)])): Process[Task, (BigInt, Vector[(K,V)])] = { Process eval Task ({ val (key, kvs1) = row val kvs2 = @@ -148,7 +148,7 @@ object CassandraRDDWriter { })(pool) } - def rowToBytes(row: (java.lang.Long, Vector[(K,V)])): Process[Task, (java.lang.Long, ByteBuffer)] = { + def rowToBytes(row: (BigInt, Vector[(K,V)])): Process[Task, (BigInt, ByteBuffer)] = { Process eval Task({ val (key, kvs) = row val bytes = ByteBuffer.wrap(AvroEncoder.toBinary(kvs)(codec)) @@ -156,7 +156,7 @@ object CassandraRDDWriter { })(pool) } - def retire(row: (java.lang.Long, ByteBuffer)): Process[Task, ResultSet] = { + def retire(row: (BigInt, ByteBuffer)): Process[Task, ResultSet] = { val (id, value) = row Process eval Task({ session.execute(writeStatement.bind(id, value)) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala index fbd4d816d2..7487780cd5 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala @@ -50,7 +50,7 @@ class CassandraValueReader( .and(eqs("zoom", layerId.zoom)) ) - val row = session.execute(statement.bind(keyIndex.toIndex(key).asInstanceOf[java.lang.Long])).all() + val row = session.execute(statement.bind(keyIndex.toIndex(key).toLong.asInstanceOf[java.lang.Long])).all() // XXX val tiles = row.map { entry => AvroEncoder.fromBinary(writerSchema, entry.getBytes("value").array())(codec) } From 1024d029d67fe9ca021aaf3f868f3be91b9c962c Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 15:51:32 -0400 Subject: [PATCH 17/25] Augment S3 --- .../geotrellis/spark/io/s3/S3CollectionLayerReader.scala | 2 +- .../scala/geotrellis/spark/io/s3/S3CollectionReader.scala | 6 +++--- .../main/scala/geotrellis/spark/io/s3/S3LayerReader.scala | 2 +- .../main/scala/geotrellis/spark/io/s3/S3RDDReader.scala | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionLayerReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionLayerReader.scala index 8b1617c772..9ba0f8aa21 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionLayerReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionLayerReader.scala @@ -56,7 +56,7 @@ class S3CollectionLayerReader(val attributeStore: AttributeStore) extends Collec val queryKeyBounds = rasterQuery(metadata) val maxWidth = Index.digits(keyIndex.toIndex(keyIndex.keyBounds.maxKey)) - val keyPath = (index: Long) => makePath(prefix, Index.encode(index, maxWidth)) + val keyPath = (index: BigInt) => makePath(prefix, Index.encode(index, maxWidth)) val decompose = (bounds: KeyBounds[K]) => keyIndex.indexRanges(bounds) val seq = collectionReader.read[K, V](bucket, keyPath, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema)) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionReader.scala index e0b5cf3440..08cfbb89cf 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3CollectionReader.scala @@ -36,9 +36,9 @@ trait S3CollectionReader { V: AvroRecordCodec ]( bucket: String, - keyPath: Long => String, + keyPath: BigInt => String, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, threads: Int = ConfigFactory.load().getThreads("geotrellis.s3.threads.collection.read") @@ -53,7 +53,7 @@ trait S3CollectionReader { val recordCodec = KeyValueRecordCodec[K, V] val s3client = getS3Client() - LayerReader.njoin[K, V](ranges.toIterator, threads){ index: Long => + LayerReader.njoin[K, V](ranges.toIterator, threads){ index: BigInt => try { val bytes = IOUtils.toByteArray(s3client.getObject(bucket, keyPath(index)).getObjectContent) val recs = AvroEncoder.fromBinary(writerSchema.getOrElse(recordCodec.schema), bytes)(recordCodec) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerReader.scala index 56ff262c5b..be659abbe9 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerReader.scala @@ -60,7 +60,7 @@ class S3LayerReader(val attributeStore: AttributeStore)(implicit sc: SparkContex val queryKeyBounds = tileQuery(metadata) val maxWidth = Index.digits(keyIndex.toIndex(keyIndex.keyBounds.maxKey)) - val keyPath = (index: Long) => makePath(prefix, Index.encode(index, maxWidth)) + val keyPath = (index: BigInt) => makePath(prefix, Index.encode(index, maxWidth)) val decompose = (bounds: KeyBounds[K]) => keyIndex.indexRanges(bounds) val rdd = rddReader.read[K, V](bucket, keyPath, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema), Some(numPartitions)) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index 4f129509b4..60ae4404ba 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -50,9 +50,9 @@ trait S3RDDReader { V: AvroRecordCodec ]( bucket: String, - keyPath: Long => String, + keyPath: BigInt => String, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None, @@ -73,11 +73,11 @@ trait S3RDDReader { val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable sc.parallelize(bins, bins.size) - .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => + .mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] => val s3client = _getS3Client() val writerSchema = kwWriterSchema.value.getOrElse(_recordCodec.schema) partition flatMap { seq => - LayerReader.njoin[K, V](seq.toIterator, threads){ index: Long => + LayerReader.njoin[K, V](seq.toIterator, threads){ index: BigInt => try { val bytes = IOUtils.toByteArray(s3client.getObject(bucket, keyPath(index)).getObjectContent) val recs = AvroEncoder.fromBinary(writerSchema, bytes)(_recordCodec) From 1ac80f43307cce2bf013beefa2ab3e7eab365a6d Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 15:41:36 -0400 Subject: [PATCH 18/25] Augment HBase --- .../geotrellis/spark/io/hbase/HBaseCollectionReader.scala | 4 ++-- .../scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala | 7 +++++-- .../scala/geotrellis/spark/io/hbase/HBaseRDDReader.scala | 4 ++-- .../scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala | 4 ++-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseCollectionReader.scala b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseCollectionReader.scala index a7fd64bb60..9b1f80fff8 100644 --- a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseCollectionReader.scala +++ b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseCollectionReader.scala @@ -35,7 +35,7 @@ object HBaseCollectionReader { table: String, layerId: LayerId, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None @@ -46,7 +46,7 @@ object HBaseCollectionReader { val _recordCodec = KeyValueRecordCodec[K, V] val kwWriterSchema = KryoWrapper(writerSchema) // Avro Schema is not Serializable - val ranges: Seq[(Long, Long)] = if (queryKeyBounds.length > 1) + val ranges: Seq[(BigInt, BigInt)] = if (queryKeyBounds.length > 1) MergeQueue(queryKeyBounds.flatMap(decomposeBounds)) else queryKeyBounds.flatMap(decomposeBounds) diff --git a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala index 1c5b95e959..1243c43c66 100644 --- a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala +++ b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala @@ -19,8 +19,11 @@ package geotrellis.spark.io.hbase import geotrellis.spark._ object HBaseKeyEncoder { - def encode(id: LayerId, index: Long, trailingByte: Boolean = false): Array[Byte] = { - val result: Array[Byte] = (s"${HBaseRDDWriter.layerIdString(id)}": Array[Byte]) ++ (index: Array[Byte]) + def encode(id: LayerId, index: BigInt, trailingByte: Boolean = false): Array[Byte] = { + val bytes3 = (index.toByteArray: Array[Byte]) + val bytes2: Array[Byte] = Stream.continually(0.toByte).take(64 - bytes3.length).toArray + val bytes1 = (s"${HBaseRDDWriter.layerIdString(id)}": Array[Byte]) + val result: Array[Byte] = bytes1 ++ bytes2 ++ bytes3 if(trailingByte) result :+ 0.toByte else result } } diff --git a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDReader.scala b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDReader.scala index a71ec25134..a0ff6ab951 100644 --- a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDReader.scala +++ b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDReader.scala @@ -43,7 +43,7 @@ object HBaseRDDReader { table: String, layerId: LayerId, queryKeyBounds: Seq[KeyBounds[K]], - decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], + decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None @@ -54,7 +54,7 @@ object HBaseRDDReader { val _recordCodec = KeyValueRecordCodec[K, V] val kwWriterSchema = KryoWrapper(writerSchema) // Avro Schema is not Serializable - val ranges: Seq[(Long, Long)] = if (queryKeyBounds.length > 1) + val ranges: Seq[(BigInt, BigInt)] = if (queryKeyBounds.length > 1) MergeQueue(queryKeyBounds.flatMap(decomposeBounds)) else queryKeyBounds.flatMap(decomposeBounds) diff --git a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala index 96f06f9827..e4138402e2 100644 --- a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala +++ b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala @@ -41,7 +41,7 @@ object HBaseRDDWriter { raster: RDD[(K, V)], instance: HBaseInstance, layerId: LayerId, - decomposeKey: K => Long, + decomposeKey: K => BigInt, table: String ): Unit = update(raster, instance, layerId, decomposeKey, table, None, None) @@ -75,7 +75,7 @@ object HBaseRDDWriter { // groupBy will reuse the partitioner on the parent RDD if it is set, which could be typed // on a key type that may no longer by valid for the key type of the resulting RDD. raster.groupBy({ row => decomposeKey(row._1) }, numPartitions = raster.partitions.length) - .foreachPartition { partition: Iterator[(Long, Iterable[(K, V)])] => + .foreachPartition { partition: Iterator[(BigInt, Iterable[(K, V)])] => if(partition.nonEmpty) { instance.withConnectionDo { connection => val mutator = connection.getBufferedMutator(table) From 5e889640385a4591f2e0ebb66bf4e929e4af77c9 Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 20 Jun 2017 21:18:32 -0400 Subject: [PATCH 19/25] Update Documentation Examples --- .../doc/examples/spark/ShardingKeyIndex.scala | 10 +++++----- .../scala/geotrellis/doc/examples/spark/VoxelKey.scala | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/doc-examples/src/main/scala/geotrellis/doc/examples/spark/ShardingKeyIndex.scala b/doc-examples/src/main/scala/geotrellis/doc/examples/spark/ShardingKeyIndex.scala index 640f374b5c..c665f4f489 100644 --- a/doc-examples/src/main/scala/geotrellis/doc/examples/spark/ShardingKeyIndex.scala +++ b/doc-examples/src/main/scala/geotrellis/doc/examples/spark/ShardingKeyIndex.scala @@ -57,19 +57,19 @@ class ShardingKeyIndex[K](val inner: KeyIndex[K], val shardCount: Int) extends K * prefixWithShard(i, s) == 8070450532247928869 * }}} */ - private def prefixWithShard(i: Long, shard: Long): Long = + private def prefixWithShard(i: BigInt, shard: Long): BigInt = (shard << 60) | i /* Necessary for extending `KeyIndex` */ - def toIndex(key: K): Long = { - val i: Long = inner.toIndex(key) - val shard: Long = i % shardCount /* Shard prefix between 0 and 7 */ + def toIndex(key: K): BigInt = { + val i: BigInt = inner.toIndex(key) + val shard: Long = (i % shardCount).toLong /* Shard prefix between 0 and 7 */ prefixWithShard(inner.toIndex(key), shard) } /* Necessary for extending `KeyIndex` */ - def indexRanges(keyRange: (K, K)): Seq[(Long, Long)] = { + def indexRanges(keyRange: (K, K)): Seq[(BigInt, BigInt)] = { inner .indexRanges(keyRange) .flatMap({ case (i1, i2) => diff --git a/doc-examples/src/main/scala/geotrellis/doc/examples/spark/VoxelKey.scala b/doc-examples/src/main/scala/geotrellis/doc/examples/spark/VoxelKey.scala index 7a8a578603..bdb2554b93 100644 --- a/doc-examples/src/main/scala/geotrellis/doc/examples/spark/VoxelKey.scala +++ b/doc-examples/src/main/scala/geotrellis/doc/examples/spark/VoxelKey.scala @@ -84,9 +84,9 @@ class ZVoxelKeyIndex(val keyBounds: KeyBounds[VoxelKey]) extends KeyIndex[VoxelK /* ''Z3'' here is a convenient shorthand for any 3-dimensional key. */ private def toZ(k: VoxelKey): Z3 = Z3(k.x, k.y, k.z) - def toIndex(k: VoxelKey): Long = toZ(k).z + def toIndex(k: VoxelKey): BigInt = toZ(k).z - def indexRanges(keyRange: (VoxelKey, VoxelKey)): Seq[(Long, Long)] = + def indexRanges(keyRange: (VoxelKey, VoxelKey)): Seq[(BigInt, BigInt)] = Z3.zranges(toZ(keyRange._1), toZ(keyRange._2)) } From 48eb6ad555b8fe7f8d2924fcca6fae411f7ac688 Mon Sep 17 00:00:00 2001 From: James McClain Date: Wed, 21 Jun 2017 14:54:05 -0400 Subject: [PATCH 20/25] HBase: Retain Backward Compatibility --- .../main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala index 1243c43c66..70660794be 100644 --- a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala +++ b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseKeyEncoder.scala @@ -21,7 +21,7 @@ import geotrellis.spark._ object HBaseKeyEncoder { def encode(id: LayerId, index: BigInt, trailingByte: Boolean = false): Array[Byte] = { val bytes3 = (index.toByteArray: Array[Byte]) - val bytes2: Array[Byte] = Stream.continually(0.toByte).take(64 - bytes3.length).toArray + val bytes2: Array[Byte] = Stream.continually(0.toByte).take(8 - bytes3.length).toArray val bytes1 = (s"${HBaseRDDWriter.layerIdString(id)}": Array[Byte]) val result: Array[Byte] = bytes1 ++ bytes2 ++ bytes3 if(trailingByte) result :+ 0.toByte else result From dff5dbf5c34d7770aeb398067449fdf6b95f8330 Mon Sep 17 00:00:00 2001 From: James McClain Date: Wed, 21 Jun 2017 20:57:23 -0400 Subject: [PATCH 21/25] Cassandra: Extend, Retain Backward Compatibility --- .../io/cassandra/BigIntegerIffBigint.java | 49 +++++++++++++++++++ .../cassandra/CassandraCollectionReader.scala | 5 +- .../io/cassandra/CassandraInstance.scala | 15 +++++- .../io/cassandra/CassandraLayerDeleter.scala | 5 +- .../io/cassandra/CassandraRDDReader.scala | 3 +- .../io/cassandra/CassandraRDDWriter.scala | 6 ++- .../io/cassandra/CassandraValueReader.scala | 5 +- .../spark/io/cassandra/package.scala | 9 ++++ 8 files changed, 90 insertions(+), 7 deletions(-) create mode 100644 cassandra/src/main/java/geotrellis/spark/io/cassandra/BigIntegerIffBigint.java create mode 100644 cassandra/src/main/scala/geotrellis/spark/io/cassandra/package.scala diff --git a/cassandra/src/main/java/geotrellis/spark/io/cassandra/BigIntegerIffBigint.java b/cassandra/src/main/java/geotrellis/spark/io/cassandra/BigIntegerIffBigint.java new file mode 100644 index 0000000000..447f0cccd2 --- /dev/null +++ b/cassandra/src/main/java/geotrellis/spark/io/cassandra/BigIntegerIffBigint.java @@ -0,0 +1,49 @@ +package geotrellis.spark.io.cassandra; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.TypeCodec.PrimitiveLongCodec; + +import java.nio.ByteBuffer; + +import java.math.BigInteger; +import java.util.Arrays; + + +/* + * This coded is used for backward compatibilty only. + * + * @author James McClain + */ +public class BigIntegerIffBigint extends TypeCodec { + + public static final BigIntegerIffBigint instance = new BigIntegerIffBigint(); + private static final PrimitiveLongCodec _instance = TypeCodec.bigint(); + + private BigIntegerIffBigint() { + super(DataType.bigint(), BigInteger.class); + } + + @Override + public ByteBuffer serialize(BigInteger value, ProtocolVersion protocolVersion) { + return _instance.serialize(value.longValue(), protocolVersion); + } + + @Override + public BigInteger deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) { + return BigInteger.valueOf(_instance.deserialize(bytes, protocolVersion)); + } + + @Override + public String format(BigInteger value) { + return _instance.format(value.longValue()); + } + + @Override + public BigInteger parse(String value) { + return BigInteger.valueOf(_instance.parse(value)); + } + +} diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala index 6bc9f0d802..bb554b81e5 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraCollectionReader.scala @@ -31,6 +31,9 @@ import com.typesafe.config.ConfigFactory import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import java.math.BigInteger + + object CassandraCollectionReader { def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag]( instance: CassandraInstance, @@ -65,7 +68,7 @@ object CassandraCollectionReader { val statement = session.prepare(query) LayerReader.njoin[K, V](ranges.toIterator, threads){ index: BigInt => - val row = session.execute(statement.bind(index.toLong.asInstanceOf[java.lang.Long])) // XXX + val row = session.execute(statement.bind(index: BigInteger)) if (row.nonEmpty) { val bytes = row.one().getBytes("value").array() val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala index de979daf79..f29e80a4fc 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala @@ -34,6 +34,7 @@ object CassandraInstance { .getOrElse(Cassandra.cfg.getString("keyspace")) val attributeTable = Option(uri.getFragment) .getOrElse(Cassandra.cfg.getString("catalog")) + var registered: Boolean = false BaseCassandraInstance( List(zookeeper), @@ -66,6 +67,16 @@ trait CassandraInstance extends Serializable { @transient lazy val cluster = getCluster @transient lazy val session = cluster.connect() + def register(): Unit = { + if (!CassandraInstance.registered) { + cluster + .getConfiguration() + .getCodecRegistry() + .register(BigIntegerIffBigint.instance) + CassandraInstance.registered = true + } + } + def ensureKeyspaceExists(keyspace: String, session: Session): Unit = session.execute(s"create keyspace if not exists ${keyspace} with replication = {'class': '${replicationStrategy}', 'replication_factor': ${replicationFactor} }") @@ -107,7 +118,9 @@ case class BaseCassandraInstance( replicationFactor: Int = Cassandra.cfg.getInt("replicationFactor"), localDc: String = Cassandra.cfg.getString("localDc"), usedHostsPerRemoteDc: Int = Cassandra.cfg.getInt("usedHostsPerRemoteDc"), - allowRemoteDCsForLocalConsistencyLevel: Boolean = Cassandra.cfg.getBoolean("allowRemoteDCsForLocalConsistencyLevel")) extends CassandraInstance + allowRemoteDCsForLocalConsistencyLevel: Boolean = Cassandra.cfg.getBoolean("allowRemoteDCsForLocalConsistencyLevel")) extends CassandraInstance { + register() +} object Cassandra { lazy val cfg = ConfigFactory.load().getConfig("geotrellis.cassandra") diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraLayerDeleter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraLayerDeleter.scala index d3357197c5..ad27f8b7ef 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraLayerDeleter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraLayerDeleter.scala @@ -25,6 +25,9 @@ import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs} import scala.collection.JavaConversions._ +import java.math.BigInteger + + class CassandraLayerDeleter(val attributeStore: AttributeStore, instance: CassandraInstance) extends LazyLogging with LayerDeleter[LayerId] { def delete(id: LayerId): Unit = { @@ -45,7 +48,7 @@ class CassandraLayerDeleter(val attributeStore: AttributeStore, instance: Cassan val statement = session.prepare(dquery) session.execute(squery).all().map { entry => - session.execute(statement.bind(entry.getLong("key").asInstanceOf[java.lang.Long])) + session.execute(statement.bind(entry.getVarint("key"))) } } } catch { diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index df3fc4372e..1d23e4c9d8 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -35,6 +35,7 @@ import com.typesafe.config.ConfigFactory import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import java.math.BigInteger object CassandraRDDReader { final val DefaultThreadCount = @@ -79,7 +80,7 @@ object CassandraRDDReader { val result = partition map { seq => LayerReader.njoin[K, V](seq.iterator, threads) { index: BigInt => - val row = session.execute(statement.bind(index.toLong.asInstanceOf[java.lang.Long])) // XXX + val row = session.execute(statement.bind(index: BigInteger)) if (row.nonEmpty) { val bytes = row.one().getBytes("value").array() val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index a1b752dabd..311d4a0bc3 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -41,6 +41,8 @@ import java.util.concurrent.Executors import scala.collection.JavaConversions._ +import java.math.BigInteger + object CassandraRDDWriter { final val DefaultThreadCount = @@ -75,7 +77,7 @@ object CassandraRDDWriter { instance.ensureKeyspaceExists(keyspace, session) session.execute( SchemaBuilder.createTable(keyspace, table).ifNotExists() - .addPartitionKey("key", bigint) + .addPartitionKey("key", varint) .addClusteringColumn("name", text) .addClusteringColumn("zoom", cint) .addColumn("value", blob) @@ -159,7 +161,7 @@ object CassandraRDDWriter { def retire(row: (BigInt, ByteBuffer)): Process[Task, ResultSet] = { val (id, value) = row Process eval Task({ - session.execute(writeStatement.bind(id, value)) + session.execute(writeStatement.bind(id: BigInteger, value)) })(pool) } diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala index 7487780cd5..faba153b06 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraValueReader.scala @@ -30,6 +30,9 @@ import spray.json._ import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import java.math.BigInteger + + class CassandraValueReader( instance: CassandraInstance, val attributeStore: AttributeStore @@ -50,7 +53,7 @@ class CassandraValueReader( .and(eqs("zoom", layerId.zoom)) ) - val row = session.execute(statement.bind(keyIndex.toIndex(key).toLong.asInstanceOf[java.lang.Long])).all() // XXX + val row = session.execute(statement.bind(keyIndex.toIndex(key): BigInteger)).all() val tiles = row.map { entry => AvroEncoder.fromBinary(writerSchema, entry.getBytes("value").array())(codec) } diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/package.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/package.scala new file mode 100644 index 0000000000..7e7ec2db0d --- /dev/null +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/package.scala @@ -0,0 +1,9 @@ +package geotrellis.spark.io + +import java.math.BigInteger + +package object cassandra { + implicit def bigToBig(i: BigInt): BigInteger = { + new BigInteger(i.toByteArray) + } +} From e6ecd3685950f1615ae9d1dd43336de08988c3a6 Mon Sep 17 00:00:00 2001 From: James McClain Date: Fri, 17 Nov 2017 10:28:24 -0500 Subject: [PATCH 22/25] Rehabilitate Hadoop After Rebase --- .../spark/io/hadoop/HadoopRDDWriter.scala | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala index 9c13607968..ec308b311c 100644 --- a/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala +++ b/spark/src/main/scala/geotrellis/spark/io/hadoop/HadoopRDDWriter.scala @@ -102,26 +102,26 @@ object HadoopRDDWriter extends LazyLogging { val conf = rdd.sparkContext.hadoopConfiguration val _conf = new SerializableConfiguration(conf) - val ranges: Vector[(String, Long, Long)] = + val ranges: Vector[(String, BigInt, BigInt)] = FilterMapFileInputFormat.layerRanges(header.path, conf) .map({ case (path, start, end) => (path.toString, start, end) }) val layerPathStr = layerPath.toString - val firstIndex: Long = ranges.head._2 - val lastIndex: Long = { + val firstIndex: BigInt = ranges.head._2 + val lastIndex: BigInt = { val path = ranges.last._1 val reader = new MapFile.Reader(path, conf) - var k = new LongWritable() + var k = new BigIntWritable() var v = new BytesWritable() - var index: Long = -1 - while (reader.next(k, v)) { index = k.get } + var index: BigInt = BigInt(-1) + while (reader.next(k, v)) { index = BigInt(new java.math.BigInteger(k.get)) } reader.close index } - val rdd2: RDD[(Long, K, V)] = rdd.map({ case (k,v) => - val i = keyIndex.toIndex(k) + val rdd2: RDD[(BigInt, K, V)] = rdd.map({ case (k,v) => + val i: BigInt = keyIndex.toIndex(k) (i, k, v) }) @@ -135,12 +135,17 @@ object HadoopRDDWriter extends LazyLogging { write(nonOverlappers, layerPath, keyIndex, indexInterval, false) // Write the portion of the update that overlaps the existing layer - val overlappers: RDD[(String, Iterable[(Long,K,V)])] = + val overlappers: RDD[(String, Iterable[(BigInt,K,V)])] = rdd2 .filter({ case (i, k, v) => firstIndex <= i && i <= lastIndex }) - .groupBy({ case (i, k,v) => - ranges.find({ case (_,start,end) => start <= i && i <= end }) }) + .groupBy({ case (i, k, v) => + ranges.find({ case (_,start,end) => + if (end == BigInt(-1)) + start <= i + else + start <= i && i <= end + }) }) .map({ case (range, ikvs) => range match { case Some((path, _, _)) => (path, ikvs) @@ -149,7 +154,7 @@ object HadoopRDDWriter extends LazyLogging { overlappers .foreach({ case (_path, ikvs1) => - val ikvs2 = mutable.ListBuffer.empty[(Long,K,V)] + val ikvs2 = mutable.ListBuffer.empty[(BigInt,K,V)] val path = new Path(_path) val conf = _conf.value @@ -158,7 +163,7 @@ object HadoopRDDWriter extends LazyLogging { val reader = new MapFile.Reader(path, conf) // Read records from map file, delete map file - var k = new LongWritable() + var k = new BigIntWritable() var v = new BytesWritable() while (reader.next(k, v)) { val _kvs2: Vector[(K,V)] = AvroEncoder.fromBinary(kwWriterSchema.value, v.getBytes)(codec) @@ -188,7 +193,7 @@ object HadoopRDDWriter extends LazyLogging { val writer = new MultiMapWriter(layerPathStr, 33, blockSize, indexInterval) for ( (index, pairs) <- GroupConsecutiveIterator(kvs.toIterator)(r => keyIndex.toIndex(r._1))) { writer.write( - new LongWritable(index), + new BigIntWritable(index.toByteArray), new BytesWritable(AvroEncoder.toBinary(pairs.toVector)(codec))) } writer.close() }) From 35ec2a519cfc20a3b0f7d25e55b42adaf6c5b17e Mon Sep 17 00:00:00 2001 From: James McClain Date: Fri, 17 Nov 2017 12:59:45 -0500 Subject: [PATCH 23/25] Rehabilitate HBase After Rebase --- .../main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala index e4138402e2..d1a0ffd41b 100644 --- a/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala +++ b/hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseRDDWriter.scala @@ -49,7 +49,7 @@ object HBaseRDDWriter { raster: RDD[(K, V)], instance: HBaseInstance, layerId: LayerId, - decomposeKey: K => Long, + decomposeKey: K => BigInt, table: String, writerSchema: Option[Schema], mergeFunc: Option[(V,V) => V] From 5a7f1770699c516c538b045fdbf53fe08da0dcd0 Mon Sep 17 00:00:00 2001 From: James McClain Date: Fri, 17 Nov 2017 12:56:06 -0500 Subject: [PATCH 24/25] Rehabilitate Cassandra After Rebase --- .../spark/io/cassandra/CassandraInstance.scala | 11 ++++++----- .../spark/io/cassandra/CassandraRDDWriter.scala | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala index f29e80a4fc..50272ed4d1 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraInstance.scala @@ -24,6 +24,8 @@ import scala.util.Try import java.net.URI object CassandraInstance { + var bigIntegerRegistered: Boolean = false + def apply(uri: URI): CassandraInstance = { import geotrellis.util.UriUtils._ @@ -34,7 +36,6 @@ object CassandraInstance { .getOrElse(Cassandra.cfg.getString("keyspace")) val attributeTable = Option(uri.getFragment) .getOrElse(Cassandra.cfg.getString("catalog")) - var registered: Boolean = false BaseCassandraInstance( List(zookeeper), @@ -67,13 +68,13 @@ trait CassandraInstance extends Serializable { @transient lazy val cluster = getCluster @transient lazy val session = cluster.connect() - def register(): Unit = { - if (!CassandraInstance.registered) { + def registerBigInteger(): Unit = { + if (!CassandraInstance.bigIntegerRegistered) { cluster .getConfiguration() .getCodecRegistry() .register(BigIntegerIffBigint.instance) - CassandraInstance.registered = true + CassandraInstance.bigIntegerRegistered = true } } @@ -119,7 +120,7 @@ case class BaseCassandraInstance( localDc: String = Cassandra.cfg.getString("localDc"), usedHostsPerRemoteDc: Int = Cassandra.cfg.getInt("usedHostsPerRemoteDc"), allowRemoteDCsForLocalConsistencyLevel: Boolean = Cassandra.cfg.getBoolean("allowRemoteDCsForLocalConsistencyLevel")) extends CassandraInstance { - register() + registerBigInteger() } object Cassandra { diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index 311d4a0bc3..94afe9ace9 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -129,7 +129,7 @@ object CassandraRDDWriter { val (key, kvs1) = row val kvs2 = if (mergeFunc.nonEmpty) { - val oldRow = session.execute(readStatement.bind(key)) + val oldRow = session.execute(readStatement.bind(key: BigInteger)) if (oldRow.nonEmpty) { val bytes = oldRow.one().getBytes("value").array() AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) From 5d43f5678c42ccf255e8fd27395700d5cfe8015f Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 5 Dec 2017 07:42:04 -0500 Subject: [PATCH 25/25] Update Changelog [skip ci] --- docs/CHANGELOG.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index 03b47f69a3..0de815551f 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -1,6 +1,18 @@ Changelog ========= +2.0.0 +----- + +API Changes +^^^^^^^^^^^ + +- ``geotrellis.spark`` + + - **Change:** The length of the key (the space-filling curve index or address) used for layer reading and writing has + been extended from a fixed length of 8 bytes to an arbitrary length. This change affects not only the + ``geotrellis.spark`` package, but all backends (excluding ``geotrellis.geowave`` and ``geotrellis.geomesa``). + 1.2.0 ----- *2017 Nov 6*