From 3c813699a95b8d2b5cf0dda2a058edfdf717653b Mon Sep 17 00:00:00 2001 From: Praneeth Naramsetti Date: Wed, 10 Aug 2016 11:54:51 -0700 Subject: [PATCH 1/3] spout_aggregation_changes --- .../summingbird/storm/StormPlatform.scala | 38 +++---- .../collector/AggregatorOutputCollector.scala | 103 ++++++++++++++++++ .../TransformingOutputCollector.scala | 22 ++-- .../storm/spout/KeyValueSpout.scala | 49 +++++++-- 4 files changed, 170 insertions(+), 42 deletions(-) create mode 100644 summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 124ad47c1..bad59d9ca 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -173,7 +173,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird } } - private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSpout[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { + val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head val nodeName = stormDag.getNodeName(node) @@ -194,7 +195,6 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird } } - val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil) val isMergeableWithSource = getOrElse(stormDag, node, DEFAULT_FM_MERGEABLE_WITH_SOURCE).get val sourceParallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SOURCE_PARALLELISM)).parHint val flatMapParallelism = getOrElse(stormDag, node, DEFAULT_FM_PARALLELISM).parHint @@ -202,6 +202,10 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird if (isMergeableWithSource) { require(flatMapParallelism <= sourceParallelism, s"SourceParallelism ($sourceParallelism) must be at least as high as FlatMapParallelism ($flatMapParallelism) when merging flatMap with Source") } + val builder = BuildSummer(this, stormDag, node, jobID) + val lockedCounters = Externalizer(JobCounters.getCountersForJob(jobID).getOrElse(Nil)) + val countersForSpout: Seq[(Group, Name)] = lockedCounters.get + val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS) val registerAllMetrics = Externalizer({ context: TopologyContext => @@ -214,39 +218,29 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird StormStatProvider.registerMetrics(jobID, context, countersForSpout) SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider) }) - val hookedTormentaSpout = tormentaSpout.openHook(registerAllMetrics.get) val summerOpt: Option[SummerNode[Storm]] = stormDag.dependantsOf(node.asInstanceOf[StormNode]).collect { case s: SummerNode[Storm] => s }.headOption - val stormSpout = summerOpt match { - /* - * As the spout is being followed by a summer, we do not have map-side aggregation handled in the spout. - * This means Summer is the place where the aggregation should happen. - */ - case Some(s) => createSpoutToFeedSummer(stormDag, s, hookedTormentaSpout) - case None => hookedTormentaSpout.getSpout + case Some(s) => createSpoutToFeedSummer[K, V](stormDag, s, tormentaSpout, builder, registerAllMetrics) + case None => tormentaSpout.openHook(registerAllMetrics.get).getSpout } topologyBuilder.setSpout(nodeName, stormSpout, sourceParallelism) } - private def createSpoutToFeedSummer(stormDag: Dag[Storm], node: StormNode, spout: Spout[(Timestamp, Any)]) = { + private def createSpoutToFeedSummer[K, V](stormDag: Dag[Storm], node: StormNode, spout: Spout[(Timestamp, Any)], builder: SummerBuilder, fn: Externalizer[(TopologyContext) => Unit]) = { val summerParalellism = getOrElse(stormDag, node, DEFAULT_SUMMER_PARALLELISM) val summerBatchMultiplier = getOrElse(stormDag, node, DEFAULT_SUMMER_BATCH_MULTIPLIER) val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get) - /* - This method is called only when SourceNode is followed by SummerNode. - The Summer exists. There will be always not more than one summer, taken care by OnlinePlan. - */ - val summerProducer = node.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]] + + val summerProducer = node.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]] val batcher = summerProducer.store.mergeableBatcher - val kvinj = new KeyValueInjection[Int, CMap[(_, BatchID), (Timestamp, _)]] + val kvinj = new KeyValueInjection[(K, BatchID), (Timestamp, V)] val formattedSummerSpout = spout.map { - case (time, (k, v)) => - val newK = keyValueShards.summerIdFor(k) - val m = CMap((k, batcher.batchOf(time)) -> (time, v)) - kvinj((newK, m)) + case (time, (k: K, v: V)) => kvinj((k, batcher.batchOf(time)), (time, v)) } - new KeyValueSpout(formattedSummerSpout.getSpout) + implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup + new KeyValueSpout[(K, BatchID), (Timestamp, V)](formattedSummerSpout.getSpout, builder, keyValueShards, fn.get) + } private def scheduleSummerBolt[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala new file mode 100644 index 000000000..d2bb4b7d4 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala @@ -0,0 +1,103 @@ +package com.twitter.summingbird.storm.collector + +import com.twitter.algebird.Semigroup +import com.twitter.summingbird.online.executor.KeyValueShards +import com.twitter.summingbird.online.option.SummerBuilder +import backtype.storm.spout.SpoutOutputCollector +import com.twitter.algebird.util.summer.AsyncSummer +import com.twitter.util.{ Await, Future } +import java.util.{ List => JList } +import scala.collection.mutable.{ Map => MMap } +import scala.collection.mutable.{ MutableList => MList } +import scala.collection.{ Map => CMap } +import scala.collection.JavaConverters._ +import java.util.{ List => JList } + +class AggregatorOutputCollector[K, V: Semigroup](in: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef], summerBuilder: SummerBuilder, summerShards: KeyValueShards) extends TransformingOutputCollector(in, func) { + + var spoutCaches = MMap[String, AsyncSummer[(K, V), Map[K, V]]]() + var lastDump = System.currentTimeMillis() + var streamMessageIdTracker = MMap[String, MMap[Int, MList[Object]]]() + + def timerFlush() = { + spoutCaches.keys.foreach { stream => + val tupsOut = spoutCaches(stream).tick.map { convertToSummerInputFormat(_) } + val tups = Await.result(tupsOut) + + tups.foreach { + case (k, v) => { + val messageIdsTracker = streamMessageIdTracker(stream) + val messageIds = messageIdsTracker.remove(k) + (messageIds.isEmpty, stream.isEmpty) match { + case (true, true) => in.emit(List(k, v).asJava.asInstanceOf[JList[AnyRef]]) + case (true, false) => in.emit(stream, List(k, v).asJava.asInstanceOf[JList[AnyRef]]) + case (false, true) => in.emit(List(k, v).asJava.asInstanceOf[JList[AnyRef]], messageIds) + case (false, false) => in.emit(stream, List(k, v).asJava.asInstanceOf[JList[AnyRef]], messageIds) + } + } + } + } + } + + private def convertToSummerInputFormat(data: CMap[K, V]): CMap[Int, CMap[K, V]] = { + data.groupBy { case (k, _) => summerShards.summerIdFor(k) } + } + + private def emitData[K, V](data: Future[Traversable[(Int, CMap[K, V])]], callerfunc: (String, JList[AnyRef], scala.Any) => JList[Integer], s: String, o: scala.Any): List[Int] = { + var returns = MList[Int]() + val data_trav = Await.result(data) + data_trav + .foreach { + case (k, v) => { + val messageIdsTracker = streamMessageIdTracker(s) + val messageIds = messageIdsTracker.remove(k) + val taskIds = callerfunc(s, List(k, v).asJava.asInstanceOf[JList[AnyRef]], messageIds) + if (taskIds != null) returns ++= taskIds.asInstanceOf[JList[Int]].asScala.toList + } + } + returns.toList + } + + private def add(tuple: (K, V), o: scala.Any, s: String) = { + if (o != Nil) trackMessageId(tuple, o, s) + addToCache(tuple, s) + } + + private def addToCache(tuple: (K, V), s: String) = { + val cache = spoutCaches.get(s) + cache match { + case Some(cac) => cac.add(tuple) + case None => { + spoutCaches(s) = summerBuilder.getSummer[K, V](implicitly[Semigroup[V]]) + spoutCaches(s).add(tuple) + } + } + } + + private def trackMessageId(tuple: (K, V), o: scala.Any, s: String): Unit = { + val messageIdTracker = streamMessageIdTracker.getOrElse(s, MMap[Int, MList[Object]]()) + var messageIds = messageIdTracker.getOrElse(summerShards.summerIdFor(tuple._1), MList()) + messageIdTracker(summerShards.summerIdFor(tuple._1)) = (messageIds.+=(o.asInstanceOf[Object])) + streamMessageIdTracker(s) = messageIdTracker + } + + def emitHelper(s: String, list: JList[AnyRef], o: scala.Any)(callerFunc: (String, JList[AnyRef], scala.Any) => JList[Integer]): JList[Integer] = { + + val first: K = func(list).get(0).asInstanceOf[K] + val second: V = func(list).get(1).asInstanceOf[V] + + val emitReturn = emitData(add((first, second), o, s).map(convertToSummerInputFormat(_)), callerFunc, s, o) + emitReturn.asJava.asInstanceOf[JList[Integer]] + } + + override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = emitHelper(s, list, o)((s, l, o) => in.emit(s, l, o)) + + override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = emitHelper("", list, o)((s, l, o) => in.emit(l, o)) + + override def emit(list: JList[AnyRef]): JList[Integer] = emitHelper("", list, Nil)((s, l, o) => in.emit(l)) + + override def emit(s: String, list: JList[AnyRef]): JList[Integer] = emitHelper(s, list, Nil)((s, l, o) => in.emit(s, l)) + + override def reportError(throwable: Throwable): Unit = in.reportError(throwable) + +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala index edb4fcffc..58d7b688e 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala @@ -7,23 +7,23 @@ import java.util.{ List => JList } * The TransformingOutputCollector is used to transform the Value object when passed on by the Spout. */ -class TransformingOutputCollector(self: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef]) extends SpoutOutputCollector(self) { +class TransformingOutputCollector(in: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef]) extends SpoutOutputCollector(in) { - override def emitDirect(i: Int, s: String, list: JList[AnyRef], o: scala.AnyRef): Unit = self.emitDirect(i, s, func(list), o) + override def emitDirect(i: Int, s: String, list: JList[AnyRef], o: scala.AnyRef): Unit = in.emitDirect(i, s, func(list), o) - override def emitDirect(i: Int, list: JList[AnyRef], o: scala.AnyRef): Unit = self.emitDirect(i, func(list), o) + override def emitDirect(i: Int, list: JList[AnyRef], o: scala.AnyRef): Unit = in.emitDirect(i, func(list), o) - override def emitDirect(i: Int, s: String, list: JList[AnyRef]): Unit = self.emitDirect(i, s, func(list)) + override def emitDirect(i: Int, s: String, list: JList[AnyRef]): Unit = in.emitDirect(i, s, func(list)) - override def emitDirect(i: Int, list: JList[AnyRef]): Unit = self.emitDirect(i, func(list)) + override def emitDirect(i: Int, list: JList[AnyRef]): Unit = in.emitDirect(i, func(list)) - override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = self.emit(s, func(list), o) + override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = in.emit(s, func(list), o) - override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = self.emit(func(list), o) + override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = in.emit(func(list), o) - override def emit(list: JList[AnyRef]): JList[Integer] = self.emit(func(list)) + override def emit(list: JList[AnyRef]): JList[Integer] = in.emit(func(list)) - override def emit(s: String, list: JList[AnyRef]): JList[Integer] = self.emit(s, func(list)) + override def emit(s: String, list: JList[AnyRef]): JList[Integer] = in.emit(s, func(list)) - override def reportError(throwable: Throwable): Unit = self.reportError(throwable) -} \ No newline at end of file + override def reportError(throwable: Throwable): Unit = in.reportError(throwable) +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala index a917d1e62..8fa64c806 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala @@ -4,32 +4,63 @@ import backtype.storm.spout.SpoutOutputCollector import backtype.storm.task.TopologyContext import backtype.storm.topology.{ IRichSpout, OutputFieldsDeclarer } import backtype.storm.tuple.Fields +import com.twitter.algebird.Semigroup +import com.twitter.summingbird.online.Externalizer +import com.twitter.summingbird.online.executor.KeyValueShards +import com.twitter.summingbird.online.option.SummerBuilder import com.twitter.summingbird.storm.Constants._ import com.twitter.tormenta.spout.SpoutProxy import java.util import java.util.{ List => JList } -import com.twitter.summingbird.storm.collector.TransformingOutputCollector +import scala.collection.mutable.{ MutableList => MList } +import com.twitter.summingbird.storm.collector.{ AggregatorOutputCollector, TransformingOutputCollector } /** * This is a spout used when the spout is being followed by summer. - * It uses a TransformingOutputCollector on open. + * It uses a AggregatorOutputCollector on open. */ -class KeyValueSpout(in: IRichSpout) extends SpoutProxy { +class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBuilder, summerShards: KeyValueShards, @transient callOnOpen: (TopologyContext) => Unit) extends SpoutProxy { + + private var adapterCollector: AggregatorOutputCollector[K, V] = _ + val lockedFn = Externalizer(callOnOpen) + var lastDump = System.currentTimeMillis() override def declareOutputFields(declarer: OutputFieldsDeclarer) = { declarer.declare(new Fields(AGG_KEY, AGG_VALUE)) } - /* - * The transform is the function which unwraps the Value object to get the actual fields present in it. - */ - override def open(conf: util.Map[_, _], topologyContext: TopologyContext, outputCollector: SpoutOutputCollector): Unit = { - val adapterCollector = new TransformingOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]]) - self.open(conf, topologyContext, adapterCollector) + adapterCollector = new AggregatorOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]], summerBuilder, summerShards) + lockedFn.get(topologyContext) + in.open(conf, topologyContext, adapterCollector) + } + + override def nextTuple(): Unit = { + if (System.currentTimeMillis() - lastDump > 1000) { + adapterCollector.timerFlush() + lastDump = System.currentTimeMillis() + } + in.nextTuple() + } + + override def ack(msgId: Object): Unit = { + val msgIds = convertToList(msgId) + msgIds.foreach { super.ack(_) } + } + + override def fail(msgId: Object): Unit = { + val msgIds = convertToList(msgId) + msgIds.foreach { super.fail(_) } + } + + def convertToList(msgId: Object): MList[Object] = { + msgId match { + case Some(s) => s.asInstanceOf[MList[Object]] + case None => MList[Object]() + } } override protected def self: IRichSpout = in From 64e18450e4773d39722692405485348cdefbbb8d Mon Sep 17 00:00:00 2001 From: Praneeth Naramsetti Date: Wed, 10 Aug 2016 23:16:55 -0700 Subject: [PATCH 2/3] code refactoring and clean up considering comments --- .../summingbird/storm/StormPlatform.scala | 5 +- .../collector/AggregatorOutputCollector.scala | 98 +++++++++++-------- .../storm/spout/KeyValueSpout.scala | 19 +++- 3 files changed, 76 insertions(+), 46 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index bad59d9ca..ac6e1f666 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -203,8 +203,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird require(flatMapParallelism <= sourceParallelism, s"SourceParallelism ($sourceParallelism) must be at least as high as FlatMapParallelism ($flatMapParallelism) when merging flatMap with Source") } val builder = BuildSummer(this, stormDag, node, jobID) - val lockedCounters = Externalizer(JobCounters.getCountersForJob(jobID).getOrElse(Nil)) - val countersForSpout: Seq[(Group, Name)] = lockedCounters.get + val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil) val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS) @@ -393,7 +392,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird node match { case _: SummerNode[_] => scheduleSummerBolt(jobID, stormDag, node) case _: FlatMapNode[_] => scheduleFlatMapper(jobID, stormDag, node) - case _: SourceNode[_] => scheduleSpout(jobID, stormDag, node) + case _: SourceNode[_] => scheduleSpout[Any, Any](jobID, stormDag, node) } } PlannedTopology(config, topologyBuilder.createTopology) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala index d2bb4b7d4..6aeef4216 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala @@ -5,61 +5,81 @@ import com.twitter.summingbird.online.executor.KeyValueShards import com.twitter.summingbird.online.option.SummerBuilder import backtype.storm.spout.SpoutOutputCollector import com.twitter.algebird.util.summer.AsyncSummer -import com.twitter.util.{ Await, Future } -import java.util.{ List => JList } +import com.twitter.util.{ Await, Future, Time } import scala.collection.mutable.{ Map => MMap } import scala.collection.mutable.{ MutableList => MList } import scala.collection.{ Map => CMap } import scala.collection.JavaConverters._ import java.util.{ List => JList } -class AggregatorOutputCollector[K, V: Semigroup](in: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef], summerBuilder: SummerBuilder, summerShards: KeyValueShards) extends TransformingOutputCollector(in, func) { +class AggregatorOutputCollector[K, V: Semigroup]( + in: SpoutOutputCollector, + transform: JList[AnyRef] => JList[AnyRef], + summerBuilder: SummerBuilder, + summerShards: KeyValueShards) extends SpoutOutputCollector(in) { - var spoutCaches = MMap[String, AsyncSummer[(K, V), Map[K, V]]]() - var lastDump = System.currentTimeMillis() - var streamMessageIdTracker = MMap[String, MMap[Int, MList[Object]]]() + val spoutCaches = MMap[String, AsyncSummer[(K, V), Map[K, V]]]() + var lastDump = Time.now.inMillis + val streamMessageIdTracker = MMap[String, MMap[Int, MList[Object]]]() def timerFlush() = { - spoutCaches.keys.foreach { stream => - val tupsOut = spoutCaches(stream).tick.map { convertToSummerInputFormat(_) } - val tups = Await.result(tupsOut) - - tups.foreach { - case (k, v) => { - val messageIdsTracker = streamMessageIdTracker(stream) - val messageIds = messageIdsTracker.remove(k) - (messageIds.isEmpty, stream.isEmpty) match { - case (true, true) => in.emit(List(k, v).asJava.asInstanceOf[JList[AnyRef]]) - case (true, false) => in.emit(stream, List(k, v).asJava.asInstanceOf[JList[AnyRef]]) - case (false, true) => in.emit(List(k, v).asJava.asInstanceOf[JList[AnyRef]], messageIds) - case (false, false) => in.emit(stream, List(k, v).asJava.asInstanceOf[JList[AnyRef]], messageIds) + /* + This is a flush called from the nextTuple() of the spout. + The timerFlush is triggered with tick frequency from the spout. + */ + spoutCaches.foreach { + case (stream, _) => + val tupsOut = spoutCaches(stream).tick.map { convertToSummerInputFormat(_) } + val tups = Await.result(tupsOut) + tups.foreach { + case (k, v) => { + val messageIdsTracker = streamMessageIdTracker(stream) + val messageIds = messageIdsTracker.remove(k) + val list = List(k, v).asJava.asInstanceOf[JList[AnyRef]] + callEmit(messageIds, list, stream) } } - } } } - private def convertToSummerInputFormat(data: CMap[K, V]): CMap[Int, CMap[K, V]] = { - data.groupBy { case (k, _) => summerShards.summerIdFor(k) } + private def convertToSummerInputFormat(flushedCache: CMap[K, V]): CMap[Int, CMap[K, V]] = { + flushedCache.groupBy { case (k, _) => summerShards.summerIdFor(k) } } - private def emitData[K, V](data: Future[Traversable[(Int, CMap[K, V])]], callerfunc: (String, JList[AnyRef], scala.Any) => JList[Integer], s: String, o: scala.Any): List[Int] = { + private def emitData[K, V](cache: Future[Traversable[(Int, CMap[K, V])]], s: String): List[Int] = { + /* + The method is invoked to handle the flushed cache caused by exceeding the memoryLimit, which is called within add method. + */ var returns = MList[Int]() - val data_trav = Await.result(data) - data_trav + val flushedTups = Await.result(cache) + flushedTups .foreach { - case (k, v) => { + case (k, v) => val messageIdsTracker = streamMessageIdTracker(s) val messageIds = messageIdsTracker.remove(k) - val taskIds = callerfunc(s, List(k, v).asJava.asInstanceOf[JList[AnyRef]], messageIds) + val list = List(k, v).asJava.asInstanceOf[JList[AnyRef]] + val taskIds = callEmit(messageIds, list, s) if (taskIds != null) returns ++= taskIds.asInstanceOf[JList[Int]].asScala.toList - } } returns.toList } - private def add(tuple: (K, V), o: scala.Any, s: String) = { - if (o != Nil) trackMessageId(tuple, o, s) + private def callEmit(messageIds: Option[Any], list: JList[AnyRef], stream: String): JList[Integer] = { + /* + This is a wrapper method to call the emit with appropriate signature + based on the arguments. + */ + (messageIds.isEmpty, stream.isEmpty) match { + case (true, true) => in.emit(list) + case (true, false) => in.emit(stream, list) + case (false, true) => in.emit(list, messageIds) + case (false, false) => in.emit(stream, list, messageIds) + } + } + + private def add(tuple: (K, V), s: String, o: Option[Any] = None) = { + if (o.isDefined) + trackMessageId(tuple, o.get, s) addToCache(tuple, s) } @@ -81,23 +101,21 @@ class AggregatorOutputCollector[K, V: Semigroup](in: SpoutOutputCollector, func: streamMessageIdTracker(s) = messageIdTracker } - def emitHelper(s: String, list: JList[AnyRef], o: scala.Any)(callerFunc: (String, JList[AnyRef], scala.Any) => JList[Integer]): JList[Integer] = { + def extractAndProcessElements(s: String, list: JList[AnyRef], o: Option[Any] = None): JList[Integer] = { - val first: K = func(list).get(0).asInstanceOf[K] - val second: V = func(list).get(1).asInstanceOf[V] - - val emitReturn = emitData(add((first, second), o, s).map(convertToSummerInputFormat(_)), callerFunc, s, o) + val first: K = transform(list).get(0).asInstanceOf[K] + val second: V = transform(list).get(1).asInstanceOf[V] + val emitReturn = emitData(add((first, second), s, o).map(convertToSummerInputFormat(_)), s) emitReturn.asJava.asInstanceOf[JList[Integer]] } - override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = emitHelper(s, list, o)((s, l, o) => in.emit(s, l, o)) + override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = extractAndProcessElements(s, list, Some(o)) - override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = emitHelper("", list, o)((s, l, o) => in.emit(l, o)) + override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = extractAndProcessElements("", list, Some(o)) - override def emit(list: JList[AnyRef]): JList[Integer] = emitHelper("", list, Nil)((s, l, o) => in.emit(l)) + override def emit(list: JList[AnyRef]): JList[Integer] = extractAndProcessElements("", list) - override def emit(s: String, list: JList[AnyRef]): JList[Integer] = emitHelper(s, list, Nil)((s, l, o) => in.emit(s, l)) + override def emit(s: String, list: JList[AnyRef]): JList[Integer] = extractAndProcessElements(s, list) override def reportError(throwable: Throwable): Unit = in.reportError(throwable) - } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala index 8fa64c806..fe890afdd 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala @@ -14,6 +14,7 @@ import java.util import java.util.{ List => JList } import scala.collection.mutable.{ MutableList => MList } import com.twitter.summingbird.storm.collector.{ AggregatorOutputCollector, TransformingOutputCollector } +import com.twitter.util.{ Duration, Time } /** * This is a spout used when the spout is being followed by summer. @@ -22,9 +23,10 @@ import com.twitter.summingbird.storm.collector.{ AggregatorOutputCollector, Tran class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBuilder, summerShards: KeyValueShards, @transient callOnOpen: (TopologyContext) => Unit) extends SpoutProxy { + private final val tickFrequency = 1000 private var adapterCollector: AggregatorOutputCollector[K, V] = _ val lockedFn = Externalizer(callOnOpen) - var lastDump = System.currentTimeMillis() + var lastDump = Time.now.inMillis override def declareOutputFields(declarer: OutputFieldsDeclarer) = { declarer.declare(new Fields(AGG_KEY, AGG_VALUE)) @@ -39,19 +41,30 @@ class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBu } override def nextTuple(): Unit = { - if (System.currentTimeMillis() - lastDump > 1000) { + /* + This method is used to call the tick on the cache. + */ + if (Time.now.inMillis - lastDump > tickFrequency) { adapterCollector.timerFlush() - lastDump = System.currentTimeMillis() + lastDump = Time.now.inMillis } in.nextTuple() } override def ack(msgId: Object): Unit = { + /* + The msgId is a list of individual messageIds of emitted tuples + which are aggregated and emitted out as a single tuple. + */ val msgIds = convertToList(msgId) msgIds.foreach { super.ack(_) } } override def fail(msgId: Object): Unit = { + /* + The msgId is a list of individual messageIds of emitted tuples + which are aggregated and emitted out as a single tuple. + */ val msgIds = convertToList(msgId) msgIds.foreach { super.fail(_) } } From 49557fc7d82ec9691fa9c8424c27b9033724c129 Mon Sep 17 00:00:00 2001 From: Praneeth Naramsetti Date: Thu, 11 Aug 2016 17:55:29 -0700 Subject: [PATCH 3/3] comment changes - refactored --- .../summingbird/storm/StormPlatform.scala | 18 ++-- .../collector/AggregatorOutputCollector.scala | 87 +++++++++---------- .../storm/spout/KeyValueSpout.scala | 14 ++- 3 files changed, 58 insertions(+), 61 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index ac6e1f666..601c16bf9 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -173,7 +173,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird } } - private def scheduleSpout[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { + private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head val nodeName = stormDag.getNodeName(node) @@ -202,12 +202,11 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird if (isMergeableWithSource) { require(flatMapParallelism <= sourceParallelism, s"SourceParallelism ($sourceParallelism) must be at least as high as FlatMapParallelism ($flatMapParallelism) when merging flatMap with Source") } - val builder = BuildSummer(this, stormDag, node, jobID) - val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil) val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS) val registerAllMetrics = Externalizer({ context: TopologyContext => + val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil) // Register metrics passed in SpoutStormMetrics option. metrics.metrics().foreach { x: StormMetric[IMetric] => @@ -217,16 +216,17 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird StormStatProvider.registerMetrics(jobID, context, countersForSpout) SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider) }) - + val hookedTormentaSpout = tormentaSpout.openHook(registerAllMetrics.get) val summerOpt: Option[SummerNode[Storm]] = stormDag.dependantsOf(node.asInstanceOf[StormNode]).collect { case s: SummerNode[Storm] => s }.headOption val stormSpout = summerOpt match { - case Some(s) => createSpoutToFeedSummer[K, V](stormDag, s, tormentaSpout, builder, registerAllMetrics) - case None => tormentaSpout.openHook(registerAllMetrics.get).getSpout + case Some(s) => createSpoutToFeedSummer[Any, Any](stormDag, s, jobID, hookedTormentaSpout) + case None => hookedTormentaSpout.getSpout } topologyBuilder.setSpout(nodeName, stormSpout, sourceParallelism) } - private def createSpoutToFeedSummer[K, V](stormDag: Dag[Storm], node: StormNode, spout: Spout[(Timestamp, Any)], builder: SummerBuilder, fn: Externalizer[(TopologyContext) => Unit]) = { + private def createSpoutToFeedSummer[K, V](stormDag: Dag[Storm], node: StormNode, jobID: JobId, spout: Spout[(Timestamp, Any)]) = { + val builder = BuildSummer(this, stormDag, node, jobID) val summerParalellism = getOrElse(stormDag, node, DEFAULT_SUMMER_PARALLELISM) val summerBatchMultiplier = getOrElse(stormDag, node, DEFAULT_SUMMER_BATCH_MULTIPLIER) val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get) @@ -238,7 +238,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird case (time, (k: K, v: V)) => kvinj((k, batcher.batchOf(time)), (time, v)) } implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup - new KeyValueSpout[(K, BatchID), (Timestamp, V)](formattedSummerSpout.getSpout, builder, keyValueShards, fn.get) + new KeyValueSpout[(K, BatchID), (Timestamp, V)](formattedSummerSpout.getSpout, builder, keyValueShards) } @@ -392,7 +392,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird node match { case _: SummerNode[_] => scheduleSummerBolt(jobID, stormDag, node) case _: FlatMapNode[_] => scheduleFlatMapper(jobID, stormDag, node) - case _: SourceNode[_] => scheduleSpout[Any, Any](jobID, stormDag, node) + case _: SourceNode[_] => scheduleSpout(jobID, stormDag, node) } } PlannedTopology(config, topologyBuilder.createTopology) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala index 6aeef4216..10c5e1cb7 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala @@ -12,14 +12,24 @@ import scala.collection.{ Map => CMap } import scala.collection.JavaConverters._ import java.util.{ List => JList } +/** + * + * AggregatorOutputCollector is a wrapper around the SpoutOutputCollector. + * AsyncSummer is used to aggregate the tuples. + * Different streams have seperated aggregators and caches. + * + */ class AggregatorOutputCollector[K, V: Semigroup]( in: SpoutOutputCollector, - transform: JList[AnyRef] => JList[AnyRef], summerBuilder: SummerBuilder, summerShards: KeyValueShards) extends SpoutOutputCollector(in) { + // Map keeps track of summers corresponding to streams. val spoutCaches = MMap[String, AsyncSummer[(K, V), Map[K, V]]]() + var lastDump = Time.now.inMillis + + // The Map keeps track of batch of aggregated tuples' messageIds. It also has a stream level tracking. val streamMessageIdTracker = MMap[String, MMap[Int, MList[Object]]]() def timerFlush() = { @@ -28,47 +38,37 @@ class AggregatorOutputCollector[K, V: Semigroup]( The timerFlush is triggered with tick frequency from the spout. */ spoutCaches.foreach { - case (stream, _) => - val tupsOut = spoutCaches(stream).tick.map { convertToSummerInputFormat(_) } - val tups = Await.result(tupsOut) - tups.foreach { - case (k, v) => { - val messageIdsTracker = streamMessageIdTracker(stream) - val messageIds = messageIdsTracker.remove(k) - val list = List(k, v).asJava.asInstanceOf[JList[AnyRef]] - callEmit(messageIds, list, stream) - } - } + case (stream, cache) => + val tupsOut = cache.tick.map { convertToSummerInputFormat(_) } + emitData(tupsOut, stream) } } - private def convertToSummerInputFormat(flushedCache: CMap[K, V]): CMap[Int, CMap[K, V]] = { + private def convertToSummerInputFormat(flushedCache: CMap[K, V]): CMap[Int, CMap[K, V]] = flushedCache.groupBy { case (k, _) => summerShards.summerIdFor(k) } - } - private def emitData[K, V](cache: Future[Traversable[(Int, CMap[K, V])]], s: String): List[Int] = { - /* - The method is invoked to handle the flushed cache caused by exceeding the memoryLimit, which is called within add method. - */ - var returns = MList[Int]() + /* + The method is invoked to handle the flushed cache caused by + exceeding the memoryLimit, which is called within add method. + */ + private def emitData[K, V](cache: Future[Traversable[(Int, CMap[K, V])]], streamId: String): List[Int] = { val flushedTups = Await.result(cache) - flushedTups - .foreach { + val messageIdsTracker = streamMessageIdTracker(streamId) + val returns = flushedTups.toList + .map { case (k, v) => - val messageIdsTracker = streamMessageIdTracker(s) val messageIds = messageIdsTracker.remove(k) val list = List(k, v).asJava.asInstanceOf[JList[AnyRef]] - val taskIds = callEmit(messageIds, list, s) - if (taskIds != null) returns ++= taskIds.asInstanceOf[JList[Int]].asScala.toList + callEmit(messageIds, list, streamId) } - returns.toList + returns.flatten } + /* + This is a wrapper method to call the emit with appropriate signature + based on the arguments. + */ private def callEmit(messageIds: Option[Any], list: JList[AnyRef], stream: String): JList[Integer] = { - /* - This is a wrapper method to call the emit with appropriate signature - based on the arguments. - */ (messageIds.isEmpty, stream.isEmpty) match { case (true, true) => in.emit(list) case (true, false) => in.emit(stream, list) @@ -77,19 +77,18 @@ class AggregatorOutputCollector[K, V: Semigroup]( } } - private def add(tuple: (K, V), s: String, o: Option[Any] = None) = { - if (o.isDefined) - trackMessageId(tuple, o.get, s) - addToCache(tuple, s) + private def add(tuple: (K, V), streamid: String, messageId: Option[Any] = None) = { + if (messageId.isDefined) + trackMessageId(tuple, messageId.get, streamid) + addToCache(tuple, streamid) } - private def addToCache(tuple: (K, V), s: String) = { - val cache = spoutCaches.get(s) - cache match { + private def addToCache(tuple: (K, V), streamid: String) = { + spoutCaches.get(streamid) match { case Some(cac) => cac.add(tuple) case None => { - spoutCaches(s) = summerBuilder.getSummer[K, V](implicitly[Semigroup[V]]) - spoutCaches(s).add(tuple) + spoutCaches(streamid) = summerBuilder.getSummer[K, V](implicitly[Semigroup[V]]) + spoutCaches(streamid).add(tuple) } } } @@ -97,15 +96,15 @@ class AggregatorOutputCollector[K, V: Semigroup]( private def trackMessageId(tuple: (K, V), o: scala.Any, s: String): Unit = { val messageIdTracker = streamMessageIdTracker.getOrElse(s, MMap[Int, MList[Object]]()) var messageIds = messageIdTracker.getOrElse(summerShards.summerIdFor(tuple._1), MList()) - messageIdTracker(summerShards.summerIdFor(tuple._1)) = (messageIds.+=(o.asInstanceOf[Object])) + messageIdTracker(summerShards.summerIdFor(tuple._1)) = ( messageIds += o.asInstanceOf[Object] ) streamMessageIdTracker(s) = messageIdTracker } - def extractAndProcessElements(s: String, list: JList[AnyRef], o: Option[Any] = None): JList[Integer] = { - - val first: K = transform(list).get(0).asInstanceOf[K] - val second: V = transform(list).get(1).asInstanceOf[V] - val emitReturn = emitData(add((first, second), s, o).map(convertToSummerInputFormat(_)), s) + def extractAndProcessElements(streamId: String, list: JList[AnyRef], messageId: Option[Any] = None): JList[Integer] = { + val listKV = list.get(0).asInstanceOf[JList[AnyRef]] + val first: K = listKV.get(0).asInstanceOf[K] + val second: V = listKV.get(1).asInstanceOf[V] + val emitReturn = emitData(add((first, second), streamId, messageId).map(convertToSummerInputFormat(_)), streamId) emitReturn.asJava.asInstanceOf[JList[Integer]] } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala index fe890afdd..f1f2b6012 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala @@ -21,12 +21,11 @@ import com.twitter.util.{ Duration, Time } * It uses a AggregatorOutputCollector on open. */ -class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBuilder, summerShards: KeyValueShards, @transient callOnOpen: (TopologyContext) => Unit) extends SpoutProxy { +class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBuilder, summerShards: KeyValueShards) extends SpoutProxy { - private final val tickFrequency = 1000 + private final val tickFrequency = Duration.fromMilliseconds(1000) private var adapterCollector: AggregatorOutputCollector[K, V] = _ - val lockedFn = Externalizer(callOnOpen) - var lastDump = Time.now.inMillis + var lastDump = Time.now override def declareOutputFields(declarer: OutputFieldsDeclarer) = { declarer.declare(new Fields(AGG_KEY, AGG_VALUE)) @@ -35,8 +34,7 @@ class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBu override def open(conf: util.Map[_, _], topologyContext: TopologyContext, outputCollector: SpoutOutputCollector): Unit = { - adapterCollector = new AggregatorOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]], summerBuilder, summerShards) - lockedFn.get(topologyContext) + adapterCollector = new AggregatorOutputCollector(outputCollector, summerBuilder, summerShards) in.open(conf, topologyContext, adapterCollector) } @@ -44,9 +42,9 @@ class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBu /* This method is used to call the tick on the cache. */ - if (Time.now.inMillis - lastDump > tickFrequency) { + if (Time.now - lastDump > tickFrequency) { adapterCollector.timerFlush() - lastDump = Time.now.inMillis + lastDump = Time.now } in.nextTuple() }