Skip to content
This repository was archived by the owner on Nov 28, 2025. It is now read-only.
This repository was archived by the owner on Nov 28, 2025. It is now read-only.

Possible reason that may fail 'Propagate' - 'write data locally' in LocalWriteSuite test #164

@simonzhaoms

Description

@simonzhaoms

private def writePartitionLocal(
index: Int,
part: Iterator[(BytesWritable, NullWritable)],
localPath: String,
mode: SaveMode): Iterator[Int] = {
val dir = new File(localPath)
if (dir.exists()) {
if (mode == SaveMode.ErrorIfExists) {
throw new IllegalStateException(
s"LocalPath $localPath already exists. SaveMode: ErrorIfExists.")
}
if (mode == SaveMode.Ignore) {
return Iterator.empty
}
}
// Make the directory if it does not exist
dir.mkdirs()
// The path to the partition file.
val filePath = localPath + s"/part-" + String.format("%05d", java.lang.Integer.valueOf(index))
val fos = new DataOutputStream(new FileOutputStream(filePath))
var count = 0
try {
val tfw = new TFRecordWriter(fos)
for((bw, _) <- part) {
tfw.write(bw.getBytes)
count += 1
}
} finally {
fos.close()
}
Iterator(count)
}
// Working around the closure variable captures.
private def writePartitionLocalFun(
localPath: String,
mode: SaveMode): (Int, Iterator[(BytesWritable, NullWritable)]) => Iterator[Int] = {
def mapFun(index: Int, part: Iterator[(BytesWritable, NullWritable)]) = {
writePartitionLocal(index, part, localPath, mode)
}
mapFun
}

The check if (dir.exists()) in line 179 above may cause subsequent partition write failed if partitions are more than 2 in the test below. Because partitions are written in a map in line 211 above, subsequent partition writes would fail when checking if (dir.exists()).

"Propagate" should {
"write data locally" in {
// Create a dataframe with 2 partitions
val rdd = spark.sparkContext.parallelize(testRows, numSlices = 2)
val df = spark.createDataFrame(rdd, schema)
// Write the partitions onto the local hard drive. Since it is going to be the
// local file system, the partitions will be written in the same directory of the
// same machine.
// In a distributed setting though, two different machines would each hold a single
// partition.
val localPath = Files.createTempDirectory("spark-connector-propagate").toAbsolutePath.toString
val savePath = localPath + "/testResult"
df.write.format("tfrecords")
.option("recordType", "Example")
.option("writeLocality", "local")
.save(savePath)
// Read again this directory, this time using the Hadoop file readers, it should
// return the same data.
// This only works in this test and does not hold in general, because the partitions
// will be written on the workers. Everything runs locally for tests.
val df2 = spark.read.format("tfrecords").option("recordType", "Example")
.load(savePath).sort("id").select("id", "IntegerTypeLabel", "LongTypeLabel",
"FloatTypeLabel", "DoubleTypeLabel", "VectorLabel", "name") // Correct column order.
assert(df2.collect().toSeq === testRows.toSeq)
}
}

The exception thrown should be similar to #141 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions