Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions test/logical/build.sbt
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import sbt.Keys._
import sbt.Keys.*

ThisBuild / scalaVersion := "2.12.10"
ThisBuild / organization := "org.dpla"
ThisBuild / scalaVersion := "2.12.12"
ThisBuild / organization := "dp.la"

lazy val hello = (project in file("."))
val SPARK_VERSION = "3.5.3"

lazy val proj = (project in file("."))
.settings(
name := "Hello",
name := "Thumbnail API Tester",
resolvers += "SparkPackages" at "https://dl.bintray.com/spark-packages/maven/",
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2",
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7",
libraryDependencies +="org.apache.spark" %% "spark-sql" % "2.4.7",
libraryDependencies +="org.apache.spark" %% "spark-mllib" % "2.4.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided",
libraryDependencies +="org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided",
libraryDependencies +="org.apache.spark" %% "spark-mllib" % SPARK_VERSION % "provided"
)
2 changes: 1 addition & 1 deletion test/logical/project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.4.1
sbt.version=1.10.7
79 changes: 52 additions & 27 deletions test/logical/src/main/scala/TestIds.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import java.io.FileOutputStream
import java.util.concurrent.TimeUnit

import scalaj.http._
import java.io.InputStream
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.SparkSession

import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import scala.util.{Failure, Success, Try, Using}
import scala.util.control.Breaks._

import scala.io.Source

object TestIds {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("TestIds")
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max", "200")

val spark = SparkSession.builder()
.config(sparkConf)
Expand All @@ -23,26 +21,53 @@ object TestIds {
import spark.implicits._
val fraction = args(0).toFloat
val ids = spark.read.text(args(1))
val endpoint = args(2)
ids
.sample(fraction)
.mapPartitions(rows => checkIds(rows.map(row => row.getString(0)), endpoint))
.write
.csv(args(3))
spark.close()

ids.sample(fraction).map(
(row: Row) => {
val id = row.getString(0)
val url = s"http://thumb.us-east-1.elasticbeanstalk.com/thumb/${id}"
try {
val response: HttpResponse[String] = Http(url).asString
val status = response.code
val body = response.body
val contentType = response.headers.getOrElse("Content-Type", Seq("")).head
val size = body.length
f"$id,$status,$contentType,$size"
} catch {
case e: Exception => f"$id,${e.getMessage}"
}
}
).write.text(args(2))
}

spark.close()
def checkIds(ids: Iterator[String], endpoint: String): Iterator[Result] = {
val client = HttpClient.newHttpClient
ids.map(id => {checkId(id, endpoint, client)})
}

def checkId(id: String, endpoint: String, client: HttpClient): Result = Try {
val url = s"$endpoint$id"
val request = HttpRequest.newBuilder().uri(new URI(url)).build()
val response = client.send(request, HttpResponse.BodyHandlers.ofInputStream())
val status = response.statusCode()
val contentType = response.headers().firstValue("Content-Type").orElse("Unknown")

Using(response.body()) { bodyInputStream =>
countBytes(bodyInputStream, 1024)
} match {
case Success(size) =>
Result(id, status, contentType, size, "")
case Failure(e) =>
System.err.println("Failed to size body for " + id + ", message: " + e.getMessage)
Result(id, status, contentType, -1, e.getMessage)
}
}.getOrElse({
System.err.println("Failed to connect for " + id)
Result(id, -1, "Unknown", -1, "fatal")
})

def countBytes(inputStream: InputStream, bufferSize: Int): Long = {
var result = 0L
val buff = new Array[Byte](bufferSize)
var bytesRead = 0
while (bytesRead != -1 ) {
bytesRead = inputStream.read(buff)
result += bytesRead
}
result
}
}


}
case class Result(id: String, status: Int, contentType: String, size: Long, error: String)
Loading