diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 124ad47c1..601c16bf9 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -174,6 +174,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird } private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { + val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head val nodeName = stormDag.getNodeName(node) @@ -194,7 +195,6 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird } } - val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil) val isMergeableWithSource = getOrElse(stormDag, node, DEFAULT_FM_MERGEABLE_WITH_SOURCE).get val sourceParallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SOURCE_PARALLELISM)).parHint val flatMapParallelism = getOrElse(stormDag, node, DEFAULT_FM_PARALLELISM).parHint @@ -202,9 +202,11 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird if (isMergeableWithSource) { require(flatMapParallelism <= sourceParallelism, s"SourceParallelism ($sourceParallelism) must be at least as high as FlatMapParallelism ($flatMapParallelism) when merging flatMap with Source") } + val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS) val registerAllMetrics = Externalizer({ context: TopologyContext => + val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil) // Register metrics passed in SpoutStormMetrics option. metrics.metrics().foreach { x: StormMetric[IMetric] => @@ -215,38 +217,29 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider) }) val hookedTormentaSpout = tormentaSpout.openHook(registerAllMetrics.get) - val summerOpt: Option[SummerNode[Storm]] = stormDag.dependantsOf(node.asInstanceOf[StormNode]).collect { case s: SummerNode[Storm] => s }.headOption - val stormSpout = summerOpt match { - /* - * As the spout is being followed by a summer, we do not have map-side aggregation handled in the spout. - * This means Summer is the place where the aggregation should happen. - */ - case Some(s) => createSpoutToFeedSummer(stormDag, s, hookedTormentaSpout) + case Some(s) => createSpoutToFeedSummer[Any, Any](stormDag, s, jobID, hookedTormentaSpout) case None => hookedTormentaSpout.getSpout } topologyBuilder.setSpout(nodeName, stormSpout, sourceParallelism) } - private def createSpoutToFeedSummer(stormDag: Dag[Storm], node: StormNode, spout: Spout[(Timestamp, Any)]) = { + private def createSpoutToFeedSummer[K, V](stormDag: Dag[Storm], node: StormNode, jobID: JobId, spout: Spout[(Timestamp, Any)]) = { + val builder = BuildSummer(this, stormDag, node, jobID) val summerParalellism = getOrElse(stormDag, node, DEFAULT_SUMMER_PARALLELISM) val summerBatchMultiplier = getOrElse(stormDag, node, DEFAULT_SUMMER_BATCH_MULTIPLIER) val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get) - /* - This method is called only when SourceNode is followed by SummerNode. - The Summer exists. There will be always not more than one summer, taken care by OnlinePlan. - */ - val summerProducer = node.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]] + + val summerProducer = node.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]] val batcher = summerProducer.store.mergeableBatcher - val kvinj = new KeyValueInjection[Int, CMap[(_, BatchID), (Timestamp, _)]] + val kvinj = new KeyValueInjection[(K, BatchID), (Timestamp, V)] val formattedSummerSpout = spout.map { - case (time, (k, v)) => - val newK = keyValueShards.summerIdFor(k) - val m = CMap((k, batcher.batchOf(time)) -> (time, v)) - kvinj((newK, m)) + case (time, (k: K, v: V)) => kvinj((k, batcher.batchOf(time)), (time, v)) } - new KeyValueSpout(formattedSummerSpout.getSpout) + implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup + new KeyValueSpout[(K, BatchID), (Timestamp, V)](formattedSummerSpout.getSpout, builder, keyValueShards) + } private def scheduleSummerBolt[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala new file mode 100644 index 000000000..10c5e1cb7 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/AggregatorOutputCollector.scala @@ -0,0 +1,120 @@ +package com.twitter.summingbird.storm.collector + +import com.twitter.algebird.Semigroup +import com.twitter.summingbird.online.executor.KeyValueShards +import com.twitter.summingbird.online.option.SummerBuilder +import backtype.storm.spout.SpoutOutputCollector +import com.twitter.algebird.util.summer.AsyncSummer +import com.twitter.util.{ Await, Future, Time } +import scala.collection.mutable.{ Map => MMap } +import scala.collection.mutable.{ MutableList => MList } +import scala.collection.{ Map => CMap } +import scala.collection.JavaConverters._ +import java.util.{ List => JList } + +/** + * + * AggregatorOutputCollector is a wrapper around the SpoutOutputCollector. + * AsyncSummer is used to aggregate the tuples. + * Different streams have seperated aggregators and caches. + * + */ +class AggregatorOutputCollector[K, V: Semigroup]( + in: SpoutOutputCollector, + summerBuilder: SummerBuilder, + summerShards: KeyValueShards) extends SpoutOutputCollector(in) { + + // Map keeps track of summers corresponding to streams. + val spoutCaches = MMap[String, AsyncSummer[(K, V), Map[K, V]]]() + + var lastDump = Time.now.inMillis + + // The Map keeps track of batch of aggregated tuples' messageIds. It also has a stream level tracking. + val streamMessageIdTracker = MMap[String, MMap[Int, MList[Object]]]() + + def timerFlush() = { + /* + This is a flush called from the nextTuple() of the spout. + The timerFlush is triggered with tick frequency from the spout. + */ + spoutCaches.foreach { + case (stream, cache) => + val tupsOut = cache.tick.map { convertToSummerInputFormat(_) } + emitData(tupsOut, stream) + } + } + + private def convertToSummerInputFormat(flushedCache: CMap[K, V]): CMap[Int, CMap[K, V]] = + flushedCache.groupBy { case (k, _) => summerShards.summerIdFor(k) } + + /* + The method is invoked to handle the flushed cache caused by + exceeding the memoryLimit, which is called within add method. + */ + private def emitData[K, V](cache: Future[Traversable[(Int, CMap[K, V])]], streamId: String): List[Int] = { + val flushedTups = Await.result(cache) + val messageIdsTracker = streamMessageIdTracker(streamId) + val returns = flushedTups.toList + .map { + case (k, v) => + val messageIds = messageIdsTracker.remove(k) + val list = List(k, v).asJava.asInstanceOf[JList[AnyRef]] + callEmit(messageIds, list, streamId) + } + returns.flatten + } + + /* + This is a wrapper method to call the emit with appropriate signature + based on the arguments. + */ + private def callEmit(messageIds: Option[Any], list: JList[AnyRef], stream: String): JList[Integer] = { + (messageIds.isEmpty, stream.isEmpty) match { + case (true, true) => in.emit(list) + case (true, false) => in.emit(stream, list) + case (false, true) => in.emit(list, messageIds) + case (false, false) => in.emit(stream, list, messageIds) + } + } + + private def add(tuple: (K, V), streamid: String, messageId: Option[Any] = None) = { + if (messageId.isDefined) + trackMessageId(tuple, messageId.get, streamid) + addToCache(tuple, streamid) + } + + private def addToCache(tuple: (K, V), streamid: String) = { + spoutCaches.get(streamid) match { + case Some(cac) => cac.add(tuple) + case None => { + spoutCaches(streamid) = summerBuilder.getSummer[K, V](implicitly[Semigroup[V]]) + spoutCaches(streamid).add(tuple) + } + } + } + + private def trackMessageId(tuple: (K, V), o: scala.Any, s: String): Unit = { + val messageIdTracker = streamMessageIdTracker.getOrElse(s, MMap[Int, MList[Object]]()) + var messageIds = messageIdTracker.getOrElse(summerShards.summerIdFor(tuple._1), MList()) + messageIdTracker(summerShards.summerIdFor(tuple._1)) = ( messageIds += o.asInstanceOf[Object] ) + streamMessageIdTracker(s) = messageIdTracker + } + + def extractAndProcessElements(streamId: String, list: JList[AnyRef], messageId: Option[Any] = None): JList[Integer] = { + val listKV = list.get(0).asInstanceOf[JList[AnyRef]] + val first: K = listKV.get(0).asInstanceOf[K] + val second: V = listKV.get(1).asInstanceOf[V] + val emitReturn = emitData(add((first, second), streamId, messageId).map(convertToSummerInputFormat(_)), streamId) + emitReturn.asJava.asInstanceOf[JList[Integer]] + } + + override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = extractAndProcessElements(s, list, Some(o)) + + override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = extractAndProcessElements("", list, Some(o)) + + override def emit(list: JList[AnyRef]): JList[Integer] = extractAndProcessElements("", list) + + override def emit(s: String, list: JList[AnyRef]): JList[Integer] = extractAndProcessElements(s, list) + + override def reportError(throwable: Throwable): Unit = in.reportError(throwable) +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala index edb4fcffc..58d7b688e 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/collector/TransformingOutputCollector.scala @@ -7,23 +7,23 @@ import java.util.{ List => JList } * The TransformingOutputCollector is used to transform the Value object when passed on by the Spout. */ -class TransformingOutputCollector(self: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef]) extends SpoutOutputCollector(self) { +class TransformingOutputCollector(in: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef]) extends SpoutOutputCollector(in) { - override def emitDirect(i: Int, s: String, list: JList[AnyRef], o: scala.AnyRef): Unit = self.emitDirect(i, s, func(list), o) + override def emitDirect(i: Int, s: String, list: JList[AnyRef], o: scala.AnyRef): Unit = in.emitDirect(i, s, func(list), o) - override def emitDirect(i: Int, list: JList[AnyRef], o: scala.AnyRef): Unit = self.emitDirect(i, func(list), o) + override def emitDirect(i: Int, list: JList[AnyRef], o: scala.AnyRef): Unit = in.emitDirect(i, func(list), o) - override def emitDirect(i: Int, s: String, list: JList[AnyRef]): Unit = self.emitDirect(i, s, func(list)) + override def emitDirect(i: Int, s: String, list: JList[AnyRef]): Unit = in.emitDirect(i, s, func(list)) - override def emitDirect(i: Int, list: JList[AnyRef]): Unit = self.emitDirect(i, func(list)) + override def emitDirect(i: Int, list: JList[AnyRef]): Unit = in.emitDirect(i, func(list)) - override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = self.emit(s, func(list), o) + override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = in.emit(s, func(list), o) - override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = self.emit(func(list), o) + override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = in.emit(func(list), o) - override def emit(list: JList[AnyRef]): JList[Integer] = self.emit(func(list)) + override def emit(list: JList[AnyRef]): JList[Integer] = in.emit(func(list)) - override def emit(s: String, list: JList[AnyRef]): JList[Integer] = self.emit(s, func(list)) + override def emit(s: String, list: JList[AnyRef]): JList[Integer] = in.emit(s, func(list)) - override def reportError(throwable: Throwable): Unit = self.reportError(throwable) -} \ No newline at end of file + override def reportError(throwable: Throwable): Unit = in.reportError(throwable) +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala index a917d1e62..f1f2b6012 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/spout/KeyValueSpout.scala @@ -4,32 +4,74 @@ import backtype.storm.spout.SpoutOutputCollector import backtype.storm.task.TopologyContext import backtype.storm.topology.{ IRichSpout, OutputFieldsDeclarer } import backtype.storm.tuple.Fields +import com.twitter.algebird.Semigroup +import com.twitter.summingbird.online.Externalizer +import com.twitter.summingbird.online.executor.KeyValueShards +import com.twitter.summingbird.online.option.SummerBuilder import com.twitter.summingbird.storm.Constants._ import com.twitter.tormenta.spout.SpoutProxy import java.util import java.util.{ List => JList } -import com.twitter.summingbird.storm.collector.TransformingOutputCollector +import scala.collection.mutable.{ MutableList => MList } +import com.twitter.summingbird.storm.collector.{ AggregatorOutputCollector, TransformingOutputCollector } +import com.twitter.util.{ Duration, Time } /** * This is a spout used when the spout is being followed by summer. - * It uses a TransformingOutputCollector on open. + * It uses a AggregatorOutputCollector on open. */ -class KeyValueSpout(in: IRichSpout) extends SpoutProxy { +class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBuilder, summerShards: KeyValueShards) extends SpoutProxy { + + private final val tickFrequency = Duration.fromMilliseconds(1000) + private var adapterCollector: AggregatorOutputCollector[K, V] = _ + var lastDump = Time.now override def declareOutputFields(declarer: OutputFieldsDeclarer) = { declarer.declare(new Fields(AGG_KEY, AGG_VALUE)) } - /* - * The transform is the function which unwraps the Value object to get the actual fields present in it. - */ - override def open(conf: util.Map[_, _], topologyContext: TopologyContext, outputCollector: SpoutOutputCollector): Unit = { - val adapterCollector = new TransformingOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]]) - self.open(conf, topologyContext, adapterCollector) + adapterCollector = new AggregatorOutputCollector(outputCollector, summerBuilder, summerShards) + in.open(conf, topologyContext, adapterCollector) + } + + override def nextTuple(): Unit = { + /* + This method is used to call the tick on the cache. + */ + if (Time.now - lastDump > tickFrequency) { + adapterCollector.timerFlush() + lastDump = Time.now + } + in.nextTuple() + } + + override def ack(msgId: Object): Unit = { + /* + The msgId is a list of individual messageIds of emitted tuples + which are aggregated and emitted out as a single tuple. + */ + val msgIds = convertToList(msgId) + msgIds.foreach { super.ack(_) } + } + + override def fail(msgId: Object): Unit = { + /* + The msgId is a list of individual messageIds of emitted tuples + which are aggregated and emitted out as a single tuple. + */ + val msgIds = convertToList(msgId) + msgIds.foreach { super.fail(_) } + } + + def convertToList(msgId: Object): MList[Object] = { + msgId match { + case Some(s) => s.asInstanceOf[MList[Object]] + case None => MList[Object]() + } } override protected def self: IRichSpout = in