Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ lazy val catsCore = "1.6.1"
lazy val circe = "0.11.1"
lazy val doobie = "0.7.0"
lazy val lolhttp = "0.13.0"
lazy val http4sVersion = "0.20.23"

lazy val commonSettings = Seq(
organization := "com.criteo.cuttle",
version := VERSION,
scalaVersion := "2.11.12",
crossScalaVersions := Seq("2.11.12", "2.12.8"),
scalaVersion := "2.12.10",
crossScalaVersions := Seq("2.11.12", "2.12.10"),
scalacOptions ++= Seq(
"-deprecation",
"-encoding",
Expand Down Expand Up @@ -211,11 +212,6 @@ lazy val cuttle =
.settings(commonSettings: _*)
.settings(Defaults.itSettings: _*)
.settings(
libraryDependencies ++= Seq(
"com.criteo.lolhttp" %% "lolhttp",
"com.criteo.lolhttp" %% "loljson",
"com.criteo.lolhttp" %% "lolhtml"
).map(_ % lolhttp),
libraryDependencies ++= Seq("core", "generic", "parser", "java8")
.map(module => "io.circe" %% s"circe-$module" % circe),
libraryDependencies ++= Seq(
Expand All @@ -227,6 +223,12 @@ lazy val cuttle =
"com.zaxxer" % "nuprocess" % "1.1.3",
"org.mariadb.jdbc" % "mariadb-java-client" % "2.7.0"
),
libraryDependencies ++= Seq(
"org.http4s" %% "http4s-dsl" % http4sVersion,
"org.http4s" %% "http4s-blaze-server" % http4sVersion,
"org.http4s" %% "http4s-blaze-client" % http4sVersion,
"org.http4s" %% "http4s-circe" % http4sVersion
),
libraryDependencies ++= Seq(
"org.tpolecat" %% "doobie-core",
"org.tpolecat" %% "doobie-hikari"
Expand Down
104 changes: 6 additions & 98 deletions core/src/main/scala/com/criteo/cuttle/Authentication.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.criteo.cuttle

import java.util.Base64

import cats.effect.IO
import lol.http._

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}

import scala.util.Try
import org.http4s.server.AuthMiddleware
import cats.data.{Kleisli, OptionT}

/**
* The cuttle API is private for any write operation while it is publicly
Expand All @@ -19,106 +18,15 @@ import scala.util.Try
*/
object Auth {

/**
* An [[Authenticator]] takes care of extracting the User from an HTTP request.
*/
trait Authenticator {

private[cuttle] def apply(s: AuthenticatedService): PartialService =
new PartialFunction[Request, IO[Response]] {
override def isDefinedAt(request: Request): Boolean =
s.isDefinedAt(request)

override def apply(request: Request): IO[Response] =
authenticate(request)
.fold(IO.pure, user => {
// pass through when authenticated
s(request)(user)
})
}

/**
* Authenticate an HTTP request.
*
* @param request the HTTP request to be authenticated.
* @return either an authenticated user or an error response.
*/
def authenticate(request: Request): Either[Response, User]
}

/**
* A connected [[User]].
*/
case class User(userId: String)

object User {
implicit val encoder: Encoder[User] = deriveEncoder
implicit val decoder: Decoder[User] = deriveDecoder
}

/**
* Default implementation of Authenticator that authenticate any request
* as Guest. It basically disables the authentication.
*/
case object GuestAuth extends Authenticator {
override def authenticate(r: Request): Either[Response, User] = Right(User("Guest"))
}

/**
* Implementation of [[Authenticator]] that rely on HTTP Basic auth.
*
* @param credentialsValidator validate the (user,password) credentials.
* @param userVisibleRealm The user visible realm.
*/
case class BasicAuth(
credentialsValidator: ((String, String)) => Boolean,
userVisibleRealm: String = "cuttle_users"
) extends Authenticator {

val scheme = "Basic"
val unauthorizedResponse =
Response(401).addHeaders(h"WWW-Authenticate" -> HttpString(s"""Basic realm="${userVisibleRealm}""""))

/**
* HTTP Basic auth implementation.
* @param r request to be authenticated
* @return either an authenticated user or an unauthorized response
*/
override def authenticate(r: Request): Either[Response, User] =
r.headers
.get(h"Authorization")
.flatMap({
case s if s.toString().startsWith(scheme) => {
val base64credentials = s.toString().drop(scheme.size).trim()
BasicAuth.decodeBase64Credentials(base64credentials)
}
case _ => None
})
.collect({
case (l, p) if credentialsValidator((l, p)) => User(l)
})
.toRight(unauthorizedResponse)
}

private[cuttle] object BasicAuth {
def decodeBase64Credentials(credentials: String): Option[(String, String)] =
Try(Base64.getDecoder.decode(credentials)).toOption
.flatMap((decoded: Array[Byte]) => {
val splitted = new String(decoded, "utf-8").trim().split(":", 2)
if (splitted.size == 1) {
None
} else {
Some((splitted(0) -> splitted(1)))
}
})
}

/** A [[lol.http.PartialService PartialService]] that requires an authenticated
* user in request handler's scope. */
type AuthenticatedService = PartialFunction[Request, (User => IO[Response])]

private[cuttle] def defaultWith(response: IO[Response]): PartialService = {
case _ => response
}
val GuestAuth: AuthMiddleware[IO, User] = AuthMiddleware(Kleisli { _ =>
OptionT.some(User("guest"))
})

}
7 changes: 4 additions & 3 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import doobie.util.fragment.Fragment
import io.circe._
import io.circe.java8.time._
import io.circe.syntax._
import lol.http.PartialService
import com.criteo.cuttle.Auth._
import com.criteo.cuttle.ExecutionStatus._
import com.criteo.cuttle.ThreadPools.{SideEffectThreadPool, _}
import com.criteo.cuttle.Metrics._
import com.criteo.cuttle.platforms.ExecutionPool
import doobie.util.Meta
import org.http4s.HttpRoutes
import org.http4s.AuthedRoutes

/** The strategy to use to retry stuck executions.
*
Expand Down Expand Up @@ -387,10 +388,10 @@ private[cuttle] object Execution {
trait ExecutionPlatform {

/** Expose a public `lolhttp` service for the platform internal statistics (for the UI and API). */
def publicRoutes: PartialService = PartialFunction.empty
def publicRoutes: HttpRoutes[IO] = HttpRoutes.empty[IO]

/** Expose a private `lolhttp` service for the platform operations (for the UI and API). */
def privateRoutes: AuthenticatedService = PartialFunction.empty
def privateRoutes: AuthedRoutes[User, IO] = AuthedRoutes.empty

/** @return the list of [[Execution]] waiting for resources on this platform.
* These executions will be seen as __WAITING__ in the UI and the API. */
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/ThreadPools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ object ThreadPools {
Executors.newCachedThreadPool(newThreadFactory(daemonThreads, poolName, threadCounter))

private def makeResourceFromES(tp: => ExecutorService): Resource[IO, ExecutionContext] = {
import cats.implicits._
val alloc = IO.delay(tp)
val free = (es: ExecutorService) => IO.delay(es.shutdown())
Resource.make(alloc)(free).map(ExecutionContext.fromExecutor)
Expand All @@ -134,6 +133,11 @@ object ThreadPools {
ThreadPools.newCachedThreadPool(poolName = Some("Doobie-Transact"))
)

val blockingExecutionContext: ExecutionContext = scala.concurrent.ExecutionContext
.fromExecutorService(
ThreadPools.newCachedThreadPool(poolName = Some("com.criteo.cuttle.ThreadPools.blockingExecutionContext"))
)

object Implicits {
import ThreadPoolSystemProperties._
implicit val serverThreadPool = new ServerThreadPool {
Expand Down
32 changes: 12 additions & 20 deletions core/src/main/scala/com/criteo/cuttle/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import doobie._
import doobie.hikari.HikariTransactor
import doobie.implicits._
import io.circe.Json
import lol.http.{PartialService, ServerSentEvents, Service}
import lol.http._
import org.http4s._
import org.http4s.dsl.io._

/** A set of basic utilities useful to write workflows. */
package object utils {
Expand Down Expand Up @@ -119,27 +119,12 @@ package object utils {

private[cuttle] def randomUUID(): String = UUID.randomUUID().toString

/**
* Allows chaining of method orFinally
* from a PartialService that returns a
* non-further-chainable Service.
*/
implicit private[cuttle] class PartialServiceConverter(val service: PartialService) extends AnyVal {
def orFinally(finalService: Service): Service =
service.orElse(toPartial(finalService))

private def toPartial(service: Service): PartialService = {
case e => service(e)
}
}

private[cuttle] def getJVMUptime = ManagementFactory.getRuntimeMXBean.getUptime / 1000

private[cuttle] def sse[A](thunk: IO[Option[A]],
encode: A => IO[Json])(implicit eqInstance: Eq[A]): lol.http.Response = {
encode: A => IO[Json])(implicit eqInstance: Eq[A]): IO[Response[IO]] = {
import scala.concurrent.duration._
import io.circe._
import lol.json._
import fs2.Stream
import com.criteo.cuttle.ThreadPools.Implicits.serverContextShift

Expand All @@ -151,8 +136,15 @@ package object utils {
})
.changes
.evalMap[IO, Json](r => encode(r))
.map(ServerSentEvents.Event(_))
.map(k => ServerSentEvent(k.noSpaces))

import org.http4s.headers._

Ok(stream, `Content-Type`(MediaType.`text/event-stream`))
}

Ok(stream)
private[cuttle] def acceptsEventStream(r: Request[IO]): Boolean = {
val acceptMediaTypes = r.headers.get(org.http4s.headers.Accept).toList.flatMap(_.values.toList.map(_.mediaRange))
acceptMediaTypes.contains(MediaType.`text/event-stream`)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package com.criteo.cuttle.platforms

import scala.concurrent.stm._

import lol.http._
import lol.json._
import cats.implicits._
import cats.effect._

import io.circe._
import io.circe.syntax._

import org.http4s._
import org.http4s._, org.http4s.dsl.io._
import org.http4s.circe._

/**
* An execution pool backed by a priority queue. It limits the concurrent executions
* and the priority queue is ordered by the [[scala.math.Ordering Ordering]] defined
Expand All @@ -20,14 +24,14 @@ case class ExecutionPool(concurrencyLimit: Int) extends WaitingExecutionQueue {
def doRunNext()(implicit txn: InTxn): Unit = ()

override def routes(urlPrefix: String) =
({
case req if req.url == urlPrefix =>
HttpRoutes.of[IO]({
case req if req.uri.toString == urlPrefix =>
Ok(
Json.obj(
"concurrencyLimit" -> concurrencyLimit.asJson,
"running" -> running.size.asJson,
"waiting" -> waiting.size.asJson
)
)
}: PartialService).orElse(super.routes(urlPrefix))
}) <+> super.routes(urlPrefix)
}
15 changes: 10 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ import scala.concurrent.stm._
import scala.concurrent.duration._
import java.time._

import lol.http._
import lol.json._
import io.circe._
import io.circe.syntax._
import io.circe.java8.time._

import cats.implicits._

import cats.effect.IO

import com.criteo.cuttle.utils.timer

import org.http4s._
import org.http4s._, org.http4s.dsl.io._
import org.http4s.circe._

/**
* An rate limiter pool backed by a priority queue. It rate limits the executions
* and the priority queue is ordered by the [[scala.math.Ordering Ordering]] defined
Expand Down Expand Up @@ -46,8 +51,8 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu
def doRunNext()(implicit txn: InTxn) = _tokens() = _tokens() - 1

override def routes(urlPrefix: String) =
({
case req if req.url == urlPrefix =>
HttpRoutes.of[IO]({
case req if req.uri.toString == urlPrefix =>
Ok(
Json.obj(
"max_tokens" -> tokens.asJson,
Expand All @@ -56,6 +61,6 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu
"last_refill" -> _lastRefill.single.get.asJson
)
)
}: PartialService).orElse(super.routes(urlPrefix))
}) <+> super.routes(urlPrefix)

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package com.criteo.cuttle.platforms
import com.criteo.cuttle._

import java.time._

import lol.http._
import lol.json._
import cats.effect._

import scala.concurrent._
import scala.concurrent.stm._
Expand All @@ -18,6 +16,10 @@ import io.circe._
import io.circe.syntax._
import io.circe.java8.time._

import org.http4s._
import org.http4s._, org.http4s.dsl.io._
import org.http4s.circe._

/** A priority queue ordered by [[com.criteo.cuttle.SchedulingContext SchedulingContext]] priority. */
trait WaitingExecutionQueue {
case class DelayedResult[A](effect: () => Future[A],
Expand Down Expand Up @@ -107,16 +109,16 @@ trait WaitingExecutionQueue {
}
}

def routes(urlPrefix: String): PartialService = {
case req if req.url == s"$urlPrefix/running" =>
def routes(urlPrefix: String): HttpRoutes[IO] = HttpRoutes.of[IO] {
case req if req.uri.toString == s"$urlPrefix/running" =>
Ok(this._running.single.get.toSeq.map {
case (execution, task) =>
Json.obj(
"execution" -> execution.toExecutionLog(ExecutionStatus.ExecutionRunning).asJson,
"task" -> task.asJson
)
}.asJson)
case req if req.url == s"$urlPrefix/waiting" =>
case req if req.uri.toString == s"$urlPrefix/waiting" =>
Ok(this._waiting.single.get.toSeq.map {
case (execution, task) =>
Json.obj(
Expand Down
Loading