Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,58 @@ package com.linkedin.norbert
package network
package common

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.Future
import cluster._
import logging.Logging
import jmx.JMX
import java.util.{TimerTask, Timer}
import java.util.concurrent.{ConcurrentHashMap, Future}
import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}


trait BaseNetworkClient extends Logging {
this: ClusterClientComponent with ClusterIoClientComponent =>

class callbackAdapter[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, callback:Either[Throwable, ResponseMsg] => Unit) {
var left = maxRetry

def process(response: Either[Throwable, ResponseMsg])
(implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) = {
val r = if(response == null) Left(new NullPointerException("Null response found")) else response

r.fold (ex => {
if (left > 0) {
left = left - 1
redoSendRequest(node, request, this)
} else
callback(response)

},
msg => callback(response)
)

}
}

@volatile protected var currentNodes: Set[Node] = Set()
@volatile protected var endpoints: Set[Endpoint] = Set()

@volatile protected var maxRetry = 0
@volatile protected var connected = false
@volatile protected var needConnected = true
@volatile protected var maxDisconnectTime = 0L


protected val startedSwitch = new AtomicBoolean
protected val shutdownSwitch = new AtomicBoolean

private var listenerKey: ClusterListenerKey = _

def tolerantDisConnect (t: Long = Long.MaxValue) = {
maxDisconnectTime = t
}

def setMaxRetry(r: Int = 1 ) = {maxRetry = r}

def start {
if (startedSwitch.compareAndSet(false, true)) {
log.debug("Ensuring cluster is started")
Expand All @@ -53,7 +87,15 @@ trait BaseNetworkClient extends Logging {

case ClusterEvents.Disconnected =>
connected = false
updateCurrentState(Set())
log.error("DisConnected From Cluster")
if (maxDisconnectTime > 0) {
needConnected = false
val timer = new Timer();
timer.schedule(new TimerTask{def run = {
needConnected = true
} }, maxDisconnectTime * 1000)
}


case ClusterEvents.Shutdown => doShutdown(true)
}
Expand Down Expand Up @@ -140,15 +182,24 @@ trait BaseNetworkClient extends Logging {

protected def updateLoadBalancer(nodes: Set[Endpoint]): Unit


protected def redoSendRequest[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, adapter: callbackAdapter[RequestMsg, ResponseMsg])
(implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = {
log.debug("resend request")
clusterIoClient.sendMessage(node, Request(request, node, is, os, adapter.process))
}

protected def doSendRequest[RequestMsg, ResponseMsg](node: Node, request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit)
(implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = {
clusterIoClient.sendMessage(node, Request(request, node, is, os, callback))
val adapter = new callbackAdapter(node, request, callback)

clusterIoClient.sendMessage(node, Request(request, node, is, os, adapter.process))
}

protected def doIfConnected[T](block: => T): T = {
if (shutdownSwitch.get) throw new NetworkShutdownException
else if (!startedSwitch.get) throw new NetworkNotStartedException
else if (!connected) throw new ClusterDisconnectedException
else if (needConnected && !connected) throw new ClusterDisconnectedException
else block
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
import annotation.tailrec
import client.loadbalancer.LoadBalancerHelpers
import logging.Logging

/**
* A mixin trait that provides functionality to help implement a hash based <code>Router</code>.
*/
Expand Down