diff --git a/test/logical/build.sbt b/test/logical/build.sbt index 8e17c34..091686c 100644 --- a/test/logical/build.sbt +++ b/test/logical/build.sbt @@ -1,14 +1,15 @@ -import sbt.Keys._ +import sbt.Keys.* -ThisBuild / scalaVersion := "2.12.10" -ThisBuild / organization := "org.dpla" +ThisBuild / scalaVersion := "2.12.12" +ThisBuild / organization := "dp.la" -lazy val hello = (project in file(".")) +val SPARK_VERSION = "3.5.3" + +lazy val proj = (project in file(".")) .settings( - name := "Hello", + name := "Thumbnail API Tester", resolvers += "SparkPackages" at "https://dl.bintray.com/spark-packages/maven/", - libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2", - libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7", - libraryDependencies +="org.apache.spark" %% "spark-sql" % "2.4.7", - libraryDependencies +="org.apache.spark" %% "spark-mllib" % "2.4.7" + libraryDependencies += "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided", + libraryDependencies +="org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided", + libraryDependencies +="org.apache.spark" %% "spark-mllib" % SPARK_VERSION % "provided" ) diff --git a/test/logical/project/build.properties b/test/logical/project/build.properties index 08e4d79..73df629 100644 --- a/test/logical/project/build.properties +++ b/test/logical/project/build.properties @@ -1 +1 @@ -sbt.version=1.4.1 +sbt.version=1.10.7 diff --git a/test/logical/src/main/scala/TestIds.scala b/test/logical/src/main/scala/TestIds.scala index 139c27e..633de46 100644 --- a/test/logical/src/main/scala/TestIds.scala +++ b/test/logical/src/main/scala/TestIds.scala @@ -1,20 +1,18 @@ -import java.io.FileOutputStream -import java.util.concurrent.TimeUnit - -import scalaj.http._ +import java.io.InputStream import org.apache.spark.SparkConf -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.SparkSession + +import java.net.URI +import java.net.http.{HttpClient, HttpRequest, HttpResponse} +import scala.util.{Failure, Success, Try, Using} +import scala.util.control.Breaks._ -import scala.io.Source object TestIds { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setAppName("TestIds") - .setMaster("local[*]") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryoserializer.buffer.max", "200") val spark = SparkSession.builder() .config(sparkConf) @@ -23,26 +21,53 @@ object TestIds { import spark.implicits._ val fraction = args(0).toFloat val ids = spark.read.text(args(1)) + val endpoint = args(2) + ids + .sample(fraction) + .mapPartitions(rows => checkIds(rows.map(row => row.getString(0)), endpoint)) + .write + .csv(args(3)) + spark.close() - ids.sample(fraction).map( - (row: Row) => { - val id = row.getString(0) - val url = s"http://thumb.us-east-1.elasticbeanstalk.com/thumb/${id}" - try { - val response: HttpResponse[String] = Http(url).asString - val status = response.code - val body = response.body - val contentType = response.headers.getOrElse("Content-Type", Seq("")).head - val size = body.length - f"$id,$status,$contentType,$size" - } catch { - case e: Exception => f"$id,${e.getMessage}" - } - } - ).write.text(args(2)) + } - spark.close() + def checkIds(ids: Iterator[String], endpoint: String): Iterator[Result] = { + val client = HttpClient.newHttpClient + ids.map(id => {checkId(id, endpoint, client)}) + } + + def checkId(id: String, endpoint: String, client: HttpClient): Result = Try { + val url = s"$endpoint$id" + val request = HttpRequest.newBuilder().uri(new URI(url)).build() + val response = client.send(request, HttpResponse.BodyHandlers.ofInputStream()) + val status = response.statusCode() + val contentType = response.headers().firstValue("Content-Type").orElse("Unknown") + Using(response.body()) { bodyInputStream => + countBytes(bodyInputStream, 1024) + } match { + case Success(size) => + Result(id, status, contentType, size, "") + case Failure(e) => + System.err.println("Failed to size body for " + id + ", message: " + e.getMessage) + Result(id, status, contentType, -1, e.getMessage) + } + }.getOrElse({ + System.err.println("Failed to connect for " + id) + Result(id, -1, "Unknown", -1, "fatal") + }) + + def countBytes(inputStream: InputStream, bufferSize: Int): Long = { + var result = 0L + val buff = new Array[Byte](bufferSize) + var bytesRead = 0 + while (bytesRead != -1 ) { + bytesRead = inputStream.read(buff) + result += bytesRead + } + result } +} + -} \ No newline at end of file +case class Result(id: String, status: Int, contentType: String, size: Long, error: String) \ No newline at end of file