Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
}

private def scheduleSpout(jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {

val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head
val nodeName = stormDag.getNodeName(node)

Expand All @@ -194,17 +195,18 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
}
}

val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil)
val isMergeableWithSource = getOrElse(stormDag, node, DEFAULT_FM_MERGEABLE_WITH_SOURCE).get
val sourceParallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SOURCE_PARALLELISM)).parHint
val flatMapParallelism = getOrElse(stormDag, node, DEFAULT_FM_PARALLELISM).parHint

if (isMergeableWithSource) {
require(flatMapParallelism <= sourceParallelism, s"SourceParallelism ($sourceParallelism) must be at least as high as FlatMapParallelism ($flatMapParallelism) when merging flatMap with Source")
}

val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS)

val registerAllMetrics = Externalizer({ context: TopologyContext =>
val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil)
// Register metrics passed in SpoutStormMetrics option.
metrics.metrics().foreach {
x: StormMetric[IMetric] =>
Expand All @@ -215,38 +217,29 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider)
})
val hookedTormentaSpout = tormentaSpout.openHook(registerAllMetrics.get)

val summerOpt: Option[SummerNode[Storm]] = stormDag.dependantsOf(node.asInstanceOf[StormNode]).collect { case s: SummerNode[Storm] => s }.headOption

val stormSpout = summerOpt match {
/*
* As the spout is being followed by a summer, we do not have map-side aggregation handled in the spout.
* This means Summer is the place where the aggregation should happen.
*/
case Some(s) => createSpoutToFeedSummer(stormDag, s, hookedTormentaSpout)
case Some(s) => createSpoutToFeedSummer[Any, Any](stormDag, s, jobID, hookedTormentaSpout)
case None => hookedTormentaSpout.getSpout
}
topologyBuilder.setSpout(nodeName, stormSpout, sourceParallelism)
}

private def createSpoutToFeedSummer(stormDag: Dag[Storm], node: StormNode, spout: Spout[(Timestamp, Any)]) = {
private def createSpoutToFeedSummer[K, V](stormDag: Dag[Storm], node: StormNode, jobID: JobId, spout: Spout[(Timestamp, Any)]) = {
val builder = BuildSummer(this, stormDag, node, jobID)
val summerParalellism = getOrElse(stormDag, node, DEFAULT_SUMMER_PARALLELISM)
val summerBatchMultiplier = getOrElse(stormDag, node, DEFAULT_SUMMER_BATCH_MULTIPLIER)
val keyValueShards = executor.KeyValueShards(summerParalellism.parHint * summerBatchMultiplier.get)
/*
This method is called only when SourceNode is followed by SummerNode.
The Summer exists. There will be always not more than one summer, taken care by OnlinePlan.
*/
val summerProducer = node.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, _, _]]

val summerProducer = node.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]]
val batcher = summerProducer.store.mergeableBatcher
val kvinj = new KeyValueInjection[Int, CMap[(_, BatchID), (Timestamp, _)]]
val kvinj = new KeyValueInjection[(K, BatchID), (Timestamp, V)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to TransformingOutputCollector, etc.

val formattedSummerSpout = spout.map {
case (time, (k, v)) =>
val newK = keyValueShards.summerIdFor(k)
val m = CMap((k, batcher.batchOf(time)) -> (time, v))
kvinj((newK, m))
case (time, (k: K, v: V)) => kvinj((k, batcher.batchOf(time)), (time, v))
}
new KeyValueSpout(formattedSummerSpout.getSpout)
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup
new KeyValueSpout[(K, BatchID), (Timestamp, V)](formattedSummerSpout.getSpout, builder, keyValueShards)

}

private def scheduleSummerBolt[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.twitter.summingbird.storm.collector

import com.twitter.algebird.Semigroup
import com.twitter.summingbird.online.executor.KeyValueShards
import com.twitter.summingbird.online.option.SummerBuilder
import backtype.storm.spout.SpoutOutputCollector
import com.twitter.algebird.util.summer.AsyncSummer
import com.twitter.util.{ Await, Future, Time }
import scala.collection.mutable.{ Map => MMap }
import scala.collection.mutable.{ MutableList => MList }
import scala.collection.{ Map => CMap }
import scala.collection.JavaConverters._
import java.util.{ List => JList }

/**
*
* AggregatorOutputCollector is a wrapper around the SpoutOutputCollector.
* AsyncSummer is used to aggregate the tuples.
* Different streams have seperated aggregators and caches.
*
*/
class AggregatorOutputCollector[K, V: Semigroup](

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comment here.

in: SpoutOutputCollector,
summerBuilder: SummerBuilder,
summerShards: KeyValueShards) extends SpoutOutputCollector(in) {

// Map keeps track of summers corresponding to streams.
val spoutCaches = MMap[String, AsyncSummer[(K, V), Map[K, V]]]()

var lastDump = Time.now.inMillis

// The Map keeps track of batch of aggregated tuples' messageIds. It also has a stream level tracking.
val streamMessageIdTracker = MMap[String, MMap[Int, MList[Object]]]()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment here specifying the Map structure.


def timerFlush() = {
/*
This is a flush called from the nextTuple() of the spout.
The timerFlush is triggered with tick frequency from the spout.
*/
spoutCaches.foreach {
case (stream, cache) =>
val tupsOut = cache.tick.map { convertToSummerInputFormat(_) }
emitData(tupsOut, stream)
}
}

private def convertToSummerInputFormat(flushedCache: CMap[K, V]): CMap[Int, CMap[K, V]] =
flushedCache.groupBy { case (k, _) => summerShards.summerIdFor(k) }

/*
The method is invoked to handle the flushed cache caused by
exceeding the memoryLimit, which is called within add method.
*/
private def emitData[K, V](cache: Future[Traversable[(Int, CMap[K, V])]], streamId: String): List[Int] = {
val flushedTups = Await.result(cache)
val messageIdsTracker = streamMessageIdTracker(streamId)
val returns = flushedTups.toList
.map {
case (k, v) =>
val messageIds = messageIdsTracker.remove(k)
val list = List(k, v).asJava.asInstanceOf[JList[AnyRef]]
callEmit(messageIds, list, streamId)
}
returns.flatten
}

/*
This is a wrapper method to call the emit with appropriate signature
based on the arguments.
*/
private def callEmit(messageIds: Option[Any], list: JList[AnyRef], stream: String): JList[Integer] = {
(messageIds.isEmpty, stream.isEmpty) match {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This special handling of empty stream name seems odd to me. What does empty stream name signify? Can we use Option[String] instead of relying on special value of stream name?

case (true, true) => in.emit(list)
case (true, false) => in.emit(stream, list)
case (false, true) => in.emit(list, messageIds)
case (false, false) => in.emit(stream, list, messageIds)
}
}

private def add(tuple: (K, V), streamid: String, messageId: Option[Any] = None) = {
if (messageId.isDefined)
trackMessageId(tuple, messageId.get, streamid)
addToCache(tuple, streamid)
}

private def addToCache(tuple: (K, V), streamid: String) = {
spoutCaches.get(streamid) match {
case Some(cac) => cac.add(tuple)
case None => {
spoutCaches(streamid) = summerBuilder.getSummer[K, V](implicitly[Semigroup[V]])
spoutCaches(streamid).add(tuple)
}
}
}

private def trackMessageId(tuple: (K, V), o: scala.Any, s: String): Unit = {
val messageIdTracker = streamMessageIdTracker.getOrElse(s, MMap[Int, MList[Object]]())
var messageIds = messageIdTracker.getOrElse(summerShards.summerIdFor(tuple._1), MList())
messageIdTracker(summerShards.summerIdFor(tuple._1)) = ( messageIds += o.asInstanceOf[Object] )
streamMessageIdTracker(s) = messageIdTracker
}

def extractAndProcessElements(streamId: String, list: JList[AnyRef], messageId: Option[Any] = None): JList[Integer] = {
val listKV = list.get(0).asInstanceOf[JList[AnyRef]]
val first: K = listKV.get(0).asInstanceOf[K]
val second: V = listKV.get(1).asInstanceOf[V]
val emitReturn = emitData(add((first, second), streamId, messageId).map(convertToSummerInputFormat(_)), streamId)
emitReturn.asJava.asInstanceOf[JList[Integer]]
}

override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = extractAndProcessElements(s, list, Some(o))

override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = extractAndProcessElements("", list, Some(o))

override def emit(list: JList[AnyRef]): JList[Integer] = extractAndProcessElements("", list)

override def emit(s: String, list: JList[AnyRef]): JList[Integer] = extractAndProcessElements(s, list)

override def reportError(throwable: Throwable): Unit = in.reportError(throwable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ import java.util.{ List => JList }
* The TransformingOutputCollector is used to transform the Value object when passed on by the Spout.
*/

class TransformingOutputCollector(self: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef]) extends SpoutOutputCollector(self) {
class TransformingOutputCollector(in: SpoutOutputCollector, func: JList[AnyRef] => JList[AnyRef]) extends SpoutOutputCollector(in) {

override def emitDirect(i: Int, s: String, list: JList[AnyRef], o: scala.AnyRef): Unit = self.emitDirect(i, s, func(list), o)
override def emitDirect(i: Int, s: String, list: JList[AnyRef], o: scala.AnyRef): Unit = in.emitDirect(i, s, func(list), o)

override def emitDirect(i: Int, list: JList[AnyRef], o: scala.AnyRef): Unit = self.emitDirect(i, func(list), o)
override def emitDirect(i: Int, list: JList[AnyRef], o: scala.AnyRef): Unit = in.emitDirect(i, func(list), o)

override def emitDirect(i: Int, s: String, list: JList[AnyRef]): Unit = self.emitDirect(i, s, func(list))
override def emitDirect(i: Int, s: String, list: JList[AnyRef]): Unit = in.emitDirect(i, s, func(list))

override def emitDirect(i: Int, list: JList[AnyRef]): Unit = self.emitDirect(i, func(list))
override def emitDirect(i: Int, list: JList[AnyRef]): Unit = in.emitDirect(i, func(list))

override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = self.emit(s, func(list), o)
override def emit(s: String, list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = in.emit(s, func(list), o)

override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = self.emit(func(list), o)
override def emit(list: JList[AnyRef], o: scala.AnyRef): JList[Integer] = in.emit(func(list), o)

override def emit(list: JList[AnyRef]): JList[Integer] = self.emit(func(list))
override def emit(list: JList[AnyRef]): JList[Integer] = in.emit(func(list))

override def emit(s: String, list: JList[AnyRef]): JList[Integer] = self.emit(s, func(list))
override def emit(s: String, list: JList[AnyRef]): JList[Integer] = in.emit(s, func(list))

override def reportError(throwable: Throwable): Unit = self.reportError(throwable)
}
override def reportError(throwable: Throwable): Unit = in.reportError(throwable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,74 @@ import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.{ IRichSpout, OutputFieldsDeclarer }
import backtype.storm.tuple.Fields
import com.twitter.algebird.Semigroup
import com.twitter.summingbird.online.Externalizer
import com.twitter.summingbird.online.executor.KeyValueShards
import com.twitter.summingbird.online.option.SummerBuilder
import com.twitter.summingbird.storm.Constants._
import com.twitter.tormenta.spout.SpoutProxy
import java.util
import java.util.{ List => JList }
import com.twitter.summingbird.storm.collector.TransformingOutputCollector
import scala.collection.mutable.{ MutableList => MList }
import com.twitter.summingbird.storm.collector.{ AggregatorOutputCollector, TransformingOutputCollector }
import com.twitter.util.{ Duration, Time }

/**
* This is a spout used when the spout is being followed by summer.
* It uses a TransformingOutputCollector on open.
* It uses a AggregatorOutputCollector on open.
*/

class KeyValueSpout(in: IRichSpout) extends SpoutProxy {
class KeyValueSpout[K, V: Semigroup](val in: IRichSpout, summerBuilder: SummerBuilder, summerShards: KeyValueShards) extends SpoutProxy {

private final val tickFrequency = Duration.fromMilliseconds(1000)
private var adapterCollector: AggregatorOutputCollector[K, V] = _
var lastDump = Time.now

override def declareOutputFields(declarer: OutputFieldsDeclarer) = {
declarer.declare(new Fields(AGG_KEY, AGG_VALUE))
}

/*
* The transform is the function which unwraps the Value object to get the actual fields present in it.
*/

override def open(conf: util.Map[_, _],
topologyContext: TopologyContext,
outputCollector: SpoutOutputCollector): Unit = {
val adapterCollector = new TransformingOutputCollector(outputCollector, _.get(0).asInstanceOf[JList[AnyRef]])
self.open(conf, topologyContext, adapterCollector)
adapterCollector = new AggregatorOutputCollector(outputCollector, summerBuilder, summerShards)
in.open(conf, topologyContext, adapterCollector)
}

override def nextTuple(): Unit = {
/*
This method is used to call the tick on the cache.
*/
if (Time.now - lastDump > tickFrequency) {
adapterCollector.timerFlush()
lastDump = Time.now
}
in.nextTuple()
}

override def ack(msgId: Object): Unit = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment who calls ack with a msgId which is actually a list of message ids. Seems pretty unintuitive so I want to understand for my own knowledge.

/*
The msgId is a list of individual messageIds of emitted tuples
which are aggregated and emitted out as a single tuple.
*/
val msgIds = convertToList(msgId)
msgIds.foreach { super.ack(_) }
}

override def fail(msgId: Object): Unit = {
/*
The msgId is a list of individual messageIds of emitted tuples
which are aggregated and emitted out as a single tuple.
*/
val msgIds = convertToList(msgId)
msgIds.foreach { super.fail(_) }
}

def convertToList(msgId: Object): MList[Object] = {
msgId match {
case Some(s) => s.asInstanceOf[MList[Object]]
case None => MList[Object]()
}
}

override protected def self: IRichSpout = in
Expand Down