From 25a4d393a47d53557dc0c755acb6d9c3a46a690f Mon Sep 17 00:00:00 2001 From: Michael Della Bitta Date: Wed, 1 Jan 2025 19:49:44 -0500 Subject: [PATCH 1/2] Updated logical test --- test/logical/build.sbt | 19 +++--- test/logical/project/build.properties | 2 +- test/logical/src/main/scala/TestIds.scala | 83 +++++++++++++++-------- 3 files changed, 67 insertions(+), 37 deletions(-) diff --git a/test/logical/build.sbt b/test/logical/build.sbt index 8e17c34..091686c 100644 --- a/test/logical/build.sbt +++ b/test/logical/build.sbt @@ -1,14 +1,15 @@ -import sbt.Keys._ +import sbt.Keys.* -ThisBuild / scalaVersion := "2.12.10" -ThisBuild / organization := "org.dpla" +ThisBuild / scalaVersion := "2.12.12" +ThisBuild / organization := "dp.la" -lazy val hello = (project in file(".")) +val SPARK_VERSION = "3.5.3" + +lazy val proj = (project in file(".")) .settings( - name := "Hello", + name := "Thumbnail API Tester", resolvers += "SparkPackages" at "https://dl.bintray.com/spark-packages/maven/", - libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2", - libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7", - libraryDependencies +="org.apache.spark" %% "spark-sql" % "2.4.7", - libraryDependencies +="org.apache.spark" %% "spark-mllib" % "2.4.7" + libraryDependencies += "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided", + libraryDependencies +="org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided", + libraryDependencies +="org.apache.spark" %% "spark-mllib" % SPARK_VERSION % "provided" ) diff --git a/test/logical/project/build.properties b/test/logical/project/build.properties index 08e4d79..73df629 100644 --- a/test/logical/project/build.properties +++ b/test/logical/project/build.properties @@ -1 +1 @@ -sbt.version=1.4.1 +sbt.version=1.10.7 diff --git a/test/logical/src/main/scala/TestIds.scala b/test/logical/src/main/scala/TestIds.scala index 139c27e..8ed9c27 100644 --- a/test/logical/src/main/scala/TestIds.scala +++ b/test/logical/src/main/scala/TestIds.scala @@ -1,20 +1,18 @@ -import java.io.FileOutputStream -import java.util.concurrent.TimeUnit - -import scalaj.http._ +import java.io.InputStream import org.apache.spark.SparkConf -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.SparkSession + +import java.net.URI +import java.net.http.{HttpClient, HttpRequest, HttpResponse} +import scala.util.{Failure, Success, Try, Using} +import scala.util.control.Breaks._ -import scala.io.Source object TestIds { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setAppName("TestIds") - .setMaster("local[*]") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryoserializer.buffer.max", "200") val spark = SparkSession.builder() .config(sparkConf) @@ -23,26 +21,57 @@ object TestIds { import spark.implicits._ val fraction = args(0).toFloat val ids = spark.read.text(args(1)) + val endpoint = args(2) + ids + .sample(fraction) + .mapPartitions(rows => checkIds(rows.map(row => row.getString(0)), endpoint)) + .write + .csv(args(3)) + spark.close() - ids.sample(fraction).map( - (row: Row) => { - val id = row.getString(0) - val url = s"http://thumb.us-east-1.elasticbeanstalk.com/thumb/${id}" - try { - val response: HttpResponse[String] = Http(url).asString - val status = response.code - val body = response.body - val contentType = response.headers.getOrElse("Content-Type", Seq("")).head - val size = body.length - f"$id,$status,$contentType,$size" - } catch { - case e: Exception => f"$id,${e.getMessage}" - } - } - ).write.text(args(2)) + } - spark.close() + def checkIds(ids: Iterator[String], endpoint: String): Iterator[Result] = { + val client = HttpClient.newHttpClient + ids.map(id => {checkId(id, endpoint, client)}) + } + + def checkId(id: String, endpoint: String, client: HttpClient): Result = Try { + val url = s"$endpoint$id" + val request = HttpRequest.newBuilder().uri(new URI(url)).build() + val response = client.send(request, HttpResponse.BodyHandlers.ofInputStream()) + val status = response.statusCode() + val contentType = response.headers().firstValue("Content-Type").orElse("Unknown") + + Using(response.body()) { bodyInputStream => + countBytes(bodyInputStream, 1024) + } match { + case Success(size) => + Result(id, status, contentType, size, "") + case Failure(e) => + System.err.println("Failed to size body for {}", id, e) + Result(id, status, contentType, -1, e.getMessage) + } + }.getOrElse({ + System.err.println("Failed to connect for {}", id) + Result(id, -1, "Unknown", -1, "fatal") + }) + def countBytes(inputStream: InputStream, bufferSize: Int): Long = { + var result = 0L + val buff = new Array[Byte](bufferSize) + breakable { + while (true) { + val bytesRead = inputStream.read(buff) + if (bytesRead == -1) { + break() + } + result += bytesRead + } + } + result } +} + -} \ No newline at end of file +case class Result(id: String, status: Int, contentType: String, size: Long, error: String) \ No newline at end of file From baa9466df15de40a1d2727f5d395fa7f2ebc7d17 Mon Sep 17 00:00:00 2001 From: Michael Della Bitta Date: Wed, 1 Jan 2025 20:02:26 -0500 Subject: [PATCH 2/2] implemented ellipsis suggestions --- test/logical/src/main/scala/TestIds.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/test/logical/src/main/scala/TestIds.scala b/test/logical/src/main/scala/TestIds.scala index 8ed9c27..633de46 100644 --- a/test/logical/src/main/scala/TestIds.scala +++ b/test/logical/src/main/scala/TestIds.scala @@ -49,25 +49,21 @@ object TestIds { case Success(size) => Result(id, status, contentType, size, "") case Failure(e) => - System.err.println("Failed to size body for {}", id, e) + System.err.println("Failed to size body for " + id + ", message: " + e.getMessage) Result(id, status, contentType, -1, e.getMessage) } }.getOrElse({ - System.err.println("Failed to connect for {}", id) + System.err.println("Failed to connect for " + id) Result(id, -1, "Unknown", -1, "fatal") }) def countBytes(inputStream: InputStream, bufferSize: Int): Long = { var result = 0L val buff = new Array[Byte](bufferSize) - breakable { - while (true) { - val bytesRead = inputStream.read(buff) - if (bytesRead == -1) { - break() - } - result += bytesRead - } + var bytesRead = 0 + while (bytesRead != -1 ) { + bytesRead = inputStream.read(buff) + result += bytesRead } result }