From 0425c248f5f649510cee97facb09cee5f545dd34 Mon Sep 17 00:00:00 2001 From: Alex Revetchi Date: Wed, 25 Oct 2017 14:51:27 +0100 Subject: [PATCH] - change callback to store extra invokation info - store invoke routing and stored proc info in the callback, so it can be used later by the caller, for logging and other household --- include/ClientImpl.h | 2 +- include/ProcedureCallback.hpp | 62 ++++++++++++++++++++++++++++++--- makefile | 2 +- src/ClientImpl.cpp | 64 +++++++++++++++++++---------------- 4 files changed, 94 insertions(+), 36 deletions(-) diff --git a/include/ClientImpl.h b/include/ClientImpl.h index b9fd888..59d47b3 100644 --- a/include/ClientImpl.h +++ b/include/ClientImpl.h @@ -151,7 +151,7 @@ class ClientImpl { /* * Get the buffered event based on transaction routing algorithm */ - struct bufferevent *routeProcedure(Procedure &proc, ScopedByteBuffer &sbb); + struct bufferevent *routeProcedure(Procedure &proc, ScopedByteBuffer &sbb, boost::shared_ptr callback); /* * Initiate connection based on pending connection instance diff --git a/include/ProcedureCallback.hpp b/include/ProcedureCallback.hpp index 097b686..9115f1b 100644 --- a/include/ProcedureCallback.hpp +++ b/include/ProcedureCallback.hpp @@ -36,6 +36,58 @@ class ProcedureCallback { enum AbandonReason { NOT_ABANDONED, TOO_BUSY }; + typedef struct { + std::string procName; + std::string hostName; + int hostId; + int partition; + bool readonly; + bool multipart; + } InvokeInfo; + + ProcedureCallback(): + m_reason(NOT_ABANDONED), m_allowAbandon(true), m_info({"", "", ~0, ~0, false, false}){ + } + + virtual void abandon(AbandonReason reason) { + m_reason = reason; + } + + // Mechanism for procedure to over-ride abandon property set in client in event of backpressure. + // @return true: allow abandoning of requests in case of back pressure + // false: don't abandon the requests in back pressure scenario. + bool allowAbandon() const { + return m_allowAbandon; + } + + + void invokeProcName(const std::string& n) { + m_info.procName = n; + } + void invokeHostName(const std::string& h) { + m_info.hostName = h; + } + + void invokeHostId(const int id) { + m_info.hostId = id; + } + + void invokePartition(const int p) { + m_info.partition = p; + } + + void invokeReadonly(const bool r) { + m_info.readonly = r; + } + + void invokeMultipart(const bool m) { + m_info.multipart = m; + } + + const InvokeInfo& invokeInfo() const { + return m_info; + } + /* * Invoked when a response to an invocation is available or * the connection to the node the invocation was sent to was lost. @@ -46,12 +98,12 @@ class ProcedureCallback { * @return true if the event loop should break after invoking this callback, false otherwise */ virtual bool callback(InvocationResponse response) throw (voltdb::Exception) = 0; - virtual void abandon(AbandonReason reason) {} - // Mechanism for procedure to over-ride abandon property set in client in event of backpressure. - // @return true: allow abandoning of requests in case of back pressure - // false: don't abandon the requests in back pressure scenario. - virtual bool allowAbandon() const {return true;} virtual ~ProcedureCallback() {} + +protected: + AbandonReason m_reason; + bool m_allowAbandon; + InvokeInfo m_info; }; } diff --git a/makefile b/makefile index 9e4a694..17ad535 100644 --- a/makefile +++ b/makefile @@ -14,7 +14,7 @@ BOOST_LIBS=/usr/local/lib LIB_NAME=libvoltdbcpp KIT_NAME=voltdb-client-cpp-x86_64-7.1 -CFLAGS=-I$(BOOST_INCLUDES) -Iinclude -D__STDC_CONSTANT_MACROS -D__STDC_LIMIT_MACROS -g3 ${OPTIMIZATION} -fPIC +CFLAGS=-I$(BOOST_INCLUDES) -Iinclude -D__STDC_CONSTANT_MACROS -D__STDC_LIMIT_MACROS -g3 ${OPTIMIZATION} -fPIC -fpermissive PLATFORM = $(shell uname) ifeq ($(PLATFORM),Darwin) diff --git a/src/ClientImpl.cpp b/src/ClientImpl.cpp index 18e2f02..5db4546 100644 --- a/src/ClientImpl.cpp +++ b/src/ClientImpl.cpp @@ -925,8 +925,6 @@ class SyncCallback : public ProcedureCallback { return true; } - void abandon(AbandonReason reason) {} - private: InvocationResponse *m_responseOut; }; @@ -1009,17 +1007,16 @@ InvocationResponse ClientImpl::invoke(Procedure &proc) throw (Exception, NoConne class DummyCallback : public ProcedureCallback { public: ProcedureCallback *m_callback; - DummyCallback(ProcedureCallback *callback) : m_callback(callback) {} + DummyCallback(ProcedureCallback *callback) : m_callback(callback) { + m_allowAbandon = m_callback->allowAbandon(); + } bool callback(InvocationResponse response) throw (Exception) { return m_callback->callback(response); } - void abandon(AbandonReason reason) {} - - bool allowAbandon() const { - return m_callback->allowAbandon(); + void abandon(AbandonReason reason) { + m_callback->abandon(reason); } - }; void ClientImpl::invoke(Procedure &proc, ProcedureCallback *callback) throw (Exception, NoConnectionsException, UninitializedParamsException, LibEventException, ElasticModeMismatchException) { @@ -1032,7 +1029,7 @@ bool ClientImpl::isReadOnly(const Procedure &proc) { return (procInfo != NULL && procInfo->m_readOnly); } -struct bufferevent *ClientImpl::routeProcedure(Procedure &proc, ScopedByteBuffer &sbb){ +struct bufferevent *ClientImpl::routeProcedure(Procedure &proc, ScopedByteBuffer &sbb, boost::shared_ptr callback){ ProcedureInfo *procInfo = m_distributer.getProcedure(proc.getName()); //route transaction to correct event if procedure is found, transaction is single partitioned @@ -1040,6 +1037,7 @@ struct bufferevent *ClientImpl::routeProcedure(Procedure &proc, ScopedByteBuffer if (procInfo && !procInfo->m_multiPart){ const int hashedPartition = m_distributer.getHashedPartitionForParameter(sbb, procInfo->m_partitionParameter); if (hashedPartition >= 0) { + callback->invokePartition(hashedPartition); hostId = m_distributer.getHostIdByPartitionId(hashedPartition); } } @@ -1047,11 +1045,17 @@ struct bufferevent *ClientImpl::routeProcedure(Procedure &proc, ScopedByteBuffer { //use MIP partition instead hostId = m_distributer.getHostIdByPartitionId(Distributer::MP_INIT_PID); + callback->invokeMultipart(true); } if (hostId >= 0) { std::map::iterator bevEntry = m_hostIdToEvent.find(hostId); if (bevEntry != m_hostIdToEvent.end()) { - return bevEntry->second; + //aici + bufferevent* bev = bevEntry->second; + const boost::shared_ptr ctx = m_contexts[bev]; + callback->invokeHostName(ctx->m_name); + callback->invokeHostId(ctx->m_hostId); + return bev; } } return NULL; @@ -1187,24 +1191,27 @@ void ClientImpl::invoke(Procedure &proc, boost::shared_ptr ca } bool procReadOnly = false; - struct bufferevent *routed_bev = NULL; //route transaction to correct event if client affinity is enabled and hashinator updating is not in progress //elastic scalability is disabled if (m_useClientAffinity && !m_distributer.isUpdating()) { - routed_bev = routeProcedure(proc, sbb); + struct bufferevent* routed_bev = routeProcedure(proc, sbb, callback); // Check if the routed_bev is valid and has not been removed due to lost connection if ((routed_bev != NULL) && (m_callbacks.find(routed_bev) != m_callbacks.end())) { bev = routed_bev; - } + } else { + // routing has failed, set invokation info from the roudrobin bev + const boost::shared_ptr ctx = m_contexts[bev]; + callback->invokeHostName(ctx->m_name); + callback->invokeHostId(ctx->m_hostId); + } - if (isReadOnly(proc)) { - procReadOnly = true; - } + procReadOnly = isReadOnly(proc); + callback->invokeReadonly(procReadOnly); } - CallBackBookeeping *cbPtr = new CallBackBookeeping(callback, expirationTime, procReadOnly); - assert (cbPtr != NULL); - boost::shared_ptr cb (cbPtr); + callback->invokeProcName(proc.getName()); + boost::shared_ptr cb(new CallBackBookeeping(callback, expirationTime, procReadOnly)); + assert (cb); BEVToCallbackMap::iterator bevFromCBMap = m_callbacks.find(bev); if ( bevFromCBMap == m_callbacks.end()) { @@ -1224,8 +1231,6 @@ void ClientImpl::invoke(Procedure &proc, boost::shared_ptr ca if (evbuffer_get_length(evbuf) > 262144) { m_backpressuredBevs.insert(bev); } - - return; } void ClientImpl::runOnce() throw (Exception, NoConnectionsException, LibEventException) { @@ -1507,7 +1512,9 @@ bool ClientImpl::drain() throw (Exception, NoConnectionsException, LibEventExcep class TopoUpdateCallback : public ProcedureCallback { public: - TopoUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {} + TopoUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) { + m_allowAbandon = false; + } bool callback(InvocationResponse response) throw (Exception) { @@ -1526,8 +1533,6 @@ class TopoUpdateCallback : public ProcedureCallback return true; } - bool allowAbandon() const {return false;} - private: Distributer *m_dist; ClientLogger *m_logger; @@ -1536,7 +1541,9 @@ class TopoUpdateCallback : public ProcedureCallback class SubscribeCallback : public ProcedureCallback { public: - SubscribeCallback(ClientLogger *logger) : m_logger(logger) {} + SubscribeCallback(ClientLogger *logger) : m_logger(logger) { + m_allowAbandon = false; + } bool callback(InvocationResponse response) throw (Exception) { @@ -1553,7 +1560,6 @@ class SubscribeCallback : public ProcedureCallback return true; } - bool allowAbandon() const {return false;} private: ClientLogger *m_logger; }; @@ -1564,7 +1570,9 @@ class SubscribeCallback : public ProcedureCallback class ProcUpdateCallback : public ProcedureCallback { public: - ProcUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) {} + ProcUpdateCallback(Distributer *dist, ClientLogger *logger) : m_dist(dist), m_logger(logger) { + m_allowAbandon = false; + } bool callback(InvocationResponse response) throw (Exception) { @@ -1584,8 +1592,6 @@ class ProcUpdateCallback : public ProcedureCallback return true; } - bool allowAbandon() const {return false;} - private: Distributer *m_dist; ClientLogger *m_logger;