From 4a57a0be928bab987fc5c41d1fe97fdf2c1456c1 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Mon, 24 Aug 2020 11:01:10 +0200 Subject: [PATCH 1/7] compiling core --- build.sbt | 13 ++++-- .../scala/com/criteo/cuttle/Executor.scala | 3 +- .../cuttle/platforms/ExecutionPool.scala | 14 ++++-- .../criteo/cuttle/platforms/RateLimiter.scala | 15 ++++-- .../platforms/WaitingExecutionQueue.scala | 14 +++--- .../cuttle/platforms/http/HttpPlatform.scala | 46 ++++++++++++------- .../platforms/local/LocalPlatform.scala | 9 +++- 7 files changed, 76 insertions(+), 38 deletions(-) diff --git a/build.sbt b/build.sbt index 11af46d6f..a5d6afea3 100644 --- a/build.sbt +++ b/build.sbt @@ -8,12 +8,13 @@ lazy val catsCore = "1.6.1" lazy val circe = "0.11.1" lazy val doobie = "0.7.0" lazy val lolhttp = "0.13.0" +lazy val http4sVersion = "0.20.23" lazy val commonSettings = Seq( organization := "com.criteo.cuttle", version := VERSION, - scalaVersion := "2.11.12", - crossScalaVersions := Seq("2.11.12", "2.12.8"), + scalaVersion := "2.12.10", + crossScalaVersions := Seq("2.11.12", "2.12.10"), scalacOptions ++= Seq( "-deprecation", "-encoding", @@ -214,7 +215,7 @@ lazy val cuttle = libraryDependencies ++= Seq( "com.criteo.lolhttp" %% "lolhttp", "com.criteo.lolhttp" %% "loljson", - "com.criteo.lolhttp" %% "lolhtml" + "com.criteo.lolhttp" %% "lolhtml", ).map(_ % lolhttp), libraryDependencies ++= Seq("core", "generic", "parser", "java8") .map(module => "io.circe" %% s"circe-$module" % circe), @@ -227,6 +228,12 @@ lazy val cuttle = "com.zaxxer" % "nuprocess" % "1.1.3", "org.mariadb.jdbc" % "mariadb-java-client" % "2.7.0" ), + libraryDependencies ++= Seq( + "org.http4s" %% "http4s-dsl" % http4sVersion, + "org.http4s" %% "http4s-blaze-server" % http4sVersion, + "org.http4s" %% "http4s-blaze-client" % http4sVersion, + "org.http4s" %% "http4s-circe" % http4sVersion + ), libraryDependencies ++= Seq( "org.tpolecat" %% "doobie-core", "org.tpolecat" %% "doobie-hikari" diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 9ffc0019d..38fdd29ab 100755 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -28,6 +28,7 @@ import com.criteo.cuttle.ThreadPools.{SideEffectThreadPool, _} import com.criteo.cuttle.Metrics._ import com.criteo.cuttle.platforms.ExecutionPool import doobie.util.Meta +import org.http4s.HttpRoutes /** The strategy to use to retry stuck executions. * @@ -387,7 +388,7 @@ private[cuttle] object Execution { trait ExecutionPlatform { /** Expose a public `lolhttp` service for the platform internal statistics (for the UI and API). */ - def publicRoutes: PartialService = PartialFunction.empty + def publicRoutes: HttpRoutes[IO] = HttpRoutes.empty[IO] /** Expose a private `lolhttp` service for the platform operations (for the UI and API). */ def privateRoutes: AuthenticatedService = PartialFunction.empty diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala b/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala index cc7b57b09..3c3252c46 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala @@ -2,12 +2,16 @@ package com.criteo.cuttle.platforms import scala.concurrent.stm._ -import lol.http._ -import lol.json._ +import cats.implicits._ +import cats.effect._ import io.circe._ import io.circe.syntax._ +import org.http4s._ +import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s.circe._ + /** * An execution pool backed by a priority queue. It limits the concurrent executions * and the priority queue is ordered by the [[scala.math.Ordering Ordering]] defined @@ -20,8 +24,8 @@ case class ExecutionPool(concurrencyLimit: Int) extends WaitingExecutionQueue { def doRunNext()(implicit txn: InTxn): Unit = () override def routes(urlPrefix: String) = - ({ - case req if req.url == urlPrefix => + HttpRoutes.of[IO]({ + case req if req.uri.toString == urlPrefix => Ok( Json.obj( "concurrencyLimit" -> concurrencyLimit.asJson, @@ -29,5 +33,5 @@ case class ExecutionPool(concurrencyLimit: Int) extends WaitingExecutionQueue { "waiting" -> waiting.size.asJson ) ) - }: PartialService).orElse(super.routes(urlPrefix)) + }) <+> super.routes(urlPrefix) } diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala index 42b3c3acb..dbb377aae 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala @@ -4,15 +4,20 @@ import scala.concurrent.stm._ import scala.concurrent.duration._ import java.time._ -import lol.http._ -import lol.json._ import io.circe._ import io.circe.syntax._ import io.circe.java8.time._ + +import cats.implicits._ + import cats.effect.IO import com.criteo.cuttle.utils.timer +import org.http4s._ +import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s.circe._ + /** * An rate limiter pool backed by a priority queue. It rate limits the executions * and the priority queue is ordered by the [[scala.math.Ordering Ordering]] defined @@ -46,8 +51,8 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu def doRunNext()(implicit txn: InTxn) = _tokens() = _tokens() - 1 override def routes(urlPrefix: String) = - ({ - case req if req.url == urlPrefix => + HttpRoutes.of[IO]({ + case req if req.uri.toString == urlPrefix => Ok( Json.obj( "max_tokens" -> tokens.asJson, @@ -56,6 +61,6 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu "last_refill" -> _lastRefill.single.get.asJson ) ) - }: PartialService).orElse(super.routes(urlPrefix)) + }) <+> super.routes(urlPrefix) } diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala b/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala index 212821ec0..8f13dcea8 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala @@ -3,9 +3,7 @@ package com.criteo.cuttle.platforms import com.criteo.cuttle._ import java.time._ - -import lol.http._ -import lol.json._ +import cats.effect._ import scala.concurrent._ import scala.concurrent.stm._ @@ -18,6 +16,10 @@ import io.circe._ import io.circe.syntax._ import io.circe.java8.time._ +import org.http4s._ +import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s.circe._ + /** A priority queue ordered by [[com.criteo.cuttle.SchedulingContext SchedulingContext]] priority. */ trait WaitingExecutionQueue { case class DelayedResult[A](effect: () => Future[A], @@ -107,8 +109,8 @@ trait WaitingExecutionQueue { } } - def routes(urlPrefix: String): PartialService = { - case req if req.url == s"$urlPrefix/running" => + def routes(urlPrefix: String): HttpRoutes[IO] = HttpRoutes.of[IO] { + case req if req.uri.toString == s"$urlPrefix/running" => Ok(this._running.single.get.toSeq.map { case (execution, task) => Json.obj( @@ -116,7 +118,7 @@ trait WaitingExecutionQueue { "task" -> task.asJson ) }.asJson) - case req if req.url == s"$urlPrefix/waiting" => + case req if req.uri.toString == s"$urlPrefix/waiting" => Ok(this._waiting.single.get.toSeq.map { case (execution, task) => Json.obj( diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala b/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala index 4cdd4b573..d6f0d1c8e 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala @@ -6,14 +6,23 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent._ import cats.effect.IO +import cats.implicits._ + import io.circe._ import io.circe.syntax._ -import lol.http._ -import lol.json._ import com.criteo.cuttle._ import com.criteo.cuttle.platforms.{ExecutionPool, RateLimiter} +import org.http4s._ +import org.http4s.dsl.io._ +// import org.http4s.implicits._ +import org.http4s.circe._ +import org.http4s.headers.Host + +// import org.http4s.client.blaze._ +// import org.http4s.client._ + /** Allow to make HTTP calls in a managed way with rate limiting. Globally the platform limits the number * of concurrent requests on the platform. Additionnaly a rate limiter must be defined for each host allowed * to be called by this platform. @@ -53,10 +62,10 @@ case class HttpPlatform(maxConcurrentRequests: Int, rateLimits: Seq[(String, Htt override def waiting: Set[Execution[_]] = rateLimiters.map(_._2).foldLeft(pool.waiting)(_ ++ _.waiting) - override lazy val publicRoutes: PartialService = - pool.routes("/api/platforms/http/pool").orElse { - val index: PartialService = { - case GET at url"/api/platforms/http/rate-limiters" => + override lazy val publicRoutes: HttpRoutes[IO] = { + pool.routes("/api/platforms/http/pool") <+> { + val index: HttpRoutes[IO] = HttpRoutes.of[IO] { + case GET -> Root / "api" / "platforms" / "http" / "rate-limiters" => Ok( Json.obj( rateLimiters.zipWithIndex.map { @@ -70,11 +79,13 @@ case class HttpPlatform(maxConcurrentRequests: Int, rateLimits: Seq[(String, Htt ) ) } + rateLimiters.zipWithIndex.foldLeft(index) { case (routes, ((_, rateLimiter), i)) => - routes.orElse(rateLimiter.routes(s"/api/platforms/http/rate-limiters/$i")) + routes <+> rateLimiter.routes(s"/api/platforms/http/rate-limiters/$i") } } + } } /** Access to the [[HttpPlatform]]. */ @@ -92,8 +103,8 @@ object HttpPlatform { * @param request The [[lol.http.Request Request]] to run. * @param thunk The function handling the HTTP resposne once received. */ - def request[A, S <: Scheduling](request: Request, timeout: FiniteDuration = FiniteDuration(30, "seconds"))( - thunk: Response => Future[A] + def request[A, S <: Scheduling](request: Request[IO], timeout: FiniteDuration = FiniteDuration(30, "seconds"))( + thunk: Response[IO] => Future[A] )(implicit execution: Execution[S]): Future[A] = { val streams = execution.streams streams.debug(s"HTTP request: ${request}") @@ -103,7 +114,7 @@ object HttpPlatform { httpPlatform.pool.run(execution, debug = request.toString) { () => try { val host = - request.headers.getOrElse(h"Host", sys.error("`Host' header must be present in the request")).toString + request.headers.get(Host).getOrElse(sys.error("`Host' header must be present in the request")).toString val rateLimiter = httpPlatform.rateLimiters .collectFirst { case (pattern, rateLimiter) if host.matches(pattern) => @@ -112,12 +123,15 @@ object HttpPlatform { .getOrElse(sys.error(s"A rate limiter should be defined for `${host}'")) rateLimiter.run(execution, debug = request.toString) { () => - Client - .run(request, timeout = timeout) { response => - streams.debug(s"Got response: $response") - IO.fromFuture(IO.pure(thunk(response))) - } - .unsafeToFuture() + // BlazeClientBuilder[IO](execution.executionContext).resource.use { client => + // client + // .run(request, timeout = timeout) { response => + // streams.debug(s"Got response: $response") + // IO.fromFuture(IO.pure(thunk(response))) + // } + // .unsafeToFuture() + // } + ??? } } catch { case e: Throwable => diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala index abdebab98..c442c1a8e 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala @@ -5,7 +5,12 @@ import java.nio.ByteBuffer import com.criteo.cuttle._ import com.criteo.cuttle.platforms.ExecutionPool import com.zaxxer.nuprocess._ -import lol.http.PartialService + +import cats.effect._ + +import org.http4s._ +import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s.circe._ import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} @@ -25,7 +30,7 @@ case class LocalPlatform(maxForkedProcesses: Int) extends ExecutionPlatform { override def waiting: Set[Execution[_]] = pool.waiting - override lazy val publicRoutes: PartialService = + override lazy val publicRoutes: HttpRoutes[IO] = pool.routes("/api/platforms/local/pool") } From fd97a9f8ac5d2c401379ab4cb5983720a349a171 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Mon, 24 Aug 2020 21:58:18 +0200 Subject: [PATCH 2/7] all but private apis --- .../com/criteo/cuttle/Authentication.scala | 106 +------ .../scala/com/criteo/cuttle/Executor.scala | 4 +- .../scala/com/criteo/cuttle/ThreadPools.scala | 3 + .../main/scala/com/criteo/cuttle/Utils.scala | 23 +- .../cuttle/platforms/ExecutionPool.scala | 2 +- .../criteo/cuttle/platforms/RateLimiter.scala | 2 +- .../platforms/WaitingExecutionQueue.scala | 2 +- .../cuttle/platforms/http/HttpPlatform.scala | 4 - .../platforms/local/LocalPlatform.scala | 2 - .../criteo/cuttle/AuthenticationSpec.scala | 160 ---------- .../com/criteo/cuttle/cron/CronApp.scala | 187 ++++++------ .../com/criteo/cuttle/cron/CronProject.scala | 37 ++- .../cuttle/timeseries/CuttleProject.scala | 29 +- .../cuttle/timeseries/TimeSeriesApp.scala | 281 +++++++++--------- 14 files changed, 308 insertions(+), 534 deletions(-) delete mode 100644 core/src/test/scala/com/criteo/cuttle/AuthenticationSpec.scala diff --git a/core/src/main/scala/com/criteo/cuttle/Authentication.scala b/core/src/main/scala/com/criteo/cuttle/Authentication.scala index ce377a855..b7261e607 100644 --- a/core/src/main/scala/com/criteo/cuttle/Authentication.scala +++ b/core/src/main/scala/com/criteo/cuttle/Authentication.scala @@ -1,13 +1,12 @@ package com.criteo.cuttle -import java.util.Base64 - import cats.effect.IO -import lol.http._ + import io.circe.{Decoder, Encoder} import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import scala.util.Try +import org.http4s.server.AuthMiddleware +import cats.data.{Kleisli, OptionT} /** * The cuttle API is private for any write operation while it is publicly @@ -19,36 +18,6 @@ import scala.util.Try */ object Auth { - /** - * An [[Authenticator]] takes care of extracting the User from an HTTP request. - */ - trait Authenticator { - - private[cuttle] def apply(s: AuthenticatedService): PartialService = - new PartialFunction[Request, IO[Response]] { - override def isDefinedAt(request: Request): Boolean = - s.isDefinedAt(request) - - override def apply(request: Request): IO[Response] = - authenticate(request) - .fold(IO.pure, user => { - // pass through when authenticated - s(request)(user) - }) - } - - /** - * Authenticate an HTTP request. - * - * @param request the HTTP request to be authenticated. - * @return either an authenticated user or an error response. - */ - def authenticate(request: Request): Either[Response, User] - } - - /** - * A connected [[User]]. - */ case class User(userId: String) object User { @@ -56,69 +25,8 @@ object Auth { implicit val decoder: Decoder[User] = deriveDecoder } - /** - * Default implementation of Authenticator that authenticate any request - * as Guest. It basically disables the authentication. - */ - case object GuestAuth extends Authenticator { - override def authenticate(r: Request): Either[Response, User] = Right(User("Guest")) - } - - /** - * Implementation of [[Authenticator]] that rely on HTTP Basic auth. - * - * @param credentialsValidator validate the (user,password) credentials. - * @param userVisibleRealm The user visible realm. - */ - case class BasicAuth( - credentialsValidator: ((String, String)) => Boolean, - userVisibleRealm: String = "cuttle_users" - ) extends Authenticator { - - val scheme = "Basic" - val unauthorizedResponse = - Response(401).addHeaders(h"WWW-Authenticate" -> HttpString(s"""Basic realm="${userVisibleRealm}"""")) - - /** - * HTTP Basic auth implementation. - * @param r request to be authenticated - * @return either an authenticated user or an unauthorized response - */ - override def authenticate(r: Request): Either[Response, User] = - r.headers - .get(h"Authorization") - .flatMap({ - case s if s.toString().startsWith(scheme) => { - val base64credentials = s.toString().drop(scheme.size).trim() - BasicAuth.decodeBase64Credentials(base64credentials) - } - case _ => None - }) - .collect({ - case (l, p) if credentialsValidator((l, p)) => User(l) - }) - .toRight(unauthorizedResponse) - } - - private[cuttle] object BasicAuth { - def decodeBase64Credentials(credentials: String): Option[(String, String)] = - Try(Base64.getDecoder.decode(credentials)).toOption - .flatMap((decoded: Array[Byte]) => { - val splitted = new String(decoded, "utf-8").trim().split(":", 2) - if (splitted.size == 1) { - None - } else { - Some((splitted(0) -> splitted(1))) - } - }) - } - - /** A [[lol.http.PartialService PartialService]] that requires an authenticated - * user in request handler's scope. */ - type AuthenticatedService = PartialFunction[Request, (User => IO[Response])] - - private[cuttle] def defaultWith(response: IO[Response]): PartialService = { - case _ => response - } - + val GuestAuth: AuthMiddleware[IO, User] = AuthMiddleware(Kleisli { + _ => OptionT.some(User("guest")) + }) + } diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 38fdd29ab..2c2a05666 100755 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -21,7 +21,6 @@ import doobie.util.fragment.Fragment import io.circe._ import io.circe.java8.time._ import io.circe.syntax._ -import lol.http.PartialService import com.criteo.cuttle.Auth._ import com.criteo.cuttle.ExecutionStatus._ import com.criteo.cuttle.ThreadPools.{SideEffectThreadPool, _} @@ -29,6 +28,7 @@ import com.criteo.cuttle.Metrics._ import com.criteo.cuttle.platforms.ExecutionPool import doobie.util.Meta import org.http4s.HttpRoutes +import org.http4s.AuthedRoutes /** The strategy to use to retry stuck executions. * @@ -391,7 +391,7 @@ trait ExecutionPlatform { def publicRoutes: HttpRoutes[IO] = HttpRoutes.empty[IO] /** Expose a private `lolhttp` service for the platform operations (for the UI and API). */ - def privateRoutes: AuthenticatedService = PartialFunction.empty + def privateRoutes: AuthedRoutes[User, IO] = AuthedRoutes.empty /** @return the list of [[Execution]] waiting for resources on this platform. * These executions will be seen as __WAITING__ in the UI and the API. */ diff --git a/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala b/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala index 4eaa63524..d49227d30 100644 --- a/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala +++ b/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala @@ -134,6 +134,9 @@ object ThreadPools { ThreadPools.newCachedThreadPool(poolName = Some("Doobie-Transact")) ) + val blockingExecutionContext: ExecutionContext = scala.concurrent.ExecutionContext + .fromExecutorService(ThreadPools.newCachedThreadPool(poolName = Some("com.criteo.cuttle.ThreadPools.blockingExecutionContext"))) + object Implicits { import ThreadPoolSystemProperties._ implicit val serverThreadPool = new ServerThreadPool { diff --git a/core/src/main/scala/com/criteo/cuttle/Utils.scala b/core/src/main/scala/com/criteo/cuttle/Utils.scala index d717ada4d..e08b71315 100644 --- a/core/src/main/scala/com/criteo/cuttle/Utils.scala +++ b/core/src/main/scala/com/criteo/cuttle/Utils.scala @@ -14,8 +14,8 @@ import doobie._ import doobie.hikari.HikariTransactor import doobie.implicits._ import io.circe.Json -import lol.http.{PartialService, ServerSentEvents, Service} -import lol.http._ +import org.http4s._ +import org.http4s.dsl.io._ /** A set of basic utilities useful to write workflows. */ package object utils { @@ -119,27 +119,12 @@ package object utils { private[cuttle] def randomUUID(): String = UUID.randomUUID().toString - /** - * Allows chaining of method orFinally - * from a PartialService that returns a - * non-further-chainable Service. - */ - implicit private[cuttle] class PartialServiceConverter(val service: PartialService) extends AnyVal { - def orFinally(finalService: Service): Service = - service.orElse(toPartial(finalService)) - - private def toPartial(service: Service): PartialService = { - case e => service(e) - } - } - private[cuttle] def getJVMUptime = ManagementFactory.getRuntimeMXBean.getUptime / 1000 private[cuttle] def sse[A](thunk: IO[Option[A]], - encode: A => IO[Json])(implicit eqInstance: Eq[A]): lol.http.Response = { + encode: A => IO[Json])(implicit eqInstance: Eq[A]): IO[Response[IO]] = { import scala.concurrent.duration._ import io.circe._ - import lol.json._ import fs2.Stream import com.criteo.cuttle.ThreadPools.Implicits.serverContextShift @@ -151,7 +136,7 @@ package object utils { }) .changes .evalMap[IO, Json](r => encode(r)) - .map(ServerSentEvents.Event(_)) + .map(k => ServerSentEvent(k.noSpaces)) Ok(stream) } diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala b/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala index 3c3252c46..bcc6c7320 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/ExecutionPool.scala @@ -9,7 +9,7 @@ import io.circe._ import io.circe.syntax._ import org.http4s._ -import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s._, org.http4s.dsl.io._ import org.http4s.circe._ /** diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala index dbb377aae..16eda18ce 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala @@ -15,7 +15,7 @@ import cats.effect.IO import com.criteo.cuttle.utils.timer import org.http4s._ -import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s._, org.http4s.dsl.io._ import org.http4s.circe._ /** diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala b/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala index 8f13dcea8..c7b6290b0 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/WaitingExecutionQueue.scala @@ -17,7 +17,7 @@ import io.circe.syntax._ import io.circe.java8.time._ import org.http4s._ -import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ +import org.http4s._, org.http4s.dsl.io._ import org.http4s.circe._ /** A priority queue ordered by [[com.criteo.cuttle.SchedulingContext SchedulingContext]] priority. */ diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala b/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala index d6f0d1c8e..5263a722e 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/http/HttpPlatform.scala @@ -16,13 +16,9 @@ import com.criteo.cuttle.platforms.{ExecutionPool, RateLimiter} import org.http4s._ import org.http4s.dsl.io._ -// import org.http4s.implicits._ import org.http4s.circe._ import org.http4s.headers.Host -// import org.http4s.client.blaze._ -// import org.http4s.client._ - /** Allow to make HTTP calls in a managed way with rate limiting. Globally the platform limits the number * of concurrent requests on the platform. Additionnaly a rate limiter must be defined for each host allowed * to be called by this platform. diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala index c442c1a8e..2ccfc8ab2 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/local/LocalPlatform.scala @@ -9,8 +9,6 @@ import com.zaxxer.nuprocess._ import cats.effect._ import org.http4s._ -import org.http4s._, org.http4s.dsl.io._, org.http4s.implicits._ -import org.http4s.circe._ import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} diff --git a/core/src/test/scala/com/criteo/cuttle/AuthenticationSpec.scala b/core/src/test/scala/com/criteo/cuttle/AuthenticationSpec.scala deleted file mode 100644 index 388931dc2..000000000 --- a/core/src/test/scala/com/criteo/cuttle/AuthenticationSpec.scala +++ /dev/null @@ -1,160 +0,0 @@ -package com.criteo.cuttle - -import org.scalatest.FunSuite -import lol.http._ -import Auth._ - -class AuthenticationSpec extends FunSuite { - - import SuiteUtils._ - - test("GuestAuth should return Guest user") { - val actual = GuestAuth.authenticate(getFakeRequest()) - assert(actual == Right(User("Guest"))) - } - - test("HttpAuth.decodeBase64Credentials should match valid credentials") { - val inputCreds = "bG9naW46cGFzc3dvcmQK" - val expected = ("login", "password") - - val actual = BasicAuth.decodeBase64Credentials(inputCreds) - assert(actual.isDefined && actual.get == expected) - } - - test("HttpAuth should answer 401 with wrong Authorization header") { - val request = getFakeRequest() - .addHeaders(HttpString("Authorization") -> HttpString("NotBasic")) - - val actual = getBasicAuth().authenticate(request) - assertHttpBasicUnAuthorized(actual) - } - - test("HttpAuth should answer 401 without Authorization header") { - val request = getFakeRequest() - - val actual = getBasicAuth().authenticate(request) - assertHttpBasicUnAuthorized(actual) - } - - test("HttpAuth should answer 401 when invalid base64 string") { - val request = getFakeRequest() - .addHeaders(h"Authorization" -> h"Basic àààààààààà") - - val actual = getBasicAuth().authenticate(request) - assertHttpBasicUnAuthorized(actual) - } - - test("HttpAuth should answer user with valid basic http header") { - val request = getFakeRequest() - .addHeaders(h"Authorization" -> h"Basic bG9naW46cGFzc3dvcmQK") - - val actual = getBasicAuth().authenticate(request) - assert(actual.isRight) - } - - test("HttpAuth should answer user with valid basic http header when many spaces") { - val request = getFakeRequest() - .addHeaders(h"Authorization" -> h"Basic bG9naW46cGFzc3dvcmQK") - - val actual = getBasicAuth().authenticate(request) - assert(actual.isRight) - } - - test("HttpAuth should deny access to unauthorized user") { - // "login:wrongpassword" - val request = getFakeRequest() - .addHeaders(h"Authorization" -> h"Basic bG9naW46d3JvbmdwYXNzd29yZAo") - - val actual = getBasicAuth().authenticate(request) - assert(actual.isLeft) - } - - test("Authenticator should allow only part of the api to be authenticated") { - val publicApi: PartialService = { - case GET at "/public" => Ok("public") - } - val privateAuthenticatedApi: AuthenticatedService = { - case GET at "/private/authenticated" => - _ => Ok("privateauthenticated") - } - - val privateNonAuthenticatedApi: AuthenticatedService = { - case GET at "/private/nonauthenticated" => - _ => Ok("privatenonauthenticated") - } - - val completeApi = publicApi - .orElse(NoAuth(privateNonAuthenticatedApi)) - .orElse(YesAuth(privateAuthenticatedApi)) - .orElse(defaultWith404) - - assertOk(completeApi, "/public") - assertUnAuthorized(completeApi, "/private/nonauthenticated") - assertOk(completeApi, "/private/authenticated") - assertNotFound(completeApi, "/nonexistingurl") - } - - val defaultWith404: PartialService = { - case _ => Response(404) - } - -} - -object SuiteUtils { - - val testRealm = "myfakerealm" - val testAppSecret = "myappsecret" - - /** - * GET request with no header. - */ - def getFakeRequest(url: String = "") = - Request( - GET, - url, - "http", - Content.empty, - Map.empty[HttpString, HttpString] - ) - - /** * - * Gets a BasicAuth allowing - * only login:password - */ - def getBasicAuth() = basicAuth - - def assertCodeAtUrl(code: Int)(api: Service)(url: String): Unit = { - val runTest = for { - answer <- api(getFakeRequest(url)) - } yield answer.status == code - assert(runTest.unsafeRunSync, url) - } - - def assertOk: (Service, String) => Unit = assertCodeAtUrl(200)(_)(_) - - def assertUnAuthorized: (Service, String) => Unit = assertCodeAtUrl(401)(_)(_) - - def assertNotFound: (Service, String) => Unit = assertCodeAtUrl(404)(_)(_) - - def assertHttpBasicUnAuthorized(authResponse: Either[Response, User]): Unit = - assert(authResponse match { - case Left(r) => { - val maybeRealm = r.headers.get(h"WWW-Authenticate") - r.status == 401 && maybeRealm == Some(s"""Basic Realm="${testRealm}"""") - } - case _ => false - }) - - private def isAuthorized(t: (String, String)): Boolean = - t._1 == "login" && t._2 == "password" - - private val basicAuth = BasicAuth(isAuthorized _, testRealm) -} - -object YesAuth extends Authenticator { - override def authenticate(r: Request): Either[Response, User] = Right(User("yesuser")) -} - -object NoAuth extends Authenticator { - override def authenticate(r: Request): Either[Response, User] = Left(Response(401)) -} diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index 79a3df882..b05196d40 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -1,18 +1,28 @@ package com.criteo.cuttle.cron +import java.time.Instant import java.util.concurrent.TimeUnit -import cats.effect.IO -import com.criteo.cuttle.Auth._ +import scala.util.{Success, Try} + +import cats._ +import cats.implicits._ +import cats.data.EitherT +import cats.effect._ + import com.criteo.cuttle.Metrics.{Gauge, Prometheus} import com.criteo.cuttle._ import com.criteo.cuttle.utils.{getJVMUptime, sse} import io.circe._ import io.circe.syntax._ -import lol.http._ -import lol.json._ - -import scala.util.{Success, Try} +import org.http4s._ +import org.http4s.dsl.io._ +import org.http4s.circe._ +import org.http4s.headers.`Content-Type` +import com.criteo.cuttle.Auth._ +import com.criteo.cuttle.Metrics.{Gauge, Prometheus} +import com.criteo.cuttle._ +import com.criteo.cuttle.utils.getJVMUptime private[cron] case class CronApp(project: CronProject, executor: Executor[CronScheduling])( implicit val transactor: XA @@ -23,14 +33,14 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc private val allJobIds = workload.all.map(_.id) private val allDagIds = workload.dags.map(_.id) - private def getDagsOrNotFound(json: Json): Either[Response, Set[CronDag]] = + private def getDagsOrNotFound(json: Json): Either[IO[Response[IO]], Set[CronDag]] = json.hcursor.downField("dags").as[Set[String]] match { case Left(_) => Left(BadRequest(s"Error: Cannot parse request body: $json")) case Right(dagIds) => if (dagIds.isEmpty) Right(workload.dags) else { val filterDags = workload.dags.filter(v => dagIds.contains(v.id)) - if (filterDags.isEmpty) Left(NotFound) + if (filterDags.isEmpty) Left(NotFound()) else Right(filterDags) } } @@ -41,10 +51,10 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc if dag.cronPipeline.vertices.exists(job => jobIds.contains(job.id)) } yield dag.id - val publicApi: PartialService = { - case GET at "/version" => Ok(project.version) + val publicApi: HttpRoutes[IO] = HttpRoutes.of[IO] { + case GET -> Root / "version" => Ok(project.version) - case GET at "/metrics" => + case GET -> Root / "metrics" => val metrics = executor.getMetrics(allJobIds, workload) ++ scheduler.getMetrics(allJobIds, workload) :+ @@ -52,7 +62,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc Ok(Prometheus.serialize(metrics)) - case GET at "/api/status" => + case GET -> Root / "api" / "status" => val projectJson = (status: String) => Json.obj( "project" -> project.name.asJson, @@ -64,41 +74,37 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc case _ => InternalServerError(projectJson("ko")) } - case GET at "/api/project_definition" => Ok(project.asJson) + case GET -> Root / "api" / "project_definition" => Ok(project.asJson) - case GET at "/api/jobs_definition" => Ok(workload.asJson) + case GET -> Root / "api" / "jobs_definition" => Ok(workload.asJson) - case req @ GET at url"/api/executions/$id/streams" => + case req @ GET -> Root / "api" / "executions" / id / "streams" => lazy val streams = executor.openStreams(id) - req.headers.get(h"Accept").contains(h"text/event-stream") match { + // TODO fix + req.headers.get(org.http4s.headers.Accept).contains(MediaType.`text/event-stream`) match { case true => - val stream = fs2.Stream(ServerSentEvents.Event("BOS".asJson)) ++ streams + val stream = fs2.Stream(ServerSentEvent("BOS")) ++ streams .through(fs2.text.utf8Decode) .through(fs2.text.lines) .chunks - .map(chunk => ServerSentEvents.Event(Json.fromValues(chunk.toArray.toIterable.map(_.asJson)))) ++ - fs2.Stream(ServerSentEvents.Event("EOS".asJson)) + .map(chunk => ServerSentEvent(Json.fromValues(chunk.toArray.toIterable.map(_.asJson)).noSpaces)) ++ + fs2.Stream(ServerSentEvent("EOS")) Ok(stream) case false => - val bodyFromStream = Content( - stream = streams, - headers = Map(h"Content-Type" -> h"text/plain") - ) - Ok(bodyFromStream) + Ok(streams, `Content-Type`(MediaType.text.plain)) } - - case GET at "/api/dags/paused" => + case GET -> Root / "api" / "jobs" / "paused" => Ok(scheduler.getPausedDags.asJson) - case request @ POST at url"/api/dags/states" => + case request @ POST -> Root / "api" / "dags" / "states" => request - .readAs[Json] + .as[Json] .flatMap { json => json.hcursor .downField("dags") .as[Set[String]] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), dagIds => { val ids = if (dagIds.isEmpty) allDagIds else dagIds Ok(scheduler.getStats(ids)) @@ -106,7 +112,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc ) } - case request @ POST at url"/api/statistics" => { + case request @ POST -> Root / "api" / "statistics" => { def getStats(jobIds: Set[String]): IO[Option[(Json, Json)]] = executor .getStats(jobIds) @@ -118,24 +124,24 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } request - .readAs[Json] + .as[Json] .flatMap { json => json.hcursor .downField("jobs") .as[Set[String]] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), jobIds => { val ids = if (jobIds.isEmpty) allJobIds else jobIds - getStats(ids).map( - _.map(stat => Ok(asJson(stat))).getOrElse(InternalServerError) + getStats(ids).flatMap( + _.map(stat => Ok(asJson(stat))).getOrElse(InternalServerError()) ) } ) } } - case request @ POST at url"/api/executions/status/$kind" => { + case request @ POST -> Root / "api" / "executions" / "status" / kind => { def getExecutions( q: ExecutionsQuery ): IO[Option[(Int, List[ExecutionLog])]] = kind match { @@ -205,24 +211,25 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } request - .readAs[Json] + .as[Json] .flatMap { json => json .as[ExecutionsQuery] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), query => { getExecutions(query) .flatMap( - _.map(e => asJson(query, e).map(json => Ok(json))) - .getOrElse(NotFound) + _.map(e => asJson(query, e).flatMap(json => Ok(json))) + .getOrElse(NotFound()) ) } ) } } - case GET at url"/api/executions/$id?events=$events" => + case request @ GET -> Root / "api" / "executions" / id => + val events = request.multiParams.getOrElse("events", "") def getExecution = IO.suspend(executor.getExecution(scheduler.allContexts, id)) @@ -230,37 +237,40 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc case "true" | "yes" => sse(getExecution, (e: ExecutionLog) => IO(e.asJson)) case _ => - getExecution.map(_.map(e => Ok(e.asJson)).getOrElse(NotFound)) + getExecution.flatMap(_.map(e => Ok(e.asJson)).getOrElse(NotFound())) } } - val privateApi: AuthenticatedService = { - case req @ GET at url"/api/shutdown" => { implicit user => + def Redirect(path: String): IO[Response[IO]] = + Found().map(_.withHeaders(org.http4s.headers.Location(Uri.fromString(path).toOption.get))) + + val privateApi: AuthedRoutes[User, IO] = AuthedRoutes.of { + case req @ GET -> Root / "api" / "shutdown" as user => import scala.concurrent.duration._ - req.queryStringParameters.get("gracePeriodSeconds") match { + req.req.params.get("gracePeriodSeconds") match { case Some(s) => Try(s.toLong) match { case Success(s) if s > 0 => - executor.gracefulShutdown(Duration(s, TimeUnit.SECONDS)) - Ok + executor.gracefulShutdown(Duration(s, TimeUnit.SECONDS))(user) + Ok() case _ => BadRequest("gracePeriodSeconds should be a positive integer") } case None => - req.queryStringParameters.get("hard") match { + req.req.params.get("hard") match { case Some(_) => executor.hardShutdown() - Ok + Ok() case None => BadRequest("Either gracePeriodSeconds or hard should be specified as query parameter") } } - } - case request @ POST at url"/api/executions/relaunch" => { implicit user => + case request @ POST -> Root / "api" / "executions" / "relaunch" as user => { request - .readAs[Json] - .map { json => + .req + .as[Json] + .flatMap { json => json.hcursor .downField("jobs") .as[Set[String]] @@ -268,72 +278,77 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc df => BadRequest(s"Error: Cannot parse request body: $df"), jobIds => { val ids = if (jobIds.isEmpty) allJobIds else jobIds - executor.relaunch(ids) - Ok + executor.relaunch(ids)(user) + Ok() } ) } - } + } - case request @ POST at url"/api/dags/pause" => { implicit user => + case request @ POST -> Root / "api" / "dags" / "pause" as user => { request - .readAs[Json] - .map { json => + .req + .as[Json] + .flatMap { json => getDagsOrNotFound(json) match { case Left(a) => a case Right(dagIds) => - scheduler.pauseDags(dagIds, executor) - Ok + scheduler.pauseDags(dagIds, executor)(implicitly, user) + Ok() } } } - case request @ POST at url"/api/dags/resume" => { implicit user => + case request @ POST -> Root / "api" / "dags" / "resume" as user => { request - .readAs[Json] - .map { json => + .req + .as[Json] + .flatMap { json => getDagsOrNotFound(json) match { case Left(a) => a case Right(dagIds) => - scheduler.resumeDags(dagIds, executor) - Ok + scheduler.resumeDags(dagIds, executor)(implicitly, user) + Ok() } } } - case request @ POST at url"/api/dags/runnow" => { implicit user => + case request @ POST -> Root / "api" / "dags" / "runnow" as user => { request - .readAs[Json] - .map { json => + .req + .as[Json] + .flatMap { json => getDagsOrNotFound(json) match { case Left(a) => a case Right(dagIds) => - scheduler.runJobsNow(dagIds, executor) - Ok + scheduler.runJobsNow(dagIds, executor)(implicitly, user) + Ok() } } - } + } } - private val api = publicApi orElse project.authenticator(privateApi) + private val publicAssets = HttpRoutes.of[IO] { + case GET -> Root / "public" / file => + import ThreadPools.Implicits.serverContextShift - private val publicAssets: PartialService = { - case GET at url"/public/cron/$file" => - ClasspathResource(s"/public/cron/$file").fold(NotFound)(r => Ok(r)) + StaticFile + .fromResource[IO](s"/public/cron/$file", ThreadPools.blockingExecutionContext) + .getOrElseF(NotFound()) } - private val index: AuthenticatedService = { - case req if req.url.startsWith("/api/") => - _ => NotFound + private val index: AuthedRoutes[User, IO] = AuthedRoutes.of { + case req if req.req.uri.toString.startsWith("/api/") => + NotFound() case _ => - _ => Ok(ClasspathResource(s"/public/cron/index.html")) + import ThreadPools.Implicits.serverContextShift + + StaticFile.fromResource[IO](s"/public/cron/index.html", ThreadPools.blockingExecutionContext).getOrElseF(NotFound()) } - val routes: PartialService = api - .orElse { - executor.platforms.foldLeft(PartialFunction.empty: PartialService) { - case (s, p) => s.orElse(p.publicRoutes).orElse(project.authenticator(p.privateRoutes)) - } - } - .orElse(publicAssets orElse project.authenticator(index)) + val routes: HttpRoutes[IO] = publicApi <+> + executor.platforms.toList.foldMapK(_.publicRoutes) <+> + publicAssets <+> + project.authenticator(privateApi <+> executor.platforms.toList.foldMapK(_.privateRoutes) <+> index) + } diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index 3307fa8ae..4de2e5616 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -1,15 +1,28 @@ package com.criteo.cuttle.cron -import com.criteo.cuttle.Auth.Authenticator -import com.criteo.cuttle.ThreadPools.Implicits.serverThreadPool -import com.criteo.cuttle.ThreadPools._ +import cats.effect._ + +import org.http4s._ +import org.http4s.implicits._ +import org.http4s.server.{AuthMiddleware, Router} +import org.http4s.server.blaze._ + +import com.criteo.cuttle.Auth.User +import scala.concurrent.duration._ + import com.criteo.cuttle._ -import doobie.implicits._ + +import com.criteo.cuttle.ThreadPools.Implicits.serverContextShift +import com.criteo.cuttle.utils.Timeout.timer +import io.circe.{Encoder, Json} import io.circe.syntax._ import io.circe.{Encoder, Json} -import lol.http._ + +import doobie.implicits._ import scala.concurrent.duration._ +import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.server.{AuthMiddleware, Router} /** * A cuttle project is a workflow to execute with the appropriate scheduler. @@ -21,7 +34,7 @@ class CronProject private[cuttle] (val name: String, val env: (String, Boolean), val workload: CronWorkload, val scheduler: CronScheduler, - val authenticator: Authenticator, + val authenticator: AuthMiddleware[IO, User], val logger: Logger) { /** @@ -59,11 +72,11 @@ class CronProject private[cuttle] (val name: String, val cronApp = CronApp(this, executor) logger.info("Starting server") - Server.listen(port, onError = { e => - logger.error(e.getMessage) - e.printStackTrace() - InternalServerError(e.getMessage) - })(cronApp.routes) + + BlazeServerBuilder[IO](ThreadPools.Implicits.serverThreadPool) + .bindHttp(port, "localhost") + .withHttpApp(Router("/" -> cronApp.routes).orNotFound) + .serve.compile.drain logger.info(s"Listening on http://localhost:$port") } @@ -93,7 +106,7 @@ object CronProject { version: String = "", description: String = "", env: (String, Boolean) = ("", false), - authenticator: Auth.Authenticator = Auth.GuestAuth + authenticator: AuthMiddleware[IO, User] = Auth.GuestAuth )(jobs: CronWorkload)(implicit scheduler: CronScheduler, logger: Logger): CronProject = new CronProject(name, version, description, env, jobs, scheduler, authenticator, logger) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index d49a67b0c..3fe006ebe 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -1,10 +1,18 @@ package com.criteo.cuttle.timeseries -import lol.http._ +import cats.effect._ +import org.http4s._ +import org.http4s.implicits._ +import org.http4s.server.{AuthMiddleware, Router} +import org.http4s.server.blaze._ import com.criteo.cuttle._ import com.criteo.cuttle.{Database => CuttleDatabase} -import com.criteo.cuttle.ThreadPools._, Implicits.serverThreadPool +import com.criteo.cuttle.ThreadPools._ +import com.criteo.cuttle.ThreadPools.Implicits.serverContextShift +import com.criteo.cuttle.utils.Timeout.timer +import com.criteo.cuttle.Auth.User + import scala.concurrent.duration.Duration /** @@ -16,7 +24,7 @@ class CuttleProject private[cuttle] (val name: String, val description: String, val env: (String, Boolean), val jobs: Workflow, - val authenticator: Auth.Authenticator, + val authenticator: AuthMiddleware[IO, User], val logger: Logger) { /** @@ -56,11 +64,12 @@ class CuttleProject private[cuttle] (val name: String, startScheduler() - logger.info("Start server") - Server.listen(port = httpPort, onError = { e => - e.printStackTrace() - InternalServerError(e.getMessage) - })(routes) + logger.info(s"Start server on port $httpPort") + + BlazeServerBuilder[IO](ThreadPools.Implicits.serverThreadPool) + .bindHttp(httpPort, "0.0.0.0") + .withHttpApp(Router("/" -> routes).orNotFound) + .serve.compile.drain.unsafeRunSync() logger.info(s"Listening on http://localhost:$httpPort") } @@ -88,7 +97,7 @@ class CuttleProject private[cuttle] (val name: String, logsRetention: Option[Duration] = None, maxVersionsHistory: Option[Int] = None, jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty - ): (Service, () => Unit) = { + ): (HttpRoutes[IO], () => Unit) = { val xa = CuttleDatabase.connect(databaseConfig)(logger) val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy) val scheduler = new TimeSeriesScheduler(logger, stateRetention, maxVersionsHistory) @@ -133,7 +142,7 @@ object CuttleProject { version: String = "", description: String = "", env: (String, Boolean) = ("", false), - authenticator: Auth.Authenticator = Auth.GuestAuth + authenticator: AuthMiddleware[IO, User] = Auth.GuestAuth )(jobs: Workflow)(implicit logger: Logger): CuttleProject = new CuttleProject(name, version, description, env, jobs, authenticator, logger) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index b157a0195..595f1215a 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -5,7 +5,6 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.TimeUnit import scala.util._ - import cats.effect.IO import cats.implicits._ import cats.data.EitherT @@ -17,9 +16,10 @@ import io.circe._ import io.circe.generic.auto._ import io.circe.java8.time._ import io.circe.syntax._ -import lol.http._ -import lol.json._ - +import org.http4s._ +import org.http4s.dsl.io._ +import org.http4s.circe._ +import org.http4s.headers.{Accept, `Content-Type`} import com.criteo.cuttle.Auth._ import com.criteo.cuttle.ExecutionStatus._ import com.criteo.cuttle.Metrics.{Gauge, Prometheus} @@ -108,9 +108,9 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, .traverse(jobId => jobLookup.get(jobId).toValid(Set(jobId))) .map(_.toSet) - private def getJobsOrNotFound(request: Request): EitherT[IO, Response, Set[Job[TimeSeries]]] = { + private def getJobsOrNotFound(request: Request[IO]): EitherT[IO, Response[IO], Set[Job[TimeSeries]]] = { val result = for { - jsonPayload <- request.readAs[Json] + jsonPayload <- request.as[Json] jobListPayload <- jsonPayload.as[JobListPayLoad].liftTo[IO] jobs <- jobListPayload.jobs.toNel .map(jobIds => { @@ -123,12 +123,12 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, EitherT .right[Throwable](result.map(_.toSet)) - .leftMap(error => BadRequest(error.getMessage)) + .leftSemiflatMap(error => BadRequest(error.getMessage)) } - val publicApi: PartialService = { + val publicApi: HttpRoutes[IO] = HttpRoutes.of { - case GET at url"/api/status" => { + case GET -> Root / "api" / "status" => { val projectJson = (status: String) => Json.obj( "project" -> project.name.asJson, @@ -141,7 +141,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } } - case request @ POST at url"/api/statistics" => { + case request @ POST -> Root / "api" / "statistics" => { def getStats(ids: Set[String]): IO[Option[(Json, Json)]] = executor .getStats(ids) @@ -152,31 +152,31 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executorStats.deepMerge(Json.obj("scheduler" -> schedulerStats)) } request - .readAs[Json] + .as[Json] .flatMap { json => json.hcursor .downField("jobs") .as[Set[String]] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), jobIds => { val ids = if (jobIds.isEmpty) allIds else jobIds - getStats(ids).map( - _.map(stat => Ok(asJson(stat))).getOrElse(InternalServerError) + getStats(ids).flatMap( + _.map(stat => Ok(asJson(stat))).getOrElse(InternalServerError()) ) } ) } } - case GET at url"/api/statistics/$jobName" => + case GET -> Root / "api" / "statistics" / jobName => executor .jobStatsForLastThirtyDays(jobName) - .map(stats => Ok(stats.asJson)) + .flatMap(stats => Ok(stats.asJson)) - case GET at url"/version" => Ok(project.version) + case GET -> Root / "version" => Ok(project.version) - case GET at "/metrics" => + case GET -> Root / "metrics" => val metrics = executor.getMetrics(allIds, jobs) ++ scheduler.getMetrics(allIds, jobs) :+ @@ -184,7 +184,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, .labeled(("version", project.version), getJVMUptime) Ok(Prometheus.serialize(metrics)) - case request @ POST at url"/api/executions/status/$kind" => { + case request @ POST -> Root / "api" / "executions" / "status" / kind => { def getExecutions( q: ExecutionsQuery ): IO[Option[(Int, List[ExecutionLog])]] = kind match { @@ -254,24 +254,24 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } request - .readAs[Json] + .as[Json] .flatMap { json => json .as[ExecutionsQuery] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), query => { getExecutions(query) .flatMap( - _.map(e => asJson(query, e).map(json => Ok(json))) - .getOrElse(NotFound) + _.map(e => asJson(query, e).flatMap(json => Ok(json))).getOrElse(NotFound()) ) } ) } } - case GET at url"/api/executions/$id?events=$events" => + case request @ GET -> Root / "api" / "executions"/ id => + val events = request.multiParams.getOrElse("events", "") def getExecution = IO.suspend(executor.getExecution(scheduler.allContexts, id)) @@ -279,115 +279,104 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, case "true" | "yes" => sse(getExecution, (e: ExecutionLog) => IO(e.asJson)) case _ => - getExecution.map(_.map(e => Ok(e.asJson)).getOrElse(NotFound)) + getExecution.flatMap(_.map(e => Ok(e.asJson)).getOrElse(NotFound())) } - case req @ GET at url"/api/executions/$id/streams" => + case req @ GET -> Root / "api" / "executions" / id / "streams" => lazy val streams = executor.openStreams(id) - req.headers.get(h"Accept").contains(h"text/event-stream") match { + // TODO: check this + req.headers.get(Accept).contains(Accept(MediaType.`text/event-stream`)) match { case true => Ok( - fs2.Stream(ServerSentEvents.Event("BOS".asJson)) ++ + fs2.Stream(ServerSentEvent("BOS")) ++ streams .through(fs2.text.utf8Decode) .through(fs2.text.lines) .chunks .map( chunk => - ServerSentEvents.Event( - Json.fromValues(chunk.toArray.toIterable.map(_.asJson)) + ServerSentEvent( + Json.fromValues(chunk.toArray.toIterable.map(_.asJson)).noSpaces ) ) ++ - fs2.Stream(ServerSentEvents.Event("EOS".asJson)) + fs2.Stream(ServerSentEvent("EOS")) ) case false => - Ok( - Content( - stream = streams, - headers = Map(h"Content-Type" -> h"text/plain") - ) - ) + Ok(streams, `Content-Type`(MediaType.text.plain)) } - case GET at "/api/jobs/paused" => + case GET -> Root / "api" / "jobs" / "paused" => Ok(scheduler.pausedJobs().asJson) - case GET at "/api/project_definition" => + case GET -> Root / "api" / "project_definition" => Ok(project.asJson) - case GET at "/api/jobs_definition" => + case GET -> Root / "api" / "jobs_definition" => Ok(jobs.asJson) } - val privateApi: AuthenticatedService = { - case POST at url"/api/executions/$id/cancel" => { implicit user => - executor.cancelExecution(id) - Ok - } + val privateApi: AuthedRoutes[User, IO] = AuthedRoutes.of { + case POST -> Root / "api" / "executions" / id / "cancel" as user => + executor.cancelExecution(id)(user) + Ok() - case request @ POST at url"/api/jobs/pause" => { implicit user => - getJobsOrNotFound(request) - .map(jobs => { - scheduler.pauseJobs(jobs, executor, xa) - Ok + case request @ POST -> Root / "api" / "jobs" / "pause" as user => + getJobsOrNotFound(request.req) + .semiflatMap(jobs => { + scheduler.pauseJobs(jobs, executor, xa)(user) + Ok() }) .merge - } - - case request @ POST at url"/api/jobs/resume" => { implicit user => - getJobsOrNotFound(request) - .map(jobs => { - scheduler.resumeJobs(jobs, xa) - Ok + case request @ POST -> Root / "api" / "jobs" / "resume" as user => + getJobsOrNotFound(request.req) + .semiflatMap(jobs => { + scheduler.resumeJobs(jobs, xa)(user) + Ok() }) .merge - } - - case POST at url"/api/jobs/all/unpause" => { implicit user => - scheduler.resumeJobs(jobs.all, xa) - Ok - } - - case POST at url"/api/jobs/$id/unpause" => { implicit user => - jobs.all.find(_.id == id).fold(NotFound) { job => - scheduler.resumeJobs(Set(job), xa) - Ok + case POST -> Root / "api" / "jobs" / "all" / "unpause" as user => + scheduler.resumeJobs(jobs.all, xa)(user) + Ok() + case POST -> Root / "api" / "jobs" / id / "unpause" as user => + jobs.all.find(_.id == id).fold(NotFound()) { job => + scheduler.resumeJobs(Set(job), xa)(user) + Ok() } - } - case POST at url"/api/executions/relaunch?jobs=$jobs" => { implicit user => + case request @ POST -> Root / "api" / "executions" / "relaunch" as user => + val jobs: String = request.req.params.getOrElse("jobs", "") + + val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption .filter(_.nonEmpty) .getOrElse(allIds) .toSet - executor.relaunch(filteredJobs) - IO.pure(Ok) - } + executor.relaunch(filteredJobs)(user) + Ok() - case req @ GET at url"/api/shutdown" => { implicit user => + case request @ GET -> Root / "api" / "shutdown" as user => import scala.concurrent.duration._ - req.queryStringParameters.get("gracePeriodSeconds") match { + request.req.params.get("gracePeriodSeconds") match { case Some(s) => Try(s.toLong) match { case Success(s) if s > 0 => - executor.gracefulShutdown(Duration(s, TimeUnit.SECONDS)) - Ok + executor.gracefulShutdown(Duration(s, TimeUnit.SECONDS))(user) + Ok() case _ => BadRequest("gracePeriodSeconds should be a positive integer") } case None => - req.queryStringParameters.get("hard") match { + request.req.params.get("hard") match { case Some(_) => executor.hardShutdown() - Ok + Ok() case None => BadRequest( "Either gracePeriodSeconds or hard should be specified as query parameter" ) } } - } } private val queries = Queries(project.logger) @@ -613,8 +602,12 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, private[timeseries] def getFocusView(q: CalendarFocusQuery, jobs: Set[String]): Json = getFocusView(snapshotWatchedState(), q, jobs) - private[timeseries] def publicRoutes(): PartialService = { - case request @ GET at url"/api/timeseries/executions?job=$jobId&start=$start&end=$end" => + private[timeseries] def publicRoutes(): HttpRoutes[IO] = HttpRoutes.of[IO] { + case request @ GET -> Root / "api" / "timeseries" / "executions" => + val jobId = request.params.getOrElse("job", "") + val start = request.params.getOrElse("start", "") + val end = request.params.getOrElse("end", "") + def getExecutions(watchedState: WatchedState): IO[Json] = { val job = jobs.vertices.find(_.id == jobId).get val calendar = job.scheduling.calendar @@ -744,16 +737,20 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } val watchedState = IO(snapshotWatchedState()) - if (request.headers.get(h"Accept").contains(h"text/event-stream")) { + if (request.headers.get(org.http4s.headers.Accept).contains(MediaType.`text/event-stream`)) { sse( watchedState.map(Some(_)), (s: WatchedState) => getExecutions(s) ) } else { - watchedState.flatMap(getExecutions).map(Ok(_)) + watchedState.flatMap(getExecutions).flatMap(Ok(_)) } - case GET at url"/api/timeseries/calendar/focus?start=$start&end=$end&jobs=$jobs" => + case request @ GET -> Root / "api" / "timeseries" / "calendar" / "focus" => + val start = request.params.getOrElse("start", "") + val end = request.params.getOrElse("end", "") + val jobs = request.params.getOrElse("jobs", "") + val filteredJobs = Option(jobs.split(",").toSet.filterNot(_.isEmpty)) .filterNot(_.isEmpty) .getOrElse(allIds) @@ -761,14 +758,14 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, Ok(getFocusView(q, filteredJobs)) - case request @ POST at url"/api/timeseries/calendar/focus" => + case request @ POST -> Root / "api" / "timeseries" / "calendar" / "focus" => request - .readAs[Json] + .as[Json] .flatMap { json => json .as[CalendarFocusQuery] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), query => { val jobs = Option(query.jobs.filterNot(_.isEmpty)) .filterNot(_.isEmpty) @@ -778,7 +775,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, ) } - case request @ POST at url"/api/timeseries/calendar" => { + case request @ POST -> Root / "api" / "timeseries" / "calendar" => { case class JobStateOnPeriod(start: Instant, duration: Long, isDone: Boolean, isStuck: Boolean) @@ -852,13 +849,13 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } request - .readAs[Json] + .as[Json] .flatMap { json => json.hcursor .downField("jobs") .as[Set[String]] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), jobIds => { val ids = if (jobIds.isEmpty) project.jobs.all.map(_.id) else jobIds @@ -868,7 +865,9 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } } - case GET at url"/api/timeseries/lastruns?job=$jobId" => + case request @ GET -> Root / "api" / "timeseries" / "lastruns" => + val jobId = request.params.getOrElse("job", "") + val (jobStates, _) = scheduler.state val successfulIntervalMaps = jobStates .filter(s => s._1.id == jobId) @@ -886,7 +885,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, ) .toList - if (successfulIntervalMaps.isEmpty) NotFound + if (successfulIntervalMaps.isEmpty) NotFound() else { (successfulIntervalMaps.head._1.hi, successfulIntervalMaps.last._1.hi) match { case (Finite(lastCompleteTime), Finite(lastTime)) => @@ -896,11 +895,11 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, "lastTime" -> lastTime.asJson ) ) - case _ => BadRequest + case _ => BadRequest() } } - case GET at url"/api/timeseries/backfills" => + case GET -> Root / "api" / "timeseries" / "backfills" => Database .queryBackfills() .to[List] @@ -931,14 +930,16 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, ) }) .transact(xa) - .map(backfills => Ok(backfills.asJson)) - case GET at url"/api/timeseries/backfills/$id?events=$events" => + .flatMap(backfills => Ok(backfills.asJson)) + case request @ GET -> Root / "api" / "timeseries" / "backfills" / id => + val events = request.params.getOrElse("events", "") + val backfills = Database.getBackfillById(id).transact(xa) events match { case "true" | "yes" => sse(backfills, (b: Json) => IO.pure(b)) - case _ => backfills.map(bf => Ok(bf.asJson)) + case _ => backfills.flatMap(bf => Ok(bf.asJson)) } - case request @ POST at url"/api/timeseries/backfills/$backfillId/executions" => { + case request @ POST -> Root / "api" / "timeseries" / "backfills" / backfillId / "executions" => { def allExecutions( q: ExecutionsQuery ): IO[Option[(Int, Double, List[ExecutionLog])]] = { @@ -989,15 +990,15 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } request - .readAs[Json] + .as[Json] .flatMap { json => json .as[ExecutionsQuery] .fold( - df => IO.pure(BadRequest(s"Error: Cannot parse request body: $df")), + df => BadRequest(s"Error: Cannot parse request body: $df"), q => { allExecutions(q) - .map(_.map { + .flatMap(_.map { case (total, completion, executions) => Ok( Json.obj( @@ -1010,25 +1011,25 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, "completion" -> completion.asJson ) ) - }.getOrElse(NotFound)) + }.getOrElse(NotFound())) } ) } } - }: PartialService + } - private[timeseries] def privateRoutes(): AuthenticatedService = { - case req @ POST at url"/api/timeseries/backfill" => { implicit user => - req - .readAs[Json] + private[timeseries] def privateRoutes(): AuthedRoutes[User, IO] = AuthedRoutes.of { + case request @ POST -> Root / "api" / "timeseries" / "backfill" as user => { + request.req + .as[Json] .flatMap( _.as[BackfillCreate] .fold( - df => IO.pure(BadRequest(s""" + df => BadRequest(s""" |Error during backfill creation. |Error: Cannot parse request body. |$df - |""".stripMargin)), + |""".stripMargin), backfill => { val jobIdsToBackfill = backfill.jobs.toSet jobIdsToBackfill @@ -1048,8 +1049,8 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, backfill.priority, executor.runningState, xa - ) - .map { + )(user) + .flatMap { case Right(_) => Ok("ok".asJson) case Left(errors) => BadRequest(errors) } @@ -1060,7 +1061,11 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } // consider the given period of the job as successful, regardless of it's actual status - case req @ GET at url"/api/timeseries/force-success?job=$jobId&start=$start&end=$end" => { implicit user => + case request @ GET -> Root / "api" / "timeseries" / "force-success" as user => { + val jobId = request.req.params.getOrElse("job", "") + val start = request.req.params.getOrElse("start", "") + val end = request.req.params.getOrElse("end", "") + (for { startDate <- Try(Instant.parse(start)) endDate <- Try(Instant.parse(end)) @@ -1085,7 +1090,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } val failingExecutions = executor.allFailingExecutions.filter(filterOp) val executions = runningExecutions ++ failingExecutions - executions.foreach(_.cancel()) + executions.foreach(_.cancel()(user)) ( executions.length, JobSuccessForced(Instant.now(), user, jobId, startDate, endDate) @@ -1095,7 +1100,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, queries .logEvent(event) .transact(xa) - .map( + .flatMap( _ => Ok( Json.obj( @@ -1104,37 +1109,39 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, ) ) case Failure(e) => - IO.pure( - BadRequest(Json.obj("error" -> Json.fromString(e.getMessage))) - ) + BadRequest(Json.obj("error" -> Json.fromString(e.getMessage))) } } } - val api = publicApi orElse project.authenticator(privateApi) + val publicAssets = HttpRoutes.of[IO] { + case GET -> Root / "public" / file => + import ThreadPools.Implicits.serverContextShift - val publicAssets: PartialService = { - case GET at url"/public/timeseries/$file" => - ClasspathResource(s"/public/timeseries/$file").fold(NotFound)(r => Ok(r)) + StaticFile + .fromResource[IO](s"/public/timeseries/$file", ThreadPools.blockingExecutionContext) + .getOrElseF(NotFound()) } - val index: AuthenticatedService = { - case req if req.url.startsWith("/api/") => - _ => NotFound + val index: AuthedRoutes[User, IO] = AuthedRoutes.of { + case req if req.req.uri.toString.startsWith("/api/") => + NotFound() case _ => - _ => Ok(ClasspathResource(s"/public/timeseries/index.html")) + import ThreadPools.Implicits.serverContextShift + + StaticFile.fromResource[IO](s"/public/timeseries/index.html", ThreadPools.blockingExecutionContext).getOrElseF(NotFound()) } + val privatePlatformApis: AuthedRoutes[User, IO] = executor + .platforms + .toList + .foldMapK(_.privateRoutes) + /** List of */ - val routes: PartialService = api - .orElse(publicRoutes()) - .orElse(project.authenticator(privateRoutes())) - .orElse { - executor.platforms.foldLeft(PartialFunction.empty: PartialService) { - case (s, p) => - s.orElse(p.publicRoutes) - .orElse(project.authenticator(p.privateRoutes)) - } - } - .orElse(publicAssets orElse project.authenticator(index)) + val routes: HttpRoutes[IO] = + publicApi <+> + publicRoutes() <+> + executor.platforms.toList.foldMapK(_.publicRoutes) <+> + publicAssets <+> + project.authenticator(privateApi <+> privateRoutes() <+> privatePlatformApis <+> index) } From c05b70e26a89d1170c2002f4afb006d14a313085 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Mon, 24 Aug 2020 22:34:37 +0200 Subject: [PATCH 3/7] format --- .../com/criteo/cuttle/Authentication.scala | 6 +++--- .../scala/com/criteo/cuttle/ThreadPools.scala | 4 +++- .../scala/com/criteo/cuttle/cron/CronApp.scala | 12 ++++++------ .../com/criteo/cuttle/cron/CronProject.scala | 4 +++- .../cuttle/timeseries/CuttleProject.scala | 5 ++++- .../cuttle/timeseries/TimeSeriesApp.scala | 18 ++++++++---------- 6 files changed, 27 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/com/criteo/cuttle/Authentication.scala b/core/src/main/scala/com/criteo/cuttle/Authentication.scala index b7261e607..2ed3c0ace 100644 --- a/core/src/main/scala/com/criteo/cuttle/Authentication.scala +++ b/core/src/main/scala/com/criteo/cuttle/Authentication.scala @@ -25,8 +25,8 @@ object Auth { implicit val decoder: Decoder[User] = deriveDecoder } - val GuestAuth: AuthMiddleware[IO, User] = AuthMiddleware(Kleisli { - _ => OptionT.some(User("guest")) + val GuestAuth: AuthMiddleware[IO, User] = AuthMiddleware(Kleisli { _ => + OptionT.some(User("guest")) }) - + } diff --git a/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala b/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala index d49227d30..846e25ec3 100644 --- a/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala +++ b/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala @@ -135,7 +135,9 @@ object ThreadPools { ) val blockingExecutionContext: ExecutionContext = scala.concurrent.ExecutionContext - .fromExecutorService(ThreadPools.newCachedThreadPool(poolName = Some("com.criteo.cuttle.ThreadPools.blockingExecutionContext"))) + .fromExecutorService( + ThreadPools.newCachedThreadPool(poolName = Some("com.criteo.cuttle.ThreadPools.blockingExecutionContext")) + ) object Implicits { import ThreadPoolSystemProperties._ diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index b05196d40..0c3d62238 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -62,7 +62,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc Ok(Prometheus.serialize(metrics)) - case GET -> Root / "api" / "status" => + case GET -> Root / "api" / "status" => val projectJson = (status: String) => Json.obj( "project" -> project.name.asJson, @@ -74,9 +74,9 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc case _ => InternalServerError(projectJson("ko")) } - case GET -> Root / "api" / "project_definition" => Ok(project.asJson) + case GET -> Root / "api" / "project_definition" => Ok(project.asJson) - case GET -> Root / "api" / "jobs_definition" => Ok(workload.asJson) + case GET -> Root / "api" / "jobs_definition" => Ok(workload.asJson) case req @ GET -> Root / "api" / "executions" / id / "streams" => lazy val streams = executor.openStreams(id) @@ -347,8 +347,8 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } val routes: HttpRoutes[IO] = publicApi <+> - executor.platforms.toList.foldMapK(_.publicRoutes) <+> - publicAssets <+> - project.authenticator(privateApi <+> executor.platforms.toList.foldMapK(_.privateRoutes) <+> index) + executor.platforms.toList.foldMapK(_.publicRoutes) <+> + publicAssets <+> + project.authenticator(privateApi <+> executor.platforms.toList.foldMapK(_.privateRoutes) <+> index) } diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index 4de2e5616..b072d28c6 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -76,7 +76,9 @@ class CronProject private[cuttle] (val name: String, BlazeServerBuilder[IO](ThreadPools.Implicits.serverThreadPool) .bindHttp(port, "localhost") .withHttpApp(Router("/" -> cronApp.routes).orNotFound) - .serve.compile.drain + .serve + .compile + .drain logger.info(s"Listening on http://localhost:$port") } diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index 3fe006ebe..de33d05d4 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -69,7 +69,10 @@ class CuttleProject private[cuttle] (val name: String, BlazeServerBuilder[IO](ThreadPools.Implicits.serverThreadPool) .bindHttp(httpPort, "0.0.0.0") .withHttpApp(Router("/" -> routes).orNotFound) - .serve.compile.drain.unsafeRunSync() + .serve + .compile + .drain + .unsafeRunSync() logger.info(s"Listening on http://localhost:$httpPort") } diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 595f1215a..03581ed34 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -19,7 +19,7 @@ import io.circe.syntax._ import org.http4s._ import org.http4s.dsl.io._ import org.http4s.circe._ -import org.http4s.headers.{Accept, `Content-Type`} +import org.http4s.headers.{`Content-Type`, Accept} import com.criteo.cuttle.Auth._ import com.criteo.cuttle.ExecutionStatus._ import com.criteo.cuttle.Metrics.{Gauge, Prometheus} @@ -270,7 +270,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } } - case request @ GET -> Root / "api" / "executions"/ id => + case request @ GET -> Root / "api" / "executions" / id => val events = request.multiParams.getOrElse("events", "") def getExecution = IO.suspend(executor.getExecution(scheduler.allContexts, id)) @@ -344,7 +344,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } case request @ POST -> Root / "api" / "executions" / "relaunch" as user => val jobs: String = request.req.params.getOrElse("jobs", "") - + val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption .filter(_.nonEmpty) @@ -1132,16 +1132,14 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, StaticFile.fromResource[IO](s"/public/timeseries/index.html", ThreadPools.blockingExecutionContext).getOrElseF(NotFound()) } - val privatePlatformApis: AuthedRoutes[User, IO] = executor - .platforms - .toList + val privatePlatformApis: AuthedRoutes[User, IO] = executor.platforms.toList .foldMapK(_.privateRoutes) /** List of */ val routes: HttpRoutes[IO] = publicApi <+> - publicRoutes() <+> - executor.platforms.toList.foldMapK(_.publicRoutes) <+> - publicAssets <+> - project.authenticator(privateApi <+> privateRoutes() <+> privatePlatformApis <+> index) + publicRoutes() <+> + executor.platforms.toList.foldMapK(_.publicRoutes) <+> + publicAssets <+> + project.authenticator(privateApi <+> privateRoutes() <+> privatePlatformApis <+> index) } From 5e6ee77db4bf26ca264d630b2a31ec62190261d6 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Tue, 25 Aug 2020 07:19:23 +0200 Subject: [PATCH 4/7] Remove lolhttp dependency Let lolhtml, as it is used for html templating. Will be replaced by twirl in a subsequent commit --- build.sbt | 2 -- 1 file changed, 2 deletions(-) diff --git a/build.sbt b/build.sbt index a5d6afea3..62d4863a9 100644 --- a/build.sbt +++ b/build.sbt @@ -213,8 +213,6 @@ lazy val cuttle = .settings(Defaults.itSettings: _*) .settings( libraryDependencies ++= Seq( - "com.criteo.lolhttp" %% "lolhttp", - "com.criteo.lolhttp" %% "loljson", "com.criteo.lolhttp" %% "lolhtml", ).map(_ % lolhttp), libraryDependencies ++= Seq("core", "generic", "parser", "java8") From b19855a0585b9cfec8ea53f117d40f480c7b4093 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Tue, 25 Aug 2020 10:01:41 +0200 Subject: [PATCH 5/7] Remove lolhtml from timeseries --- build.sbt | 8 +++---- .../com/criteo/cuttle/cron/CronApp.scala | 22 +++++++++---------- .../cuttle/timeseries/TimeSeriesApp.scala | 5 +++-- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/build.sbt b/build.sbt index 62d4863a9..6608934e6 100644 --- a/build.sbt +++ b/build.sbt @@ -212,9 +212,6 @@ lazy val cuttle = .settings(commonSettings: _*) .settings(Defaults.itSettings: _*) .settings( - libraryDependencies ++= Seq( - "com.criteo.lolhttp" %% "lolhtml", - ).map(_ % lolhttp), libraryDependencies ++= Seq("core", "generic", "parser", "java8") .map(module => "io.circe" %% s"circe-$module" % circe), libraryDependencies ++= Seq( @@ -284,7 +281,10 @@ lazy val cron = .settings(commonSettings: _*) .settings(Defaults.itSettings: _*) .settings( - libraryDependencies += "com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.4.5" + libraryDependencies += "com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.4.5", + libraryDependencies ++= Seq( + "com.criteo.lolhttp" %% "lolhtml", + ).map(_ % lolhttp) ) .settings( fork in Test := true diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index 0c3d62238..cf015654f 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -267,8 +267,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } } case request @ POST -> Root / "api" / "executions" / "relaunch" as user => { - request - .req + request.req .as[Json] .flatMap { json => json.hcursor @@ -283,11 +282,10 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } ) } - } + } case request @ POST -> Root / "api" / "dags" / "pause" as user => { - request - .req + request.req .as[Json] .flatMap { json => getDagsOrNotFound(json) match { @@ -300,8 +298,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } case request @ POST -> Root / "api" / "dags" / "resume" as user => { - request - .req + request.req .as[Json] .flatMap { json => getDagsOrNotFound(json) match { @@ -313,9 +310,8 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } } - case request @ POST -> Root / "api" / "dags" / "runnow" as user => { - request - .req + case request @ POST -> Root / "api" / "dags" / "runnow" as user => { + request.req .as[Json] .flatMap { json => getDagsOrNotFound(json) match { @@ -325,7 +321,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc Ok() } } - } + } } private val publicAssets = HttpRoutes.of[IO] { @@ -343,7 +339,9 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc case _ => import ThreadPools.Implicits.serverContextShift - StaticFile.fromResource[IO](s"/public/cron/index.html", ThreadPools.blockingExecutionContext).getOrElseF(NotFound()) + StaticFile + .fromResource[IO](s"/public/cron/index.html", ThreadPools.blockingExecutionContext) + .getOrElseF(NotFound()) } val routes: HttpRoutes[IO] = publicApi <+> diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 03581ed34..4ca01c6c8 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -345,7 +345,6 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, case request @ POST -> Root / "api" / "executions" / "relaunch" as user => val jobs: String = request.req.params.getOrElse("jobs", "") - val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption .filter(_.nonEmpty) .getOrElse(allIds) @@ -1129,7 +1128,9 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, case _ => import ThreadPools.Implicits.serverContextShift - StaticFile.fromResource[IO](s"/public/timeseries/index.html", ThreadPools.blockingExecutionContext).getOrElseF(NotFound()) + StaticFile + .fromResource[IO](s"/public/timeseries/index.html", ThreadPools.blockingExecutionContext) + .getOrElseF(NotFound()) } val privatePlatformApis: AuthedRoutes[User, IO] = executor.platforms.toList From 4e2fdc5be68b06f2b1fc820346670e050dc18b33 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Wed, 16 Dec 2020 00:20:35 +0100 Subject: [PATCH 6/7] Remove unused imports --- build.sbt | 5 +---- core/src/main/scala/com/criteo/cuttle/ThreadPools.scala | 1 - cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala | 3 --- cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala | 1 - 4 files changed, 1 insertion(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 6608934e6..3573e142e 100644 --- a/build.sbt +++ b/build.sbt @@ -281,10 +281,7 @@ lazy val cron = .settings(commonSettings: _*) .settings(Defaults.itSettings: _*) .settings( - libraryDependencies += "com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.4.5", - libraryDependencies ++= Seq( - "com.criteo.lolhttp" %% "lolhtml", - ).map(_ % lolhttp) + libraryDependencies += "com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.4.5" ) .settings( fork in Test := true diff --git a/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala b/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala index 846e25ec3..b227fd6fd 100644 --- a/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala +++ b/core/src/main/scala/com/criteo/cuttle/ThreadPools.scala @@ -117,7 +117,6 @@ object ThreadPools { Executors.newCachedThreadPool(newThreadFactory(daemonThreads, poolName, threadCounter)) private def makeResourceFromES(tp: => ExecutorService): Resource[IO, ExecutionContext] = { - import cats.implicits._ val alloc = IO.delay(tp) val free = (es: ExecutorService) => IO.delay(es.shutdown()) Resource.make(alloc)(free).map(ExecutionContext.fromExecutor) diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index cf015654f..ada29e02d 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -1,13 +1,10 @@ package com.criteo.cuttle.cron -import java.time.Instant import java.util.concurrent.TimeUnit import scala.util.{Success, Try} -import cats._ import cats.implicits._ -import cats.data.EitherT import cats.effect._ import com.criteo.cuttle.Metrics.{Gauge, Prometheus} diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index b072d28c6..1b4e6217f 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -2,7 +2,6 @@ package com.criteo.cuttle.cron import cats.effect._ -import org.http4s._ import org.http4s.implicits._ import org.http4s.server.{AuthMiddleware, Router} import org.http4s.server.blaze._ From 43781c054475bd623c8e33a174a4c795adb92304 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Mon, 11 Jan 2021 10:27:43 +0100 Subject: [PATCH 7/7] Fix broken SSE --- .../main/scala/com/criteo/cuttle/Utils.scala | 9 +++- .../com/criteo/cuttle/cron/CronApp.scala | 26 +++++----- .../com/criteo/cuttle/cron/CronProject.scala | 3 +- devloop.js | 10 ++-- run.sh | 13 +++++ .../criteo/cuttle/timeseries/Internal.scala | 8 ++- .../cuttle/timeseries/TimeSeriesApp.scala | 51 ++++++++++--------- webpack.config.js | 2 +- 8 files changed, 74 insertions(+), 48 deletions(-) create mode 100755 run.sh diff --git a/core/src/main/scala/com/criteo/cuttle/Utils.scala b/core/src/main/scala/com/criteo/cuttle/Utils.scala index e08b71315..42b0ef4af 100644 --- a/core/src/main/scala/com/criteo/cuttle/Utils.scala +++ b/core/src/main/scala/com/criteo/cuttle/Utils.scala @@ -138,6 +138,13 @@ package object utils { .evalMap[IO, Json](r => encode(r)) .map(k => ServerSentEvent(k.noSpaces)) - Ok(stream) + import org.http4s.headers._ + + Ok(stream, `Content-Type`(MediaType.`text/event-stream`)) + } + + private[cuttle] def acceptsEventStream(r: Request[IO]): Boolean = { + val acceptMediaTypes = r.headers.get(org.http4s.headers.Accept).toList.flatMap(_.values.toList.map(_.mediaRange)) + acceptMediaTypes.contains(MediaType.`text/event-stream`) } } diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index ada29e02d..2769f5593 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -77,18 +77,16 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc case req @ GET -> Root / "api" / "executions" / id / "streams" => lazy val streams = executor.openStreams(id) - // TODO fix - req.headers.get(org.http4s.headers.Accept).contains(MediaType.`text/event-stream`) match { - case true => - val stream = fs2.Stream(ServerSentEvent("BOS")) ++ streams - .through(fs2.text.utf8Decode) - .through(fs2.text.lines) - .chunks - .map(chunk => ServerSentEvent(Json.fromValues(chunk.toArray.toIterable.map(_.asJson)).noSpaces)) ++ - fs2.Stream(ServerSentEvent("EOS")) - Ok(stream) - case false => - Ok(streams, `Content-Type`(MediaType.text.plain)) + if (utils.acceptsEventStream(req)) { + val stream = fs2.Stream(ServerSentEvent("\"BOS\"")) ++ streams + .through(fs2.text.utf8Decode) + .through(fs2.text.lines) + .chunks + .map(chunk => ServerSentEvent(Json.fromValues(chunk.toArray.toIterable.map(_.asJson)).noSpaces)) ++ + fs2.Stream(ServerSentEvent("\"EOS\"")) + Ok(stream) + } else { + Ok(streams, `Content-Type`(MediaType.text.plain)) } case GET -> Root / "api" / "jobs" / "paused" => Ok(scheduler.getPausedDags.asJson) @@ -226,7 +224,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } case request @ GET -> Root / "api" / "executions" / id => - val events = request.multiParams.getOrElse("events", "") + val events = request.params.getOrElse("events", "") def getExecution = IO.suspend(executor.getExecution(scheduler.allContexts, id)) @@ -322,7 +320,7 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc } private val publicAssets = HttpRoutes.of[IO] { - case GET -> Root / "public" / file => + case GET -> Root / "public" / "cron" / file => import ThreadPools.Implicits.serverContextShift StaticFile diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index 1b4e6217f..e30f11885 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -73,11 +73,12 @@ class CronProject private[cuttle] (val name: String, logger.info("Starting server") BlazeServerBuilder[IO](ThreadPools.Implicits.serverThreadPool) - .bindHttp(port, "localhost") + .bindHttp(port, "0.0.0.0") .withHttpApp(Router("/" -> cronApp.routes).orNotFound) .serve .compile .drain + .unsafeRunSync() logger.info(s"Listening on http://localhost:$port") } diff --git a/devloop.js b/devloop.js index ff6494cba..dc013af1c 100644 --- a/devloop.js +++ b/devloop.js @@ -37,11 +37,11 @@ let server = runServer({ httpPort: 8888, sh: "java -cp `cat /tmp/classpath_com.criteo.cuttle.examples` com.criteo.cuttle.examples.HelloTimeSeries", env: { - MYSQL_LOCATIONS: "localhost:3388", - MYSQL_DATABASE: "cuttle_dev", - MYSQL_USERNAME: "root", - MYSQL_PASSWORD: "" + MYSQL_LOCATIONS: "localhost:3306", + MYSQL_DATABASE: "cuttle", + MYSQL_USERNAME: "cuttle", + MYSQL_PASSWORD: "cuttle" } -}).dependsOn(compile, generateClasspath, database); +}).dependsOn(compile, generateClasspath); proxy(server, 8080).dependsOn(front); diff --git a/run.sh b/run.sh new file mode 100755 index 000000000..c481c278b --- /dev/null +++ b/run.sh @@ -0,0 +1,13 @@ +#!/bin/bash +export MYSQL_DATABASE="cuttle" +export MYSQL_USERNAME="cuttle" +export MYSQL_PASSWORD="cuttle" +# -DdevMode=True +#sbt "examples / runMain com.criteo.cuttle.examples.HelloCronScheduling" + +# docker run --name cuttle_db -e MYSQL_ALLOW_EMPTY_PASSWORD=true -e MYSQL_DATABASE=cuttle -e MYSQL_USER=cuttle -e MYSQL_PASSWORD=cuttle -d -p 3306:3306 mariadb:10.2 +if [ "$1" == "cron" ]; then + sbt "examples / runMain com.criteo.cuttle.examples.HelloCronScheduling" +else + sbt "examples / runMain com.criteo.cuttle.examples.HelloTimeSeries" +fi; \ No newline at end of file diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Internal.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Internal.scala index 14ccf819c..fe0b3434d 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Internal.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Internal.scala @@ -4,7 +4,7 @@ import java.time.Instant import cats.syntax.either._ import io.circe._ -import io.circe.generic.semiauto.deriveDecoder +import io.circe.generic.semiauto._ import io.circe.java8.time._ import com.criteo.cuttle._ @@ -50,6 +50,7 @@ private[timeseries] object JobListPayLoad { private[timeseries] object SortQuery { implicit val decodeExecutionsParams: Decoder[SortQuery] = deriveDecoder[SortQuery] + implicit val encoder: Encoder[SortQuery] = deriveEncoder[SortQuery] } private[timeseries] case class SortQuery( @@ -58,7 +59,6 @@ private[timeseries] case class SortQuery( ) { val asc = order.toLowerCase == "asc" } - private[timeseries] object ExecutionsQuery { implicit val decodeExecutionsParams: Decoder[ExecutionsQuery] = deriveDecoder[ExecutionsQuery] } @@ -114,3 +114,7 @@ private[timeseries] case class AggregatedJobExecution(period: Interval[Instant], extends ExecutionPeriod { override val aggregated: Boolean = true } + +private[timeseries] object AggregatedJobExecution { + implicit val encoder: Encoder[AggregatedJobExecution] = deriveEncoder +} diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 4ca01c6c8..0d85cfc27 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -12,14 +12,16 @@ import cats.data.NonEmptyList import cats.data.Validated import doobie.implicits._ + import io.circe._ -import io.circe.generic.auto._ import io.circe.java8.time._ import io.circe.syntax._ + import org.http4s._ import org.http4s.dsl.io._ import org.http4s.circe._ -import org.http4s.headers.{`Content-Type`, Accept} +import org.http4s.headers.`Content-Type` + import com.criteo.cuttle.Auth._ import com.criteo.cuttle.ExecutionStatus._ import com.criteo.cuttle.Metrics.{Gauge, Prometheus} @@ -271,7 +273,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } case request @ GET -> Root / "api" / "executions" / id => - val events = request.multiParams.getOrElse("events", "") + val events = request.params.getOrElse("events", "") def getExecution = IO.suspend(executor.getExecution(scheduler.allContexts, id)) @@ -284,25 +286,23 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, case req @ GET -> Root / "api" / "executions" / id / "streams" => lazy val streams = executor.openStreams(id) - // TODO: check this - req.headers.get(Accept).contains(Accept(MediaType.`text/event-stream`)) match { - case true => - Ok( - fs2.Stream(ServerSentEvent("BOS")) ++ - streams - .through(fs2.text.utf8Decode) - .through(fs2.text.lines) - .chunks - .map( - chunk => - ServerSentEvent( - Json.fromValues(chunk.toArray.toIterable.map(_.asJson)).noSpaces - ) - ) ++ - fs2.Stream(ServerSentEvent("EOS")) - ) - case false => - Ok(streams, `Content-Type`(MediaType.text.plain)) + if (utils.acceptsEventStream(req)) { + Ok( + fs2.Stream(ServerSentEvent("\"BOS\"")) ++ + streams + .through(fs2.text.utf8Decode) + .through(fs2.text.lines) + .chunks + .map( + chunk => + ServerSentEvent( + Json.fromValues(chunk.toArray.toIterable.map(_.asJson)).noSpaces + ) + ) ++ + fs2.Stream(ServerSentEvent("\"EOS\"")) + ) + } else { + Ok(streams, `Content-Type`(MediaType.text.plain)) } case GET -> Root / "api" / "jobs" / "paused" => @@ -736,7 +736,8 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } val watchedState = IO(snapshotWatchedState()) - if (request.headers.get(org.http4s.headers.Accept).contains(MediaType.`text/event-stream`)) { + + if (utils.acceptsEventStream(request)) { sse( watchedState.map(Some(_)), (s: WatchedState) => getExecutions(s) @@ -1114,9 +1115,10 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, } val publicAssets = HttpRoutes.of[IO] { - case GET -> Root / "public" / file => + case GET -> Root / "public" / "timeseries" / file => import ThreadPools.Implicits.serverContextShift + println(s"Requesting $file") StaticFile .fromResource[IO](s"/public/timeseries/$file", ThreadPools.blockingExecutionContext) .getOrElseF(NotFound()) @@ -1127,6 +1129,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, NotFound() case _ => import ThreadPools.Implicits.serverContextShift + println(s"Requesting index") StaticFile .fromResource[IO](s"/public/timeseries/index.html", ThreadPools.blockingExecutionContext) diff --git a/webpack.config.js b/webpack.config.js index 700cef44b..7cc3e990c 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -4,7 +4,7 @@ const HtmlWebpackPlugin = require("html-webpack-plugin"); const FlowStatusWebpackPlugin = require("flow-status-webpack-plugin"); const merge = require("webpack-merge"); const args = require("yargs").argv; -const project = args.project; +const project = args.project || "timeseries"; const env = process.env.NODE_ENV || "production";