Skip to content

Commit d06b700

Browse files
committed
Add wantSection to readChunksConcurrent.
fix bugs in hdf5 chunked reading.
1 parent f52ca19 commit d06b700

File tree

10 files changed

+192
-285
lines changed

10 files changed

+192
-285
lines changed

core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ interface Netchdf : AutoCloseable {
2323
fun <T> readChunksConcurrent(v2: Variable<T>,
2424
lamda : (ArraySection<*>) -> Unit,
2525
done : () -> Unit,
26+
wantSection: SectionPartial? = null,
2627
nthreads: Int? = null) {
2728
TODO()
2829
}

core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.sunya.cdm.api
22

33
/** A filled section of multidimensional array indices, plus the variable shape. */
4+
// TODO IndexSpace mo betta?
45
data class Section(val ranges : List<LongProgression>, val varShape : LongArray) {
56
val rank = ranges.size
67
val shape : LongArray // or IntArray ??

core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.sunya.cdm.layout
22

3+
import com.sunya.cdm.api.computeSize
34
import kotlin.math.max
45
import kotlin.math.min
56

@@ -20,6 +21,7 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) {
2021
val tileShape : LongArray // overall shape of the dataset's tile space
2122
val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape
2223
val tileStrider : LongArray // for computing tile index
24+
val nelems: Int
2325

2426
init {
2527
// convenient to allow tileSize to have (an) extra dimension at the end
@@ -30,10 +32,13 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) {
3032
for (i in 0 until rank) {
3133
this.indexShape[i] = max(varShape[i], chunk[i])
3234
}
35+
3336
this.tileShape = LongArray(rank)
3437
for (i in 0 until rank) {
3538
tileShape[i] = (this.indexShape[i] + chunk[i] - 1) / chunk[i]
3639
}
40+
nelems = tileShape.computeSize().toInt()
41+
3742
tileStrider = LongArray(rank)
3843
var accumStride = 1L
3944
for (k in rank - 1 downTo 0) {
@@ -88,7 +93,7 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) {
8893
tile[k] = rem / tileStrider[k]
8994
rem = rem - (tile[k] * tileStrider[k])
9095
}
91-
print("tile $order = ${tile.contentToString()}")
96+
// print("tile $order = ${tile.contentToString()}")
9297

9398
// convert to index
9499
return index(tile)

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1.kt

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ internal class BTree1(
4949

5050
// here both internal and leaf are the same structure
5151
// Btree nodes Level 1A1 - Version 1 B-trees
52-
inner class Node(val address: Long, val parent: Node?) : BTreeNodeIF {
52+
inner class Node(val address: Long, val parent: Node?) {
5353
val level: Int
5454
val nentries: Int
5555
private val leftAddress: Long
@@ -93,11 +93,11 @@ internal class BTree1(
9393
// but most nodes will point to less than that number of children""
9494
}
9595

96-
override fun isLeaf() = (level == 0)
96+
fun isLeaf() = (level == 0)
9797

98-
override fun nentries() = nentries
98+
fun nentries() = nentries
9999

100-
override fun dataChunkEntryAt(idx: Int) = dataChunkEntries[idx]
100+
fun dataChunkEntryAt(idx: Int) = dataChunkEntries[idx]
101101
}
102102

103103
/** @param key the byte offset into the local heap for the first object name in the subtree which that key describes. */
@@ -129,14 +129,10 @@ internal class BTree1(
129129
override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" +
130130
", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx"
131131
}
132-
}
133132

134-
interface BTreeNodeIF {
135-
fun isLeaf(): Boolean
136-
fun nentries(): Int
137-
fun dataChunkEntryAt(idx: Int) : DataChunkIF // only if isLeaf
138133
}
139134

135+
140136
interface DataChunkIF {
141137
fun childAddress(): Long
142138
fun offsets(): LongArray

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt

Lines changed: 66 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -10,58 +10,65 @@ import kotlin.collections.mutableListOf
1010
/** a BTree1 that uses OpenFileExtended and tracks its own tiling. */
1111
internal class BTree1data(
1212
val raf: OpenFileExtended,
13-
val rootNodeAddress: Long,
13+
rootNodeAddress: Long,
1414
varShape: LongArray,
1515
chunkShape: LongArray,
1616
) {
1717
val tiling = Tiling(varShape, chunkShape)
1818
val ndimStorage = chunkShape.size
19+
val rootNode: BTreeNode
1920

20-
fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null)
21+
init {
22+
rootNode = BTreeNode(rootNodeAddress, null)
23+
}
24+
25+
fun asSequence(): Sequence<Pair<Long, DataChunk>> = sequence {
26+
repeat( tiling.nelems) {
27+
//val startingIndex = tiling.orderToIndex(it.toLong())
28+
//val indexSpace = IndexSpace(startingIndex, tiling.chunk)
29+
yield(Pair(it.toLong(), findDataChunk(it) ?: missingDataChunk(it)))
30+
}
31+
}
32+
33+
internal fun findDataChunk(order: Int): DataChunk? {
34+
return rootNode.findDataChunk(order)
35+
}
2136

2237
// here both internal and leaf are the same structure
2338
// Btree nodes Level 1A1 - Version 1 B-trees
2439
inner class BTreeNode(val address: Long, val parent: BTreeNode?) {
25-
val level: Int
26-
val nentries: Int
27-
private val leftAddress: Long
28-
private val rightAddress: Long
40+
var level: Int = 0
41+
var nentries: Int = 0
2942

30-
val keys = mutableListOf<LongArray>()
31-
val values = mutableListOf<DataChunkIF>()
43+
val keyValues = mutableListOf<Pair<Int, DataChunk>>() // tile order to DataChunk
3244
val children = mutableListOf<BTreeNode>()
3345

46+
var lastOrder : Int = 0
47+
3448
init {
35-
val state = OpenFileState(raf.getFileOffset(address), false)
36-
val magic: String = raf.readString(state, 4)
37-
check(magic == "TREE") { "DataBTree doesnt start with TREE" }
38-
39-
val type: Int = raf.readByte(state).toInt()
40-
check(type == 1) { "DataBTree must be type 1" }
41-
42-
level = raf.readByte(state).toInt() // leaf nodes are level 0
43-
nentries = raf.readShort(state).toInt() // number of children to which this node points
44-
leftAddress = raf.readOffset(state)
45-
rightAddress = raf.readOffset(state)
46-
47-
if (nentries == 0) {
48-
val chunkSize = raf.readInt(state)
49-
val filterMask = raf.readInt(state)
50-
val inner = LongArray(ndimStorage) { j -> raf.readLong(state) }
51-
val key = DataChunkKey(chunkSize, filterMask, inner)
52-
val childPointer = raf.readAddress(state)
53-
keys.add(inner)
54-
values.add(DataChunkEntry1(this, key, childPointer))
55-
} else {
49+
if (address > 0) {
50+
val state = OpenFileState(raf.getFileOffset(address), false)
51+
val magic: String = raf.readString(state, 4)
52+
check(magic == "TREE") { "DataBTree doesnt start with TREE" }
53+
54+
val type: Int = raf.readByte(state).toInt()
55+
check(type == 1) { "DataBTree must be type 1" }
56+
57+
level = raf.readByte(state).toInt() // leaf nodes are level 0
58+
nentries = raf.readShort(state).toInt() // number of children to which this node points
59+
val leftAddress = raf.readOffset(state)
60+
val rightAddress = raf.readOffset(state)
61+
5662
repeat(nentries) {
5763
val chunkSize = raf.readInt(state)
5864
val filterMask = raf.readInt(state)
5965
val inner = LongArray(ndimStorage) { j -> raf.readLong(state) }
60-
val key = DataChunkKey(chunkSize, filterMask, inner)
66+
val order = tiling.order(inner).toInt()
67+
val key = DataChunkKey(order, chunkSize, filterMask)
6168
val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset
6269
if (level == 0) {
63-
keys.add(inner)
64-
values.add(DataChunkEntry1( this, key, childPointer))
70+
keyValues.add(Pair(order, DataChunk(key, childPointer)))
71+
lastOrder = order
6572
} else {
6673
children.add(BTreeNode(childPointer, this))
6774
}
@@ -72,44 +79,52 @@ internal class BTree1data(
7279
// but most nodes will point to less than that number of children""
7380
}
7481

75-
// return only the leaf nodes, in any order
76-
fun asSequence(): Sequence<Pair<Long, DataChunkIF>> = sequence {
82+
// this does not have missing data. Use iterator on the Btree1data class
83+
// return only the leaf nodes, in depth-first order
84+
fun asSequence(): Sequence<Pair<Int, DataChunkIF>> = sequence {
7785
// Handle child nodes recursively (in-order traversal)
7886
if (children.isNotEmpty()) {
7987
children.forEachIndexed { index, childNode ->
8088
yieldAll(childNode.asSequence()) // Yield all elements from the child
8189
}
8290
} else { // If it's a leaf node (no children)
83-
keys.forEachIndexed { index, key ->
84-
yield(tiling.order(key) to values[index]) // Yield all key-value pairs
85-
}
91+
keyValues.forEach { yield(it) }
8692
}
8793
}
88-
}
8994

90-
data class DataChunkKey(val chunkSize: Int, val filterMask : Int, val offsets: LongArray) {
91-
override fun equals(other: Any?): Boolean {
92-
if (this === other) return true
93-
if (other !is DataChunkKey) return false
94-
if (!offsets.contentEquals(other.offsets)) return false
95-
return true
95+
fun findDataChunk(wantOrder: Int): DataChunk? {
96+
if (children.isNotEmpty()) { // search tree; assumes that chunks are ordered
97+
children.forEach { childNode ->
98+
if (wantOrder <= childNode.lastOrder)
99+
return childNode.findDataChunk(wantOrder)
100+
}
101+
} else { // If it's a leaf node (no children)
102+
val kv = keyValues.find { it.first == wantOrder }
103+
return kv?.second
104+
}
105+
return null
96106
}
97107

98-
override fun hashCode(): Int {
99-
return offsets.contentHashCode()
100-
}
101108
}
102109

110+
data class DataChunkKey(val order: Int, val chunkSize: Int, val filterMask : Int)
111+
103112
// childAddress = data chunk (level 1) else a child node
104-
data class DataChunkEntry1(val parent : BTreeNode, val key : DataChunkKey, val childAddress : Long) : DataChunkIF {
113+
inner class DataChunk(val key : DataChunkKey, val childAddress : Long) : DataChunkIF {
105114
override fun childAddress() = childAddress
106-
override fun offsets() = key.offsets
115+
override fun offsets() = tiling.orderToIndex(key.order.toLong())
107116
override fun isMissing() = (childAddress <= 0L) // may be 0 or -1
108117
override fun chunkSize() = key.chunkSize
109118
override fun filterMask() = key.filterMask
110119

111-
override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" +
112-
", tile= ${tiling.tile(key.offsets).contentToString()}"
120+
override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${offsets().contentToString()}" +
121+
", tile= ${tiling.tile(offsets() ).contentToString()}"
122+
123+
fun show() = show(tiling)
124+
}
125+
126+
fun missingDataChunk(order: Int) : DataChunk {
127+
return DataChunk(DataChunkKey(order, 0, 0), -1L)
113128
}
114129
}
115130

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ package com.sunya.netchdf.hdf5
44

55
import com.sunya.cdm.api.ArraySection
66
import com.sunya.cdm.api.Datatype
7+
import com.sunya.cdm.api.Section
8+
import com.sunya.cdm.api.SectionPartial
79
import com.sunya.cdm.api.Variable
810
import com.sunya.cdm.api.computeSize
911
import com.sunya.cdm.api.toIntArray
1012
import com.sunya.cdm.api.toLongArray
1113
import com.sunya.cdm.iosp.OpenFileState
14+
import com.sunya.cdm.layout.Chunker
1215
import com.sunya.cdm.layout.IndexSpace
1316
import com.sunya.cdm.layout.Tiling
1417
import com.sunya.cdm.layout.transferMissingNelems
@@ -23,18 +26,25 @@ import kotlinx.coroutines.launch
2326
import kotlinx.coroutines.runBlocking
2427
import kotlinx.coroutines.yield
2528

26-
class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
29+
class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>, wantSection: SectionPartial?) {
2730
val h5 = h5file.header
2831
val rafext: OpenFileExtended = h5.openFileExtended()
32+
internal val bTree: BTree1data
2933

3034
val varShape = v2.shape
3135
val chunkShape: IntArray
3236
val tiling: Tiling
3337
val nchunks: Long
34-
internal val rootNode: BTree1data.BTreeNode
35-
val rootAddress: Long
38+
// internal val rootNode: BTree1data.BTreeNode
39+
// val rootAddress: Long
40+
val wantSpace: IndexSpace
41+
val allData : Boolean
3642

3743
init {
44+
val useSection = SectionPartial.fill(wantSection, v2.shape)
45+
wantSpace = IndexSpace(useSection)
46+
allData = (wantSection == null) || (useSection == Section(varShape))
47+
3848
require(v2.spObject is DataContainerVariable)
3949
val vinfo = v2.spObject
4050
require(vinfo.mdl is DataLayoutBTreeVer1)
@@ -44,17 +54,16 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
4454
nchunks = tiling.tileShape.computeSize()
4555

4656
// its not obvious you actually need a seperate raf
47-
val bTreeExt = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray())
48-
rootNode = bTreeExt.rootNode()
49-
rootAddress = mdl.btreeAddress
57+
bTree = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray())
58+
// rootAddress = mdl.btreeAddress
5059
}
5160

52-
// TODO section: SectionPartial
5361
fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit, done: () -> Unit) {
62+
5463
runBlocking {
5564
val jobs = mutableListOf<Job>()
5665
val workers = mutableListOf<Worker>()
57-
val chunkProducer = produceChunks(rootNode.asSequence())
66+
val chunkProducer = produceChunks(bTree.asSequence())
5867
repeat(nthreads) {
5968
val worker = Worker()
6069
jobs.add( launchJob(worker, chunkProducer, lamda))
@@ -95,9 +104,6 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
95104
private inner class Worker() {
96105
val rafext: OpenFileExtended = h5.openFileExtended() // here we need a seperate raf
97106

98-
// a thread-safe accessor of the btree
99-
// private val btree1 = BTree1data(rafext, rootAddress, varShape, chunkShape.toLongArray())
100-
101107
val vinfo: DataContainerVariable = v2.spObject as DataContainerVariable
102108
val h5type: H5TypeInfo
103109
val elemSize: Int
@@ -116,31 +122,41 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
116122
}
117123

118124
fun work(dataChunk : DataChunkIF) : ArraySection<*>? {
119-
// TODO check if intersect with wantSection before reading in data
120-
121125
val dataSpace = IndexSpace(v2.rank, dataChunk.offsets(), vinfo.storageDims)
126+
if (!allData && !wantSpace.intersects(dataSpace)) {
127+
return null
128+
}
129+
val useEntireChunk = wantSpace.contains(dataSpace)
130+
val intersectSpace = if (useEntireChunk) dataSpace else wantSpace.intersect(dataSpace)
131+
122132
val ba = if (dataChunk.isMissing()) {
123133
if (debugChunking) println(" missing ${dataChunk.show(tiling)}")
124-
val sizeBytes = dataSpace.totalElements * elemSize
134+
val sizeBytes = intersectSpace.totalElements * elemSize
125135
val bbmissing = ByteArray(sizeBytes.toInt())
126-
transferMissingNelems(vinfo.fillValue, dataSpace.totalElements.toInt(), bbmissing, 0)
127-
if (debugChunking) println(" missing transfer ${dataSpace.totalElements} fillValue=${vinfo.fillValue}")
136+
transferMissingNelems(vinfo.fillValue, intersectSpace.totalElements.toInt(), bbmissing, 0)
137+
if (debugChunking) println(" missing transfer ${intersectSpace.totalElements} fillValue=${vinfo.fillValue}")
128138
bbmissing
129139
} else {
130140
if (debugChunking) println(" chunkIterator=${dataChunk.show(tiling)}")
131141
state.pos = dataChunk.childAddress()
132142
val rawdata = rafext.readByteArray(state, dataChunk.chunkSize())
133-
if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!)
143+
val filteredData = if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!)
144+
if (useEntireChunk) {
145+
filteredData
146+
} else {
147+
val chunker = Chunker(dataSpace, wantSpace) // each DataChunkEntry has its own Chunker iteration
148+
chunker.copyOut(filteredData, 0, elemSize, intersectSpace.totalElements.toInt())
149+
}
134150
}
135151

136152
val array = if (h5type.datatype5 == Datatype5.Vlen) {
137153
// internal fun <T> H5builder.processVlenIntoArray(h5type: H5TypeInfo, shape: IntArray, ba: ByteArray, nelems: Int, elemSize : Int): ArrayTyped<T> {
138-
h5.processVlenIntoArray(h5type, dataSpace.shape.toIntArray(), ba, dataSpace.totalElements.toInt(), elemSize)
154+
h5.processVlenIntoArray(h5type, intersectSpace.shape.toIntArray(), ba, intersectSpace.totalElements.toInt(), elemSize)
139155
} else {
140-
h5.processDataIntoArray(ba, h5type.isBE, datatype, dataSpace.shape.toIntArray(), h5type, elemSize)
156+
h5.processDataIntoArray(ba, h5type.isBE, datatype, intersectSpace.shape.toIntArray(), h5type, elemSize)
141157
}
142158

143-
return ArraySection(array, dataSpace.section(v2.shape))
159+
return ArraySection(array, intersectSpace.section(v2.shape))
144160
}
145161
}
146162
val debugChunking = false

0 commit comments

Comments
 (0)