Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.6.50
version=0.6.73
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg](
is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg],
var retryStrategy: Option[RetryStrategy], var duplicatesOk: Boolean = false)
extends ResponseIterator[ResponseMsg] with ResponseHelper{
/**
* The last exception which we encountered before failing
*/
var lastExceptionSeen : Option[Throwable] = None
/**
* Set of nodes which have failed in sending a response back in time for this larger request
*/
Expand Down Expand Up @@ -438,42 +442,58 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg](
timeoutCutoff = timeStartedPass + timeoutForRetry
}
case None => {
if (!duplicatesOk)
throw new TimeoutException("Timedout waiting for final %d nodes, retryInfo:%s ".format(distinctResponsesLeft, retryMessage))
else
throw new TimeoutException("Timedout waiting for final %d partitions to return, retryInfo:%s ".format(setRequests.size, retryMessage))
val exception = {
if (!duplicatesOk)
new TimeoutException("Timedout waiting for final %d nodes, retryInfo:%s ".format(distinctResponsesLeft, retryMessage))
else
new TimeoutException("Timedout waiting for final %d partitions to return, retryInfo:%s ".format(setRequests.size, retryMessage))
}
lastExceptionSeen match {
case Some(s) => exception.initCause(s)
case None => //do nothing
}
throw exception
}
}

try {
if(conditionsRetryMet) {
retryMessage = "Retry initiated at %d".format(System.currentTimeMillis)
//If for a particular partition id only if 10/10 of the replicas are in trouble then quit
val nodes = calculateNodesFromIds(ids, failedNodes, 10)

if(duplicatesOk != true) {
//only the responses from these new requests count
log.debug("Adjust responseIterator to: %d".format(nodes.keySet.size))
distinctResponsesLeft=nodes.keySet.size
//reset the outstanding requests map
setRequests = Map.empty[PartitionedId, Node]
}

if(conditionsRetryMet) {
retryMessage = "Retry initiated at %d".format(System.currentTimeMillis)
//If for a particular partition id only if 10/10 of the replicas are in trouble then quit
val nodes = calculateNodesFromIds(ids, failedNodes, 10)

if(duplicatesOk != true) {
//only the responses from these new requests count
log.debug("Adjust responseIterator to: %d".format(nodes.keySet.size))
distinctResponsesLeft=nodes.keySet.size
//reset the outstanding requests map
setRequests = Map.empty[PartitionedId, Node]
}
nodes.foreach {
case (node, idsForNode) => {
def callback(a:Either[Throwable, ResponseMsg]):Unit = {
a match {
case Left(t) => queue += Left(t)
case Right(r) => queue += Right(Tuple3(node, idsForNode, r))
}
}

nodes.foreach {
case (node, idsForNode) => {
def callback(a:Either[Throwable, ResponseMsg]):Unit = {
a match {
case Left(t) => queue += Left(t)
case Right(r) => queue += Right(Tuple3(node, idsForNode, r))
val request1 = PartitionedRequest(requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, Some((a: Either[Throwable, ResponseMsg]) => {callback(a)}), 0, Some(this))
idsForNode.foreach {
case id => setRequests = setRequests + (id -> node)
}
sendRequestFunctor(request1)
}

val request1 = PartitionedRequest(requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, Some((a: Either[Throwable, ResponseMsg]) => {callback(a)}), 0, Some(this))
idsForNode.foreach {
case id => setRequests = setRequests + (id -> node)
}
sendRequestFunctor(request1)
}
}
} catch {
case e:NoNodesAvailableException => {
//reset state built about the failure of queries
conditionsRetryMet = false
retryMessage = ""
failedNodes = Set.empty[Node]
}
}
}

Expand All @@ -490,7 +510,9 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg](
}
}
case Left(exception) => {
throw exception
//log this let selective retry kick in and retry this request
log.warn(exception.getStackTrace().mkString("\n"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to pass an exception into log.warn

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.wanr only takes string

lastExceptionSeen = Some(exception)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class NorbertResponseIteratorSpec extends Specification with Mockito with Sample
}
timeoutExceptionFlag mustBe true
}

"if timeout occurs we try to find the next node multiple times" in {
var invocationCount = 1
def calculateNodeFunctor(setPIds: Set[Int],setNodes: Set[Node],requestMsg: Int):Map[Node, Set[Int]] = {
Expand Down Expand Up @@ -272,6 +272,60 @@ class NorbertResponseIteratorSpec extends Specification with Mockito with Sample
iterator.next mustEqual 5000
}

"in case of exception we will try to retry now" in {
val node1 = Node(1, "node1", true)
def calculateNodeFunctor(setPIds: Set[Int],setNodes: Set[Node],requestMsg: Int):Map[Node, Set[Int]] = {
return Map.empty[Node, Set[Int]] + (Node(3,"node3",true) -> (Set.empty[Int] + 1))
}
def requestBuilderFunctor(node: Node, setPIds: Set[Int]):Int = 1
val queue = new ResponseQueue[Tuple3[Node, Set[Int], Int]]
def callback(pRequest: PartitionedRequest[Int,Int,Int]) = {
val insertQueueLate = new Thread(new Runnable {
def run() {Thread.sleep(10L); queue += Right(Tuple3(Node(3, "endpoint", true), Set.empty[Int] + 1, 3000)) }
})
insertQueueLate.start()
}
val iterator = new SelectiveRetryIterator[Int, Int, Int](2, 20L,
callback, Map.empty[Int, Node] + (1->node1),
queue,
calculateNodeFunctor,
requestBuilderFunctor,
new DummyInputSerializer[Int, Int],
new DummyOutputSerializer[Int, Int],
Some(new RetryStrategy(20L, 1, None)), true)

queue.clear
queue += Left(new HeavyLoadException())

iterator.hasNext mustBe true
iterator.next mustEqual 3000

//test what happens if the retry is not successful
val iterator1 = new SelectiveRetryIterator[Int, Int, Int](2, 20L,
callback, Map.empty[Int, Node] + (1->node1),
queue,
calculateNodeFunctor,
requestBuilderFunctor,
new DummyInputSerializer[Int, Int],
new DummyOutputSerializer[Int, Int],
None)

val cause = new HeavyLoadException();
val insertQueueLate2 = new Thread(new Runnable {
def run() {Thread.sleep(10L); queue += Left(cause) }
})
iterator1.hasNext mustBe true
val exceptionSeen : Throwable = {
try {
iterator.next
null
} catch {
case e => e
}
}
exceptionSeen.getCause.getClass.getName mustEqual "com.linkedin.norbert.network.HeavyLoadException"
}

"in case of duplicate outstanding requests the first one to complete wins" in {
def calculateNodeFunctor(setPIds: Set[Int],setNodes: Set[Node],requestMsg: Int):Map[Node, Set[Int]] = {
val failedNodes = Set.empty[Node] + Node(1, "node1", true) + Node(2, "node2", true)
Expand Down