diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 000000000..2007a548c
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM java:7
+
+ENV app /kafka-manager
+WORKDIR ${app}
+
+COPY sbt $app/sbt
+COPY build.sbt $app/build.sbt
+COPY app $app/app
+COPY conf $app/conf
+COPY img $app/img
+COPY project/build.properties $app/project/build.properties
+COPY project/plugins.sbt $app/project/plugins.sbt
+COPY public $app/public
+COPY src $app/src
+COPY test $app/test
+
+RUN ./sbt assembly
+EXPOSE 9000
+CMD [ "./sbt", "run" ]
diff --git a/app/controllers/Application.scala b/app/controllers/Application.scala
index bbf1c1a74..8a975db86 100644
--- a/app/controllers/Application.scala
+++ b/app/controllers/Application.scala
@@ -21,8 +21,8 @@ object Application extends Controller {
private[this] implicit val af: ApplicationFeatures = ApplicationFeatures.features
def index = Action.async {
- kafkaManager.getClusterList.map { errorOrClusterList =>
- Ok(views.html.index(errorOrClusterList))
- }
+ for {errorOrSchedulerList <- kafkaManager.getSchedulerList
+ errorOrClusterList <- kafkaManager.getClusterList
+ } yield Ok(views.html.index(errorOrClusterList, errorOrSchedulerList))
}
}
diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala
index 0b704c40d..96c691fae 100644
--- a/app/controllers/Cluster.scala
+++ b/app/controllers/Cluster.scala
@@ -6,7 +6,7 @@
package controllers
import features.{KMClusterManagerFeature, ApplicationFeatures}
-import kafka.manager.{KafkaVersion, ApiError, ClusterConfig}
+import kafka.manager.{SchedulerConfig, KafkaVersion, ApiError, ClusterConfig}
import models.FollowLink
import models.form._
import play.api.data.Form
@@ -76,6 +76,17 @@ object Cluster extends Controller {
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)
+ val schedulerConfigForm = Form(
+ mapping(
+ "name" -> nonEmptyText.verifying(maxLength(250), validateName),
+ "kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion),
+ "apiUrl" -> nonEmptyText.verifying(validateZkHosts),
+ "zkHosts" -> nonEmptyText.verifying(validateZkHosts),
+ "zkMaxRetry" -> ignored(100 : Int),
+ "jmxEnabled" -> boolean
+ )(SchedulerConfig.apply)(SchedulerConfig.customUnapply)
+ )
+
val updateForm = Form(
mapping(
"operation" -> nonEmptyText.verifying(validateOperation),
@@ -114,6 +125,28 @@ object Cluster extends Controller {
}
}
+ def addScheduler = Action.async { implicit request =>
+ Future.successful(Ok(scheduler.views.html.scheduler.addScheduler(schedulerConfigForm)))
+ }
+
+ def handleAddScheduler = Action.async { implicit request =>
+ schedulerConfigForm.bindFromRequest.fold(
+ formWithErrors => Future.successful(BadRequest(scheduler.views.html.scheduler.addScheduler(formWithErrors))),
+ schedulerConfig => {
+ kafkaManager.addScheduler(schedulerConfig.name, schedulerConfig.version.toString, schedulerConfig.apiUrl, schedulerConfig.curatorConfig.zkConnect, jmxEnabled = true).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.defaultMenu(),
+ models.navigation.BreadCrumbs.withView("Add Scheduler"),
+ errorOrSuccess,
+ "Add Scheduler",
+ FollowLink("Go to scheduler view.",scheduler.controllers.routes.SchedulerApplication.getScheduler(schedulerConfig.name).toString()),
+ FollowLink("Try again.",routes.Cluster.addScheduler().toString())
+ ))
+ }
+ }
+ )
+ }
+
def updateCluster(c: String) = Action.async { implicit request =>
featureGate(KMClusterManagerFeature) {
kafkaManager.getClusterConfig(c).map { errorOrClusterConfig =>
diff --git a/app/kafka/manager/ActorModel.scala b/app/kafka/manager/ActorModel.scala
index bf155674f..12ae1009e 100644
--- a/app/kafka/manager/ActorModel.scala
+++ b/app/kafka/manager/ActorModel.scala
@@ -5,11 +5,12 @@
package kafka.manager
-import java.util.Properties
+import java.util.{Date, Properties}
import org.joda.time.DateTime
import kafka.common.TopicAndPartition
import org.slf4j.LoggerFactory
+import scheduler.models.form.Failover
import scala.collection.immutable.Queue
import scala.concurrent.{Await, Future}
@@ -639,4 +640,100 @@ object ActorModel {
LogkafkaIdentity(hostname, lct.isDefined, identitySet.toMap)
}
}
+
+ case class SMGetBrokerIdentity(id: Int) extends BVRequest
+
+ case object SMGetView extends QueryRequest
+ case class SMView(topicsCount: Int, brokersCount: Int, schedulerConfig: SchedulerConfig) extends QueryResponse
+
+ case class SMAddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover) extends CommandRequest
+
+ case class SMUpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover) extends CommandRequest
+
+ case class SMCommandResult(result: Try[Unit]) extends CommandResponse
+
+ case class KSCAddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover) extends CommandRequest
+
+ case class KSCUpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover) extends CommandRequest
+
+ case class SMStartBroker(brokerId: Int) extends CommandRequest
+ case class SMStopBroker(brokerId: Int) extends CommandRequest
+ case class SMRemoveBroker(brokerId: Int) extends CommandRequest
+ case class SMRebalanceTopics(ids: String, topics:Option[String]) extends CommandRequest
+
+ case class KSCStartBroker(id: Int) extends CommandRequest
+ case class KSCStopBroker(id: Int) extends CommandRequest
+ case class KSCRemoveBroker(id: Int) extends CommandRequest
+ case class KSCRebalanceTopics(ids: String, topics:Option[String]) extends CommandRequest
+
+ case class KMSchedulerCommandRequest(scheduler: String, request: CommandRequest) extends CommandRequest
+ case class KMSchedulerList(active: IndexedSeq[SchedulerConfig], pending : IndexedSeq[SchedulerConfig]) extends QueryResponse
+ case object KMGetAllSchedulers extends QueryRequest
+ case class KMAddScheduler(config: SchedulerConfig) extends CommandRequest
+ case class KMSchedulerQueryRequest(schedulerName: String, request: QueryRequest) extends QueryRequest
+ case class KMGetSchedulerConfig(schedulerName: String) extends QueryRequest
+ case class KMSchedulerConfigResult(result: Try[SchedulerConfig]) extends QueryResponse
+
+ case object SchedulerKSGetBrokers extends KSRequest
+ case class SchedulerBrokerList(list: Seq[SchedulerBrokerIdentity], schedulerConfig: SchedulerConfig) extends QueryResponse
+
+ case class SchedulerBrokerTaskIdentity(id: String,
+ slaveId: String,
+ executorId: String,
+ hostname: String,
+ endpoint: Option[String],
+ state: String)
+
+ case class SchedulerBrokerStickinessIdentity(period: String,
+ stopTime: Option[Date],
+ hostname: Option[String])
+
+ case class SchedulerBrokerFailoverIdentity(delay: String,
+ maxDelay: String,
+ maxTries: Option[Int],
+ failures: Option[Int],
+ failureTime: Option[Date])
+
+ case class SchedulerBrokerIdentity(id: Int, active: Boolean, cpus: Double, mem: Long, heap: Long, port: Option[String],
+ bindAddress: Option[String], constraints: Seq[(String, String)], options: Seq[(String, String)],
+ log4jOptions: Seq[(String, String)], jvmOptions: Option[String],
+ stickiness: SchedulerBrokerStickinessIdentity,
+ failover: SchedulerBrokerFailoverIdentity,
+ task: Option[SchedulerBrokerTaskIdentity],
+ schedulerConfig: SchedulerConfig = null,
+ metrics: Option[BrokerMetrics] = None,
+ stats: Option[BrokerClusterStats] = None) {
+
+ def actualHost(): Option[String] = task.flatMap(t => t.endpoint.map(_.split(":")(0)))
+
+ def actualPort(): Option[String] = task.flatMap(t => t.endpoint.map(_.split(":")(1)))
+
+ def numTopics() = 0
+ def numPartitions() = 0
+
+ def topicPartitions() = Seq.empty[(TopicIdentity, IndexedSeq[Int])]
+
+ def constraintsDesc = constraints.map { case (k, v) => s"$k=$v" }.mkString
+ def optionsDesc = options.map { case (k, v) => s"$k=$v" }.mkString
+ def log4jOptionsDesc = log4jOptions.map { case (k, v) => s"$k=$v" }.mkString
+
+ def state(): String = {
+ if (active)
+ if (actualHost().isEmpty) "starting" else "running"
+ else
+ "stopped|failed"
+ }
+ }
}
diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala
index 33712d69d..571a242d6 100644
--- a/app/kafka/manager/KafkaManager.scala
+++ b/app/kafka/manager/KafkaManager.scala
@@ -13,6 +13,7 @@ import akka.util.Timeout
import com.typesafe.config.{ConfigFactory, Config}
import kafka.manager.ActorModel._
import org.slf4j.{LoggerFactory, Logger}
+import scheduler.models.form.Failover
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
@@ -33,6 +34,7 @@ case class BrokerListExtended(list: IndexedSeq[BrokerIdentity],
clusterContext: ClusterContext)
case class ConsumerListExtended(list: IndexedSeq[(String, Option[ConsumerIdentity])], clusterContext: ClusterContext)
case class LogkafkaListExtended(list: IndexedSeq[(String, Option[LogkafkaIdentity])], deleteSet: Set[String])
+case class SchedulerBrokerListExtended(list: Seq[SchedulerBrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], schedulerConfig: SchedulerConfig)
case class ApiError(msg: String)
object ApiError {
@@ -778,4 +780,129 @@ class KafkaManager(akkaConfig: Config)
)
}
}
+
+ def addScheduler(schedulerName: String, version: String, apiUrl: String, zkHosts: String, jmxEnabled: Boolean): Future[ApiError \/
+ Unit] =
+ {
+ val sc = SchedulerConfig(schedulerName, apiUrl, CuratorConfig(zkHosts), enabled = true, KafkaVersion(version), jmxEnabled = jmxEnabled)
+
+ tryWithKafkaManagerActor(KMAddScheduler(sc)) { result: KMCommandResult =>
+ result.result.get
+ }
+ }
+
+ def getSchedulerView(schedulerName: String): Future[ApiError \/ SMView] = {
+ tryWithKafkaManagerActor(KMSchedulerQueryRequest(schedulerName, SMGetView))(identity[SMView])
+ }
+
+ def getSchedulerBrokerList(schedulerName: String): Future[ApiError \/ SchedulerBrokerListExtended] = {
+ implicit val ec = apiExecutionContext
+
+ val futureBrokerList = tryWithKafkaManagerActor(KMSchedulerQueryRequest(schedulerName, SchedulerKSGetBrokers))(identity[SchedulerBrokerList])
+ futureBrokerList.map {
+ case \/-(SchedulerBrokerList(identities, config)) =>
+ \/-(SchedulerBrokerListExtended(identities, Map.empty, None, config))
+ case a : -\/[ApiError] =>
+ a
+ }
+ }
+
+ def getSchedulerConfig(schedulerName: String): Future[ApiError \/ SchedulerConfig] = {
+ tryWithKafkaManagerActor(KMGetSchedulerConfig(schedulerName)) { result: KMSchedulerConfigResult =>
+ result.result.get
+ }
+ }
+
+ def getSchedulerList: Future[ApiError \/ KMSchedulerList] = {
+ tryWithKafkaManagerActor(KMGetAllSchedulers)(identity[KMSchedulerList])
+ }
+
+
+ def addBroker(schedulerName:String, id:Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover): Future[ApiError \/ Unit] =
+ {
+ implicit val ec = apiExecutionContext
+ withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMAddBroker(id, cpus, mem, heap, port, bindAddress,
+ constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))) {
+ result: Future[SMCommandResult] =>
+ result.map(cmr => toDisjunction(cmr.result))
+ }
+
+ }
+
+ def updateBroker(schedulerName:String, id:Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover): Future[ApiError \/ Unit] =
+ {
+ implicit val ec = apiExecutionContext
+ withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMUpdateBroker(id, cpus, mem, heap, port, bindAddress,
+ constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))) {
+ result: Future[SMCommandResult] =>
+ result.map(cmr => toDisjunction(cmr.result))
+ }
+
+ }
+
+ def getBrokerIdentity(schedulerName: String, brokerId: Int): Future[ApiError \/ SchedulerBrokerIdentity] = {
+ val futureView = tryWithKafkaManagerActor(
+ KMSchedulerQueryRequest(
+ schedulerName,
+ SMGetBrokerIdentity(brokerId)
+ )
+ )(identity[Option[SchedulerBrokerIdentity]])
+
+ implicit val ec = apiExecutionContext
+ futureView.flatMap[ApiError \/ SchedulerBrokerIdentity] { errOrView =>
+ errOrView.fold(
+ { err: ApiError =>
+ Future.successful(-\/[ApiError](err))
+ }, { viewOption: Option[SchedulerBrokerIdentity] =>
+ viewOption.fold {
+ Future.successful[ApiError \/ SchedulerBrokerIdentity](-\/(ApiError(s"Broker not found $brokerId for scheduler $schedulerName")))
+ } { view =>
+ Future.successful(\/-(view))
+ }
+ }
+ )
+ }
+ }
+
+ def startBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] =
+ {
+ implicit val ec = apiExecutionContext
+ withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMStartBroker(brokerId))) {
+ result: Future[SMCommandResult] =>
+ result.map(cmr => toDisjunction(cmr.result))
+ }
+ }
+
+ def stopBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] =
+ {
+ implicit val ec = apiExecutionContext
+ withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMStopBroker(brokerId))) {
+ result: Future[SMCommandResult] =>
+ result.map(cmr => toDisjunction(cmr.result))
+ }
+ }
+
+ def removeBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] =
+ {
+ implicit val ec = apiExecutionContext
+ withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMRemoveBroker(brokerId))) {
+ result: Future[SMCommandResult] =>
+ result.map(cmr => toDisjunction(cmr.result))
+ }
+ }
+
+ def rebalanceTopics(schedulerName: String, ids: String, topics: Option[String]): Future[ApiError \/ Unit] =
+ {
+ implicit val ec = apiExecutionContext
+ withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMRebalanceTopics(ids, topics))) {
+ result: Future[SMCommandResult] =>
+ result.map(cmr => toDisjunction(cmr.result))
+ }
+ }
}
diff --git a/app/kafka/manager/KafkaManagerActor.scala b/app/kafka/manager/KafkaManagerActor.scala
index ad006c964..d8f3a5730 100644
--- a/app/kafka/manager/KafkaManagerActor.scala
+++ b/app/kafka/manager/KafkaManagerActor.scala
@@ -17,6 +17,7 @@ import org.apache.curator.framework.recipes.cache.{PathChildrenCacheEvent, PathC
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.zookeeper.CreateMode
+import scheduler.kafka.manager.{SchedulerManagerActorConfig, SchedulerManagerActor}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, ExecutionContext}
@@ -191,6 +192,81 @@ case class ClusterConfig (name: String,
logkafkaEnabled: Boolean,
activeOffsetCacheEnabled: Boolean)
+object SchedulerConfig {
+
+ def apply(name: String, version : String, apiUrl: String, zkHosts: String, zkMaxRetry: Int = 100, jmxEnabled: Boolean) : SchedulerConfig = {
+ val kafkaVersion = KafkaVersion(version)
+ //validate scheduler name
+ ClusterConfig.validateName(name)
+ //validate zk hosts
+ ClusterConfig.validateZkHosts(zkHosts)
+ val cleanZkHosts = zkHosts.replaceAll(" ","")
+ new SchedulerConfig(name, apiUrl, CuratorConfig(cleanZkHosts, zkMaxRetry), true, kafkaVersion, jmxEnabled)
+ }
+
+ def customUnapply(sc: SchedulerConfig) : Option[(String, String, String, String, Int, Boolean)] = {
+ Some((sc.name, sc.version.toString, sc.apiUrl, sc.curatorConfig.zkConnect, sc.curatorConfig.zkMaxRetry, sc.jmxEnabled))
+ }
+
+ import scalaz.{Failure, Success}
+ import scalaz.syntax.applicative._
+ import org.json4s._
+ import org.json4s.jackson.JsonMethods._
+ import org.json4s.jackson.Serialization
+ import org.json4s.scalaz.JsonScalaz._
+ import scala.language.reflectiveCalls
+
+ implicit val formats = Serialization.formats(FullTypeHints(List(classOf[SchedulerConfig])))
+
+ implicit def curatorConfigJSONW: JSONW[CuratorConfig] = new JSONW[CuratorConfig] {
+ def write(a: CuratorConfig) =
+ makeObj(("zkConnect" -> toJSON(a.zkConnect))
+ :: ("zkMaxRetry" -> toJSON(a.zkMaxRetry))
+ :: ("baseSleepTimeMs" -> toJSON(a.baseSleepTimeMs))
+ :: ("maxSleepTimeMs" -> toJSON(a.maxSleepTimeMs))
+ :: Nil)
+ }
+
+ implicit def curatorConfigJSONR: JSONR[CuratorConfig] = CuratorConfig.applyJSON(
+ field[String]("zkConnect"), field[Int]("zkMaxRetry"), field[Int]("baseSleepTimeMs"), field[Int]("maxSleepTimeMs"))
+
+ def serialize(config: SchedulerConfig): Array[Byte] = {
+ val json = makeObj(
+ ("name" -> toJSON(config.name))
+ :: ("apiUrl" -> toJSON(config.apiUrl))
+ :: ("curatorConfig" -> toJSON(config.curatorConfig))
+ :: ("enabled" -> toJSON(config.enabled))
+ :: ("kafkaVersion" -> toJSON(config.version.toString))
+ :: ("jmxEnabled" -> toJSON(config.jmxEnabled))
+ :: Nil)
+ compact(render(json)).getBytes(StandardCharsets.UTF_8)
+ }
+
+ def deserialize(ba: Array[Byte]): Try[SchedulerConfig] = {
+ Try {
+ val json = parse(kafka.manager.utils.deserializeString(ba))
+
+ val result = (field[String]("name")(json) |@| field[String]("apiUrl")(json) |@| field[CuratorConfig]("curatorConfig")(json) |@| field[Boolean]("enabled")(json)) {
+ (name: String, apiUrl: String, curatorConfig: CuratorConfig, enabled: Boolean) =>
+ val versionString = field[String]("kafkaVersion")(json)
+ val version = versionString.map(KafkaVersion.apply).getOrElse(Kafka_0_8_1_1)
+ val jmxEnabled = field[Boolean]("jmxEnabled")(json)
+ SchedulerConfig(name, apiUrl, curatorConfig, enabled, version, jmxEnabled.getOrElse(false))
+ }
+
+ result match {
+ case Failure(nel) =>
+ throw new IllegalArgumentException(nel.toString())
+ case Success(schedulerConfig) =>
+ schedulerConfig
+ }
+
+ }
+ }
+}
+
+case class SchedulerConfig (name: String, apiUrl: String, curatorConfig : CuratorConfig, enabled: Boolean, version: KafkaVersion, jmxEnabled: Boolean)
+
object KafkaManagerActor {
val ZkRoot : String = "/kafka-manager"
@@ -228,6 +304,7 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
private[this] val baseClusterZkPath = zkPath("clusters")
private[this] val configsZkPath = zkPath("configs")
+ private[this] val schedulersZkPath = zkPath("schedulers")
private[this] val deleteClustersZkPath = zkPath("deleteClusters")
log.info(s"zk=${kafkaManagerConfig.curatorConfig.zkConnect}")
@@ -260,6 +337,8 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
private[this] val kafkaManagerPathCache = new PathChildrenCache(curator,configsZkPath,true)
+ private[this] val schedulersPathCache = new PathChildrenCache(curator,schedulersZkPath,true)
+
private[this] val mutex = new InterProcessSemaphoreMutex(curator, zkPath("mutex"))
private[this] val dcProps = {
@@ -300,6 +379,10 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
private[this] var clusterConfigMap : Map[String,ClusterConfig] = Map.empty
private[this] var pendingClusterConfigMap : Map[String,ClusterConfig] = Map.empty
+ private[this] var schedulerManagerMap : Map[String,ActorPath] = Map.empty
+ private[this] var schedulerConfigMap : Map[String,SchedulerConfig] = Map.empty
+ private[this] var pendingSchedulerConfigMap : Map[String,SchedulerConfig] = Map.empty
+
private[this] def modify(fn: => Any) : Unit = {
if(longRunningExecutor.getQueue.remainingCapacity() == 0) {
Future.successful(KMCommandResult(Try(throw new UnsupportedOperationException("Long running executor blocking queue is full!"))))
@@ -334,9 +417,11 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
log.info("Starting kafka manager path cache...")
kafkaManagerPathCache.start(StartMode.BUILD_INITIAL_CACHE)
+ schedulersPathCache.start(StartMode.BUILD_INITIAL_CACHE)
log.info("Adding kafka manager path cache listener...")
kafkaManagerPathCache.getListenable.addListener(pathCacheListener)
+ schedulersPathCache.getListenable.addListener(pathCacheListener)
implicit val ec = longRunningExecutionContext
//schedule periodic forced update
@@ -360,12 +445,14 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
log.info("Removing kafka manager path cache listener...")
Try(kafkaManagerPathCache.getListenable.removeListener(pathCacheListener))
+ Try(schedulersPathCache.getListenable.removeListener(pathCacheListener))
log.info("Shutting down long running executor...")
Try(longRunningExecutor.shutdown())
log.info("Shutting down kafka manager path cache...")
Try(kafkaManagerPathCache.close())
+ Try(schedulersPathCache.close())
log.info("Shutting down delete clusters path cache...")
Try(deleteClustersPathCache.close())
@@ -402,7 +489,26 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
clusterManagerPath:ActorPath =>
context.actorSelection(clusterManagerPath).forward(request)
}
-
+
+ case KMGetAllSchedulers =>
+ sender ! KMSchedulerList(schedulerConfigMap.values.toIndexedSeq, pendingSchedulerConfigMap.values.toIndexedSeq)
+
+ case KMGetSchedulerConfig(name) =>
+ sender ! KMSchedulerConfigResult(Try {
+ val sc = schedulerConfigMap.get(name)
+ require(sc.isDefined, s"Unknown scheduler : $name")
+ sc.get
+ })
+
+
+ case KMSchedulerQueryRequest(schedulerName, request) =>
+ schedulerManagerMap.get(schedulerName).fold[Unit] {
+ sender ! ActorErrorResponse(s"Unknown scheduler : $schedulerName")
+ } {
+ schedulerManagerPath: ActorPath =>
+ context.actorSelection(schedulerManagerPath).forward(request)
+ }
+
case any: Any => log.warning("kma : processQueryRequest : Received unknown message: {}", any)
}
@@ -422,6 +528,15 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkpath, data)
}
+ case KMAddScheduler(schedulerConfig) =>
+ modify {
+ val data: Array[Byte] = SchedulerConfig.serialize(schedulerConfig)
+ val zkpath: String = getSchedulersZkPath(schedulerConfig)
+ require(schedulersPathCache.getCurrentData(zkpath) == null,
+ s"Scheduler already exists : ${schedulerConfig.name}")
+ curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkpath, data)
+ }
+
case KMUpdateCluster(clusterConfig) =>
modify {
val data: Array[Byte] = ClusterConfig.serialize(clusterConfig)
@@ -508,6 +623,14 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
context.actorSelection(clusterManagerPath).forward(request)
}
+ case KMSchedulerCommandRequest(schedulerName, request) =>
+ schedulerManagerMap.get(schedulerName).fold[Unit] {
+ sender ! ActorErrorResponse(s"Unknown scheduler : $schedulerName")
+ } {
+ schedulerManagerPath:ActorPath =>
+ context.actorSelection(schedulerManagerPath).forward(request)
+ }
+
case KMUpdateState =>
updateState()
@@ -531,6 +654,10 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
zkPathFrom(configsZkPath,clusterConfig.name)
}
+ private[this] def getSchedulersZkPath(schedulerConfig: SchedulerConfig) : String = {
+ zkPathFrom(schedulersZkPath,schedulerConfig.name)
+ }
+
private[this] def getClusterZkPath(clusterConfig: ClusterConfig) : String = {
zkPathFrom(baseClusterZkPath,clusterConfig.name)
}
@@ -586,6 +713,31 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
}
}
+ private[this] def addScheduler(config: SchedulerConfig): Try[Boolean] = {
+ Try {
+ if(!config.enabled) {
+ log.info("Not adding scheduler manager for disabled scheduler : {}", config.name)
+ schedulerConfigMap += (config.name -> config)
+ pendingSchedulerConfigMap -= config.name
+ false
+ } else {
+ log.info("Adding new scheduler manager for scheduler : {}", config.name)
+ val schedulerManagerConfig = SchedulerManagerActorConfig(
+ kafkaManagerConfig.pinnedDispatcherName,
+ getSchedulersZkPath(config),
+ kafkaManagerConfig.curatorConfig,
+ config,
+ kafkaManagerConfig.brokerViewUpdatePeriod)
+ val props = Props(classOf[SchedulerManagerActor], schedulerManagerConfig)
+ val newSchedulerManager = context.actorOf(props, config.name).path
+ schedulerConfigMap += (config.name -> config)
+ schedulerManagerMap += (config.name -> newSchedulerManager)
+ pendingSchedulerConfigMap -= config.name
+ true
+ }
+ }
+ }
+
private[this] def updateCluster(currentConfig: ClusterConfig, newConfig: ClusterConfig): Try[Boolean] = {
Try {
if(newConfig.curatorConfig.zkConnect == currentConfig.curatorConfig.zkConnect
@@ -608,6 +760,10 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
}
}
+ private[this] def updateScheduler(currentConfig: SchedulerConfig, newConfig: SchedulerConfig): Try[Boolean] = {
+ Try(true)
+ }
+
private[this] def updateState(): Unit = {
log.info("Updating internal state...")
val result = Try {
@@ -619,6 +775,15 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
clusterConfigMap.get(newConfig.name).fold(addCluster(newConfig))(updateCluster(_,newConfig))
}
}
+
+ schedulersPathCache.getCurrentData.asScala.foreach { data =>
+ SchedulerConfig.deserialize(data.getData) match {
+ case Failure(t) =>
+ log.error("Failed to deserialize scheduler config",t)
+ case Success(newScheduler) =>
+ schedulerConfigMap.get(newScheduler.name).fold(addScheduler(newScheduler))(updateScheduler(_,newScheduler))
+ }
+ }
}
result match {
case Failure(t) =>
diff --git a/app/models/navigation/BreadCrumbs.scala b/app/models/navigation/BreadCrumbs.scala
index ecf3dcd6d..f62d4a7eb 100644
--- a/app/models/navigation/BreadCrumbs.scala
+++ b/app/models/navigation/BreadCrumbs.scala
@@ -31,6 +31,11 @@ object BreadCrumbs {
"Add Cluster" -> IndexedSeq("Clusters".baseRouteBreadCrumb)
)
+ val baseSchedulerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map(
+ "Schedulers" -> IndexedSeq.empty[BreadCrumb],
+ "Add Scheduler" -> IndexedSeq("Schedulers".baseRouteBreadCrumb)
+ )
+
val clusterBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map(
"Unknown Cluster Operation" -> IndexedSeq("Clusters".baseRouteBreadCrumb),
"Delete Cluster" -> IndexedSeq("Clusters".baseRouteBreadCrumb, BCDynamicText(identity)),
@@ -83,6 +88,23 @@ object BreadCrumbs {
)
)
+ val schedulerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map(
+ "Update Scheduler" -> IndexedSeq("Schedulers".baseRouteBreadCrumb, BCDynamicText(identity)),
+ "Summary" -> IndexedSeq("Schedulers".baseRouteBreadCrumb, BCDynamicText(identity)),
+ "Brokers" -> IndexedSeq("Schedulers".baseRouteBreadCrumb, BCDynamicNamedLink(identity, "Summary".schedulerRoute)),
+ "Broker View" -> IndexedSeq(
+ "Schedulers".baseRouteBreadCrumb,
+ BCDynamicNamedLink(identity, "Summary".schedulerRoute),
+ "Brokers".schedulerRouteBreadCrumb),
+ "Add Broker" -> IndexedSeq(
+ "Schedulers".baseRouteBreadCrumb,
+ BCDynamicNamedLink(identity,"Summary".schedulerRoute),
+ "Brokers".schedulerRouteBreadCrumb),
+ "Rebalance Topics" -> IndexedSeq(
+ "Schedulers".baseRouteBreadCrumb,
+ BCDynamicNamedLink(identity, "Summary".schedulerRoute))
+ )
+
val topicBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map(
"Topic View" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
@@ -117,6 +139,14 @@ object BreadCrumbs {
rendered :+ BCActive(s)
}
+ def withSView(s: String) : IndexedSeq[BreadCrumbRendered] = {
+ val rendered : IndexedSeq[BreadCrumbRendered] = baseSchedulerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map {
+ case BCStaticLink(n,c) => BCLink(n,c.toString())
+ case a: Any => throw new IllegalArgumentException(s"Only static link supported : $a")
+ }
+ rendered :+ BCActive(s)
+ }
+
private[this] def renderWithCluster(s: String, clusterName: String) : IndexedSeq[BreadCrumbRendered] = {
clusterBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map {
case BCStaticLink(n,c) => BCLink(n,c.toString())
@@ -127,14 +157,33 @@ object BreadCrumbs {
}
}
+ private[this] def renderWithScheduler(s: String, schedulerName: String) : IndexedSeq[BreadCrumbRendered] = {
+ schedulerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map {
+ case BCStaticLink(n,c) => BCLink(n,c.toString())
+ case BCDynamicNamedLink(cn, cl) => BCLink(cn(schedulerName),cl(schedulerName).toString())
+ case BCDynamicLink(cn, cl) => BCLink(cn,cl(schedulerName).toString())
+ case BCDynamicText(cn) => BCText(cn(schedulerName))
+ case _ => BCText("ERROR")
+ }
+ }
+
+
def withNamedViewAndCluster(s: String, clusterName: String, name: String) : IndexedSeq[BreadCrumbRendered] = {
renderWithCluster(s, clusterName) :+ BCActive(name)
}
+ def withNamedViewAndScheduler(s: String, schedulerName: String, name: String) : IndexedSeq[BreadCrumbRendered] = {
+ renderWithScheduler(s, schedulerName) :+ BCActive(name)
+ }
+
def withViewAndCluster(s: String, clusterName: String) : IndexedSeq[BreadCrumbRendered] = {
withNamedViewAndCluster(s, clusterName, s)
}
+ def withViewAndScheduler(s: String, schedulerName: String) : IndexedSeq[BreadCrumbRendered] = {
+ withNamedViewAndScheduler(s, schedulerName, s)
+ }
+
private[this] def renderWithClusterAndTopic(s: String, clusterName: String, topic: String) : IndexedSeq[BreadCrumbRendered] = {
topicBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map {
case BCStaticLink(n,c) => BCLink(n,c.toString())
diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala
index b2cb34880..69ceee85b 100644
--- a/app/models/navigation/Menus.scala
+++ b/app/models/navigation/Menus.scala
@@ -89,6 +89,24 @@ object Menus {
else
defaultItems
}
- IndexedSeq(Menu("Cluster", items, None))
+ IndexedSeq(
+ Menu("Cluster", items, None),
+ Menu("Scheduler",IndexedSeq(
+ "List".baseRouteMenuItem,
+ "Add Scheduler".baseRouteMenuItem),
+ None)
+ )
}
+
+ def schedulerMenus(scheduler: String) : IndexedSeq[Menu] = IndexedSeq(
+ Menu("Scheduler",IndexedSeq(
+ "Summary".schedulerRouteMenuItem(scheduler),
+ "List".baseRouteMenuItem,
+ "Add Scheduler".baseRouteMenuItem),
+ None),
+ Menu("Brokers",IndexedSeq(
+ "Add Broker".schedulerRouteMenuItem(scheduler)),
+ None),
+ "Rebalance Topics".schedulerMenu(scheduler)
+ )
}
diff --git a/app/models/navigation/QuickRoutes.scala b/app/models/navigation/QuickRoutes.scala
index 209b47c67..2358184ea 100644
--- a/app/models/navigation/QuickRoutes.scala
+++ b/app/models/navigation/QuickRoutes.scala
@@ -15,8 +15,10 @@ object QuickRoutes {
val baseRoutes : Map[String, Call] = Map(
"Clusters" -> controllers.routes.Application.index(),
+ "Schedulers" -> controllers.routes.Application.index(),
"List" -> controllers.routes.Application.index(),
- "Add Cluster" -> controllers.routes.Cluster.addCluster()
+ "Add Cluster" -> controllers.routes.Cluster.addCluster(),
+ "Add Scheduler" -> controllers.routes.Cluster.addScheduler()
)
val clusterRoutes : Map[String, String => Call] = Map(
"Update Cluster" -> controllers.routes.Cluster.updateCluster,
@@ -32,6 +34,15 @@ object QuickRoutes {
"List Logkafka" -> controllers.routes.Logkafka.logkafkas,
"Create Logkafka" -> controllers.routes.Logkafka.createLogkafka
)
+
+ val schedulerRoutes : Map[String, String => Call] = Map(
+ "Update Scheduler" -> controllers.routes.Cluster.updateCluster,
+ "Summary" -> scheduler.controllers.routes.SchedulerApplication.getScheduler,
+ "Brokers" -> scheduler.controllers.routes.SchedulerApplication.brokers,
+ "Add Broker" -> scheduler.controllers.routes.Broker.addBroker,
+ "Rebalance Topics" -> scheduler.controllers.routes.RebalanceTopics.rebalanceTopics
+ )
+
val topicRoutes : Map[String, (String, String) => Call] = Map(
"Topic View" -> controllers.routes.Topic.topic,
"Add Partitions" -> controllers.routes.Topic.addPartitions,
@@ -75,6 +86,21 @@ object QuickRoutes {
}
}
+ implicit class SchedulerRoute(s: String) {
+ def schedulerRouteMenuItem(c: String): (String, Call) = {
+ s -> schedulerRoutes(s)(c)
+ }
+ def schedulerRoute(c: String): Call = {
+ schedulerRoutes(s)(c)
+ }
+ def schedulerMenu(c: String): Menu = {
+ Menu(s,IndexedSeq.empty,Some(schedulerRoute(c)))
+ }
+ def schedulerRouteBreadCrumb : BCDynamicLink = {
+ BCDynamicLink( s, schedulerRoutes(s))
+ }
+ }
+
implicit class TopicRoute(s: String) {
def topicRouteMenuItem(c: String, t: String): (String, Call) = {
s -> topicRoutes(s)(c,t)
diff --git a/app/scheduler/controllers/Broker.scala b/app/scheduler/controllers/Broker.scala
new file mode 100644
index 000000000..ebad792b9
--- /dev/null
+++ b/app/scheduler/controllers/Broker.scala
@@ -0,0 +1,177 @@
+package scheduler.controllers
+
+import controllers.KafkaManagerContext
+import kafka.manager.ActorModel.SchedulerBrokerIdentity
+import models.FollowLink
+import scheduler.models.form._
+import models.navigation.Menus
+import play.api.data.Form
+import play.api.data.Forms._
+import play.api.data.validation.{Constraint, Constraints}
+import play.api.mvc.{Action, Controller}
+import play.api.data.format.Formats._
+
+import scala.concurrent.Future
+import scalaz.{-\/, \/-}
+
+object Broker extends Controller{
+ import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+ private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
+
+ val validateCpus: Constraint[Double] = Constraints.min(minValue = 0.0)
+
+ val defaultAddForm = Form(
+ mapping(
+ "id" -> number(min = 0),
+ "cpus" -> optional(of[Double].verifying(validateCpus)),
+ "mem" -> optional(longNumber),
+ "heap" -> optional(longNumber),
+ "port" -> optional(text),
+ "bindAddress" -> optional(text),
+ "constraints" -> optional(text),
+ "options" -> optional(text),
+ "log4jOptions" -> optional(text),
+ "jvmOptions" -> optional(text),
+ "stickinessPeriod" -> optional(text),
+ "failover" -> mapping(
+ "failoverDelay" -> optional(text),
+ "failoverMaxDelay" -> optional(text),
+ "failoverMaxTries" -> optional(number(0))
+ )(Failover.apply)(Failover.unapply)
+ )(AddBroker.apply)(AddBroker.unapply)
+ )
+
+ private val defaultF = AddBroker(1, None, None, None, None, None, None, None, None, None, None, Failover(None, None, None))
+
+ private val addBrokerForm = defaultAddForm.fill(defaultF)
+
+
+ val defaultUpdateForm = Form(
+ mapping(
+ "id" -> number(min = 0),
+ "cpus" -> optional(of[Double].verifying(validateCpus)),
+ "mem" -> optional(longNumber),
+ "heap" -> optional(longNumber),
+ "port" -> optional(text),
+ "bindAddress" -> optional(text),
+ "constraints" -> optional(text),
+ "options" -> optional(text),
+ "log4jOptions" -> optional(text),
+ "jvmOptions" -> optional(text),
+ "stickinessPeriod" -> optional(text),
+ "failover" -> mapping(
+ "failoverDelay" -> optional(text),
+ "failoverMaxDelay" -> optional(text),
+ "failoverMaxTries" -> optional(number(0))
+ )(Failover.apply)(Failover.unapply)
+ )(UpdateBroker.apply)(UpdateBroker.unapply)
+ )
+
+ private def defaultUpdateF(brokerId: Int) = UpdateBroker(brokerId, None, None, None, None, None, None, None, None, None, None, Failover(None, None, None))
+
+ private def stringPairs(pairs: Seq[(String, String)]) =
+ if (pairs.isEmpty) None
+ else
+ Some(pairs.map {
+ case (key, v) => s"$key=$v"
+ }.mkString(","))
+
+ private def updateBrokerForm(schedulerName: String, bi: SchedulerBrokerIdentity) = {
+ defaultUpdateForm.fill(UpdateBroker(bi.id, Option(bi.cpus), Option(bi.mem), Option(bi.heap), bi.port, bi.bindAddress, stringPairs(bi.constraints),
+ stringPairs(bi.options), stringPairs(bi.log4jOptions), bi.jvmOptions, Option(bi.stickiness.period),
+ Failover(Option(bi.failover.delay), Option(bi.failover.maxDelay), bi.failover.maxTries)))
+ }
+
+ def addBroker(schedulerName: String) = Action.async { implicit request =>
+ Future.successful(Ok(scheduler.views.html.broker.addBroker(schedulerName, \/-(addBrokerForm))))
+ }
+
+ def handleAddBroker(schedulerName: String) = Action.async { implicit request =>
+ defaultAddForm.bindFromRequest.fold(
+ formWithErrors => Future.successful(BadRequest(scheduler.views.html.broker.addBroker(schedulerName,\/-(formWithErrors)))),
+ ab => {
+ kafkaManager.addBroker(schedulerName,ab.id, ab.cpus, ab.mem, ab.heap, ab.port, ab.bindAddress, ab.constraints, ab.options,
+ ab.log4jOptions, ab.jvmOptions, ab.stickinessPeriod, ab.failover).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.schedulerMenu(schedulerName,"Broker","Add Broker",Menus.schedulerMenus(schedulerName)),
+ models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Add Broker"),
+ errorOrSuccess,
+ "Add Broker",
+ FollowLink("Go to broker view.",scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, ab.id).toString()),
+ FollowLink("Try again.",scheduler.controllers.routes.Broker.addBroker(schedulerName).toString())
+ ))
+ }
+ }
+ )
+ }
+
+ def handleStartBroker(schedulerName: String, id: Int) = Action.async { implicit request =>
+ kafkaManager.startBroker(schedulerName, id).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)),
+ models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Start Broker"),
+ errorOrSuccess,
+ "Start Broker",
+ FollowLink("Go to broker view.", scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, id).toString()),
+ FollowLink("Try again.", scheduler.controllers.routes.Broker.handleStartBroker(schedulerName, id).toString())
+ ))
+ }
+ }
+
+ def handleStopBroker(schedulerName: String, id: Int) = Action.async { implicit request =>
+ kafkaManager.stopBroker(schedulerName, id).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)),
+ models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Stop Broker"),
+ errorOrSuccess,
+ "Stop Broker",
+ FollowLink("Go to broker view.", scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, id).toString()),
+ FollowLink("Try again.", scheduler.controllers.routes.Broker.handleStopBroker(schedulerName, id).toString())
+ ))
+ }
+ }
+
+ def handleRemoveBroker(schedulerName: String, id: Int) = Action.async { implicit request =>
+ kafkaManager.removeBroker(schedulerName, id).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)),
+ models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Remove Broker"),
+ errorOrSuccess,
+ "Remove Broker",
+ FollowLink("Go to brokers list.", scheduler.controllers.routes.SchedulerApplication.brokers(schedulerName).toString()),
+ FollowLink("Try again.", scheduler.controllers.routes.Broker.handleRemoveBroker(schedulerName, id).toString())
+ ))
+ }
+ }
+
+ def updateBroker(schedulerName: String, id: Int) = Action.async { implicit request =>
+ val errorOrFormFuture = kafkaManager.getBrokerIdentity(schedulerName, id).map { errorOrBrokerIdentity =>
+ errorOrBrokerIdentity.fold(e => -\/(e), { brokerIdentity: SchedulerBrokerIdentity =>
+ \/-(updateBrokerForm(schedulerName, brokerIdentity))
+ })
+ }
+ errorOrFormFuture.map { errorOrForm =>
+ Ok(scheduler.views.html.broker.updateBroker(schedulerName, id, errorOrForm))
+ }
+ }
+
+ def handleUpdateBroker(schedulerName: String, brokerId: Int) = Action.async { implicit request =>
+ defaultUpdateForm.bindFromRequest.fold(
+ formWithErrors => Future.successful(BadRequest(scheduler.views.html.broker.updateBroker(schedulerName, brokerId, \/-(formWithErrors)))),
+ ub => {
+ kafkaManager.updateBroker(schedulerName, ub.id, ub.cpus, ub.mem, ub.heap, ub.port, ub.bindAddress, ub.constraints, ub.options,
+ ub.log4jOptions, ub.jvmOptions, ub.stickinessPeriod, ub.failover).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)),
+ models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Update Broker"),
+ errorOrSuccess,
+ "Update Broker",
+ FollowLink("Go to broker view.",scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, ub.id).toString()),
+ FollowLink("Try again.",scheduler.controllers.routes.Broker.updateBroker(schedulerName, ub.id).toString())
+ ))
+ }
+ }
+ )
+ }
+}
diff --git a/app/scheduler/controllers/RebalanceTopics.scala b/app/scheduler/controllers/RebalanceTopics.scala
new file mode 100644
index 000000000..46a0c7697
--- /dev/null
+++ b/app/scheduler/controllers/RebalanceTopics.scala
@@ -0,0 +1,51 @@
+package scheduler.controllers
+
+import controllers.KafkaManagerContext
+import models.FollowLink
+import scheduler.models.form.{RebalanceTopics => RebalanceTopicsOp}
+import models.navigation.Menus
+import play.api.data.Form
+import play.api.data.Forms._
+import play.api.mvc.{Action, Controller}
+
+import scala.concurrent.Future
+import scalaz.\/-
+
+object RebalanceTopics extends Controller{
+ import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+ private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
+
+ val defaultRebalanceForm = Form(
+ mapping(
+ "ids" -> text,
+ "topics" -> optional(text)
+ )(RebalanceTopicsOp.apply)(RebalanceTopicsOp.unapply)
+ )
+
+ private val defaultF = RebalanceTopicsOp("*", Some(""))
+
+ private val rebalanceForm = defaultRebalanceForm.fill(defaultF)
+
+ def rebalanceTopics(schedulerName: String) = Action.async { implicit request =>
+ Future.successful(Ok(scheduler.views.html.broker.rebalanceTopics(schedulerName, \/-(rebalanceForm))))
+ }
+
+ def handleRebalance(schedulerName: String) = Action.async { implicit request =>
+ defaultRebalanceForm.bindFromRequest.fold(
+ formWithErrors => Future.successful(BadRequest(scheduler.views.html.broker.rebalanceTopics(schedulerName,\/-(formWithErrors)))),
+ rt => {
+ kafkaManager.rebalanceTopics(schedulerName, rt.ids, rt.topics).map { errorOrSuccess =>
+ Ok(views.html.common.resultOfCommand(
+ views.html.navigation.schedulerMenu(schedulerName,"Rebalance Topics","",Menus.schedulerMenus(schedulerName)),
+ models.navigation.BreadCrumbs.withNamedViewAndScheduler("Rebalance Topics",schedulerName,"Rebalance Topics"),
+ errorOrSuccess,
+ "Rebalance Topics",
+ FollowLink("Go to the brokers list.", scheduler.controllers.routes.SchedulerApplication.brokers(schedulerName).toString()),
+ FollowLink("Try again.",scheduler.controllers.routes.RebalanceTopics.rebalanceTopics(schedulerName).toString())
+ ))
+ }
+ }
+ )
+ }
+}
diff --git a/app/scheduler/controllers/SchedulerApplication.scala b/app/scheduler/controllers/SchedulerApplication.scala
new file mode 100644
index 000000000..b724941c5
--- /dev/null
+++ b/app/scheduler/controllers/SchedulerApplication.scala
@@ -0,0 +1,30 @@
+package scheduler.controllers
+
+import controllers.KafkaManagerContext
+import play.api.mvc.{Action, Controller}
+
+object SchedulerApplication extends Controller {
+
+ private[this] val kafkaManager = KafkaManagerContext.getKafkaManager
+
+ import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+ def getScheduler(s: String) = Action.async {
+ kafkaManager.getSchedulerView(s).map { errorOrSchedulerView =>
+ Ok(scheduler.views.html.scheduler.schedulerView(s,errorOrSchedulerView))
+ }
+ }
+
+ def brokers(s: String) = Action.async {
+ kafkaManager.getSchedulerBrokerList(s).map { errorOrBrokerList =>
+ Ok(scheduler.views.html.broker.brokerList(s,errorOrBrokerList))
+ }
+ }
+
+ def broker(s: String, b: Int) = Action.async {
+ kafkaManager.getBrokerIdentity(s,b).map { errorOrBrokerView =>
+ Ok(scheduler.views.html.broker.brokerView(s,b,errorOrBrokerView))
+ }
+ }
+}
+
diff --git a/app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala b/app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala
new file mode 100644
index 000000000..13a62348d
--- /dev/null
+++ b/app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala
@@ -0,0 +1,96 @@
+package scheduler.kafka.manager
+
+import kafka.manager.ActorModel._
+import kafka.manager._
+import org.apache.curator.framework.CuratorFramework
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+case class KafkaSchedulerCommandActorConfig(schedulerConfig: SchedulerConfig,
+ curator: CuratorFramework,
+ longRunningPoolConfig: LongRunningPoolConfig,
+ askTimeoutMillis: Long = 400,
+ version: KafkaVersion)
+
+class KafkaSchedulerCommandActor(kafkaCommandActorConfig: KafkaSchedulerCommandActorConfig) extends BaseCommandActor with LongRunningPoolActor {
+
+ val schedulerRestClient = new SchedulerRestClient(kafkaCommandActorConfig.schedulerConfig.apiUrl)(play.api.libs.concurrent.Execution.Implicits.defaultContext)
+
+ @scala.throws[Exception](classOf[Exception])
+ override def preStart() = {
+ log.info("Started actor %s".format(self.path))
+ }
+
+ @scala.throws[Exception](classOf[Exception])
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ log.error(reason, "Restarting due to [{}] when processing [{}]",
+ reason.getMessage, message.getOrElse(""))
+ super.preRestart(reason, message)
+ }
+
+ @scala.throws[Exception](classOf[Exception])
+ override def postStop(): Unit = {
+ super.postStop()
+ }
+
+ override protected def longRunningPoolConfig: LongRunningPoolConfig = kafkaCommandActorConfig.longRunningPoolConfig
+
+ override protected def longRunningQueueFull(): Unit = {
+ sender ! KCCommandResult(Try(throw new UnsupportedOperationException("Long running executor blocking queue is full!")))
+ }
+
+ override def processActorResponse(response: ActorResponse): Unit = {
+ response match {
+ case any: Any => log.warning("ksca : processActorResponse : Received unknown message: {}", any)
+ }
+ }
+
+ def futureToKCCommandResult[T](future: Future[T])(implicit ec: ExecutionContext): Future[KCCommandResult] = {
+ future.map {
+ _ => KCCommandResult(Success(()))
+ }.recover {
+ case e: Throwable => KCCommandResult(Failure(e))
+ }
+ }
+
+ override def processCommandRequest(request: CommandRequest): Unit = {
+ implicit val ec = longRunningExecutionContext
+ request match {
+
+ case KSCAddBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) =>
+ longRunning {
+ futureToKCCommandResult(schedulerRestClient.addBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))
+ }
+
+ case KSCUpdateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) =>
+ longRunning {
+ futureToKCCommandResult(schedulerRestClient.updateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))
+ }
+
+ case KSCStartBroker(id) =>
+ longRunning {
+ futureToKCCommandResult(schedulerRestClient.startBroker(id))
+ }
+
+ case KSCStopBroker(id) =>
+ longRunning {
+ futureToKCCommandResult(schedulerRestClient.stopBroker(id))
+ }
+
+ case KSCRemoveBroker(id) =>
+ longRunning {
+ futureToKCCommandResult(schedulerRestClient.removeBroker(id))
+ }
+
+ case KSCRebalanceTopics(ids, topics) =>
+ longRunning {
+ futureToKCCommandResult(schedulerRestClient.rebalanceTopics(ids, topics))
+ }
+
+ case any: Any => log.warning("ksca : processCommandRequest : Received unknown message: {}", any)
+ }
+ }
+}
+
+
diff --git a/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala b/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala
new file mode 100644
index 000000000..3d6a4b0df
--- /dev/null
+++ b/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+ * See accompanying LICENSE file.
+ */
+
+package scheduler.kafka.manager
+
+import akka.pattern.pipe
+import kafka.manager.ActorModel._
+import kafka.manager.{BaseQueryCommandActor, SchedulerConfig}
+import org.apache.curator.framework.CuratorFramework
+
+class KafkaSchedulerStateActor(curator: CuratorFramework,
+ schedulerConfig: SchedulerConfig) extends BaseQueryCommandActor {
+
+ val schedulerRestClient = new SchedulerRestClient(schedulerConfig.apiUrl)(play.api.libs.concurrent.Execution.Implicits.defaultContext)
+
+ @scala.throws[Exception](classOf[Exception])
+ override def preStart() = {
+ log.info("Started actor %s".format(self.path))
+ }
+
+ @scala.throws[Exception](classOf[Exception])
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ log.error(reason, "Restarting due to [{}] when processing [{}]",
+ reason.getMessage, message.getOrElse(""))
+ super.preRestart(reason, message)
+ }
+
+
+ @scala.throws[Exception](classOf[Exception])
+ override def postStop(): Unit = {
+ log.info("Stopped actor %s".format(self.path))
+
+ super.postStop()
+ }
+
+ override def processActorResponse(response: ActorResponse): Unit = {
+ response match {
+ case any: Any => log.warning("kssa : processActorResponse : Received unknown message: {}", any.toString)
+ }
+ }
+
+ override def processQueryRequest(request: QueryRequest): Unit = {
+ request match {
+ case KSGetTopics =>
+ sender ! TopicList(IndexedSeq(), Set(), null)
+
+ case KSGetAllTopicDescriptions(lastUpdateMillisOption) =>
+ sender ! TopicDescriptions(IndexedSeq.empty, 0L)
+
+ case SchedulerKSGetBrokers =>
+ implicit val ec = context.dispatcher
+
+ def deserMap(map: String): Seq[(String, String)] =
+ map.split(",").map {
+ kv =>
+ val pair = kv.split("=")
+ (pair(0), pair(1))
+ }
+
+ schedulerRestClient.getStatus map {
+ status =>
+ val brokerIdentities =
+ status.brokers.getOrElse(Seq.empty).map{
+ b =>
+ SchedulerBrokerIdentity(b.id.toInt, b.active, b.cpus, b.mem, b.heap, b.port,
+ b.bindAddress, b.constraints.map(deserMap).getOrElse(Seq.empty),
+ b.options.map(deserMap).getOrElse(Seq.empty), b.log4jOptions.map(deserMap).getOrElse(Seq.empty), b.jvmOptions,
+ SchedulerBrokerStickinessIdentity(b.stickiness.period, b.stickiness.stopTime, b.stickiness.hostname),
+ SchedulerBrokerFailoverIdentity(b.failover.delay, b.failover.maxDelay, b.failover.maxTries, b.failover.failures, b.failover.failureTime),
+ b.task.map(task => SchedulerBrokerTaskIdentity(task.id, task.slaveId, task.executorId, task.hostname, task.endpoint, task.state)), schedulerConfig)
+ }
+ SchedulerBrokerList(brokerIdentities, schedulerConfig)
+ } pipeTo sender()
+
+ case any: Any => log.warning("kssa : processQueryRequest : Received unknown message: {}", any.toString)
+ }
+ }
+
+ override def processCommandRequest(request: CommandRequest): Unit = {
+ request match {
+ case any: Any => log.warning("kssa : processCommandRequest : Received unknown message: {}", any.toString)
+ }
+ }
+}
\ No newline at end of file
diff --git a/app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala b/app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala
new file mode 100644
index 000000000..3b972667c
--- /dev/null
+++ b/app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala
@@ -0,0 +1,225 @@
+/**
+ * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+ * See accompanying LICENSE file.
+ */
+
+package scheduler.kafka.manager
+
+import akka.actor.{ActorRef, Cancellable, ActorPath}
+import kafka.manager.ActorModel._
+import kafka.manager._
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.Try
+
+case class SchedulerBrokerViewCacheActorConfig(schedulerKafkaStateActorPath: ActorPath,
+ schedulerConfig: SchedulerConfig,
+ longRunningPoolConfig: LongRunningPoolConfig,
+ updatePeriod: FiniteDuration = 10 seconds)
+class SchedulerBrokerViewCacheActor(config: SchedulerBrokerViewCacheActorConfig) extends LongRunningPoolActor {
+
+ private[this] val ZERO = BigDecimal(0)
+
+ private[this] var cancellable : Option[Cancellable] = None
+
+ private[this] var topicIdentities : Map[String, TopicIdentity] = Map.empty
+
+ private[this] var topicDescriptionsOption : Option[TopicDescriptions] = None
+
+ private[this] var brokerListOption : Option[SchedulerBrokerList] = None
+
+ private[this] var brokerMetrics : Map[Int, BrokerMetrics] = Map.empty
+
+ private[this] val brokerTopicPartitions : mutable.Map[Int, SchedulerBrokerIdentity] = new mutable.HashMap[Int, SchedulerBrokerIdentity]
+
+ private[this] val topicMetrics: mutable.Map[String, mutable.Map[Int, BrokerMetrics]] =
+ new mutable.HashMap[String, mutable.Map[Int, BrokerMetrics]]()
+
+ private[this] var combinedBrokerMetric : Option[BrokerMetrics] = None
+
+ override def preStart() = {
+ log.info("Started actor %s".format(self.path))
+ log.info("Scheduling updater for %s".format(config.updatePeriod))
+ cancellable = Some(
+ context.system.scheduler.schedule(0 seconds,
+ config.updatePeriod,
+ self,
+ BVForceUpdate)(context.system.dispatcher,self)
+ )
+ }
+
+ @scala.throws[Exception](classOf[Exception])
+ override def postStop(): Unit = {
+ log.info("Stopped actor %s".format(self.path))
+ log.info("Cancelling updater...")
+ Try(cancellable.map(_.cancel()))
+ super.postStop()
+ }
+
+ override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig
+
+ override protected def longRunningQueueFull(): Unit = {
+ log.error("Long running pool queue full, skipping!")
+ }
+
+ override def processActorRequest(request: ActorRequest): Unit = {
+ request match {
+ case BVForceUpdate =>
+ log.info("Updating scheduler broker view...")
+ //ask for topic descriptions
+ val lastUpdateMillisOption: Option[Long] = topicDescriptionsOption.map(_.lastUpdateMillis)
+ context.actorSelection(config.schedulerKafkaStateActorPath).tell(KSGetAllTopicDescriptions(lastUpdateMillisOption), self)
+ context.actorSelection(config.schedulerKafkaStateActorPath).tell(SchedulerKSGetBrokers, self)
+
+ case SMGetBrokerIdentity(id) =>
+ sender ! brokerTopicPartitions.get(id).map { bv =>
+ val bcs = for {
+ metrics <- bv.metrics
+ cbm <- combinedBrokerMetric
+ } yield {
+ val perMessages = if(cbm.messagesInPerSec.oneMinuteRate > 0) {
+ BigDecimal(metrics.messagesInPerSec.oneMinuteRate / cbm.messagesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
+ } else ZERO
+ val perIncoming = if(cbm.bytesInPerSec.oneMinuteRate > 0) {
+ BigDecimal(metrics.bytesInPerSec.oneMinuteRate / cbm.bytesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
+ } else ZERO
+ val perOutgoing = if(cbm.bytesOutPerSec.oneMinuteRate > 0) {
+ BigDecimal(metrics.bytesOutPerSec.oneMinuteRate / cbm.bytesOutPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
+ } else ZERO
+ BrokerClusterStats(perMessages, perIncoming, perOutgoing)
+ }
+ if(bcs.isDefined) {
+ bv.copy(stats=bcs)
+ } else {
+ bv
+ }
+ }
+
+ case BVGetBrokerMetrics =>
+ sender ! brokerMetrics
+
+ case BVGetTopicMetrics(topic) =>
+ sender ! topicMetrics.get(topic).map(m => m.values.foldLeft(BrokerMetrics.DEFAULT)((acc,bm) => acc + bm))
+
+ case BVGetTopicIdentities =>
+ sender ! topicIdentities
+
+ case BVUpdateTopicMetricsForBroker(id, metrics) =>
+ metrics.foreach {
+ case (topic, bm) =>
+ val tm = topicMetrics.getOrElse(topic, new mutable.HashMap[Int, BrokerMetrics])
+ tm.put(id, bm)
+ topicMetrics.put(topic, tm)
+ }
+
+ case BVUpdateBrokerMetrics(id, metrics) =>
+ brokerMetrics += (id -> metrics)
+ combinedBrokerMetric = Option(brokerMetrics.values.foldLeft(BrokerMetrics.DEFAULT)((acc, m) => acc + m))
+ for {
+ bv <- brokerTopicPartitions.get(id)
+ } {
+ brokerTopicPartitions.put(id, bv.copy(metrics = Option(metrics)))
+ }
+
+ case any: Any => log.warning("sbvca : processActorRequest : Received unknown message: {}", any)
+ }
+ }
+
+ override def processActorResponse(response: ActorResponse): Unit = {
+ response match {
+ case td: TopicDescriptions =>
+ topicDescriptionsOption = Some(td)
+ updateView()
+
+ case bl: SchedulerBrokerList =>
+ brokerListOption = Some(bl)
+ updateView()
+
+ case any: Any => log.warning("sbvca : processActorResponse : Received unknown message: {}", any)
+ }
+ }
+
+ private[this] def updateView(): Unit = {
+ for {
+ brokerList <- brokerListOption
+ topicDescriptions <- topicDescriptionsOption
+ } {
+ val topicIdentity : IndexedSeq[TopicIdentity] = IndexedSeq.empty
+
+ topicIdentities = topicIdentity.map(ti => (ti.topic, ti)).toMap
+ val topicPartitionByBroker = topicIdentity.flatMap(
+ ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)
+
+ //check for 2*broker list size since we schedule 2 jmx calls for each broker
+ if (config.schedulerConfig.jmxEnabled && hasCapacityFor(2 * brokerListOption.size)) {
+ implicit val ec = longRunningExecutionContext
+ val brokerLookup = brokerList.list.map(bi => bi.id -> bi).toMap
+ topicPartitionByBroker.foreach {
+ case (brokerId, topicPartitions) =>
+ val brokerInfoOpt = brokerLookup.get(brokerId)
+ for (broker <- brokerInfoOpt;
+ host <- broker.actualHost();
+ port <- broker.actualPort()) {
+ longRunning {
+ Future {
+ // TODO JMX Port is temporary hardcoded to 9999
+ val tryResult = KafkaJMX.doWithConnection(host, 9999) {
+ mbsc =>
+ topicPartitions.map {
+ case (topic, id, partitions) =>
+ (topic.topic,
+ KafkaMetrics.getBrokerMetrics(config.schedulerConfig.version, mbsc, Option(topic.topic)))
+ }
+ }
+ val result = tryResult match {
+ case scala.util.Failure(t) =>
+ log.error(t, s"Failed to get topic metrics for broker $broker")
+ topicPartitions.map {
+ case (topic, id, partitions) =>
+ (topic.topic, BrokerMetrics.DEFAULT)
+ }
+ case scala.util.Success(bm) => bm
+ }
+ self.tell(BVUpdateTopicMetricsForBroker(broker.id, result), ActorRef.noSender)
+ }
+ }
+ }
+ }
+
+ brokerList.list.foreach {
+ broker =>
+ for (host <- broker.actualHost();
+ port <- broker.actualPort()) {
+ longRunning {
+ Future {
+ // TODO JMX Port is temporary hardcoded to 9999
+ val tryResult = KafkaJMX.doWithConnection(host, 9999) {
+ mbsc =>
+ KafkaMetrics.getBrokerMetrics(config.schedulerConfig.version, mbsc)
+ }
+
+ val result = tryResult match {
+ case scala.util.Failure(t) =>
+ log.error(t, s"Failed to get broker metrics for $broker")
+ BrokerMetrics.DEFAULT
+ case scala.util.Success(bm) => bm
+ }
+ self.tell(BVUpdateBrokerMetrics(broker.id, result), ActorRef.noSender)
+ }
+ }
+ }
+ }
+ } else if (config.schedulerConfig.jmxEnabled) {
+ log.warning("Not scheduling update of JMX for all brokers, not enough capacity!")
+ }
+
+ brokerTopicPartitions.clear()
+ brokerList.list.foreach {
+ bi =>
+ brokerTopicPartitions.put(bi.id, bi.copy(metrics = brokerMetrics.get(bi.id)))
+ }
+ }
+ }
+
+}
diff --git a/app/scheduler/kafka/manager/SchedulerManagerActor.scala b/app/scheduler/kafka/manager/SchedulerManagerActor.scala
new file mode 100644
index 000000000..9fd939dab
--- /dev/null
+++ b/app/scheduler/kafka/manager/SchedulerManagerActor.scala
@@ -0,0 +1,234 @@
+/**
+ * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+ * See accompanying LICENSE file.
+ */
+
+package scheduler.kafka.manager
+
+import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
+
+import akka.actor.{ActorPath, Props}
+import akka.pattern._
+import akka.util.Timeout
+import kafka.manager._
+import kafka.manager.utils.AdminUtils
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.cache.PathChildrenCache
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
+import org.apache.zookeeper.CreateMode
+
+import scala.concurrent.duration.{FiniteDuration, _}
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.ClassTag
+import scala.util.Try
+
+import kafka.manager.ActorModel._
+
+case class SchedulerManagerActorConfig(pinnedDispatcherName: String,
+ baseZkPath : String,
+ curatorConfig: CuratorConfig,
+ schedulerConfig: SchedulerConfig,
+ updatePeriod: FiniteDuration,
+ threadPoolSize: Int = 2,
+ maxQueueSize: Int = 100,
+ // enlarging from 2s to 10s because 2s for Scheduler REST API is not enough
+ askTimeoutMillis: Long = 10000,
+ mutexTimeoutMillis: Int = 4000)
+
+class SchedulerManagerActor(smConfig: SchedulerManagerActorConfig)
+ extends BaseQueryCommandActor with CuratorAwareActor with BaseZkPath {
+
+ //this is from base zk path trait
+ override def baseZkPath : String = smConfig.baseZkPath
+
+ //this is for curator aware actor
+ override def curatorConfig: CuratorConfig = smConfig.curatorConfig
+
+ val longRunningExecutor = new ThreadPoolExecutor(
+ smConfig.threadPoolSize, smConfig.threadPoolSize,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue[Runnable](smConfig.maxQueueSize))
+ val longRunningExecutionContext = ExecutionContext.fromExecutor(longRunningExecutor)
+
+ protected[this] val sharedClusterCurator : CuratorFramework = getCurator(smConfig.schedulerConfig.curatorConfig)
+ log.info("Starting shared curator...")
+ sharedClusterCurator.start()
+
+ //create cluster path
+ Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseZkPath))
+ require(curator.checkExists().forPath(baseZkPath) != null,s"Cluster path not found : $baseZkPath")
+
+ private[this] val baseTopicsZkPath = zkPath("topics")
+
+ //create cluster path for storing topics state
+ Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseTopicsZkPath))
+ require(curator.checkExists().forPath(baseTopicsZkPath) != null,s"Cluster path for topics not found : $baseTopicsZkPath")
+
+ private[this] val mutex = new InterProcessSemaphoreMutex(curator, zkPath("mutex"))
+
+ private[this] val adminUtils = new AdminUtils(smConfig.schedulerConfig.version)
+
+ private[this] val kssProps = Props(classOf[KafkaSchedulerStateActor],sharedClusterCurator, smConfig.schedulerConfig)
+ private[this] val kafkaSchedulerStateActor : ActorPath = context.actorOf(kssProps.withDispatcher(smConfig.pinnedDispatcherName),"kafka-scheduler-state").path
+
+ private[this] val bvConfig = SchedulerBrokerViewCacheActorConfig(
+ kafkaSchedulerStateActor,
+ smConfig.schedulerConfig,
+ LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000),
+ smConfig.updatePeriod)
+ private[this] val bvcProps = Props(classOf[SchedulerBrokerViewCacheActor],bvConfig)
+ private[this] val brokerViewCacheActor : ActorPath = context.actorOf(bvcProps,"scheduler-broker-view").path
+
+ private[this] val kscProps = {
+ val kscaConfig = KafkaSchedulerCommandActorConfig(
+ smConfig.schedulerConfig,
+ sharedClusterCurator,
+ LongRunningPoolConfig(smConfig.threadPoolSize, smConfig.maxQueueSize),
+ smConfig.askTimeoutMillis,
+ smConfig.schedulerConfig.version)
+ Props(classOf[KafkaSchedulerCommandActor],kscaConfig)
+ }
+ private[this] val kafkaSchedulerCommandActor : ActorPath = context.actorOf(kscProps,"kafka-scheduler-command").path
+
+ private[this] implicit val timeout: Timeout = FiniteDuration(smConfig.askTimeoutMillis,MILLISECONDS)
+
+ private[this] val clusterManagerTopicsPathCache = new PathChildrenCache(curator,baseTopicsZkPath,true)
+
+ @scala.throws[Exception](classOf[Exception])
+ override def preStart() = {
+ super.preStart()
+ log.info("Started actor %s".format(self.path))
+ log.info("Starting cluster manager topics path cache...")
+ clusterManagerTopicsPathCache.start(StartMode.BUILD_INITIAL_CACHE)
+ }
+
+ @scala.throws[Exception](classOf[Exception])
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ log.error(reason, "Restarting due to [{}] when processing [{}]",
+ reason.getMessage, message.getOrElse(""))
+ super.preRestart(reason, message)
+ }
+
+ @scala.throws[Exception](classOf[Exception])
+ override def postStop(): Unit = {
+ log.info("Stopped actor %s".format(self.path))
+ log.info("Shutting down shared curator...")
+ Try(sharedClusterCurator.close())
+
+ log.info("Shutting down scheduler manager topics path cache...")
+ Try(clusterManagerTopicsPathCache.close())
+ super.postStop()
+ }
+
+ override def processActorResponse(response: ActorResponse): Unit = {
+ response match {
+ case any: Any => log.warning("cma : processActorResponse : Received unknown message: {}", any)
+ }
+ }
+
+ override def processQueryRequest(request: QueryRequest): Unit = {
+ request match {
+ case ksRequest: KSRequest =>
+ context.actorSelection(kafkaSchedulerStateActor).forward(ksRequest)
+
+ case bvRequest: BVRequest =>
+ context.actorSelection(brokerViewCacheActor).forward(bvRequest)
+
+ case SMGetView =>
+ implicit val ec = context.dispatcher
+ val eventualBrokerList = withKafkaSchedulerStateActor(SchedulerKSGetBrokers)(identity[SchedulerBrokerList])
+ val eventualTopicList = withKafkaSchedulerStateActor(KSGetTopics)(identity[TopicList])
+ val result = for {
+ bl <- eventualBrokerList
+ tl <- eventualTopicList
+ } yield SMView(tl.list.size, bl.list.size, smConfig.schedulerConfig)
+ result pipeTo sender
+
+ case any: Any => log.warning("sma : processQueryResponse : Received unknown message: {}", any)
+ }
+ }
+
+ override def processCommandRequest(request: CommandRequest): Unit = {
+ request match {
+ case SMAddBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) =>
+ implicit val ec = longRunningExecutionContext
+
+ withKafkaCommandActor(KSCAddBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover)){
+ kcResponse: KCCommandResult =>
+ Future.successful(SMCommandResult(kcResponse.result))
+ } pipeTo sender()
+
+ case SMUpdateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) =>
+ implicit val ec = longRunningExecutionContext
+
+ withKafkaCommandActor(KSCUpdateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover)){
+ kcResponse: KCCommandResult =>
+ Future.successful(SMCommandResult(kcResponse.result))
+ } pipeTo sender()
+
+ case SMStartBroker(id) =>
+ implicit val ec = longRunningExecutionContext
+
+ withKafkaCommandActor(KSCStartBroker(id)){
+ kcResponse: KCCommandResult =>
+ Future.successful(SMCommandResult(kcResponse.result))
+ } pipeTo sender()
+
+ case SMStopBroker(id) =>
+ implicit val ec = longRunningExecutionContext
+
+ withKafkaCommandActor(KSCStopBroker(id)){
+ kcResponse: KCCommandResult =>
+ Future.successful(SMCommandResult(kcResponse.result))
+ } pipeTo sender()
+
+ case SMRemoveBroker(id) =>
+ implicit val ec = longRunningExecutionContext
+
+ withKafkaCommandActor(KSCRemoveBroker(id)){
+ kcResponse: KCCommandResult =>
+ Future.successful(SMCommandResult(kcResponse.result))
+ } pipeTo sender()
+
+ case SMRebalanceTopics(ids, topics) =>
+ implicit val ec = longRunningExecutionContext
+
+ withKafkaCommandActor(KSCRebalanceTopics(ids, topics)){
+ kcResponse: KCCommandResult =>
+ Future.successful(SMCommandResult(kcResponse.result))
+ } pipeTo sender()
+
+ case any: Any => log.warning("cma : processCommandRequest : Received unknown message: {}", any)
+ }
+ }
+
+ private[this] def withKafkaSchedulerStateActor[Input,Output,FOutput]
+ (msg: Input)(fn: Output => FOutput)(implicit tag: ClassTag[Output], ec: ExecutionContext) : Future[FOutput] = {
+ context.actorSelection(kafkaSchedulerStateActor).ask(msg).mapTo[Output].map(fn)
+ }
+
+ private[this] def withKafkaCommandActor[Input,Output,FOutput]
+ (msg: Input)(fn: Output => FOutput)(implicit tag: ClassTag[Output], ec: ExecutionContext) : Future[FOutput] = {
+ context.actorSelection(kafkaSchedulerCommandActor).ask(msg).mapTo[Output].map(fn)
+ }
+
+ private[this] def modify[T](fn: => T): T = {
+ try {
+ mutex.acquire(smConfig.mutexTimeoutMillis,TimeUnit.MILLISECONDS)
+ fn
+ } finally {
+ if(mutex.isAcquiredInThisProcess) {
+ mutex.release()
+ }
+ }
+ }
+
+ def getNonExistentBrokers(availableBrokers: BrokerList, selectedBrokers: Seq[Int]): Seq[Int] = {
+ val availableBrokerIds: Set[Int] = availableBrokers.list.map(_.id.toInt).toSet
+ selectedBrokers filter { b: Int => !availableBrokerIds.contains(b) }
+ }
+
+ def getNonExistentBrokers(availableBrokers: BrokerList, assignments: Map[Int, Seq[Int]]): Seq[Int] = {
+ val brokersAssigned = assignments.flatMap({ case (pt, bl) => bl }).toSet.toSeq
+ getNonExistentBrokers(availableBrokers, brokersAssigned)
+ }
+}
diff --git a/app/scheduler/kafka/manager/SchedulerRestClient.scala b/app/scheduler/kafka/manager/SchedulerRestClient.scala
new file mode 100644
index 000000000..51a1b5dbd
--- /dev/null
+++ b/app/scheduler/kafka/manager/SchedulerRestClient.scala
@@ -0,0 +1,235 @@
+package scheduler.kafka.manager
+
+import java.util.Date
+
+import org.slf4j.LoggerFactory
+import play.api.Play.current
+import play.api.libs.json.{JsError, JsSuccess}
+import play.api.libs.ws._
+import scheduler.kafka.manager.SchedulerRestClient.{AddBrokerResponse, Broker, StatusResponse}
+import scheduler.models.form.Failover
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object SchedulerRestClient {
+
+ import play.api.libs.functional.syntax._
+ import play.api.libs.json._
+
+
+ case class Task(id: String,
+ slaveId: String,
+ executorId: String,
+ hostname: String,
+ endpoint: Option[String],
+ state: String)
+
+ implicit val taskReads: Reads[Task] = (
+ (__ \ 'id).read[String] and
+ (__ \ 'slaveId).read[String] and
+ (__ \ 'executorId).read[String] and
+ (__ \ 'hostname).read[String] and
+ (__ \ 'endpoint).readNullable[String] and
+ (__ \ 'state).read[String]
+ )(Task)
+
+ case class Failover(delay: String,
+ maxDelay: String,
+ maxTries: Option[Int],
+ failures: Option[Int],
+ failureTime: Option[Date])
+
+ implicit val failoverReads: Reads[Failover] = (
+ (__ \ 'delay).read[String] and
+ (__ \ 'maxDelay).read[String] and
+ (__ \ 'maxTries).readNullable[Int] and
+ (__ \ 'failures).readNullable[Int] and
+ (__ \ 'failureTime).readNullable[Date]
+ )(Failover)
+
+ case class Stickiness(period: String, stopTime: Option[Date], hostname: Option[String])
+
+ implicit val stickinessReads: Reads[Stickiness] = (
+ (__ \ 'period).read[String] and
+ (__ \ 'stopTime).readNullable[Date] and
+ (__ \ 'hostname).readNullable[String]
+ )(Stickiness)
+
+
+ case class Broker(id: String,
+ active: Boolean,
+ cpus: Double,
+ mem: Long,
+ heap: Long,
+ port: Option[String],
+ bindAddress: Option[String],
+ constraints: Option[String],
+ options: Option[String],
+ log4jOptions: Option[String],
+ jvmOptions: Option[String],
+ stickiness: Stickiness,
+ failover: Failover,
+ task: Option[Task])
+
+ implicit val brokerReads: Reads[Broker] = (
+ (__ \ 'id).read[String] and
+ (__ \ 'active).read[Boolean] and
+ (__ \ 'cpus).read[Double] and
+ (__ \ 'mem).read[Long] and
+ (__ \ 'heap).read[Long] and
+ (__ \ 'port).readNullable[String] and
+ (__ \ 'bindAddress).readNullable[String] and
+ (__ \ 'constraints).readNullable[String] and
+ (__ \ 'options).readNullable[String] and
+ (__ \ 'log4jOptions).readNullable[String] and
+ (__ \ 'jvmOptions).readNullable[String] and
+ (__ \ 'stickiness).read[Stickiness] and
+ (__ \ 'failover).read[Failover] and
+ (__ \ 'task).readNullable[Task]
+ )(Broker)
+
+ case class StatusResponse(brokers: Option[Seq[Broker]], frameworkId: Option[String])
+
+ implicit val statusResponseReads: Reads[StatusResponse] = (
+ (__ \ 'brokers).readNullable[Seq[Broker]] and
+ (__ \ 'frameworkId).readNullable[String]
+ )(StatusResponse)
+
+
+ case class AddBrokerResponse(brokers: Seq[Broker])
+
+ implicit val addBrokerResponseReads: Reads[AddBrokerResponse] =
+ (__ \ 'brokers).read[Seq[Broker]].map(AddBrokerResponse)
+
+}
+
+class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: ExecutionContext) {
+ private[this] lazy val logger = LoggerFactory.getLogger(this.getClass)
+
+ private val BrokerApiPrefix = s"$apiUrl/api/broker"
+ private val StatusUrl = s"$BrokerApiPrefix/list"
+ private val AddBrokerUrl = s"$BrokerApiPrefix/add"
+ private val UpdateBrokerUrl = s"$BrokerApiPrefix/update"
+ private val StartBrokerUrl = s"$BrokerApiPrefix/start"
+ private val StopBrokerUrl = s"$BrokerApiPrefix/stop"
+ private val RemoveBrokerUrl = s"$BrokerApiPrefix/remove"
+
+ private val TopicApiPrefix = s"$apiUrl/api/topic"
+ private val RebalanceTopicsUrl = s"$TopicApiPrefix/rebalance"
+
+ private val Timeout = 10000
+
+ def getStatus: Future[StatusResponse] = {
+ val holder: Future[WSResponse] = WS
+ .url(StatusUrl)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map {
+ response => response.json.validate[StatusResponse]
+ }.flatMap {
+ case JsError(e) =>
+ logger.error(s"Failed to parse status response $e")
+ Future.failed(new Exception("Failed to parse status response json"))
+ case JsSuccess(status, _) =>
+ Future.successful(status)
+ }
+ }
+
+ def addBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover): Future[Seq[Broker]] = {
+
+ val queryParamsSeq = Seq(
+ "broker" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString),
+ "port" -> port, "bindAddress" -> bindAddress, "constraints" -> constraints,
+ "options" -> options, "log4jOptions" -> log4jOptions, "jvmOptions" -> jvmOptions,
+ "stickinessPeriod" -> stickinessPeriod.map(_.toString), "failoverDelay" -> failover.failoverDelay.map(_.toString),
+ "failoverMaxDelay" -> failover.failoverMaxDelay.map(_.toString), "failoverMaxTries" -> failover.failoverMaxTries.map(_.toString)).collect {
+ case (key, Some(v)) => (key, v)
+ }
+
+ val holder: Future[WSResponse] = WS
+ .url(AddBrokerUrl)
+ .withQueryString(queryParamsSeq: _*)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map {
+ response =>
+ response.json.validate[AddBrokerResponse]
+ }.flatMap {
+ case JsError(e) =>
+ logger.error(s"Failed to parse add broker response $e")
+ Future.failed(new Exception("Failed to parse add broker response json"))
+ case JsSuccess(brokers, _) =>
+ Future.successful(brokers.brokers)
+ }
+ }
+
+ def updateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover): Future[Unit] = {
+
+ val queryParamsSeq = Seq(
+ "broker" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString),
+ "port" -> port, "bindAddress" -> bindAddress, "constraints" -> constraints,
+ "options" -> options, "log4jOptions" -> log4jOptions, "jvmOptions" -> jvmOptions,
+ "stickinessPeriod" -> stickinessPeriod.map(_.toString), "failoverDelay" -> failover.failoverDelay.map(_.toString),
+ "failoverMaxDelay" -> failover.failoverMaxDelay.map(_.toString), "failoverMaxTries" -> failover.failoverMaxTries.map(_.toString)).collect {
+ case (key, Some(v)) => (key, v)
+ }
+
+ val holder: Future[WSResponse] = WS
+ .url(UpdateBrokerUrl)
+ .withQueryString(queryParamsSeq: _*)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map { _ => () }
+ }
+
+ def startBroker(id: Int): Future[Unit] = {
+ val holder: Future[WSResponse] = WS
+ .url(StartBrokerUrl)
+ .withQueryString(Seq("broker" -> id.toString, "timeout" -> "0"): _*)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map { _ => () }
+ }
+
+ def stopBroker(id: Int): Future[Unit] = {
+ val holder: Future[WSResponse] = WS
+ .url(StopBrokerUrl)
+ .withQueryString(Seq("broker" -> id.toString, "timeout" -> "0"): _*)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map { _ => () }
+ }
+
+ def removeBroker(id: Int): Future[Unit] = {
+ val holder: Future[WSResponse] = WS
+ .url(RemoveBrokerUrl)
+ .withQueryString(Seq("broker" -> id.toString): _*)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map { _ => () }
+ }
+
+ def rebalanceTopics(ids: String, topics: Option[String]): Future[Unit] = {
+ val holder: Future[WSResponse] = WS
+ .url(RebalanceTopicsUrl)
+ .withQueryString(Seq("broker" -> Some(ids), "topic" -> topics).collect {
+ case (key, Some(v)) => (key, v)
+ }: _*)
+ .withRequestTimeout(Timeout)
+ .get()
+
+ holder.map { _ => () }
+ }
+}
diff --git a/app/scheduler/models/form/BrokerOperations.scala b/app/scheduler/models/form/BrokerOperations.scala
new file mode 100644
index 000000000..975fe4785
--- /dev/null
+++ b/app/scheduler/models/form/BrokerOperations.scala
@@ -0,0 +1,15 @@
+package scheduler.models.form
+
+sealed trait BrokerOperations
+
+case class Failover(failoverDelay: Option[String], failoverMaxDelay: Option[String], failoverMaxTries:Option[Int])
+
+case class AddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover)
+
+case class UpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String],
+ bindAddress: Option[String], constraints: Option[String], options: Option[String],
+ log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String],
+ failover: Failover)
diff --git a/app/scheduler/models/form/RebalanceTopicsOperations.scala b/app/scheduler/models/form/RebalanceTopicsOperations.scala
new file mode 100644
index 000000000..72bb6c14a
--- /dev/null
+++ b/app/scheduler/models/form/RebalanceTopicsOperations.scala
@@ -0,0 +1,7 @@
+package scheduler.models.form
+
+sealed trait RebalanceTopicsOperations
+
+case class RebalanceTopics(ids: String, topics: Option[String]) extends RebalanceTopicsOperations
+
+
diff --git a/app/scheduler/views/broker/addBroker.scala.html b/app/scheduler/views/broker/addBroker.scala.html
new file mode 100644
index 000000000..6b413ce3f
--- /dev/null
+++ b/app/scheduler/views/broker/addBroker.scala.html
@@ -0,0 +1,57 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import helper._
+@import b3.vertical.fieldConstructor
+@import scheduler.controllers.routes
+@import scalaz.{\/}
+@(schedulerName: String, errorOrForm: kafka.manager.ApiError \/ Form[scheduler.models.form.AddBroker])(implicit request: RequestHeader)
+
+@theMenu = {
+@views.html.navigation.schedulerMenu(schedulerName,"Broker","Add Broker",models.navigation.Menus.schedulerMenus(schedulerName))
+}
+
+@renderForm(addBrokerForm: Form[scheduler.models.form.AddBroker]) = {
+
+ @b3.form(routes.Broker.handleAddBroker(schedulerName)) {
+
+
+
+ |
+ @b3.text(addBrokerForm("id"), '_label -> "ID", 'placeholder -> "", 'autofocus -> true )
+ @b3.text(addBrokerForm("cpus"), '_label -> "CPU", 'placeholder -> "")
+ @b3.text(addBrokerForm("mem"), '_label -> "Memory", 'placeholder -> "")
+ @b3.text(addBrokerForm("heap"), '_label -> "Heap", 'placeholder -> "")
+ @b3.text(addBrokerForm("port"), '_label -> "Port", 'placeholder -> "")
+ @b3.text(addBrokerForm("bindAddress"), '_label -> "Bind Address", 'placeholder -> "")
+ @b3.text(addBrokerForm("stickinessPeriod"), '_label -> "Stickiness Period", 'placeholder -> "")
+ @b3.text(addBrokerForm("constraints"), '_label -> "Constraints", 'placeholder -> "")
+ @b3.text(addBrokerForm("jvmOptions"), '_label -> "Jvm Options", 'placeholder -> "")
+ @b3.text(addBrokerForm("options"), '_label -> "Options", 'placeholder -> "")
+ @b3.text(addBrokerForm("log4jOptions"), '_label -> "Log4j Options", 'placeholder -> "")
+ @b3.text(addBrokerForm("failoverDelay"), '_label -> "Failover Delay", 'placeholder -> "")
+ @b3.text(addBrokerForm("failoverMaxDelay"), '_label -> "Failover Max Delay", 'placeholder -> "")
+ @b3.text(addBrokerForm("failoverMaxTries"), '_label -> "Failover Max Tries", 'placeholder -> "")
+ @b3.submit('class -> "submit-button btn btn-primary"){ Add }
+ Cancel
+ |
+
+
+
+ }
+
+}
+
+@main(
+"Add Broker",
+menu = theMenu,
+breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndScheduler("Add Broker",schedulerName))) {
+
+
+
Add Broker
+ @errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_))
+
+
+}
+
diff --git a/app/scheduler/views/broker/brokerList.scala.html b/app/scheduler/views/broker/brokerList.scala.html
new file mode 100644
index 000000000..45012d03b
--- /dev/null
+++ b/app/scheduler/views/broker/brokerList.scala.html
@@ -0,0 +1,46 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import kafka.manager.ActorModel.BrokerIdentity
+@import scalaz.{\/}
+@(schedulerName:String, errorOrBrokers: kafka.manager.ApiError \/ kafka.manager.SchedulerBrokerListExtended)
+
+@theMenu = {
+@views.html.navigation.schedulerMenu(schedulerName,"Brokers","",models.navigation.Menus.schedulerMenus(schedulerName))
+}
+
+@renderBrokerMetrics(bl: kafka.manager.SchedulerBrokerListExtended) = {
+@if(bl.schedulerConfig.jmxEnabled) {
+@views.html.common.brokerMetrics(bl.combinedMetric)
+} else {
+
+ Please enable JMX polling
here.
+
+}
+}
+
+@main(
+"Broker List",
+menu = theMenu,
+breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndScheduler("Brokers",schedulerName))) {
+
+
+
+
+ Brokers
+
+
+ @errorOrBrokers.fold( views.html.errors.onApiError(_), scheduler.views.html.broker.brokerListContent(schedulerName,_) )
+
+
+
+
+
Combined Metrics
+ @errorOrBrokers.fold( views.html.errors.onApiError(_), bl => renderBrokerMetrics(bl))
+
+
+
+}
diff --git a/app/scheduler/views/broker/brokerListContent.scala.html b/app/scheduler/views/broker/brokerListContent.scala.html
new file mode 100644
index 000000000..a69ba0012
--- /dev/null
+++ b/app/scheduler/views/broker/brokerListContent.scala.html
@@ -0,0 +1,33 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import kafka.manager.ActorModel.SchedulerBrokerIdentity
+@(schedulerName:String, brokerListExtended: kafka.manager.SchedulerBrokerListExtended)
+
+
+
+ | Id | Host | Port | State | Bytes In | Bytes Out |
+
+
+ @for(broker <- brokerListExtended.list.sortBy(_.id)) {
+
+ | @broker.id |
+ @broker.actualHost().getOrElse("-") |
+ @broker.actualPort().getOrElse("-") |
+ @broker.state() |
+
+
+ @brokerListExtended.metrics.get(broker.id).map(_.bytesInPerSec.formatOneMinuteRate)
+
+ |
+
+
+ @brokerListExtended.metrics.get(broker.id).map(_.bytesOutPerSec.formatOneMinuteRate)
+
+ |
+
+ }
+
+
+
diff --git a/app/scheduler/views/broker/brokerView.scala.html b/app/scheduler/views/broker/brokerView.scala.html
new file mode 100644
index 000000000..843e6573c
--- /dev/null
+++ b/app/scheduler/views/broker/brokerView.scala.html
@@ -0,0 +1,24 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import scalaz.{\/}
+@(schedulerName:String, brokerId: Int, errorOrBrokerView: kafka.manager.ApiError \/ kafka.manager.ActorModel.SchedulerBrokerIdentity)
+
+@theMenu = {
+ @views.html.navigation.schedulerMenu(schedulerName,"Brokers","",models.navigation.Menus.schedulerMenus(schedulerName))
+}
+
+@main(
+ "Broker View",
+ menu = theMenu,
+ breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndScheduler("Broker View",schedulerName,brokerId.toString))) {
+
+
+
+
Broker Id @brokerId
+
+ @errorOrBrokerView.fold[Html](views.html.errors.onApiError(_), scheduler.views.html.broker.brokerViewContent(schedulerName, brokerId, _))
+
+
+}
diff --git a/app/scheduler/views/broker/brokerViewContent.scala.html b/app/scheduler/views/broker/brokerViewContent.scala.html
new file mode 100644
index 000000000..8766a375d
--- /dev/null
+++ b/app/scheduler/views/broker/brokerViewContent.scala.html
@@ -0,0 +1,101 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import b3.vertical.fieldConstructor
+@(schedulerName: String, brokerId: Int, brokerView :kafka.manager.ActorModel.SchedulerBrokerIdentity)
+
+@renderBrokerMetrics = {
+ @if(brokerView.schedulerConfig.jmxEnabled) {
+ @views.html.common.brokerMetrics(brokerView.metrics)
+ } else {
+
+ Please enable JMX polling
here.
+
+ }
+}
+
+
+
+
+
Summary
+
+
+ | Status | @brokerView.state |
+ | CPUs | @brokerView.cpus |
+ | Memory | @brokerView.mem |
+ | Heap | @brokerView.heap |
+ | Constraints | @brokerView.constraintsDesc |
+ | Options | @brokerView.optionsDesc |
+ | Log4j Options | @brokerView.log4jOptionsDesc |
+ | Jvm Options | @brokerView.jvmOptions |
+ | # of Topics | @brokerView.numTopics |
+ | # of Partitions | @brokerView.numPartitions |
+ @if(brokerView.schedulerConfig.jmxEnabled) {
+ | % of Messages | @brokerView.stats.map(_.perMessages) |
+ | % of Incoming | @brokerView.stats.map(_.perIncoming) |
+ | % of Outgoing | @brokerView.stats.map(_.perOutgoing) |
+ }
+
+
+
+
+
Metrics
+ @renderBrokerMetrics
+
+
+
+
+
Operations
+
+
+
+ @if(brokerView.active) {
+ @b3.form(scheduler.controllers.routes.Broker.handleStopBroker(schedulerName, brokerId)) {
+
+ }
+ } else {
+ @b3.form(scheduler.controllers.routes.Broker.handleStartBroker(schedulerName, brokerId)) {
+
+ }
+ @b3.form(scheduler.controllers.routes.Broker.handleRemoveBroker(schedulerName, brokerId)) {
+
+ }
+ Update Broker
+ }
+
+
+
+
+
+
+
+
+
Per Topic Detail
+
+
+ | Topic | Replication | Total Partitions | Partitions on Broker | Partitions | Skewed? |
+
+
+ @for((ti,bp) <- brokerView.topicPartitions) {
+
+ | @ti.topic |
+ @ti.replicationFactor |
+ @ti.partitions |
+ @bp.size |
+ @bp.mkString("(",",",")") |
+ @ti.partitionsByBroker.find(_.id == brokerId).map(_.isSkewed).getOrElse("Unknown") |
+
+ }
+
+
+
+
+
+
diff --git a/app/scheduler/views/broker/rebalanceTopics.scala.html b/app/scheduler/views/broker/rebalanceTopics.scala.html
new file mode 100644
index 000000000..9cbb542c0
--- /dev/null
+++ b/app/scheduler/views/broker/rebalanceTopics.scala.html
@@ -0,0 +1,46 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+
+@import helper._
+@import b3.vertical.fieldConstructor
+@import scheduler.controllers.routes
+@import scalaz.{\/}
+@(schedulerName: String, errorOrForm: kafka.manager.ApiError \/ Form[scheduler.models.form.RebalanceTopics])(implicit request: RequestHeader)
+
+@theMenu = {
+@views.html.navigation.schedulerMenu(schedulerName,"Rebalance Topics","",models.navigation.Menus.schedulerMenus(schedulerName))
+}
+
+@renderForm(rebalanceForm: Form[scheduler.models.form.RebalanceTopics]) = {
+
+ @b3.form(routes.RebalanceTopics.handleRebalance(schedulerName)) {
+
+
+
+ |
+ @b3.text(rebalanceForm("ids"), '_label -> "Broker Ids Expression", 'placeholder -> "", 'autofocus -> true )
+ @b3.text(rebalanceForm("topics"), '_label -> "Topic Expression", 'placeholder -> "")
+ @b3.submit('class -> "submit-button btn btn-primary"){ Submit }
+ Cancel
+ |
+
+
+
+ }
+
+}
+
+@main(
+"Rebalance Topics",
+menu = theMenu,
+breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndCluster("Rebalance Topics",schedulerName))) {
+
+
+
Rebalance
+ @errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_))
+
+
+}
+
diff --git a/app/scheduler/views/broker/updateBroker.scala.html b/app/scheduler/views/broker/updateBroker.scala.html
new file mode 100644
index 000000000..33874e1b7
--- /dev/null
+++ b/app/scheduler/views/broker/updateBroker.scala.html
@@ -0,0 +1,56 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import helper._
+@import b3.vertical.fieldConstructor
+@import scheduler.controllers.routes
+@import scalaz.{\/}
+@(schedulerName: String, brokerId:Int, errorOrForm: kafka.manager.ApiError \/ Form[scheduler.models.form.UpdateBroker])(implicit request: RequestHeader)
+
+@theMenu = {
+@views.html.navigation.schedulerMenu(schedulerName,"Broker","Update Broker",models.navigation.Menus.schedulerMenus(schedulerName))
+}
+
+@renderForm(updateBrokerForm: Form[scheduler.models.form.UpdateBroker]) = {
+
+ @b3.form(routes.Broker.handleUpdateBroker(schedulerName, brokerId)) {
+
+
+
+ |
+ @b3.text(updateBrokerForm("id"), '_label -> "ID", 'placeholder -> "", 'readonly -> true)
+ @b3.text(updateBrokerForm("cpus"), '_label -> "CPU", 'placeholder -> "")
+ @b3.text(updateBrokerForm("mem"), '_label -> "Memory", 'placeholder -> "")
+ @b3.text(updateBrokerForm("heap"), '_label -> "Heap", 'placeholder -> "")
+ @b3.text(updateBrokerForm("port"), '_label -> "Port", 'placeholder -> "")
+ @b3.text(updateBrokerForm("bindAddress"), '_label -> "Bind Address", 'placeholder -> "")
+ @b3.text(updateBrokerForm("jvmOptions"), '_label -> "Jvm Options", 'placeholder -> "")
+ @b3.text(updateBrokerForm("stickinessPeriod"), '_label -> "Stickiness Period", 'placeholder -> "")
+ @b3.text(updateBrokerForm("failoverDelay"), '_label -> "Failover Delay", 'placeholder -> "")
+ @b3.text(updateBrokerForm("failoverMaxDelay"), '_label -> "Failover Max Delay", 'placeholder -> "")
+ @b3.text(updateBrokerForm("failoverMaxTries"), '_label -> "Failover Max Tries", 'placeholder -> "")
+ @b3.submit('class -> "submit-button btn btn-primary"){ Update }
+ Cancel
+ |
+
+
+
+ }
+
+}
+
+@main(
+ "Update Broker",
+ menu = theMenu,
+ breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndScheduler("Broker View",schedulerName,brokerId.toString))) {
+
+
+
Update Broker
+
+ @errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_))
+
+
+
+}
+
diff --git a/app/scheduler/views/scheduler/addScheduler.scala.html b/app/scheduler/views/scheduler/addScheduler.scala.html
new file mode 100644
index 000000000..64fbfe309
--- /dev/null
+++ b/app/scheduler/views/scheduler/addScheduler.scala.html
@@ -0,0 +1,38 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@(addSchedulerForm: Form[kafka.manager.SchedulerConfig])(implicit request: RequestHeader)
+
+@import b3.vertical.fieldConstructor
+@import controllers.routes
+
+@theMenu = {
+@views.html.navigation.defaultMenu(views.html.navigation.menuNav("Scheduler","Add Scheduler",models.navigation.Menus.indexMenu))
+}
+
+@drawForm(form : Form[kafka.manager.SchedulerConfig]) = {
+@b3.form(routes.Cluster.handleAddScheduler) {
+
+}
+}
+
+@main("Add Scheduler", menu = theMenu, breadcrumbs = views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withSView("Add Scheduler"))) {
+
+
+
Add Scheduler
+
+ @drawForm(addSchedulerForm)
+
+
+
+}
+
diff --git a/app/scheduler/views/scheduler/schedulerList.scala.html b/app/scheduler/views/scheduler/schedulerList.scala.html
new file mode 100644
index 000000000..8449ebb80
--- /dev/null
+++ b/app/scheduler/views/scheduler/schedulerList.scala.html
@@ -0,0 +1,58 @@
+@(schedulers: IndexedSeq[kafka.manager.SchedulerConfig])
+
+@import b3.vertical.fieldConstructor
+@import scheduler.controllers.routes
+
+
+
+ | Active | Operations | Version |
+
+
+ @for(scheduler <- schedulers) {
+
+ |
+ @if(scheduler.enabled) {
+ @scheduler.name
+ } else {
+ @scheduler.name
+ }
+ |
+
+ @*
+ *
+ * @if(scheduler.enabled) {
+ * Modify
+ * @b3.form(controllers.routes.Cluster.handleUpdateCluster(scheduler.name)) {
+ *
+ *
+ *
+ *
+ * @b3.submit('class -> "btn btn-warning ops-button"){ Disable }
+ * }
+ * } else {
+ * @b3.form(controllers.routes.Cluster.handleUpdateCluster(scheduler.name)) {
+ *
+ *
+ *
+ *
+ * @b3.submit('class -> "btn btn-success ops-button"){ Enable }
+ * }
+ * @b3.form(controllers.routes.Cluster.handleUpdateCluster(scheduler.name)) {
+ *
+ *
+ *
+ *
+ * @b3.submit('class -> "btn btn-danger ops-button"){ Delete }
+ * }
+ * }
+ *
+ *@
+ |
+
+ @scheduler.version.toString
+ |
+
+ }
+
+
+
diff --git a/app/scheduler/views/scheduler/schedulerView.scala.html b/app/scheduler/views/scheduler/schedulerView.scala.html
new file mode 100644
index 000000000..7410e0617
--- /dev/null
+++ b/app/scheduler/views/scheduler/schedulerView.scala.html
@@ -0,0 +1,19 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@import scalaz.{\/}
+@(schedulerName: String, errorOrSchedulerView: kafka.manager.ApiError \/ kafka.manager.ActorModel.SMView)
+
+@theMenu = {
+ @views.html.navigation.schedulerMenu(schedulerName,"Scheduler","Summary",models.navigation.Menus.schedulerMenus(schedulerName))
+}
+
+@main(
+ "Kafka Manager",
+ menu = theMenu,
+ breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndScheduler("Summary",schedulerName))) {
+
+ @errorOrSchedulerView.fold(views.html.errors.onApiError(_),scheduler.views.html.scheduler.schedulerViewContent(schedulerName,_))
+
+}
diff --git a/app/scheduler/views/scheduler/schedulerViewContent.scala.html b/app/scheduler/views/scheduler/schedulerViewContent.scala.html
new file mode 100644
index 000000000..13199e2bb
--- /dev/null
+++ b/app/scheduler/views/scheduler/schedulerViewContent.scala.html
@@ -0,0 +1,33 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@(schedulerName: String, schedulerView: kafka.manager.ActorModel.SMView)
+
+
+
Scheduler Information
+
+
+
+ | Zookeepers | @schedulerView.schedulerConfig.curatorConfig.zkConnect.replace(","," ") |
+
+
+ | Api url | @schedulerView.schedulerConfig.apiUrl.toString |
+
+
+ | Version | @schedulerView.schedulerConfig.version.toString |
+
+
+
+
+
\ No newline at end of file
diff --git a/app/views/index.scala.html b/app/views/index.scala.html
index 9d037ed32..89d3aeb48 100644
--- a/app/views/index.scala.html
+++ b/app/views/index.scala.html
@@ -3,7 +3,8 @@
* See accompanying LICENSE file.
*@
@import scalaz.{\/}
-@(errorOrClusters: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMClusterList
+@(errorOrClusters: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMClusterList,
+errorOrSchedulers: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMSchedulerList
)(implicit af: features.ApplicationFeatures)
@main(
@@ -19,6 +20,11 @@
@errorOrClusters.fold( _ => Html(""), cl => {
views.html.cluster.pendingClusterList(cl.pending)
})
+
+ Schedulers
+ @errorOrSchedulers.fold(views.html.errors.onApiError(_), sc => {
+ scheduler.views.html.scheduler.schedulerList(sc.active)
+ })
}
diff --git a/app/views/navigation/schedulerMenu.scala.html b/app/views/navigation/schedulerMenu.scala.html
new file mode 100644
index 000000000..6b732462b
--- /dev/null
+++ b/app/views/navigation/schedulerMenu.scala.html
@@ -0,0 +1,20 @@
+@*
+* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
+* See accompanying LICENSE file.
+*@
+@(scheduler: String, menuTitle: String, menuItem: String, menuList: IndexedSeq[models.navigation.Menu])
+
+
+
+
+ @menuNav(menuTitle,menuItem,menuList)
+
+
diff --git a/build.sbt b/build.sbt
index b1e5cb94f..c899915d1 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,11 +18,16 @@ assemblyMergeStrategy in assembly := {
case other => (assemblyMergeStrategy in assembly).value(other)
}
+// Disable tests during assembly
+test in assembly := {}
+
+
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.14",
"com.typesafe.akka" %% "akka-slf4j" % "2.3.14",
"com.google.code.findbugs" % "jsr305" % "2.0.1",
"org.webjars" %% "webjars-play" % "2.3.0-2",
+ ws exclude("oauth.signpost","signpost-commonshttp4") force(),
"org.webjars" % "bootstrap" % "3.3.4",
"org.webjars" % "jquery" % "2.1.4",
"org.webjars" % "backbonejs" % "1.1.2-4",
diff --git a/conf/routes b/conf/routes
index dd0e2fb89..e56be94a6 100644
--- a/conf/routes
+++ b/conf/routes
@@ -55,6 +55,21 @@ GET /api/status/:c/availableBrokers controllers.api.KafkaH
GET /api/status/:c/:t/underReplicatedPartitions controllers.api.KafkaHealthCheck.underReplicatedPartitions(c:String,t:String)
GET /api/status/:c/:t/unavailablePartitions controllers.api.KafkaHealthCheck.unavailablePartitions(c:String,t:String)
+GET /addScheduler controllers.Cluster.addScheduler
+GET /schedulers/:s scheduler.controllers.SchedulerApplication.getScheduler(s:String)
+POST /schedulers controllers.Cluster.handleAddScheduler
+GET /schedulers/:s/addBroker scheduler.controllers.Broker.addBroker(s:String)
+POST /schedulers/:s/brokers/add scheduler.controllers.Broker.handleAddBroker(s:String)
+GET /schedulers/:s/brokers scheduler.controllers.SchedulerApplication.brokers(s:String)
+GET /schedulers/:s/brokers/:b scheduler.controllers.SchedulerApplication.broker(s: String, b:Int)
+POST /schedulers/:s/broker/:b/start scheduler.controllers.Broker.handleStartBroker(s:String, b: Int)
+POST /schedulers/:s/broker/:b/stop scheduler.controllers.Broker.handleStopBroker(s:String, b: Int)
+POST /schedulers/:s/broker/:b/remove scheduler.controllers.Broker.handleRemoveBroker(s:String, b: Int)
+GET /schedulers/:s/brokers/:b/updateBroker scheduler.controllers.Broker.updateBroker(s:String, b: Int)
+POST /schedulers/:s/brokers/:b/update scheduler.controllers.Broker.handleUpdateBroker(s:String, b: Int)
+GET /schedulers/:s/rebalanceTopics scheduler.controllers.RebalanceTopics.rebalanceTopics(s:String)
+POST /schedulers/:s/rebalanceTopics/run scheduler.controllers.RebalanceTopics.handleRebalance(s:String)
+
# Versioned Assets
GET /vassets/*file controllers.Assets.versioned(path="/public", file: Asset)
diff --git a/kafka-manager b/kafka-manager
new file mode 100755
index 000000000..2dfa3ab1e
--- /dev/null
+++ b/kafka-manager
@@ -0,0 +1,32 @@
+#!/bin/bash -Eu
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TODO: detect our OS
+if [ $(boot2docker status) != 'running' ]; then
+ echo 'Starting boot2docker VM...'
+ boot2docker up
+fi
+
+echo 'Building docker container'
+docker build -t stealthly/kafkamanager .
+
+echo 'Starting docker runtime in background'
+#docker run --rm -it stealthly/kafkamanager
+
+id=$(docker run -p 9000:9000 -t -d stealthly/kafkamanager)
+
+echo 'Kafka manager started on port 9000'
\ No newline at end of file
diff --git a/marathon.json b/marathon.json
new file mode 100644
index 000000000..2eb659e66
--- /dev/null
+++ b/marathon.json
@@ -0,0 +1,39 @@
+{
+ "id": "/kafka-manager",
+ "cpus": 0.5,
+ "mem": 256.0,
+ "instances": 1,
+ "container": {
+ "type": "DOCKER",
+ "docker": {
+ "image": "stealthly/kafkamanager",
+ "forcePullImage": true,
+ "network": "BRIDGE",
+ "portMappings": [
+ {
+ "containerPort": 9000,
+ "hostPort": 0,
+ "protocol": "tcp"
+ }
+ ],
+ "privileged": false,
+ "parameters": [
+ { "key": "tty", "value": "true" }
+ ]
+ },
+ "volumes": []
+ },
+ "env": {
+ "APPLICATION_SECRET": "letmein"
+ },
+ "healthChecks": [
+ {
+ "protocol": "HTTP",
+ "portIndex": 0,
+ "path": "/",
+ "gracePeriodSeconds": 10,
+ "intervalSeconds": 30,
+ "maxConsecutiveFailures": 3
+ }
+ ]
+}
\ No newline at end of file