I have a rather simple threadpool implementation for which I need to write a continuation. When a task is enqueued in threadpool it returns an std::future
. A caller can then want to enqueue tasks that depend on those futures. So I need a function that takes self-contained task, a list of futures and return a future of that asynchronous operation. Here is a demonstration:
threadpool tp;
// enqueue two tasks in threadpool and get back the futures
auto f1 = tp([](auto v) { return do_something(v); 1});
auto f2 = tp([](auto v) { return do_something(v); 2});
// desirable non-starving threadpool::then
auto f3 = tp.then([](auto v1, auto v2)
{
return do_something(v1 + v2);
}, std::move(f1), std::move(f2));
// naive starving implementation
auto naive = tp([](auto&& f1, auto&& f2)
{
return do_something(f1.get(), f2.get());
}, std::move(f1), std::move(f2)));
The obvious problem that arises with this naive implementation is that threadpool would be starved if it contains blocked threads waiting on futures to get ready. A simple solution would be creating a thread outside of the threadpool waiting on futures, which is also a bad solution as it’s impossible to know how many such suspended threads would be created and it’s a slow solution as well.
What would be an optimal way of creating threadpool continuations without creating stray threads or starving the threadpool?
10
It turns out that combining the “create task” with “queue task for running” causes a problem in C++. This is why there is an executors proposal.
So the first thing you should consider is changing how you set up tasks for the thread pool.
A simple replacement for the idea of a std-future returning task is a pipeline element.
template<class In, class Out>
struct PipelineElement;
template<class... In, class...Out>
struct PipelineElement<void(In...), void(Out...)> {
std::function<In..., std::function<void(Out...)>> operation;
PipelineElement(std::function<In..., std::function<void(Out...)>> f):
operation(std::move(f))
{}
PipelineElement(PipelineElement const&)=default;
PipelineElement(PipelineElement &&)=default;
PipelineElement& operator=(PipelineElement const&)=default;
PipelineElement& operator=(PipelineElement &&)=default;
template<class...Result>
PipelineElement<void(In...), void(Result...)>
operator|(
PipelineElement self,
PipelineElement<void(Out...), void(Result...)> next
) {
return [self=std::move(self), next=std::move(next)]( In... in, std::function<void(Result...)> continuation ) mutable {
self.operation( std::forward<In>(in)...,
[next=std::move(next), continuation=std::move(continuation)]
( Out... out ) mutable {
next.operation( std::forward<Out>(out)..., std::move(continuation) );
}
);
};
}
};
template<class...In>
using Sink = PipelineElement<void(In...), void()>;
template<class...Out>
using Source = PipelineElement<void(), void(Out...)>;
template<class F, class R=std::invoke_result_t<F>>
Source<R> MakeSource( F&& f ) {
return [f=std::forward<F>(f)]( auto sink ) mutable {
return sink(f());
};
}
template<class...In>
Sink<In...> MakeSink( std::function<void(In...)> f ) {
return [f=std::move(f)]( In... in, auto Done ) {
f(std::forward<In>(in)...);
Done();
};
}
template<class...In, class R>
PipelineElement<void(In...), void(R)> MakePipeline( std::function<R(In...)> f ) {
return [f=std::move(f)]( In... in, auto Done ) {
Done(f(std::forward<In>(in)...));
};
}
template<class...In>
PipelineElement<void(In...), void()> MakePipeline( std::function<void(In...)> f ) {
return MakeSink( std::move(f) );
}
these things can be composed at will.
The callbacks can be scheduled, or run in sequence.
So instead of putting a task on a thread pool, you build the task get a PipelineElement out of it, you attach the next steps to it, then you give the PipelineElement to the thread pool.
Making the last step activate a std::future is easy.
If you are married to it being a set up the task to run then plan what happens next, you have to add some (needless) concurrency work, as the task may be finished before you plan the next step.
This can still use these pipeline elements. A thread pool could return a pipeline element: a Source<int>
can replace your std::future<int>
.
A Source<int>
is merely a callable that accepts a std::function<void(int)>
– converting that into a std::future<int>
is easy:
template<class R>
std::future<R> FutureFromSource( Source<int> src ) {
auto pp = std::make_shared<std::promise<R>>();
src.operation( [pp](R r){ pp->set_value(std::forward<R>(r)); } );
return pp->get_future();
}
A side effect of this Source/Sink/Pipeline is that you don’t generally write code that waits on stuff. Instead, your components schedule code to run somewhere.
If you want code to run on the main or UI thread, you need a main or UI thread scheduler. If you want it threaded off, you need a thread pool. Etc.
Code stating “I want to stop running until this code is finished” can’t be written, which is good because in async code you shouldn’t write that code.
…
The other alternative is to replace your std::future
s with your own toys. These have both storage for a T
and a std::function<void(T)>
and a mutex and an executor:
template<class T>
struct nexter {
struct state {
mutable std::mutex m;
std::optional<T> value;
std::function<void(T)> sink;
std::function<void(std::function<void()>)> executor;
mutable std::condition_variable cv;
};
std::shared_ptr<state> pstate;
};
you can implement .wait
and .then
on top of this.
.then
takes a void(T)
and appends it to sink
; if the value is ready, it passed the value and the sink to executor (which can include “run immediately”).
Making it ready populates the optional. If the sink exists, it runs it on the executor. It also signals the condition variable.
It is a bit of beast really.
Appending to the sink consists of assigning if the sink is empty. Otherwise, you write a function that calls the first sink, then the second.
Decisions about “do you support multiple .then?” and the like have to be made.
2
Have your task queue only contain entries that are ready to be executed.
When enqueuing with dependencies from prior scheduled tasks, add the new task as dependency to each existing one.
The threadpool dequeues only tasks from the “ready to go” list, and whenever a task is finished, decrement a counter on every dependent. If the counter reaches 0 (no more dependencies), you can either execute the child directly or push it into the task list again.
Such a system will never starve, but it requires avoiding user code awaiting the dependencies directly (through futures in your example) and instead formalize it through the scheduling API instead.
3