From 95e39e9da10b692cbb8387e49cd36841a6906a94 Mon Sep 17 00:00:00 2001 From: abhinigam Date: Wed, 11 Dec 2013 18:01:40 -0800 Subject: [PATCH 1/3] update norbert future to fix NoNodeAvailableException Adding handling of NoNodesAvailableException --- .../network/common/NorbertFuture.scala | 57 +++++++++++-------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala index 6ade63dc..f02d81c1 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala @@ -444,36 +444,45 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( throw new TimeoutException("Timedout waiting for final %d partitions to return, retryInfo:%s ".format(setRequests.size, retryMessage)) } } + + try { + if(conditionsRetryMet) { + retryMessage = "Retry initiated at %d".format(System.currentTimeMillis) + //If for a particular partition id only if 10/10 of the replicas are in trouble then quit + val nodes = calculateNodesFromIds(ids, failedNodes, 10) + + if(duplicatesOk != true) { + //only the responses from these new requests count + log.debug("Adjust responseIterator to: %d".format(nodes.keySet.size)) + distinctResponsesLeft=nodes.keySet.size + //reset the outstanding requests map + setRequests = Map.empty[PartitionedId, Node] + } - if(conditionsRetryMet) { - retryMessage = "Retry initiated at %d".format(System.currentTimeMillis) - //If for a particular partition id only if 10/10 of the replicas are in trouble then quit - val nodes = calculateNodesFromIds(ids, failedNodes, 10) - - if(duplicatesOk != true) { - //only the responses from these new requests count - log.debug("Adjust responseIterator to: %d".format(nodes.keySet.size)) - distinctResponsesLeft=nodes.keySet.size - //reset the outstanding requests map - setRequests = Map.empty[PartitionedId, Node] - } + nodes.foreach { + case (node, idsForNode) => { + def callback(a:Either[Throwable, ResponseMsg]):Unit = { + a match { + case Left(t) => queue += Left(t) + case Right(r) => queue += Right(Tuple3(node, idsForNode, r)) + } + } - nodes.foreach { - case (node, idsForNode) => { - def callback(a:Either[Throwable, ResponseMsg]):Unit = { - a match { - case Left(t) => queue += Left(t) - case Right(r) => queue += Right(Tuple3(node, idsForNode, r)) + val request1 = PartitionedRequest(requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, Some((a: Either[Throwable, ResponseMsg]) => {callback(a)}), 0, Some(this)) + idsForNode.foreach { + case id => setRequests = setRequests + (id -> node) } + sendRequestFunctor(request1) } - - val request1 = PartitionedRequest(requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, Some((a: Either[Throwable, ResponseMsg]) => {callback(a)}), 0, Some(this)) - idsForNode.foreach { - case id => setRequests = setRequests + (id -> node) - } - sendRequestFunctor(request1) } } + } catch { + case e:NoNodesAvailableException => { + //reset state built about the failure of queries + conditionsRetryMet = false + retryMessage = "" + failedNodes = Set.empty[Node] + } } } From 2e776649d82255f57fc87e73d2a3f17b2c48e011 Mon Sep 17 00:00:00 2001 From: abhinigam Date: Wed, 11 Jun 2014 14:01:57 -0700 Subject: [PATCH 2/3] Enhance selective retry to retry failed requests due to exceptions should they occur before the initial timeout --- gradle.properties | 2 +- .../network/common/NorbertFuture.scala | 21 +++++-- .../common/NorbertResponseIteratorSpec.scala | 56 ++++++++++++++++++- 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/gradle.properties b/gradle.properties index 9044e24d..73276a3d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.6.50 +version=0.6.73 diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala index f02d81c1..3c0dcecc 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala @@ -315,6 +315,10 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg], var retryStrategy: Option[RetryStrategy], var duplicatesOk: Boolean = false) extends ResponseIterator[ResponseMsg] with ResponseHelper{ + /** + * The last exception which we encountered before trying selective retry + */ + var lastExceptionSeen : Throwable = null /** * Set of nodes which have failed in sending a response back in time for this larger request */ @@ -438,10 +442,15 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( timeoutCutoff = timeStartedPass + timeoutForRetry } case None => { - if (!duplicatesOk) - throw new TimeoutException("Timedout waiting for final %d nodes, retryInfo:%s ".format(distinctResponsesLeft, retryMessage)) - else - throw new TimeoutException("Timedout waiting for final %d partitions to return, retryInfo:%s ".format(setRequests.size, retryMessage)) + val exception = { + if (!duplicatesOk) + new TimeoutException("Timedout waiting for final %d nodes, retryInfo:%s ".format(distinctResponsesLeft, retryMessage)) + else + new TimeoutException("Timedout waiting for final %d partitions to return, retryInfo:%s ".format(setRequests.size, retryMessage)) + } + if(lastExceptionSeen != null) + exception.initCause(lastExceptionSeen) + throw exception } } @@ -499,7 +508,9 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( } } case Left(exception) => { - throw exception + //log this let selective retry kick in and retry this request + log.warn(exception.getStackTrace().mkString("\n")) + lastExceptionSeen = exception } } } diff --git a/network/src/test/scala/com/linkedin/norbert/network/common/NorbertResponseIteratorSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/common/NorbertResponseIteratorSpec.scala index 4db14564..ccd56a5f 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/common/NorbertResponseIteratorSpec.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/common/NorbertResponseIteratorSpec.scala @@ -163,7 +163,7 @@ class NorbertResponseIteratorSpec extends Specification with Mockito with Sample } timeoutExceptionFlag mustBe true } - + "if timeout occurs we try to find the next node multiple times" in { var invocationCount = 1 def calculateNodeFunctor(setPIds: Set[Int],setNodes: Set[Node],requestMsg: Int):Map[Node, Set[Int]] = { @@ -272,6 +272,60 @@ class NorbertResponseIteratorSpec extends Specification with Mockito with Sample iterator.next mustEqual 5000 } + "in case of exception we will try to retry now" in { + val node1 = Node(1, "node1", true) + def calculateNodeFunctor(setPIds: Set[Int],setNodes: Set[Node],requestMsg: Int):Map[Node, Set[Int]] = { + return Map.empty[Node, Set[Int]] + (Node(3,"node3",true) -> (Set.empty[Int] + 1)) + } + def requestBuilderFunctor(node: Node, setPIds: Set[Int]):Int = 1 + val queue = new ResponseQueue[Tuple3[Node, Set[Int], Int]] + def callback(pRequest: PartitionedRequest[Int,Int,Int]) = { + val insertQueueLate = new Thread(new Runnable { + def run() {Thread.sleep(10L); queue += Right(Tuple3(Node(3, "endpoint", true), Set.empty[Int] + 1, 3000)) } + }) + insertQueueLate.start() + } + val iterator = new SelectiveRetryIterator[Int, Int, Int](2, 20L, + callback, Map.empty[Int, Node] + (1->node1), + queue, + calculateNodeFunctor, + requestBuilderFunctor, + new DummyInputSerializer[Int, Int], + new DummyOutputSerializer[Int, Int], + Some(new RetryStrategy(20L, 1, None)), true) + + queue.clear + queue += Left(new HeavyLoadException()) + + iterator.hasNext mustBe true + iterator.next mustEqual 3000 + + //test what happens if the retry is not successful + val iterator1 = new SelectiveRetryIterator[Int, Int, Int](2, 20L, + callback, Map.empty[Int, Node] + (1->node1), + queue, + calculateNodeFunctor, + requestBuilderFunctor, + new DummyInputSerializer[Int, Int], + new DummyOutputSerializer[Int, Int], + None) + + val cause = new HeavyLoadException(); + val insertQueueLate2 = new Thread(new Runnable { + def run() {Thread.sleep(10L); queue += Left(cause) } + }) + iterator1.hasNext mustBe true + val exceptionSeen : Throwable = { + try { + iterator.next + null + } catch { + case e => e + } + } + exceptionSeen.getCause.getClass.getName mustEqual "com.linkedin.norbert.network.HeavyLoadException" + } + "in case of duplicate outstanding requests the first one to complete wins" in { def calculateNodeFunctor(setPIds: Set[Int],setNodes: Set[Node],requestMsg: Int):Map[Node, Set[Int]] = { val failedNodes = Set.empty[Node] + Node(1, "node1", true) + Node(2, "node2", true) From d0cb96027996912db004a6768da3ce87b7ea5d4a Mon Sep 17 00:00:00 2001 From: abhinigam Date: Thu, 12 Jun 2014 10:29:18 -0700 Subject: [PATCH 3/3] Using option instead of null --- .../norbert/network/common/NorbertFuture.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala index 3c0dcecc..19f2ff07 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala @@ -316,9 +316,9 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( var retryStrategy: Option[RetryStrategy], var duplicatesOk: Boolean = false) extends ResponseIterator[ResponseMsg] with ResponseHelper{ /** - * The last exception which we encountered before trying selective retry + * The last exception which we encountered before failing */ - var lastExceptionSeen : Throwable = null + var lastExceptionSeen : Option[Throwable] = None /** * Set of nodes which have failed in sending a response back in time for this larger request */ @@ -448,8 +448,10 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( else new TimeoutException("Timedout waiting for final %d partitions to return, retryInfo:%s ".format(setRequests.size, retryMessage)) } - if(lastExceptionSeen != null) - exception.initCause(lastExceptionSeen) + lastExceptionSeen match { + case Some(s) => exception.initCause(s) + case None => //do nothing + } throw exception } } @@ -510,7 +512,7 @@ class SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( case Left(exception) => { //log this let selective retry kick in and retry this request log.warn(exception.getStackTrace().mkString("\n")) - lastExceptionSeen = exception + lastExceptionSeen = Some(exception) } } }