diff --git a/cicada/Makefile b/cicada/Makefile index 09bd7849..345410fc 100644 --- a/cicada/Makefile +++ b/cicada/Makefile @@ -7,8 +7,9 @@ CICADA_ALLSRC = $(CICADA_SRCS1) $(wildcard include/*.hh) ADD_ANALYSIS=1 BACK_OFF=1 -INLINE_VERSION_OPT=1 -INLINE_VERSION_PROMOTION=1 +INLINE_VERSION_OPT=0 +INLINE_VERSION_PROMOTION=0 +NO_SPINWAIT=1 MASSTREE_USE=1 PARTITION_TABLE=0 REUSE_VERSION=1 @@ -29,6 +30,7 @@ CFLAGS = -c -pipe -g -O3 -std=c++17 -march=native \ -DBACK_OFF=$(BACK_OFF) \ -DINLINE_VERSION_PROMOTION=$(INLINE_VERSION_PROMOTION) \ -DINLINE_VERSION_OPT=$(INLINE_VERSION_OPT) \ + -DNO_SPINWAIT=$(NO_SPINWAIT) \ -DMASSTREE_USE=$(MASSTREE_USE) \ -DPARTITION_TABLE=$(PARTITION_TABLE) \ -DREUSE_VERSION=$(REUSE_VERSION) \ diff --git a/cicada/include/cicada_op_element.hh b/cicada/include/cicada_op_element.hh index f0d5d209..c64e17e1 100644 --- a/cicada/include/cicada_op_element.hh +++ b/cicada/include/cicada_op_element.hh @@ -8,11 +8,13 @@ class ReadElement : public OpElement { using OpElement::OpElement; Version *later_ver_, *ver_; + bool rmw_; - ReadElement(uint64_t key, T *rcdptr, Version *later_ver, Version *ver) + ReadElement(uint64_t key, T *rcdptr, Version *later_ver, Version *ver, bool rmw) : OpElement::OpElement(key, rcdptr) { later_ver_ = later_ver; ver_ = ver; + rmw_ = rmw; } bool operator<(const ReadElement &right) const { diff --git a/cicada/include/time_stamp.hh b/cicada/include/time_stamp.hh index 3ac41ed2..89fcacab 100644 --- a/cicada/include/time_stamp.hh +++ b/cicada/include/time_stamp.hh @@ -21,7 +21,11 @@ class TimeStamp { inline void generateTimeStampFirst(uint8_t tid) { localClock_ = rdtscp(); +#if NO_SPINWAIT + ts_ = ((localClock_ << (sizeof(tid) * 8)) & ~(1ULL<<63))| tid; +#else ts_ = (localClock_ << (sizeof(tid) * 8)) | tid; +#endif thid_ = tid; } @@ -34,6 +38,10 @@ class TimeStamp { localClock_ += elapsedTime; localClock_ += clockBoost_; +#if NO_SPINWAIT + ts_ = ((localClock_ << (sizeof(tid) * 8)) & ~(1ULL<<63))| tid; +#else ts_ = (localClock_ << (sizeof(tid) * 8)) | tid; +#endif } }; diff --git a/cicada/include/transaction.hh b/cicada/include/transaction.hh index 13fd0f9d..cfde6683 100644 --- a/cicada/include/transaction.hh +++ b/cicada/include/transaction.hh @@ -22,6 +22,8 @@ #define CONTINUING_COMMIT_THRESHOLD 5 +#define RTS_NOT_EXTENDABLE (1ULL<<63) + enum class TransactionStatus : uint8_t { invalid, inflight, @@ -159,7 +161,7 @@ class TxExecutor { twrite(key); if ((*pro_set_.begin()).ronly_) { (*pro_set_.begin()).ronly_ = false; - read_set_.emplace_back(key, tuple, later_ver, ver); + read_set_.emplace_back(key, tuple, later_ver, ver, false); } } } @@ -255,6 +257,34 @@ class TxExecutor { } } + bool readTimestampUpdateWithValidation() { + uint64_t expected, new_rts; + for (auto itr = read_set_.begin(); itr != read_set_.end(); ++itr) { + expected = (*itr).ver_->ldAcqRts(); + if ((*itr).rmw_) { + for (;;) { + if (expected & RTS_NOT_EXTENDABLE || + (expected & ~RTS_NOT_EXTENDABLE) > this->wts_.ts_) return false; + new_rts = this->wts_.ts_ | RTS_NOT_EXTENDABLE; + if ((*itr).ver_->rts_.compare_exchange_strong(expected, new_rts, + memory_order_acq_rel, + memory_order_acquire)) + break; + } + } else { + for (;;) { + if ((expected & ~RTS_NOT_EXTENDABLE) > this->wts_.ts_) break; + if (expected & RTS_NOT_EXTENDABLE) return false; + if ((*itr).ver_->rts_.compare_exchange_strong(expected, this->wts_.ts_, + memory_order_acq_rel, + memory_order_acquire)) + break; + } + } + } + return true; + } + /** * @brief Search xxx set * @detail Search element of local set corresponding to given key. diff --git a/cicada/transaction.cc b/cicada/transaction.cc index 01343e19..0cc44770 100644 --- a/cicada/transaction.cc +++ b/cicada/transaction.cc @@ -134,6 +134,25 @@ void TxExecutor::tread(const uint64_t key) { later_ver = ver; ver = ver->ldAcqNext(); } +#if NO_SPINWAIT + Version *next_ver; + while (ver->status_.load(memory_order_acquire) != VersionStatus::committed) { + next_ver = ver->ldAcqNext(); + /* + * TODO: Is this kind of effort effective? + if (next_ver->status_.load(memory_order_acquire) == VersionStatus::committed && + next_ver->ldAcqRts() & RTS_NOT_EXTENDABLE) { + while (ver->status_.load(memory_order_acquire) == VersionStatus::pending) { + } + if (ver->status_.load(memory_order_acquire) == VersionStatus::aborted) { + ver = next_ver; + break; + } + } + */ + ver = next_ver; + } +#else while (ver->status_.load(memory_order_acquire) != VersionStatus::committed) { /** * Wait for the result of the pending version in the view. @@ -144,6 +163,7 @@ void TxExecutor::tread(const uint64_t key) { ver = ver->ldAcqNext(); } } +#endif #endif /** @@ -155,7 +175,7 @@ void TxExecutor::tread(const uint64_t key) { * If read-only tx, not track or validate read_set_ */ if ((*this->pro_set_.begin()).ronly_ == false) { - read_set_.emplace_back(key, tuple, later_ver, ver); + read_set_.emplace_back(key, tuple, later_ver, ver, false); } #if INLINE_VERSION_OPT @@ -174,7 +194,9 @@ void TxExecutor::tread(const uint64_t key) { cres_->local_read_latency_ += rdtscp() - start; #endif +#if INLINE_VERSION_PROMOTION END_TREAD: +#endif return; } @@ -267,6 +289,7 @@ void TxExecutor::twrite(const uint64_t key) { /** * Constraint from new to old. + * TODO: Is this only necessary for RMW case, isn't it? */ if ((ver->ldAcqRts() > this->wts_.ts_) && (ver->ldAcqStatus() == VersionStatus::committed)) { @@ -277,6 +300,16 @@ void TxExecutor::twrite(const uint64_t key) { goto FINISH_TWRITE; } +#if NO_SPINWAIT + if (re) { + re->rmw_ = true; + } else { + /* + * TODO: Psedo RMW can be relaxed more. + */ + read_set_.emplace_back(key, tuple, later_ver, ver, true); + } +#endif Version *new_ver; new_ver = newVersionGeneration(tuple); write_set_.emplace_back(key, tuple, later_ver, new_ver, rmw); @@ -353,6 +386,14 @@ bool TxExecutor::validation() { (*itr).finish_version_install_ = true; } +#if NO_SPINWAIT + /** + * Read timestamp update with validation + */ + result = readTimestampUpdateWithValidation(); + if (!result) + goto FINISH_VALIDATION; +#else /** * Read timestamp update */ @@ -410,6 +451,7 @@ bool TxExecutor::validation() { goto FINISH_VALIDATION; } } +#endif FINISH_VALIDATION: #if ADD_ANALYSIS @@ -566,6 +608,22 @@ void TxExecutor::earlyAbort() { * @return void */ void TxExecutor::abort() { +#if NO_SPINWAIT + // Roll back RTS_NOT_EXTENDABLE flags + for (auto itr = read_set_.begin(); itr != read_set_.end(); ++itr) { + if ((*itr).rmw_) { + uint64_t expected, new_rts; + expected = (*itr).ver_->ldAcqRts(); + for (;;) { + new_rts = expected & ~RTS_NOT_EXTENDABLE; + if ((*itr).ver_->rts_.compare_exchange_strong(expected, new_rts, + memory_order_acq_rel, + memory_order_acquire)) + break; + } + } + } +#endif writeSetClean(); read_set_.clear();