From 9a971bb8077ae7f16ec02172021cf593bfcb68b8 Mon Sep 17 00:00:00 2001 From: Reema Date: Wed, 27 Sep 2017 16:27:09 +0530 Subject: [PATCH 1/3] Cleaning up task files during abort in DirectParquetOutputCommitter --- .../com/indix/utils/core/StringUtils.scala | 11 +++++++ .../DirectParquetOutputCommitter.scala | 21 +++++++++--- .../com/indix/utils/spark/SparkJobSpec.scala | 16 ++++++++++ .../utils/spark/pail/PailDataSourceSpec.scala | 12 ++----- .../DirectParquetOutputCommitterSpec.scala | 32 +++++++++++++++++++ .../parquet/ParquetAvroDataSourceSpec.scala | 22 +++---------- 6 files changed, 82 insertions(+), 32 deletions(-) create mode 100644 util-core/src/main/scala/com/indix/utils/core/StringUtils.scala create mode 100644 util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala create mode 100644 util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala diff --git a/util-core/src/main/scala/com/indix/utils/core/StringUtils.scala b/util-core/src/main/scala/com/indix/utils/core/StringUtils.scala new file mode 100644 index 0000000..377e523 --- /dev/null +++ b/util-core/src/main/scala/com/indix/utils/core/StringUtils.scala @@ -0,0 +1,11 @@ +package com.indix.utils.core + +object StringUtils { + + def removeSpaces(str: String) = { + str + .replaceAll("\u00A0", " ") + .replaceAll("\\s+", " ") + } + +} diff --git a/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala b/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala index 981ddbb..aab2cf5 100644 --- a/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala +++ b/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala @@ -18,12 +18,14 @@ package com.indix.utils.spark.parquet -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.Log import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat /** * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder @@ -37,18 +39,27 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO * * *NOTE* * - * NEVER use DirectParquetOutputCommitter when appending data, because currently there's - * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are - * left empty). + * NEVER use DirectParquetOutputCommitter when appending data, because currently there's + * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are + * left empty). */ class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { val LOG = Log.getLog(classOf[ParquetOutputCommitter]) + var jobId: Option[String] = None override def getWorkPath: Path = outputPath - override def abortTask(taskContext: TaskAttemptContext): Unit = {} + override def abortTask(taskContext: TaskAttemptContext): Unit = { + val fs = outputPath.getFileSystem(context.getConfiguration) + val split = taskContext.getTaskAttemptID.getTaskID.getId + + val lists = fs.listStatus(outputPath, new PathFilter { + override def accept(path: Path): Boolean = path.getName.contains(f"-$split%05d") + }) + lists.foreach(l => fs.delete(l.getPath, false)) + } override def commitTask(taskContext: TaskAttemptContext): Unit = {} diff --git a/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala new file mode 100644 index 0000000..4a588fc --- /dev/null +++ b/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala @@ -0,0 +1,16 @@ +package com.indix.utils.spark + +import org.apache.spark.sql.SparkSession +import org.scalatest.{BeforeAndAfterAll, FlatSpec} + +abstract class SparkJobSpec extends FlatSpec with BeforeAndAfterAll { + val appName: String + val taskRetries: Int = 1 + @transient var spark: SparkSession = _ + lazy val sqlContext = spark.sqlContext + + override protected def beforeAll(): Unit = { + super.beforeAll() + spark = SparkSession.builder().master(s"local[2, $taskRetries]").appName(appName).getOrCreate() + } +} diff --git a/util-spark/src/test/scala/com/indix/utils/spark/pail/PailDataSourceSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/pail/PailDataSourceSpec.scala index 21f7a07..72bee91 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/pail/PailDataSourceSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/pail/PailDataSourceSpec.scala @@ -5,9 +5,8 @@ import java.util import com.backtype.hadoop.pail.{PailFormatFactory, PailSpec, PailStructure} import com.backtype.support.{Utils => PailUtils} import com.google.common.io.Files +import com.indix.utils.spark.SparkJobSpec import org.apache.commons.io.FileUtils -import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfterAll, FlatSpec} import org.scalatest.Matchers._ import scala.collection.JavaConverters._ @@ -27,13 +26,8 @@ class UserPailStructure extends PailStructure[User] { override def deserialize(serialized: Array[Byte]): User = PailUtils.deserialize(serialized).asInstanceOf[User] } -class PailDataSourceSpec extends FlatSpec with BeforeAndAfterAll with PailDataSource { - private var spark: SparkSession = _ - - override protected def beforeAll(): Unit = { - super.beforeAll() - spark = SparkSession.builder().master("local[2]").appName("PailDataSource").getOrCreate() - } +class PailDataSourceSpec extends SparkJobSpec with PailDataSource { + override val appName: String = "PailDataSource" val userPailSpec = new PailSpec(PailFormatFactory.SEQUENCE_FILE, new UserPailStructure) diff --git a/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala new file mode 100644 index 0000000..1bc5edc --- /dev/null +++ b/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala @@ -0,0 +1,32 @@ +package com.indix.utils.spark.parquet + +import com.indix.utils.spark.SparkJobSpec +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.functions._ + +class DirectParquetOutputCommitterSpec extends SparkJobSpec { + override val appName = "DirectParquetOutputCommitterSpec" + override val taskRetries = 2 + + it should "not fail with file already exists on subsequent retries" in { + spark.conf + .set("spark.sql.parquet.output.committer.class", "com.indix.utils.spark.parquet.DirectParquetOutputCommitter") + + val exception = intercept[org.apache.spark.SparkException] { + sqlContext + .range(10) + .toDF() + .withColumn("dummy", udf((x: Int) => x / (x - 1)).apply(col("id"))) + .write + .parquet("/tmp/parquet/dummy") + } + + val path = new Path("/tmp/parquet/dummy") + val fs = path.getFileSystem(new Configuration()) + fs.delete(path) + + } + + +} diff --git a/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala index e79b2a7..3ce8edc 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala @@ -3,30 +3,16 @@ package com.indix.utils.spark.parquet import java.io.File import com.google.common.io.Files +import com.indix.utils.spark.SparkJobSpec import com.indix.utils.spark.parquet.avro.ParquetAvroDataSource import org.apache.commons.io.FileUtils -import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfterAll, FlatSpec} -import org.scalatest.Matchers.{be, convertToAnyShouldWrapper} import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.scalatest.Matchers.{be, convertToAnyShouldWrapper} case class SampleAvroRecord(a: Int, b: String, c: Seq[String], d: Boolean, e: Double, f: collection.Map[String,String], g: Seq[Byte]) -class ParquetAvroDataSourceSpec extends FlatSpec with BeforeAndAfterAll with ParquetAvroDataSource { - private var spark: SparkSession = _ - - override protected def beforeAll(): Unit = { - super.beforeAll() - spark = SparkSession.builder().master("local[2]").appName("ParquetAvroDataSource").getOrCreate() - } - - override protected def afterAll(): Unit = { - try { - spark.sparkContext.stop() - } finally { - super.afterAll() - } - } +class ParquetAvroDataSourceSpec extends SparkJobSpec with ParquetAvroDataSource { + override val appName = "ParquetAvroDataSource" "AvroBasedParquetDataSource" should "read/write avro records as ParquetData" in { From e713004345a4f74c47c83354146ee3a4bab445e0 Mon Sep 17 00:00:00 2001 From: Reema Date: Thu, 28 Sep 2017 12:08:54 +0530 Subject: [PATCH 2/3] Writing appropriate test with retries for DirectParquetOutputCommitter --- .../com/indix/utils/core/StringUtils.scala | 11 ---- .../DirectParquetOutputCommitter.scala | 14 +++-- .../com/indix/utils/spark/SparkJobSpec.scala | 5 ++ .../DirectParquetOutputCommitterSpec.scala | 59 +++++++++++++------ 4 files changed, 54 insertions(+), 35 deletions(-) delete mode 100644 util-core/src/main/scala/com/indix/utils/core/StringUtils.scala diff --git a/util-core/src/main/scala/com/indix/utils/core/StringUtils.scala b/util-core/src/main/scala/com/indix/utils/core/StringUtils.scala deleted file mode 100644 index 377e523..0000000 --- a/util-core/src/main/scala/com/indix/utils/core/StringUtils.scala +++ /dev/null @@ -1,11 +0,0 @@ -package com.indix.utils.core - -object StringUtils { - - def removeSpaces(str: String) = { - str - .replaceAll("\u00A0", " ") - .replaceAll("\\s+", " ") - } - -} diff --git a/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala b/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala index aab2cf5..8379464 100644 --- a/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala +++ b/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.Log +import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} import org.apache.spark.internal.io.FileCommitProtocol @@ -47,18 +48,21 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { val LOG = Log.getLog(classOf[ParquetOutputCommitter]) - var jobId: Option[String] = None override def getWorkPath: Path = outputPath override def abortTask(taskContext: TaskAttemptContext): Unit = { val fs = outputPath.getFileSystem(context.getConfiguration) val split = taskContext.getTaskAttemptID.getTaskID.getId + try { + val lists = fs.listStatus(outputPath, new PathFilter { + override def accept(path: Path): Boolean = path.getName.contains(f"-$split%05d") + }) + lists.foreach(l => fs.delete(l.getPath, false)) + } catch { + case e: Throwable => LOG.warn(s"Cannot clean $outputPath. File does not exist") + } - val lists = fs.listStatus(outputPath, new PathFilter { - override def accept(path: Path): Boolean = path.getName.contains(f"-$split%05d") - }) - lists.foreach(l => fs.delete(l.getPath, false)) } override def commitTask(taskContext: TaskAttemptContext): Unit = {} diff --git a/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala index 4a588fc..62678cb 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala @@ -6,11 +6,16 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpec} abstract class SparkJobSpec extends FlatSpec with BeforeAndAfterAll { val appName: String val taskRetries: Int = 1 + val sparkConf: Map[String, String] = Map() + @transient var spark: SparkSession = _ lazy val sqlContext = spark.sqlContext override protected def beforeAll(): Unit = { super.beforeAll() spark = SparkSession.builder().master(s"local[2, $taskRetries]").appName(appName).getOrCreate() + sparkConf.foreach { + case (k, v) => spark.conf.set(k, v) + } } } diff --git a/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala index 1bc5edc..294b6d8 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala @@ -1,32 +1,53 @@ package com.indix.utils.spark.parquet +import java.io.File + import com.indix.utils.spark.SparkJobSpec -import org.apache.hadoop.conf.Configuration +import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path -import org.apache.spark.sql.functions._ +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.SparkException +import org.apache.spark.sql.SaveMode +import org.scalatest.Matchers + +class TestDirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) + extends DirectParquetOutputCommitter(outputPath, context) { + + override def commitTask(taskContext: TaskAttemptContext): Unit = { + if (taskContext.getTaskAttemptID.getId == 0) + throw new SparkException("Failing first attempt of task") + else + super.commitTask(taskContext) + } + +} -class DirectParquetOutputCommitterSpec extends SparkJobSpec { +class DirectParquetOutputCommitterSpec extends SparkJobSpec with Matchers { override val appName = "DirectParquetOutputCommitterSpec" override val taskRetries = 2 + override val sparkConf = Map(("spark.sql.parquet.output.committer.class", "com.indix.utils.spark.parquet.TestDirectParquetOutputCommitter")) + var file: File = _ - it should "not fail with file already exists on subsequent retries" in { - spark.conf - .set("spark.sql.parquet.output.committer.class", "com.indix.utils.spark.parquet.DirectParquetOutputCommitter") - - val exception = intercept[org.apache.spark.SparkException] { - sqlContext - .range(10) - .toDF() - .withColumn("dummy", udf((x: Int) => x / (x - 1)).apply(col("id"))) - .write - .parquet("/tmp/parquet/dummy") - } - - val path = new Path("/tmp/parquet/dummy") - val fs = path.getFileSystem(new Configuration()) - fs.delete(path) + override def beforeAll() = { + super.beforeAll() + file = File.createTempFile("parquet", "") + } + override def afterAll() = { + super.afterAll() + FileUtils.deleteDirectory(file) } + it should "not fail with file already exists on subsequent retries" in { + lazy val result = sqlContext + .range(10) + .toDF() + .write + .mode(SaveMode.Overwrite) + .parquet(file.toString) + + //Subsequent retry of task will not throw an exception + noException should be thrownBy result + } } From f320d733c16b968f2d8d063d011e05ceb54d6a13 Mon Sep 17 00:00:00 2001 From: Reema Date: Tue, 3 Oct 2017 11:39:43 +0530 Subject: [PATCH 3/3] Setting task retries as 2 while instantiating spark session --- .../DirectParquetOutputCommitter.scala | 18 +++++++------ .../com/indix/utils/spark/SparkJobSpec.scala | 18 +++++++++---- .../DirectParquetOutputCommitterSpec.scala | 25 +++++++++++-------- .../parquet/ParquetAvroDataSourceSpec.scala | 4 +-- 4 files changed, 40 insertions(+), 25 deletions(-) diff --git a/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala b/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala index 8379464..9634508 100644 --- a/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala +++ b/util-spark/src/main/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitter.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat /** - * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder + * An output committer for writing Parquet files. Instead of writing to the `_temporary` folder * like what parquet.hadoop.ParquetOutputCommitter does, this output committer writes data directly to the * destination folder. This can be useful for data stored in S3, where directory operations are * relatively expensive. @@ -41,8 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat * *NOTE* * * NEVER use DirectParquetOutputCommitter when appending data, because currently there's - * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are - * left empty). + * no safe way undo a failed appending job. */ class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) @@ -54,11 +53,16 @@ class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext override def abortTask(taskContext: TaskAttemptContext): Unit = { val fs = outputPath.getFileSystem(context.getConfiguration) val split = taskContext.getTaskAttemptID.getTaskID.getId + + val lists = fs.listStatus(outputPath, new PathFilter { + override def accept(path: Path): Boolean = path.getName.contains(f"-$split%05d-") + }) try { - val lists = fs.listStatus(outputPath, new PathFilter { - override def accept(path: Path): Boolean = path.getName.contains(f"-$split%05d") - }) - lists.foreach(l => fs.delete(l.getPath, false)) + lists.foreach { + l => + LOG.error(s"Abort Task - Deleting ${l.getPath.toUri}") + fs.delete(l.getPath, false) + } } catch { case e: Throwable => LOG.warn(s"Cannot clean $outputPath. File does not exist") } diff --git a/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala index 62678cb..79a9ef3 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/SparkJobSpec.scala @@ -1,21 +1,29 @@ package com.indix.utils.spark import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfterAll, FlatSpec} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec} -abstract class SparkJobSpec extends FlatSpec with BeforeAndAfterAll { +abstract class SparkJobSpec extends FlatSpec with BeforeAndAfterEach with BeforeAndAfterAll { val appName: String - val taskRetries: Int = 1 + val taskRetries: Int = 2 val sparkConf: Map[String, String] = Map() @transient var spark: SparkSession = _ lazy val sqlContext = spark.sqlContext override protected def beforeAll(): Unit = { - super.beforeAll() - spark = SparkSession.builder().master(s"local[2, $taskRetries]").appName(appName).getOrCreate() + spark = SparkSession.builder() + .master(s"local[2, $taskRetries]").appName(appName) + .getOrCreate() + sparkConf.foreach { case (k, v) => spark.conf.set(k, v) } } + + override protected def afterAll() = { + SparkSession.clearDefaultSession() + SparkSession.clearActiveSession() + } + } diff --git a/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala index 294b6d8..5691731 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/parquet/DirectParquetOutputCommitterSpec.scala @@ -1,6 +1,7 @@ package com.indix.utils.spark.parquet import java.io.File +import java.nio.file.{Files, Paths} import com.indix.utils.spark.SparkJobSpec import org.apache.commons.io.FileUtils @@ -24,7 +25,6 @@ class TestDirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptCon class DirectParquetOutputCommitterSpec extends SparkJobSpec with Matchers { override val appName = "DirectParquetOutputCommitterSpec" - override val taskRetries = 2 override val sparkConf = Map(("spark.sql.parquet.output.committer.class", "com.indix.utils.spark.parquet.TestDirectParquetOutputCommitter")) var file: File = _ @@ -39,15 +39,20 @@ class DirectParquetOutputCommitterSpec extends SparkJobSpec with Matchers { } it should "not fail with file already exists on subsequent retries" in { - lazy val result = sqlContext - .range(10) - .toDF() - .write - .mode(SaveMode.Overwrite) - .parquet(file.toString) - - //Subsequent retry of task will not throw an exception - noException should be thrownBy result + try { + sqlContext + .range(10) + .toDF() + .write + .mode(SaveMode.Overwrite) + .parquet(file.toString) + } catch { + case e: Exception => println(e) + } finally { + val successPath = Paths.get(file.toString + "/_SUCCESS") + Files.exists(successPath) should be(true) + } + } } diff --git a/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala b/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala index 3ce8edc..17b694e 100644 --- a/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala +++ b/util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala @@ -30,9 +30,7 @@ class ParquetAvroDataSourceSpec extends SparkJobSpec with ParquetAvroDataSource sampleDf.rdd.saveAvroInParquet(outputLocation, sampleDf.schema, CompressionCodecName.GZIP) - val sparkVal = spark - - import sparkVal.implicits._ + import sqlContext.implicits._ val records: Array[SampleAvroRecord] = spark.read.parquet(outputLocation).as[SampleAvroRecord].collect()