diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala index 87c81a3d..0997bb01 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala @@ -17,24 +17,58 @@ package com.linkedin.norbert package network package common -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.Future import cluster._ import logging.Logging import jmx.JMX +import java.util.{TimerTask, Timer} +import java.util.concurrent.{ConcurrentHashMap, Future} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} + trait BaseNetworkClient extends Logging { this: ClusterClientComponent with ClusterIoClientComponent => + class callbackAdapter[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, callback:Either[Throwable, ResponseMsg] => Unit) { + var left = maxRetry + + def process(response: Either[Throwable, ResponseMsg]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) = { + val r = if(response == null) Left(new NullPointerException("Null response found")) else response + + r.fold (ex => { + if (left > 0) { + left = left - 1 + redoSendRequest(node, request, this) + } else + callback(response) + + }, + msg => callback(response) + ) + + } + } + @volatile protected var currentNodes: Set[Node] = Set() @volatile protected var endpoints: Set[Endpoint] = Set() + @volatile protected var maxRetry = 0 @volatile protected var connected = false + @volatile protected var needConnected = true + @volatile protected var maxDisconnectTime = 0L + + protected val startedSwitch = new AtomicBoolean protected val shutdownSwitch = new AtomicBoolean private var listenerKey: ClusterListenerKey = _ + def tolerantDisConnect (t: Long = Long.MaxValue) = { + maxDisconnectTime = t + } + + def setMaxRetry(r: Int = 1 ) = {maxRetry = r} + def start { if (startedSwitch.compareAndSet(false, true)) { log.debug("Ensuring cluster is started") @@ -53,7 +87,15 @@ trait BaseNetworkClient extends Logging { case ClusterEvents.Disconnected => connected = false - updateCurrentState(Set()) + log.error("DisConnected From Cluster") + if (maxDisconnectTime > 0) { + needConnected = false + val timer = new Timer(); + timer.schedule(new TimerTask{def run = { + needConnected = true + } }, maxDisconnectTime * 1000) + } + case ClusterEvents.Shutdown => doShutdown(true) } @@ -140,15 +182,24 @@ trait BaseNetworkClient extends Logging { protected def updateLoadBalancer(nodes: Set[Endpoint]): Unit + + protected def redoSendRequest[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, adapter: callbackAdapter[RequestMsg, ResponseMsg]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { + log.debug("resend request") + clusterIoClient.sendMessage(node, Request(request, node, is, os, adapter.process)) + } + protected def doSendRequest[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - clusterIoClient.sendMessage(node, Request(request, node, is, os, callback)) + val adapter = new callbackAdapter(node, request, callback) + + clusterIoClient.sendMessage(node, Request(request, node, is, os, adapter.process)) } protected def doIfConnected[T](block: => T): T = { if (shutdownSwitch.get) throw new NetworkShutdownException else if (!startedSwitch.get) throw new NetworkNotStartedException - else if (!connected) throw new ClusterDisconnectedException + else if (needConnected && !connected) throw new ClusterDisconnectedException else block } diff --git a/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/DefaultLoadBalancerHelper.scala b/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/DefaultLoadBalancerHelper.scala index b6a1f949..893337a4 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/DefaultLoadBalancerHelper.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/DefaultLoadBalancerHelper.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger import annotation.tailrec import client.loadbalancer.LoadBalancerHelpers import logging.Logging - /** * A mixin trait that provides functionality to help implement a hash based Router. */