I have a single-consumer, multiple-producer lock-free queue (MPSCQueue) combined with a std::counting_semaphore to notify the consumer when new items are enqueued. The consumer uses dequeue() to attempt fetching an item; if it succeeds, it calls sema.try_acquire() which I expect to succeed every time there’s actually an item in the queue. However, under very rare circumstances, I get an assertion failure indicating try_acquire() returned false even though the queue returned a valid item.
I can confirm there is truly only one consumer thread. There are no other places calling dequeue(). The code uses std::memory_order_seq_cst for atomic operations in the queue, and sema.release()/sema.acquire() for the semaphore.
Why might this happen?
It seems like a memory ordering or visibility issue where the consumer sees the newly linked node in the queue before it sees the corresponding release() operation on the semaphore. But I thought seq_cst plus semaphore release/acquire would be enough to guarantee proper ordering. Any ideas or clarifications on how to ensure we never see a valid item in the queue but still fail try_acquire()?
#include <atomic>
#include <cassert>
#include <iostream>
#include <optional>
#include <semaphore>
template <typename T>
class MPSCQueue {
struct Node {
T data;
std::atomic<Node*> next;
// Default constructor for the dummy node
Node() : next(nullptr) {}
// Constructor that moves the data in
Node(T data_) : data(std::move(data_)), next(nullptr) {}
};
// Atomic head pointer for multiple producers
std::atomic<Node*> head;
// Tail pointer for the single consumer
Node* tail;
public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;
MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_seq_cst);
tail = dummy;
}
~MPSCQueue() {
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_seq_cst);
delete node;
node = next;
}
}
// Called by producers
void enqueue(T data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
// Swap in the new node as the head
Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
// Link the old head to the new node
prev_head->next.store(node, std::memory_order_seq_cst);
}
// Called by the single consumer
std::optional<T> dequeue() {
// Check the next pointer of the tail
Node* next = tail->next.load(std::memory_order_seq_cst);
if (next) {
// Move the data out
T res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return std::nullopt;
}
size_t size() { return enqueue_count.load() - dequeue_count; }
};
template <typename T>
class MPSCQueueConsumerLock {
MPSCQueue<T> queue;
std::counting_semaphore<> sema{0};
public:
void enqueue(T data) {
queue.enqueue(std::move(data));
// Release the semaphore to notify the consumer
sema.release();
}
// Single consumer calls this
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// We have an item, so we expect the semaphore count to be > 0
if (!sema.try_acquire()) {
// Unexpectedly fails in rare cases
std::cerr << __FILE__ << ":" << __FUNCTION__
<< " sema.try_acquire() should succeed, please checkn";
assert(false);
}
return re.value();
}
// Otherwise, block until something is available
sema.acquire();
return queue.dequeue().value();
}
size_t size() { return queue.size(); }
};
Symptom
Sometimes (very low probability, but still possible) the code hits:
/path/to/mpsc.hpp:dequeue sema try_acquire should be success, please check
python: /path/to/mpsc.hpp:79: T MPSCQueueConsumerLock<T>::dequeue() [...]
Assertion `false' failed.
Aborted (core dumped)
I have verified there is truly only one consumer thread and this is the only place where dequeue() is called.
Question:
How could it happen that the consumer can see the new node in MPSCQueue, but the corresponding sema.release() call is not observed (so try_acquire() fails)?
Is there a subtlety in std::counting_semaphore or the memory ordering that I’m missing?
Any suggestions to ensure the consumer’s view of the queue and the semaphore’s count stay consistent?
Additional
- I used std::memory_order_seq_cst for all the atomic operations in the queue.
- The consumer is strictly single-threaded; no other threads call dequeue().
- Could there be a hidden memory fence issue between exchange(…, seq_cst) and sema.release()?
Any pointers or advice would be greatly appreciated. Thank you!
Ervin Xie is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.