What could be the problem with the following threadpool implementation?
The main thread is blocked when the destructor of the threadpool is called, but I don’t understand why.
The definition of the threadpool is:
#include <atomic>
#include <barrier>
#include <functional>
#include <stop_token>
#include <thread>
#include <vector>
class ThreadPool {
struct ThreadStorage {
ThreadPool* pool{nullptr};
void main(std::stop_token) const;
};
public:
friend void Notify(ThreadPool* pool) noexcept;
using function = void(void* context, size_t idx);
explicit ThreadPool(size_t num_threads);
~ThreadPool();
void QueueTask(function*, void* context, int range);
private:
std::barrier<std::function<void()>> sync_point;
std::atomic_flag start{false};
//std::atomic_flag finish_flag{false};
std::atomic<int> idx{0};
function* task{nullptr};
void* ctx{nullptr};
std::vector<std::jthread> threads;
std::vector<ThreadStorage> storage;
};
and the implementation is:
constexpr auto order = std::memory_order::seq_cst;
ThreadPool::ThreadPool(size_t num_threads)
: sync_point(
num_threads,
[]() noexcept { std::cout << "All threads arrived" << std::endl; }),
storage(num_threads - 1),
threads(num_threads - 1) {
for (uint32_t ii = 1; ii < num_threads; ++ii) {
storage[ii - 1].pool = this;
threads[ii - 1] = std::jthread(std::bind_front(
&ThreadPool::ThreadStorage::main, &storage[ii - 1]));
}
}
void ThreadPool::ThreadStorage::main(std::stop_token stoken) const {
while (!stoken.stop_requested()) {
pool->start.wait(false);
if (stoken.stop_requested()) {
return;
}
int i{0};
while ((i = pool->idx.fetch_sub(1) - 1) > -1) {
(*(pool->task))(pool->ctx, i);
}
pool->sync_point.arrive_and_wait();
}
}
void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
int r) {
task = function;
ctx = context;
idx.store(r);
start.test_and_set();
start.notify_all();
int i{0};
while ((i = idx.fetch_sub(1) - 1) > -1) {
(*(function))(context, i);
}
sync_point.arrive_and_wait();
start.clear();
}
ThreadPool::~ThreadPool() {
for (auto& t : threads) {
t.request_stop();
}
start.test_and_set();
start.notify_all();
for (auto& t: threads) {
t.join();
}
}
Tasks in the threadpool are added with the QueueTask
function. The main thread sets the work and context info for all threads and notifies them of new work. All threads (including the main thread) share the work by picking elements of the task’s arguments. When all arguments have been sliced up, they’re using a std::barrier
for coordination. When I debug the threadpool with gdb, I see that the program hangs at the threadpool’s destructor. With two threads, the main thread waits joining the other thread, while the other thread waits at the barrier:
pool->sync_point.arrive_and_wait();
What could be the source of the problem?