From d97c3514211af01f4bc0261b78363346373bf649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Micha=C3=ABl=20Celerier?= Date: Fri, 19 Aug 2022 11:31:06 +0200 Subject: [PATCH 1/3] Replace BLOCK_SIZE with CQ_BLOCK_SIZE to avoid conflicts with --- blockingconcurrentqueue.h | 4 +- concurrentqueue.h | 158 +++++++++++++++++----------------- tests/unittests/unittests.cpp | 10 +-- 3 files changed, 86 insertions(+), 86 deletions(-) diff --git a/blockingconcurrentqueue.h b/blockingconcurrentqueue.h index 205a4db7..f58511a4 100644 --- a/blockingconcurrentqueue.h +++ b/blockingconcurrentqueue.h @@ -36,7 +36,7 @@ class BlockingConcurrentQueue typedef typename ConcurrentQueue::size_t size_t; typedef typename std::make_signed::type ssize_t; - static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE; + static const size_t CQ_BLOCK_SIZE = ConcurrentQueue::CQ_BLOCK_SIZE; static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD; static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE; static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE; @@ -55,7 +55,7 @@ class BlockingConcurrentQueue // queue is fully constructed before it starts being used by other threads (this // includes making the memory effects of construction visible, possibly with a // memory barrier). - explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE) + explicit BlockingConcurrentQueue(size_t capacity = 6 * CQ_BLOCK_SIZE) : inner(capacity), sema(create(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy) { assert(reinterpret_cast((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member"); diff --git a/concurrentqueue.h b/concurrentqueue.h index 99caefc0..9f866bf9 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -340,7 +340,7 @@ struct ConcurrentQueueDefaultTraits // but many producers, a smaller block size should be favoured. For few producers // and/or many elements, a larger block size is preferred. A sane default // is provided. Must be a power of 2. - static const size_t BLOCK_SIZE = 32; + static const size_t CQ_BLOCK_SIZE = 32; // For explicit producers (i.e. when using a producer token), the block is // checked for being empty by iterating through a list of flags, one per element. @@ -773,7 +773,7 @@ class ConcurrentQueue typedef typename Traits::index_t index_t; typedef typename Traits::size_t size_t; - static const size_t BLOCK_SIZE = static_cast(Traits::BLOCK_SIZE); + static const size_t CQ_BLOCK_SIZE = static_cast(Traits::CQ_BLOCK_SIZE); static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD); static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::EXPLICIT_INITIAL_INDEX_SIZE); static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::IMPLICIT_INITIAL_INDEX_SIZE); @@ -784,7 +784,7 @@ class ConcurrentQueue #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) #pragma warning(disable: 4309) // static_cast: Truncation of constant value #endif - static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max::value - static_cast(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max::value : ((static_cast(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE); + static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max::value - static_cast(Traits::MAX_SUBQUEUE_SIZE) < CQ_BLOCK_SIZE) ? details::const_numeric_max::value : ((static_cast(Traits::MAX_SUBQUEUE_SIZE) + (CQ_BLOCK_SIZE - 1)) / CQ_BLOCK_SIZE * CQ_BLOCK_SIZE); #ifdef _MSC_VER #pragma warning(pop) #endif @@ -792,7 +792,7 @@ class ConcurrentQueue static_assert(!std::numeric_limits::is_signed && std::is_integral::value, "Traits::size_t must be an unsigned integral type"); static_assert(!std::numeric_limits::is_signed && std::is_integral::value, "Traits::index_t must be an unsigned integral type"); static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t"); - static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)"); + static_assert((CQ_BLOCK_SIZE > 1) && !(CQ_BLOCK_SIZE & (CQ_BLOCK_SIZE - 1)), "Traits::CQ_BLOCK_SIZE must be a power of 2 (and at least 2)"); static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)"); static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); @@ -810,7 +810,7 @@ class ConcurrentQueue // queue is fully constructed before it starts being used by other threads (this // includes making the memory effects of construction visible, possibly with a // memory barrier). - explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE) + explicit ConcurrentQueue(size_t capacity = 32 * CQ_BLOCK_SIZE) : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), @@ -819,7 +819,7 @@ class ConcurrentQueue { implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); populate_initial_implicit_producer_hash(); - populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1)); + populate_initial_block_list(capacity / CQ_BLOCK_SIZE + ((capacity & (CQ_BLOCK_SIZE - 1)) == 0 ? 0 : 1)); #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG // Track all the producers using a fully-resolved typed list for @@ -843,7 +843,7 @@ class ConcurrentQueue { implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); populate_initial_implicit_producer_hash(); - size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers); + size_t blocks = (((minCapacity + CQ_BLOCK_SIZE - 1) / CQ_BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers); populate_initial_block_list(blocks); #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG @@ -1569,9 +1569,9 @@ class ConcurrentQueue template inline bool is_empty() const { - MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { + MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && CQ_BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Check flags - for (size_t i = 0; i < BLOCK_SIZE; ++i) { + for (size_t i = 0; i < CQ_BLOCK_SIZE; ++i) { if (!emptyFlags[i].load(std::memory_order_relaxed)) { return false; } @@ -1583,11 +1583,11 @@ class ConcurrentQueue } else { // Check counter - if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) { + if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == CQ_BLOCK_SIZE) { std::atomic_thread_fence(std::memory_order_acquire); return true; } - assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE); + assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= CQ_BLOCK_SIZE); return false; } } @@ -1596,17 +1596,17 @@ class ConcurrentQueue template inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i) { - MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { + MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && CQ_BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Set flag - assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1))].load(std::memory_order_relaxed)); - emptyFlags[BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1))].store(true, std::memory_order_release); + assert(!emptyFlags[CQ_BLOCK_SIZE - 1 - static_cast(i & static_cast(CQ_BLOCK_SIZE - 1))].load(std::memory_order_relaxed)); + emptyFlags[CQ_BLOCK_SIZE - 1 - static_cast(i & static_cast(CQ_BLOCK_SIZE - 1))].store(true, std::memory_order_release); return false; } else { // Increment counter auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release); - assert(prevVal < BLOCK_SIZE); - return prevVal == BLOCK_SIZE - 1; + assert(prevVal < CQ_BLOCK_SIZE); + return prevVal == CQ_BLOCK_SIZE - 1; } } @@ -1615,10 +1615,10 @@ class ConcurrentQueue template inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count) { - MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { + MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && CQ_BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Set flags std::atomic_thread_fence(std::memory_order_release); - i = BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1)) - count + 1; + i = CQ_BLOCK_SIZE - 1 - static_cast(i & static_cast(CQ_BLOCK_SIZE - 1)) - count + 1; for (size_t j = 0; j != count; ++j) { assert(!emptyFlags[i + j].load(std::memory_order_relaxed)); emptyFlags[i + j].store(true, std::memory_order_relaxed); @@ -1628,32 +1628,32 @@ class ConcurrentQueue else { // Increment counter auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release); - assert(prevVal + count <= BLOCK_SIZE); - return prevVal + count == BLOCK_SIZE; + assert(prevVal + count <= CQ_BLOCK_SIZE); + return prevVal + count == CQ_BLOCK_SIZE; } } template inline void set_all_empty() { - MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { + MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && CQ_BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Set all flags - for (size_t i = 0; i != BLOCK_SIZE; ++i) { + for (size_t i = 0; i != CQ_BLOCK_SIZE; ++i) { emptyFlags[i].store(true, std::memory_order_relaxed); } } else { // Reset counter - elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed); + elementsCompletelyDequeued.store(CQ_BLOCK_SIZE, std::memory_order_relaxed); } } template inline void reset_empty() { - MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { + MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && CQ_BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Reset flags - for (size_t i = 0; i != BLOCK_SIZE; ++i) { + for (size_t i = 0; i != CQ_BLOCK_SIZE; ++i) { emptyFlags[i].store(false, std::memory_order_relaxed); } } @@ -1663,16 +1663,16 @@ class ConcurrentQueue } } - inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(BLOCK_SIZE - 1)); } - inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(BLOCK_SIZE - 1)); } + inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(CQ_BLOCK_SIZE - 1)); } + inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(CQ_BLOCK_SIZE - 1)); } private: static_assert(std::alignment_of::value <= sizeof(T), "The queue does not support types with an alignment greater than their size at this time"); - MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T) elements; + MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * CQ_BLOCK_SIZE], T) elements; public: Block* next; std::atomic elementsCompletelyDequeued; - std::atomic emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1]; + std::atomic emptyFlags[CQ_BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? CQ_BLOCK_SIZE : 1]; public: std::atomic freeListRefs; std::atomic freeListNext; @@ -1793,11 +1793,11 @@ class ConcurrentQueue if (this->tailBlock != nullptr) { // Note this means there must be a block index too // First find the block that's partially dequeued, if any Block* halfDequeuedBlock = nullptr; - if ((this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) != 0) { + if ((this->headIndex.load(std::memory_order_relaxed) & static_cast(CQ_BLOCK_SIZE - 1)) != 0) { // The head's not on a block boundary, meaning a block somewhere is partially dequeued // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary) size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1); - while (details::circular_less_than(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) { + while (details::circular_less_than(pr_blockIndexEntries[i].base + CQ_BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) { i = (i + 1) & (pr_blockIndexSize - 1); } assert(details::circular_less_than(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed))); @@ -1814,12 +1814,12 @@ class ConcurrentQueue size_t i = 0; // Offset into block if (block == halfDequeuedBlock) { - i = static_cast(this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)); + i = static_cast(this->headIndex.load(std::memory_order_relaxed) & static_cast(CQ_BLOCK_SIZE - 1)); } // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index - auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast(this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)); - while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { + auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast(CQ_BLOCK_SIZE - 1)) == 0 ? CQ_BLOCK_SIZE : static_cast(this->tailIndex.load(std::memory_order_relaxed) & static_cast(CQ_BLOCK_SIZE - 1)); + while (i != CQ_BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { (*block)[i++]->~T(); } } while (block != this->tailBlock); @@ -1850,7 +1850,7 @@ class ConcurrentQueue { index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); index_t newTailIndex = 1 + currentTailIndex; - if ((currentTailIndex & static_cast(BLOCK_SIZE - 1)) == 0) { + if ((currentTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) == 0) { // We reached the end of a block, start a new one auto startBlock = this->tailBlock; auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed; @@ -1871,8 +1871,8 @@ class ConcurrentQueue // <= to it. auto head = this->headIndex.load(std::memory_order_relaxed); assert(!details::circular_less_than(currentTailIndex, head)); - if (!details::circular_less_than(head, currentTailIndex + BLOCK_SIZE) - || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) { + if (!details::circular_less_than(head, currentTailIndex + CQ_BLOCK_SIZE) + || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - CQ_BLOCK_SIZE < currentTailIndex - head))) { // We can't enqueue in another block because there's not enough leeway -- the // tail could surpass the head by the time the block fills up! (Or we'll exceed // the size limit, if the second part of the condition was true.) @@ -2012,8 +2012,8 @@ class ConcurrentQueue // When an index wraps, we need to preserve the sign of the offset when dividing it by the // block size (in order to get a correct signed block count offset in all cases): auto headBase = localBlockIndex->entries[localBlockIndexHead].base; - auto blockBaseIndex = index & ~static_cast(BLOCK_SIZE - 1); - auto offset = static_cast(static_cast::type>(blockBaseIndex - headBase) / static_cast::type>(BLOCK_SIZE)); + auto blockBaseIndex = index & ~static_cast(CQ_BLOCK_SIZE - 1); + auto offset = static_cast(static_cast::type>(blockBaseIndex - headBase) / static_cast::type>(CQ_BLOCK_SIZE)); auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block; // Dequeue @@ -2065,13 +2065,13 @@ class ConcurrentQueue Block* firstAllocatedBlock = nullptr; // Figure out how many blocks we'll need to allocate, and do so - size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast(BLOCK_SIZE - 1)); - index_t currentTailIndex = (startTailIndex - 1) & ~static_cast(BLOCK_SIZE - 1); + size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast(CQ_BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast(CQ_BLOCK_SIZE - 1)); + index_t currentTailIndex = (startTailIndex - 1) & ~static_cast(CQ_BLOCK_SIZE - 1); if (blockBaseDiff > 0) { // Allocate as many blocks as possible from ahead while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty()) { - blockBaseDiff -= static_cast(BLOCK_SIZE); - currentTailIndex += static_cast(BLOCK_SIZE); + blockBaseDiff -= static_cast(CQ_BLOCK_SIZE); + currentTailIndex += static_cast(CQ_BLOCK_SIZE); this->tailBlock = this->tailBlock->next; firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock; @@ -2084,12 +2084,12 @@ class ConcurrentQueue // Now allocate as many blocks as necessary from the block pool while (blockBaseDiff > 0) { - blockBaseDiff -= static_cast(BLOCK_SIZE); - currentTailIndex += static_cast(BLOCK_SIZE); + blockBaseDiff -= static_cast(CQ_BLOCK_SIZE); + currentTailIndex += static_cast(CQ_BLOCK_SIZE); auto head = this->headIndex.load(std::memory_order_relaxed); assert(!details::circular_less_than(currentTailIndex, head)); - bool full = !details::circular_less_than(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head)); + bool full = !details::circular_less_than(head, currentTailIndex + CQ_BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - CQ_BLOCK_SIZE < currentTailIndex - head)); if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) { MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) { // Failed to allocate, undo changes (but keep injected blocks) @@ -2164,12 +2164,12 @@ class ConcurrentQueue currentTailIndex = startTailIndex; auto endBlock = this->tailBlock; this->tailBlock = startBlock; - assert((startTailIndex & static_cast(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0); - if ((startTailIndex & static_cast(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) { + assert((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0); + if ((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) { this->tailBlock = firstAllocatedBlock; } while (true) { - index_t stopIndex = (currentTailIndex & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + index_t stopIndex = (currentTailIndex & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); if (details::circular_less_than(newTailIndex, stopIndex)) { stopIndex = newTailIndex; } @@ -2206,12 +2206,12 @@ class ConcurrentQueue if (!details::is_trivially_destructible::value) { auto block = startBlock; - if ((startTailIndex & static_cast(BLOCK_SIZE - 1)) == 0) { + if ((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) == 0) { block = firstAllocatedBlock; } currentTailIndex = startTailIndex; while (true) { - stopIndex = (currentTailIndex & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + stopIndex = (currentTailIndex & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); if (details::circular_less_than(constructedStopIndex, stopIndex)) { stopIndex = constructedStopIndex; } @@ -2273,15 +2273,15 @@ class ConcurrentQueue auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); auto headBase = localBlockIndex->entries[localBlockIndexHead].base; - auto firstBlockBaseIndex = firstIndex & ~static_cast(BLOCK_SIZE - 1); - auto offset = static_cast(static_cast::type>(firstBlockBaseIndex - headBase) / static_cast::type>(BLOCK_SIZE)); + auto firstBlockBaseIndex = firstIndex & ~static_cast(CQ_BLOCK_SIZE - 1); + auto offset = static_cast(static_cast::type>(firstBlockBaseIndex - headBase) / static_cast::type>(CQ_BLOCK_SIZE)); auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1); // Iterate the blocks and dequeue auto index = firstIndex; do { auto firstIndexInBlock = index; - index_t endIndex = (index & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + index_t endIndex = (index & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); endIndex = details::circular_less_than(firstIndex + static_cast(actualCount), endIndex) ? firstIndex + static_cast(actualCount) : endIndex; auto block = localBlockIndex->entries[indexIndex].block; if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) { @@ -2315,7 +2315,7 @@ class ConcurrentQueue indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); firstIndexInBlock = index; - endIndex = (index & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + endIndex = (index & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); endIndex = details::circular_less_than(firstIndex + static_cast(actualCount), endIndex) ? firstIndex + static_cast(actualCount) : endIndex; } while (index != firstIndex + actualCount); @@ -2449,7 +2449,7 @@ class ConcurrentQueue assert(index == tail || details::circular_less_than(index, tail)); bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed while (index != tail) { - if ((index & static_cast(BLOCK_SIZE - 1)) == 0 || block == nullptr) { + if ((index & static_cast(CQ_BLOCK_SIZE - 1)) == 0 || block == nullptr) { if (block != nullptr) { // Free the old block this->parent->add_block_to_free_list(block); @@ -2464,7 +2464,7 @@ class ConcurrentQueue // Even if the queue is empty, there's still one block that's not on the free list // (unless the head index reached the end of it, in which case the tail will be poised // to create a new block). - if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast(BLOCK_SIZE - 1)) != 0)) { + if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast(CQ_BLOCK_SIZE - 1)) != 0)) { this->parent->add_block_to_free_list(this->tailBlock); } @@ -2488,11 +2488,11 @@ class ConcurrentQueue { index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); index_t newTailIndex = 1 + currentTailIndex; - if ((currentTailIndex & static_cast(BLOCK_SIZE - 1)) == 0) { + if ((currentTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) == 0) { // We reached the end of a block, start a new one auto head = this->headIndex.load(std::memory_order_relaxed); assert(!details::circular_less_than(currentTailIndex, head)); - if (!details::circular_less_than(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) { + if (!details::circular_less_than(head, currentTailIndex + CQ_BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - CQ_BLOCK_SIZE < currentTailIndex - head))) { return false; } #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX @@ -2640,15 +2640,15 @@ class ConcurrentQueue auto endBlock = this->tailBlock; // Figure out how many blocks we'll need to allocate, and do so - size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast(BLOCK_SIZE - 1)); - index_t currentTailIndex = (startTailIndex - 1) & ~static_cast(BLOCK_SIZE - 1); + size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast(CQ_BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast(CQ_BLOCK_SIZE - 1)); + index_t currentTailIndex = (startTailIndex - 1) & ~static_cast(CQ_BLOCK_SIZE - 1); if (blockBaseDiff > 0) { #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX debug::DebugLock lock(mutex); #endif do { - blockBaseDiff -= static_cast(BLOCK_SIZE); - currentTailIndex += static_cast(BLOCK_SIZE); + blockBaseDiff -= static_cast(CQ_BLOCK_SIZE); + currentTailIndex += static_cast(CQ_BLOCK_SIZE); // Find out where we'll be inserting this block in the block index BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell @@ -2656,7 +2656,7 @@ class ConcurrentQueue bool indexInserted = false; auto head = this->headIndex.load(std::memory_order_relaxed); assert(!details::circular_less_than(currentTailIndex, head)); - bool full = !details::circular_less_than(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head)); + bool full = !details::circular_less_than(head, currentTailIndex + CQ_BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - CQ_BLOCK_SIZE < currentTailIndex - head)); if (full || !(indexInserted = insert_block_index_entry(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block()) == nullptr) { // Index allocation or block allocation failed; revert any other allocations @@ -2665,9 +2665,9 @@ class ConcurrentQueue rewind_block_index_tail(); idxEntry->value.store(nullptr, std::memory_order_relaxed); } - currentTailIndex = (startTailIndex - 1) & ~static_cast(BLOCK_SIZE - 1); + currentTailIndex = (startTailIndex - 1) & ~static_cast(CQ_BLOCK_SIZE - 1); for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) { - currentTailIndex += static_cast(BLOCK_SIZE); + currentTailIndex += static_cast(CQ_BLOCK_SIZE); idxEntry = get_block_index_entry_for_index(currentTailIndex); idxEntry->value.store(nullptr, std::memory_order_relaxed); rewind_block_index_tail(); @@ -2689,7 +2689,7 @@ class ConcurrentQueue // Store the chain of blocks so that we can undo if later allocations fail, // and so that we can find the blocks when we do the actual enqueueing - if ((startTailIndex & static_cast(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) { + if ((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) { assert(this->tailBlock != nullptr); this->tailBlock->next = newBlock; } @@ -2703,12 +2703,12 @@ class ConcurrentQueue index_t newTailIndex = startTailIndex + static_cast(count); currentTailIndex = startTailIndex; this->tailBlock = startBlock; - assert((startTailIndex & static_cast(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0); - if ((startTailIndex & static_cast(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) { + assert((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0); + if ((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) { this->tailBlock = firstAllocatedBlock; } while (true) { - index_t stopIndex = (currentTailIndex & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + index_t stopIndex = (currentTailIndex & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); if (details::circular_less_than(newTailIndex, stopIndex)) { stopIndex = newTailIndex; } @@ -2731,12 +2731,12 @@ class ConcurrentQueue if (!details::is_trivially_destructible::value) { auto block = startBlock; - if ((startTailIndex & static_cast(BLOCK_SIZE - 1)) == 0) { + if ((startTailIndex & static_cast(CQ_BLOCK_SIZE - 1)) == 0) { block = firstAllocatedBlock; } currentTailIndex = startTailIndex; while (true) { - stopIndex = (currentTailIndex & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + stopIndex = (currentTailIndex & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); if (details::circular_less_than(constructedStopIndex, stopIndex)) { stopIndex = constructedStopIndex; } @@ -2750,9 +2750,9 @@ class ConcurrentQueue } } - currentTailIndex = (startTailIndex - 1) & ~static_cast(BLOCK_SIZE - 1); + currentTailIndex = (startTailIndex - 1) & ~static_cast(CQ_BLOCK_SIZE - 1); for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) { - currentTailIndex += static_cast(BLOCK_SIZE); + currentTailIndex += static_cast(CQ_BLOCK_SIZE); auto idxEntry = get_block_index_entry_for_index(currentTailIndex); idxEntry->value.store(nullptr, std::memory_order_relaxed); rewind_block_index_tail(); @@ -2806,7 +2806,7 @@ class ConcurrentQueue auto indexIndex = get_block_index_index_for_index(index, localBlockIndex); do { auto blockStartIndex = index; - index_t endIndex = (index & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + index_t endIndex = (index & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); endIndex = details::circular_less_than(firstIndex + static_cast(actualCount), endIndex) ? firstIndex + static_cast(actualCount) : endIndex; auto entry = localBlockIndex->index[indexIndex]; @@ -2847,7 +2847,7 @@ class ConcurrentQueue indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1); blockStartIndex = index; - endIndex = (index & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); + endIndex = (index & ~static_cast(CQ_BLOCK_SIZE - 1)) + static_cast(CQ_BLOCK_SIZE); endIndex = details::circular_less_than(firstIndex + static_cast(actualCount), endIndex) ? firstIndex + static_cast(actualCount) : endIndex; } while (index != firstIndex + actualCount); @@ -2950,14 +2950,14 @@ class ConcurrentQueue #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX debug::DebugLock lock(mutex); #endif - index &= ~static_cast(BLOCK_SIZE - 1); + index &= ~static_cast(CQ_BLOCK_SIZE - 1); localBlockIndex = blockIndex.load(std::memory_order_acquire); auto tail = localBlockIndex->tail.load(std::memory_order_acquire); auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed); assert(tailBase != INVALID_BLOCK_BASE); // Note: Must use division instead of shift because the index may wrap around, causing a negative // offset, whose negativity we want to preserve - auto offset = static_cast(static_cast::type>(index - tailBase) / static_cast::type>(BLOCK_SIZE)); + auto offset = static_cast(static_cast::type>(index - tailBase) / static_cast::type>(CQ_BLOCK_SIZE)); size_t idx = (tail + offset) & (localBlockIndex->capacity - 1); assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr); return idx; @@ -3169,7 +3169,7 @@ class ConcurrentQueue stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*); } } - for (; details::circular_less_than(head, tail); head += BLOCK_SIZE) { + for (; details::circular_less_than(head, tail); head += CQ_BLOCK_SIZE) { //auto block = prod->get_block_index_entry_for_index(head); ++stats.usedBlocks; } diff --git a/tests/unittests/unittests.cpp b/tests/unittests/unittests.cpp index b4c4d9cc..a3fe5e4d 100644 --- a/tests/unittests/unittests.cpp +++ b/tests/unittests/unittests.cpp @@ -96,13 +96,13 @@ struct MallocTrackingTraits : public ConcurrentQueueDefaultTraits static inline void free(void* ptr) { tracking_allocator::free(ptr); } }; -template +template struct TestTraits : public MallocTrackingTraits { typedef std::size_t size_t; typedef uint64_t index_t; - static const size_t BLOCK_SIZE = BlockSize; + static const size_t CQ_BLOCK_SIZE = BlockSize; static const size_t EXPLICIT_INITIAL_INDEX_SIZE = InitialIndexSize; static const size_t IMPLICIT_INITIAL_INDEX_SIZE = InitialIndexSize * 2; static const bool RECYCLE_ALLOCATED_BLOCKS = RecycleBlocks; @@ -131,7 +131,7 @@ struct ExtraSmallIndexTraits : public MallocTrackingTraits struct LargeTraits : public MallocTrackingTraits { - static const size_t BLOCK_SIZE = 128; + static const size_t CQ_BLOCK_SIZE = 128; static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 128; static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 128; }; @@ -1354,7 +1354,7 @@ class ConcurrentQueueTests : public TestClass { // Implicit const int MAX_THREADS = 48; - ConcurrentQueue q(Traits::BLOCK_SIZE * (MAX_THREADS + 1)); + ConcurrentQueue q(Traits::CQ_BLOCK_SIZE * (MAX_THREADS + 1)); ASSERT_OR_FAIL(Traits::malloc_count() == 1); // Initial block pool SimpleThread t0([&]() { q.enqueue(0); }); @@ -2392,7 +2392,7 @@ class ConcurrentQueueTests : public TestClass struct SizeLimitTraits : public MallocTrackingTraits { - static const size_t BLOCK_SIZE = 2; + static const size_t CQ_BLOCK_SIZE = 2; static const size_t MAX_SUBQUEUE_SIZE = 5; // Will round up to 6 because of block size }; From 324c8187e294a78618c99d19dd7cd43809784333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Micha=C3=ABl=20Celerier?= Date: Sat, 2 Nov 2024 19:31:43 -0400 Subject: [PATCH 2/3] Improve compatibility with c++ modules --- concurrentqueue.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/concurrentqueue.h b/concurrentqueue.h index 9f866bf9..828aa134 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -466,7 +466,7 @@ namespace details }; template struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { }; - static inline size_t hash_thread_id(thread_id_t id) + inline size_t hash_thread_id(thread_id_t id) { static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values"); return static_cast(hash_32_or_64::thread_id_hash_t)>::hash( @@ -474,7 +474,7 @@ namespace details } template - static inline bool circular_less_than(T a, T b) + inline bool circular_less_than(T a, T b) { static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "circular_less_than is intended to be used only with unsigned integer types"); return static_cast(a - b) > static_cast(static_cast(1) << (static_cast(sizeof(T) * CHAR_BIT - 1))); @@ -483,14 +483,14 @@ namespace details } template - static inline char* align_for(char* ptr) + inline char* align_for(char* ptr) { const std::size_t alignment = std::alignment_of::value; return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment; } template - static inline T ceil_to_pow_2(T x) + inline T ceil_to_pow_2(T x) { static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types"); @@ -507,7 +507,7 @@ namespace details } template - static inline void swap_relaxed(std::atomic& left, std::atomic& right) + inline void swap_relaxed(std::atomic& left, std::atomic& right) { T temp = std::move(left.load(std::memory_order_relaxed)); left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed); @@ -515,7 +515,7 @@ namespace details } template - static inline T const& nomove(T const& x) + inline T const& nomove(T const& x) { return x; } @@ -542,7 +542,7 @@ namespace details }; template - static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it) + inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it) { return *it; } From da14b8bf89beb4af434ec94faa2fa5d84f1d2be1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Micha=C3=ABl=20Celerier?= Date: Wed, 2 Jul 2025 23:20:13 -0400 Subject: [PATCH 3/3] Disable ubsan's integer sanitizer warnings They crop up on valid per the standard (but sometimes bogus) uses of modular arithmetic on unsigned integers, such as in hash functions. --- concurrentqueue.h | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/concurrentqueue.h b/concurrentqueue.h index 828aa134..7d5afa9b 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -234,6 +234,17 @@ namespace moodycamel { namespace details { #endif #endif +#if defined(__clang__) && defined(__has_attribute) +#if __has_attribute(__no_sanitize__) +#define MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK \ + __attribute__((__no_sanitize__("unsigned-integer-overflow"))) +#endif +#endif + +#if !defined(MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK) +#define MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK +#endif + namespace moodycamel { namespace details { #ifndef MOODYCAMEL_ALIGNAS // VS2013 doesn't support alignas or alignof, and align() requires a constant literal @@ -441,8 +452,9 @@ namespace details }; template struct _hash_32_or_64 { - static inline std::uint32_t hash(std::uint32_t h) - { + MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK static inline std::uint32_t + hash(std::uint32_t h) + { // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp // Since the thread ID is already unique, all we really want to do is propagate that // uniqueness evenly across all the bits, so that we can use a subset of the bits while @@ -455,8 +467,9 @@ namespace details } }; template<> struct _hash_32_or_64<1> { - static inline std::uint64_t hash(std::uint64_t h) - { + MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK static inline std::uint64_t + hash(std::uint64_t h) + { h ^= h >> 33; h *= 0xff51afd7ed558ccd; h ^= h >> 33; @@ -472,10 +485,11 @@ namespace details return static_cast(hash_32_or_64::thread_id_hash_t)>::hash( thread_id_converter::prehash(id))); } - - template - inline bool circular_less_than(T a, T b) - { + + template + MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK inline bool + circular_less_than(T a, T b) + { static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "circular_less_than is intended to be used only with unsigned integer types"); return static_cast(a - b) > static_cast(static_cast(1) << (static_cast(sizeof(T) * CHAR_BIT - 1))); // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931 @@ -2962,10 +2976,10 @@ class ConcurrentQueue assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr); return idx; } - - bool new_block_index() - { - auto prev = blockIndex.load(std::memory_order_relaxed); + + MOODYCAMEL_DISABLE_UBSAN_UNSIGNED_INTEGER_CHECK bool new_block_index() + { + auto prev = blockIndex.load(std::memory_order_relaxed); size_t prevCapacity = prev == nullptr ? 0 : prev->capacity; auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity; auto raw = static_cast((Traits::malloc)( @@ -3005,9 +3019,9 @@ class ConcurrentQueue nextBlockIndexCapacity <<= 1; return true; - } - - private: + } + + private: size_t nextBlockIndexCapacity; std::atomic blockIndex;