From 253cb635f08675f23a748a43a16ff571177146ea Mon Sep 17 00:00:00 2001 From: Ling Liu Date: Mon, 11 Apr 2011 18:16:10 -0700 Subject: [PATCH 1/2] add zookeeper failuer tolerance --- .../network/common/BaseNetworkClient.scala | 19 +++++++++++++++++-- .../ConsistentHashLoadBalancerHelper.scala | 8 +++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala index 87c81a3d..5ec5e1f8 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala @@ -22,6 +22,8 @@ import java.util.concurrent.Future import cluster._ import logging.Logging import jmx.JMX +import java.util.{TimerTask, Timer} + trait BaseNetworkClient extends Logging { this: ClusterClientComponent with ClusterIoClientComponent => @@ -30,11 +32,18 @@ trait BaseNetworkClient extends Logging { @volatile protected var endpoints: Set[Endpoint] = Set() @volatile protected var connected = false + @volatile protected var needConnected = true + @volatile protected var maxDisconnectTime = 0L + protected val startedSwitch = new AtomicBoolean protected val shutdownSwitch = new AtomicBoolean private var listenerKey: ClusterListenerKey = _ + def tolerantDisConnect (t: Long = Long.MaxValue) = { + maxDisconnectTime = t + } + def start { if (startedSwitch.compareAndSet(false, true)) { log.debug("Ensuring cluster is started") @@ -53,7 +62,13 @@ trait BaseNetworkClient extends Logging { case ClusterEvents.Disconnected => connected = false - updateCurrentState(Set()) + log.error("DisConnected From Cluster") + needConnected = false + val timer = new Timer(); + timer.schedule(new TimerTask{def run = { + needConnected = true + } }, maxDisconnectTime * 1000) + case ClusterEvents.Shutdown => doShutdown(true) } @@ -148,7 +163,7 @@ trait BaseNetworkClient extends Logging { protected def doIfConnected[T](block: => T): T = { if (shutdownSwitch.get) throw new NetworkShutdownException else if (!startedSwitch.get) throw new NetworkNotStartedException - else if (!connected) throw new ClusterDisconnectedException + else if (needConnected && !connected) throw new ClusterDisconnectedException else block } diff --git a/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/ConsistentHashLoadBalancerHelper.scala b/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/ConsistentHashLoadBalancerHelper.scala index c38b07f3..1ca314fd 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/ConsistentHashLoadBalancerHelper.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/ConsistentHashLoadBalancerHelper.scala @@ -23,11 +23,13 @@ import common.Endpoint import java.util.concurrent.atomic.AtomicInteger import annotation.tailrec import client.loadbalancer.LoadBalancerHelpers +import logging.Logging + /** * A mixin trait that provides functionality to help implement a consistent hash based Router. */ -trait ConsistentHashLoadBalancerHelper extends LoadBalancerHelpers { +trait ConsistentHashLoadBalancerHelper extends LoadBalancerHelpers with Logging { /** * A mapping from partition id to the Nodes which can service that partition. @@ -50,6 +52,10 @@ trait ConsistentHashLoadBalancerHelper extends LoadBalancerHelpers { case (map, (partitionId, node)) => map + (partitionId -> (node +: map.get(partitionId).getOrElse(Vector.empty[Endpoint]))) } + for (i <- 0 until numPartitions) { + if (!partitionToNodeMap.contains(i)) log.error("Partition %d is not assigned a node".format(i)) + } + partitionToNodeMap.map { case (pId, endPoints) => pId -> (endPoints, new AtomicInteger(0)) } } From 617c10fc3f06258c2a7e28312387e4758fa2d037 Mon Sep 17 00:00:00 2001 From: Ling Liu Date: Tue, 12 Apr 2011 18:01:50 -0700 Subject: [PATCH 2/2] add retry in dosendmsg --- .../network/common/BaseNetworkClient.scala | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala index 5ec5e1f8..0997bb01 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala @@ -17,24 +17,47 @@ package com.linkedin.norbert package network package common -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.Future import cluster._ import logging.Logging import jmx.JMX import java.util.{TimerTask, Timer} +import java.util.concurrent.{ConcurrentHashMap, Future} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} trait BaseNetworkClient extends Logging { this: ClusterClientComponent with ClusterIoClientComponent => + class callbackAdapter[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, callback:Either[Throwable, ResponseMsg] => Unit) { + var left = maxRetry + + def process(response: Either[Throwable, ResponseMsg]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) = { + val r = if(response == null) Left(new NullPointerException("Null response found")) else response + + r.fold (ex => { + if (left > 0) { + left = left - 1 + redoSendRequest(node, request, this) + } else + callback(response) + + }, + msg => callback(response) + ) + + } + } + @volatile protected var currentNodes: Set[Node] = Set() @volatile protected var endpoints: Set[Endpoint] = Set() + @volatile protected var maxRetry = 0 @volatile protected var connected = false @volatile protected var needConnected = true @volatile protected var maxDisconnectTime = 0L + protected val startedSwitch = new AtomicBoolean protected val shutdownSwitch = new AtomicBoolean @@ -44,6 +67,8 @@ trait BaseNetworkClient extends Logging { maxDisconnectTime = t } + def setMaxRetry(r: Int = 1 ) = {maxRetry = r} + def start { if (startedSwitch.compareAndSet(false, true)) { log.debug("Ensuring cluster is started") @@ -63,11 +88,13 @@ trait BaseNetworkClient extends Logging { case ClusterEvents.Disconnected => connected = false log.error("DisConnected From Cluster") + if (maxDisconnectTime > 0) { needConnected = false val timer = new Timer(); timer.schedule(new TimerTask{def run = { needConnected = true } }, maxDisconnectTime * 1000) + } case ClusterEvents.Shutdown => doShutdown(true) @@ -155,9 +182,18 @@ trait BaseNetworkClient extends Logging { protected def updateLoadBalancer(nodes: Set[Endpoint]): Unit + + protected def redoSendRequest[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, adapter: callbackAdapter[RequestMsg, ResponseMsg]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { + log.debug("resend request") + clusterIoClient.sendMessage(node, Request(request, node, is, os, adapter.process)) + } + protected def doSendRequest[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - clusterIoClient.sendMessage(node, Request(request, node, is, os, callback)) + val adapter = new callbackAdapter(node, request, callback) + + clusterIoClient.sendMessage(node, Request(request, node, is, os, adapter.process)) } protected def doIfConnected[T](block: => T): T = {