diff --git a/include/Client.h b/include/Client.h index b181063..bec8c43 100644 --- a/include/Client.h +++ b/include/Client.h @@ -48,14 +48,16 @@ class ProcedureCallback; class Client { friend class MockVoltDB; public: + /* - * Create a connection to the VoltDB process running at the specified host authenticating - * using the username and password provided when this client was constructed + * Creates a pending connection that is handled in the reconnect callback * @param hostname Hostname or IP address to connect to + * @param port Port to connect to + * @param defer if true defer connection establishment * @throws voltdb::ConnectException An error occurs connecting or authenticating * @throws voltdb::LibEventException libevent returns an error code */ - void createConnection(const std::string &hostname, const unsigned short port = 21212, const bool keepConnecting = false) throw (voltdb::ConnectException, voltdb::LibEventException, voltdb::Exception); + void createConnection(const std::string &hostname, const unsigned short port = 21212, const bool autoReconnect = false, const bool defer = false) throw (voltdb::Exception, voltdb::ConnectException, voltdb::LibEventException); /* * Close client connection. diff --git a/include/ClientImpl.h b/include/ClientImpl.h index b9fd888..63fb532 100644 --- a/include/ClientImpl.h +++ b/include/ClientImpl.h @@ -57,8 +57,9 @@ class ClientImpl { /* * Create a connection to the VoltDB process running at the specified host authenticating * using the username and password provided when this client was constructed - * @param hostname Hostname or IP address to connect to - * @param port Port to connect to + * @param hostname Hostname or IP address to connect to + * @param port Port to connect to + * @param defer if true defer connection establishment * @throws voltdb::ConnectException An error occurs connecting or authenticating * @throws voltdb::LibEventException libevent returns an error code * @throws voltdb::PipeCreationException Fails to create pipe for communication @@ -66,7 +67,8 @@ class ClientImpl { * @throws voltdb::TimerThreadException error happens when creating query timer monitor thread * @throws voltdb::SSLException ssl operations returns an error */ - void createConnection(const std::string &hostname, const unsigned short port, const bool keepConnecting) throw (Exception, ConnectException, LibEventException, PipeCreationException, TimerThreadException, SSLException); + void createConnection(const std::string &hostname, const unsigned short port=21212, const bool autoReconnect=false, const bool defer=false) + throw (voltdb::Exception, voltdb::ConnectException, voltdb::LibEventException, voltdb::PipeCreationException, voltdb::TimerThreadException, voltdb::SSLException); /* * Synchronously invoke a stored procedure and return a the response. @@ -93,7 +95,7 @@ class ClientImpl { void regularEventCallback(struct bufferevent *bev, short events); void regularWriteCallback(struct bufferevent *bev); void eventBaseLoopBreak(); - void reconnectEventCallback(); + void reconnectEventCallback(const boost::shared_ptr& pc); void runTimeoutMonitor() throw (LibEventException); void purgeExpiredRequests(); @@ -135,8 +137,10 @@ class ClientImpl { private: ClientImpl(ClientConfig config) throw (Exception, LibEventException, MDHashException, SSLException); - void initiateAuthentication(struct bufferevent *bev, const std::string& hostname, unsigned short port) throw (LibEventException); - void finalizeAuthentication(PendingConnection* pc) throw (Exception, ConnectException); + void createConnectionSync(const std::string &hostname, const unsigned short port, const bool keepConnecting) + throw (voltdb::Exception, voltdb::ConnectException, voltdb::LibEventException, voltdb::PipeCreationException, voltdb::TimerThreadException, voltdb::SSLException); + void initiateAuthentication(struct bufferevent *bev, const std::string& hostname, unsigned short port) throw (voltdb::LibEventException); + void finalizeAuthentication(PendingConnection* pc) throw (voltdb::Exception, voltdb::ConnectException); /* * Updates procedures and topology information for transaction routing algorithm @@ -156,7 +160,7 @@ class ClientImpl { /* * Initiate connection based on pending connection instance */ - void initiateConnection(boost::shared_ptr &pc) throw (ConnectException, LibEventException, SSLException); + void initiateConnection(const boost::shared_ptr &pc) throw (voltdb::ConnectException, voltdb::LibEventException, voltdb::SSLException); /* * Creates a pending connection that is handled in the reconnect callback @@ -192,7 +196,6 @@ class ClientImpl { } } } - /* * Method for sinking messages. * If a logger callback is not set then skip all messages @@ -235,7 +238,7 @@ class ClientImpl { // data member variables Distributer m_distributer; struct event_base *m_base; - struct event * m_ev; + struct event* m_pipeEvent; struct event_config * m_cfg; int64_t m_nextRequestId; size_t m_nextConnectionIndex; @@ -266,9 +269,7 @@ class ClientImpl { //If to use abandon in case of backpressure. bool m_enableAbandon; - std::list > m_pendingConnectionList; - boost::atomic m_pendingConnectionSize; - boost::mutex m_pendingConnectionLock; + std::vector > m_pendingConnectionList; int m_wakeupPipe[2]; boost::mutex m_wakeupPipeLock; @@ -306,7 +307,7 @@ class ClientImpl { SSL_CTX *m_clientSslCtx; // Reference count number of clients running to help in release of the global resource like // ssl ciphers, error strings and digests can be unloaded that are shared between clients - static boost::atomic m_numberOfClients; + static uint32_t m_numberOfClients; static boost::mutex m_globalResourceLock; static const int64_t VOLT_NOTIFICATION_MAGIC_NUMBER; diff --git a/src/Client.cpp b/src/Client.cpp index e4b926f..35022fa 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -36,12 +36,13 @@ Client::Client(ClientImpl *impl) : m_impl(impl) {} Client::~Client() { } -void Client::createConnection(const std::string &hostname, - const unsigned short port, - const bool keepConnecting) throw (voltdb::Exception, - voltdb::ConnectException, - voltdb::LibEventException) { - m_impl->createConnection(hostname, port, keepConnecting); +void +Client::createConnection( + const std::string &hostname, + const unsigned short port, + const bool autoReconnect, + const bool defer) throw (voltdb::Exception, voltdb::ConnectException, voltdb::LibEventException) { + m_impl->createConnection(hostname, port, autoReconnect, defer); } void Client::close() { diff --git a/src/ClientImpl.cpp b/src/ClientImpl.cpp index 18e2f02..e2e3486 100644 --- a/src/ClientImpl.cpp +++ b/src/ClientImpl.cpp @@ -52,12 +52,12 @@ int64_t get_sec_time() { return tp.tv_sec; } -class PendingConnection { +class PendingConnection: public boost::enable_shared_from_this { public: - PendingConnection(const std::string& hostname, const unsigned short port, const bool keepConnecting, + PendingConnection(const std::string& hostname, const unsigned short port, const bool autoReconnect, struct event_base *base, ClientImpl* ci) : m_hostname(hostname), m_port(port), - m_keepConnecting(keepConnecting), + m_autoReconnect(autoReconnect), m_base(base), m_bufferEvent(NULL), m_authenticationResponseLength(-1), @@ -84,6 +84,11 @@ class PendingConnection { } } + void reconnectEventCallback() { + m_clientImpl->reconnectEventCallback(shared_from_this()); + } + + ~PendingConnection() {} /* @@ -91,7 +96,7 @@ class PendingConnection { * */ const std::string m_hostname; const unsigned short m_port; - const bool m_keepConnecting; + const bool m_autoReconnect; /* *Event and event base associated with connection @@ -113,12 +118,13 @@ class CxnContext { * Data associated with a specific connection */ public: - CxnContext(const std::string& name, unsigned short port, int hostId) : m_name(name), - m_port(port), m_nextLength(4), m_lengthOrMessage(true), m_hostId(hostId) { } + CxnContext(const std::string& name, unsigned short port, const int hostId, const bool autoReconnect) : m_name(name), + m_port(port), m_nextLength(4), m_lengthOrMessage(true), m_hostId(hostId), m_autoReconnect(autoReconnect) { } const std::string m_name; const unsigned short m_port; int32_t m_nextLength; bool m_lengthOrMessage; + bool m_autoReconnect; int m_hostId; }; @@ -256,7 +262,7 @@ static void regularEventCallback(struct bufferevent *bev, short events, void *ct impl->regularEventCallback(bev, events); } -boost::atomic ClientImpl::m_numberOfClients(0); +uint32_t ClientImpl::m_numberOfClients(0); boost::mutex ClientImpl::m_globalResourceLock; void initOpenSSLLib() { @@ -306,16 +312,9 @@ static void regularWriteCallback(struct bufferevent *bev, void *ctx) { ClientImpl::~ClientImpl() { bool cleanupEvp = false; - bool cleanupErrorStrings = false; - for (std::vector::iterator bevItr = m_bevs.begin(); bevItr != m_bevs.end(); ++bevItr) { - if (m_enableSSL) { - notifySslClose(*bevItr); - } - // if in SSL mode, the allocated SSL context for bev will - // get released by bufferevent_free - bufferevent_free(*bevItr); - } - m_bevs.clear(); + + this->close(); + m_contexts.clear(); m_callbacks.clear(); if (m_passwordHash != NULL) { @@ -325,9 +324,6 @@ ClientImpl::~ClientImpl() { if (m_cfg != NULL) { event_config_free(m_cfg); } - if (m_ev != NULL) { - event_free(m_ev); - } if (m_timerMonitorEventInitialized) { pthread_cancel(m_queryTimeoutMonitorThread); @@ -351,19 +347,11 @@ ClientImpl::~ClientImpl() { } event_base_free(m_base); - - if (m_wakeupPipe[1] != -1) { - ::close(m_wakeupPipe[0]); - ::close(m_wakeupPipe[1]); - } - - if (m_enableSSL) { - if (m_clientSslCtx != NULL) { - // free clients ssl context - SSL_CTX_free(m_clientSslCtx); - m_clientSslCtx = NULL; - } - cleanupErrorStrings = true; + + if (m_enableSSL && m_clientSslCtx != NULL) { + // free clients ssl context + SSL_CTX_free(m_clientSslCtx); + m_clientSslCtx = NULL; } { @@ -444,12 +432,12 @@ void ClientImpl::hashPassword(const std::string& password) throw (MDHashExceptio } ClientImpl::ClientImpl(ClientConfig config) throw (Exception, LibEventException, MDHashException, SSLException) : - m_base(NULL), m_ev(NULL), m_cfg(NULL), m_nextRequestId(INT64_MIN), m_nextConnectionIndex(0), + m_base(NULL), m_pipeEvent(NULL), m_cfg(NULL), m_nextRequestId(INT64_MIN), m_nextConnectionIndex(0), m_listener(config.m_listener), m_invocationBlockedOnBackpressure(false), m_backPressuredForOutstandingRequests(false), m_loopBreakRequested(false), m_isDraining(false), m_instanceIdIsSet(false), m_outstandingRequests(0), m_leaderAddress(-1), m_clusterStartTime(-1), m_username(config.m_username), m_passwordHash(NULL), m_maxOutstandingRequests(config.m_maxOutstandingRequests), - m_ignoreBackpressure(false), m_useClientAffinity(true),m_updateHashinator(false), m_enableAbandon(config.m_enableAbandon), m_pendingConnectionSize(0), + m_ignoreBackpressure(false), m_useClientAffinity(true),m_updateHashinator(false), m_enableAbandon(config.m_enableAbandon), m_enableQueryTimeout(config.m_enableQueryTimeout), m_queryTimeoutMonitorThread(0), m_timerMonitorBase(NULL), m_timerMonitorEventPtr(NULL), m_timeoutServiceEventPtr(NULL), m_timerMonitorEventInitialized(false), m_timedoutRequests(0), m_responseHandleNotFound(0), m_queryExpirationTime(config.m_queryTimeout), m_scanIntervalForTimedoutQuery(config.m_scanIntervalForTimedoutQuery), @@ -472,8 +460,14 @@ ClientImpl::ClientImpl(ClientConfig config) throw (Exception, LibEventException, throw LibEventException("Failed to create and initialize main event base"); } hashPassword(config.m_password); - m_wakeupPipe[0] = -1; - m_wakeupPipe[1] = -1; + + if (0 == pipe(m_wakeupPipe)) { + m_pipeEvent = event_new(m_base, m_wakeupPipe[0], EV_READ|EV_PERSIST, wakeupPipeCallback, this); + event_add(m_pipeEvent, NULL); + } else { + m_wakeupPipe[0] = -1; + m_wakeupPipe[1] = -1; + } if (m_enableQueryTimeout) { m_timerMonitorBase = event_base_new(); @@ -523,7 +517,7 @@ class FreeBEVOnFailure { struct bufferevent *m_bev; }; -void ClientImpl::initiateConnection(boost::shared_ptr &pc) throw (ConnectException, +void ClientImpl::initiateConnection(const boost::shared_ptr &pc) throw (ConnectException, LibEventException, SSLException) { std::ostringstream ss; @@ -549,9 +543,7 @@ void ClientImpl::initiateConnection(boost::shared_ptr &pc) th pc->m_bufferEvent = bufferevent_socket_new(m_base, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); } if (pc->m_bufferEvent == NULL) { - if (pc->m_keepConnecting) { - createPendingConnection(pc->m_hostname, pc->m_port); - } else { + if (!pc->m_autoReconnect) { ss.str(""); ss << "!!!! ClientImpl::initiateConnection to " << pc->m_hostname << ":" << pc->m_port << " failed getting socket"; logMessage(ClientLogger::ERROR, ss.str()); @@ -566,14 +558,7 @@ void ClientImpl::initiateConnection(boost::shared_ptr &pc) th //std::cout << ss.str() << " thread-id: " << (long) pthread_self() << std::endl; if (bufferevent_socket_connect_hostname(pc->m_bufferEvent, NULL, AF_INET, pc->m_hostname.c_str(), pc->m_port) != 0) { - if (pc->m_keepConnecting) { - //std::cout << "CI::free bev: " << pc->m_bufevent <m_bufferEvent != NULL) { - pc->cleanupBev(); - } - protector.success(); - createPendingConnection(pc->m_hostname, pc->m_port); - } else { + if (!pc->m_autoReconnect) { ss.str(""); ss << "!!!! ClientImpl::initiateConnection to " << pc->m_hostname << ":" << pc->m_port << " failed"; logMessage(ClientLogger::ERROR, ss.str()); @@ -588,11 +573,14 @@ void ClientImpl::close() { //drain before we close; drain(); if (m_wakeupPipe[1] != -1) { + event_del(m_pipeEvent); + event_free(m_pipeEvent); ::close(m_wakeupPipe[0]); ::close(m_wakeupPipe[1]); + m_wakeupPipe[1] = -1; } - if (m_bevs.empty()) return; - for (std::vector::iterator bevEntryItr = m_bevs.begin(); bevEntryItr != m_bevs.end(); ++bevEntryItr) { + + for (std::vector::const_iterator bevEntryItr = m_bevs.begin(); bevEntryItr != m_bevs.end(); ++bevEntryItr) { if (m_enableSSL) { notifySslClose(*bevEntryItr); } @@ -603,7 +591,7 @@ void ClientImpl::close() { m_bevs.clear(); } -void ClientImpl::initiateAuthentication(struct bufferevent *bev, const std::string& hostname, unsigned short port) throw (LibEventException) { +void ClientImpl::initiateAuthentication(struct bufferevent *bev, const std::string& hostname, const unsigned short port) throw (LibEventException) { logMessage(ClientLogger::DEBUG, "ClientImpl::initiateAuthentication"); @@ -673,7 +661,7 @@ void ClientImpl::finalizeAuthentication(PendingConnection* pc) throw (Exception, // save connection information for the event m_contexts[bev] = - boost::shared_ptr(new CxnContext(pc->m_hostname, pc->m_port, hostId)); + boost::shared_ptr(new CxnContext(pc->m_hostname, pc->m_port, hostId, pc->m_autoReconnect)); boost::shared_ptr callbackMap(new CallbackMap()); m_callbacks[bev] = callbackMap; @@ -684,15 +672,10 @@ void ClientImpl::finalizeAuthentication(PendingConnection* pc) throw (Exception, voltdb::regularEventCallback, this); - { - boost::mutex::scoped_lock lock(m_pendingConnectionLock); - for (std::list::iterator i = m_pendingConnectionList.begin(); - i != m_pendingConnectionList.end(); - ++i) { + if (pc->m_autoReconnect) { + for (std::vector::iterator i = m_pendingConnectionList.begin(); i != m_pendingConnectionList.end(); ++i) { if (i->get() == pc) { m_pendingConnectionList.erase(i); - m_pendingConnectionSize.store(m_pendingConnectionList.size(), boost::memory_order_release); - pcRemoved = true; break; } } @@ -752,6 +735,36 @@ void ClientImpl::finalizeAuthentication(PendingConnection* pc) throw (Exception, protector.success(); } +static void reconnectCallback(evutil_socket_t fd, short events, void *clientData) { + ClientImpl* c = reinterpret_cast(clientData); + c->reconnectEventCallback(boost::shared_ptr()); +} + +static void pendingConnectionCallback(evutil_socket_t fd, short events, void *clientData) { + PendingConnectionSPtr pc = *reinterpret_cast(clientData); + pc->reconnectEventCallback(); +} + +void ClientImpl::reconnectEventCallback(const PendingConnectionSPtr& pc) { + if (pc) { + if (pc->m_autoReconnect) m_pendingConnectionList.push_back(pc); + else initiateConnection(pc); + } + + if (m_pendingConnectionList.empty()) return; + + const int64_t reconnect_deadline = get_sec_time() - RECONNECT_INTERVAL; + BOOST_FOREACH( PendingConnectionSPtr& pc, m_pendingConnectionList ) { + if (reconnect_deadline > pc->m_startPending){ + //pc->m_startPending = now; + initiateConnection(pc); + } + } + + struct timeval tv = {RECONNECT_INTERVAL,0}; + event_base_once(m_base, -1, EV_TIMEOUT, reconnectCallback, this, &tv); +} + void *timerThreadRun(void *ctx) { ClientImpl *client = reinterpret_cast(ctx); client->runTimeoutMonitor(); @@ -797,33 +810,17 @@ void ClientImpl::setUpTimeoutCheckerMonitor() throw (LibEventException){ startMonitorThread(); } -void ClientImpl::createConnection(const std::string& hostname, - const unsigned short port, - const bool keepConnecting) throw (Exception, - ConnectException, - LibEventException, - PipeCreationException, - TimerThreadException, - SSLException) { - - +void ClientImpl::createConnectionSync(const std::string& hostname, const unsigned short port, const bool autoReconnect) + throw (Exception, ConnectException, LibEventException, PipeCreationException, TimerThreadException, SSLException) { if (m_pLogger) { std::ostringstream os; os << "ClientImpl::createConnection" << " hostname:" << hostname << " port:" << port; m_pLogger->log(ClientLogger::INFO, os.str()); } - if (0 == pipe(m_wakeupPipe)) { - if (m_ev != NULL) { - event_free(m_ev); - } - m_ev = event_new(m_base, m_wakeupPipe[0], EV_READ|EV_PERSIST, wakeupPipeCallback, this); - event_add(m_ev, NULL); - } else { - m_wakeupPipe[1] = -1; - } + // we'll create a new instance later if autoReconnect == true + PendingConnectionSPtr pc(new PendingConnection(hostname, port, false, m_base, this)); - PendingConnectionSPtr pc(new PendingConnection(hostname, port, keepConnecting, m_base, this)); initiateConnection(pc); int dispatchStatus = event_base_dispatch(m_base); @@ -836,22 +833,21 @@ void ClientImpl::createConnection(const std::string& hostname, if (dispatchStatus == -1) { throw LibEventException("CreateConnection: Failed to run base loop"); } - if (pc->m_loginExchangeCompleted) { return; } } - if (keepConnecting) { + + if (autoReconnect) { if (pc->m_bufferEvent != NULL) { pc->cleanupBev(); } - createPendingConnection(hostname, port); + createConnection(hostname, port, autoReconnect, true); } else { // if no error has been reported for the connection, back off and listen if // any events were there to process before calling it no-connection - int retry = 0; - while (pc->m_status && retry < 5) { - ++retry; + int retry = 5; + while (pc->m_status && retry--) { dispatchStatus = event_base_dispatch(m_base); if (dispatchStatus == -1) { throw LibEventException("CreateConnection: Failed to run base loop"); @@ -868,46 +864,22 @@ void ClientImpl::createConnection(const std::string& hostname, } } -static void reconnectCallback(evutil_socket_t fd, short events, void *clientData) { - ClientImpl *self = reinterpret_cast(clientData); - self->reconnectEventCallback(); -} - -void ClientImpl::reconnectEventCallback() { - if (m_pendingConnectionSize.load(boost::memory_order_consume) <= 0) return; - - boost::mutex::scoped_lock lock(m_pendingConnectionLock); - const int64_t now = get_sec_time(); - BOOST_FOREACH( PendingConnectionSPtr& pc, m_pendingConnectionList ) { - if ((now - pc->m_startPending) > RECONNECT_INTERVAL) { - pc->m_startPending = now; - initiateConnection(pc); - } - } - - struct timeval tv; - tv.tv_sec = RECONNECT_INTERVAL; - tv.tv_usec = 0; - - event_base_once(m_base, -1, EV_TIMEOUT, reconnectCallback, this, &tv); -} - -void ClientImpl::createPendingConnection(const std::string &hostname, const unsigned short port, int64_t time) { - logMessage(ClientLogger::DEBUG, "ClientImpl::createPendingConnection"); +void ClientImpl::createConnection(const std::string &hostname, const unsigned short port, const bool autoReconnect, const bool defer) + throw (Exception, ConnectException, LibEventException, PipeCreationException, TimerThreadException, SSLException) { + std::stringstream ss; + ss << "ClientImpl::createConnection" << " hostname:" << hostname << " port:" << port; + logMessage(ClientLogger::INFO, ss.str()); - PendingConnectionSPtr pc(new PendingConnection(hostname, port, false, m_base, this)); - pc->m_startPending = time; - { - boost::mutex::scoped_lock lock(m_pendingConnectionLock); - m_pendingConnectionList.push_back(pc); - m_pendingConnectionSize.store(m_pendingConnectionList.size(), boost::memory_order_release); + if (!defer) { + createConnectionSync(hostname, port, autoReconnect); + return; } - struct timeval tv; - tv.tv_sec = (time > 0)? RECONNECT_INTERVAL : 0; - tv.tv_usec = 0; + PendingConnectionSPtr pc(new PendingConnection(hostname, port, autoReconnect, m_base, this)); + struct timeval tv = {RECONNECT_INTERVAL,0}; + pc->m_startPending = get_sec_time(); - event_base_once(m_base, -1, EV_TIMEOUT, reconnectCallback, this, &tv); + event_base_once(m_base, -1, EV_TIMEOUT, pendingConnectionCallback, &pc, &tv); } @@ -1232,8 +1204,8 @@ void ClientImpl::runOnce() throw (Exception, NoConnectionsException, LibEventExc logMessage(ClientLogger::DEBUG, "ClientImpl::runOnce"); - if (m_bevs.empty() && m_pendingConnectionSize.load(boost::memory_order_consume) <= 0) { - throw NoConnectionsException(); + if (m_bevs.empty() && m_pendingConnectionList.empty()) { + throw voltdb::NoConnectionsException(); } if (event_base_loop(m_base, EVLOOP_NONBLOCK) == -1) { @@ -1246,8 +1218,8 @@ void ClientImpl::run() throw (Exception, NoConnectionsException, LibEventExcepti logMessage(ClientLogger::DEBUG, "ClientImpl::run"); - if (m_bevs.empty() && m_pendingConnectionSize.load(boost::memory_order_consume) <= 0) { - throw NoConnectionsException(); + if (m_bevs.empty() && m_pendingConnectionList.empty()) { + throw voltdb::NoConnectionsException(); } if (event_base_dispatch(m_base) == -1) { throw LibEventException("run: Failed running event base loop"); @@ -1368,6 +1340,7 @@ void ClientImpl::regularEventCallback(struct bufferevent *bev, short events) { std::map >::iterator connectionCtxIter = m_contexts.find(bev); assert(connectionCtxIter != m_contexts.end()); + const boost::shared_ptr& ctx = connectionCtxIter->second; // First drain anything in the read buffer regularReadCallback(bev); @@ -1381,10 +1354,10 @@ void ClientImpl::regularEventCallback(struct bufferevent *bev, short events) { } //Notify client that a connection was lost - if (m_listener.get() != NULL) { + if (m_listener) { try { m_ignoreBackpressure = true; - breakEventLoop |= m_listener->connectionLost( connectionCtxIter->second->m_name, m_bevs.size() - 1); + breakEventLoop |= m_listener->connectionLost(ctx->m_name, m_bevs.size() - 1); m_ignoreBackpressure = false; } catch (const std::exception& e) { std::string msg(e.what()); @@ -1418,18 +1391,20 @@ void ClientImpl::regularEventCallback(struct bufferevent *bev, short events) { breakEventLoop = true; } - m_hostIdToEvent.erase(connectionCtxIter->second->m_hostId); + m_hostIdToEvent.erase(ctx->m_hostId); //remove the entry for the backpressured connection set m_backpressuredBevs.erase(bev); + if (ctx->m_autoReconnect) { + createConnection(ctx->m_name, ctx->m_port, true, true); + } + if (m_outstandingRequests < m_maxOutstandingRequests) { m_backPressuredForOutstandingRequests = false; } - createPendingConnection(connectionCtxIter->second->m_name, connectionCtxIter->second->m_port, get_sec_time()); - //Remove the connection context - m_contexts.erase(bev); + m_contexts.erase(connectionCtxIter); std::vector::iterator entry = std::find(m_bevs.begin(), m_bevs.end(), bev); if (entry != m_bevs.end()) { @@ -1442,12 +1417,12 @@ void ClientImpl::regularEventCallback(struct bufferevent *bev, short events) { m_instanceIdIsSet = false; } - if (breakEventLoop || (m_bevs.size() == 0)) { + if (breakEventLoop || m_bevs.empty()) { event_base_loopbreak( m_base ); } //update topology info and procedures info - if (m_useClientAffinity && (m_bevs.size() > 0)) { + if (m_useClientAffinity && !m_bevs.empty()) { updateHashinator(); } } diff --git a/test_src/ClientTest.cpp b/test_src/ClientTest.cpp index 5c91e09..3588a67 100644 --- a/test_src/ClientTest.cpp +++ b/test_src/ClientTest.cpp @@ -46,25 +46,25 @@ class DelegatingListener : public StatusListener { std::exception exception, boost::shared_ptr callback, InvocationResponse response) { - if (m_listener != NULL) { + if (m_listener) { return m_listener->uncaughtException(exception, callback, response); } return false; } virtual bool connectionLost(std::string hostname, int32_t connectionsLeft) { - if (m_listener != NULL) { + if (m_listener) { return m_listener->connectionLost(hostname, connectionsLeft); } return false; } virtual bool connectionActive(std::string hostname, int32_t connectionsActive) { - if (m_listener != NULL) { + if (m_listener) { return m_listener->connectionActive(hostname, connectionsActive); } return false; } virtual bool backpressure(bool hasBackpressure) { - if (m_listener != NULL) { + if (m_listener) { return m_listener->backpressure(hasBackpressure); } return false; @@ -94,15 +94,14 @@ CPPUNIT_TEST_SUITE_END(); public: void setUp() { - m_dlistener = new boost::shared_ptr(new DelegatingListener()); - ClientConfig config = ClientConfig("hello", "world", *m_dlistener); + m_dlistener = boost::shared_ptr(new DelegatingListener()); + ClientConfig config = ClientConfig("hello", "world", m_dlistener); m_voltdb.reset(new MockVoltDB(Client::create(config))); m_client = m_voltdb->client(); m_client->setClientAffinity(false); } void tearDown() { - delete m_dlistener; m_voltdb.reset(NULL); } @@ -274,7 +273,7 @@ CPPUNIT_TEST_SUITE_END(); return false; } } listener; - (*m_dlistener)->m_listener = &listener; + m_dlistener->m_listener = &listener; (m_client)->createConnection("localhost"); std::vector signature; @@ -303,7 +302,7 @@ CPPUNIT_TEST_SUITE_END(); void testPendingConnection() { m_voltdb.reset(0); - Client client = Client::create(ClientConfig("hello", "world", *m_dlistener)); + Client client = Client::create(ClientConfig("hello", "world", m_dlistener)); m_client = &client; m_client->createConnection("localhost", 21212, true); std::vector signature; @@ -347,7 +346,7 @@ CPPUNIT_TEST_SUITE_END(); return false; } } listener; - (*m_dlistener)->m_listener = &listener; + m_dlistener->m_listener = &listener; (m_client)->createConnection("localhost"); (m_client)->createConnection("localhost"); @@ -362,6 +361,8 @@ CPPUNIT_TEST_SUITE_END(); (m_client)->run(); (m_client)->runOnce(); + + m_dlistener->m_listener = NULL; } class BreakingSyncCallback : public ProcedureCallback { @@ -426,7 +427,7 @@ CPPUNIT_TEST_SUITE_END(); } } listener; - (*m_dlistener)->m_listener = &listener; + m_dlistener->m_listener = &listener; (m_client)->createConnection("localhost"); std::vector signature; @@ -443,6 +444,7 @@ CPPUNIT_TEST_SUITE_END(); (m_client)->invoke(proc, callback); (m_client)->run(); + m_dlistener->m_listener = NULL; } void testBackpressure() { @@ -470,7 +472,7 @@ CPPUNIT_TEST_SUITE_END(); return true; } } listener; - (*m_dlistener)->m_listener = &listener; + m_dlistener->m_listener = &listener; (m_client)->createConnection("localhost"); std::vector signature; @@ -482,6 +484,8 @@ CPPUNIT_TEST_SUITE_END(); (m_client)->invoke(proc, callback); (m_client)->runOnce(); } + + m_dlistener->m_listener = NULL; } class CountingCallback : public voltdb::ProcedureCallback { @@ -577,7 +581,7 @@ CPPUNIT_TEST_SUITE_END(); private: Client *m_client; boost::scoped_ptr m_voltdb; - boost::shared_ptr *m_dlistener; + boost::shared_ptr m_dlistener; }; CPPUNIT_TEST_SUITE_REGISTRATION( ClientTest ); }