25 #ifndef GUL14_THREADPOOL_H_
26 #define GUL14_THREADPOOL_H_
29 #include <condition_variable>
48 std::shared_ptr<ThreadPool> lock_pool_or_throw(std::weak_ptr<ThreadPool> pool);
109 class ThreadPool :
public std::enable_shared_from_this<ThreadPool>
131 template <
typename T>
157 : future_{ std::move(future) }
159 , pool_{ std::move(pool) }
176 return detail::lock_pool_or_throw(pool_)->cancel_pending_task(id_);
187 if (not future_.valid())
188 throw std::logic_error(
"Canceled task has no result");
189 return future_.get();
206 if (not future_.valid())
209 return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
226 const auto state = detail::lock_pool_or_throw(pool_)->get_task_state(id_);
228 if (state == InternalTaskState::unknown)
239 std::future<T> future_;
241 std::weak_ptr<ThreadPool> pool_;
245 using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
246 using Duration = TimePoint::duration;
303 template <
typename Function>
304 TaskHandle<invoke_result_t<Function, ThreadPool&>>
305 add_task(Function fct, TimePoint start_time = {}, std::string name = {})
308 is_invocable<Function, ThreadPool&>::value
309 || is_invocable<Function>::value,
310 "Invalid function signature: Must be T fct() or T fct(ThreadPool&)");
312 using Result = invoke_result_t<Function, ThreadPool&>;
314 TaskHandle<Result> task_handle = [
this, &fct, start_time, &name]()
316 std::lock_guard<std::mutex> lock(mutex_);
320 throw std::runtime_error(
cat(
321 "Cannot add task: Pending queue has reached capacity (",
322 pending_tasks_.size(),
')'));
325 using PackagedTask = std::packaged_task<Result(ThreadPool&)>;
327 auto named_task_ptr = std::make_unique<NamedTaskImpl<PackagedTask>>(
328 PackagedTask{ std::move(fct) }, std::move(name));
330 TaskHandle<Result> handle{
331 next_task_id_, named_task_ptr->fct_.get_future(), shared_from_this() };
333 pending_tasks_.emplace_back(
334 next_task_id_, std::move(named_task_ptr), start_time);
346 template <
typename Function,
347 std::enable_if_t<is_invocable<Function>::value,
bool> =
true>
348 TaskHandle<invoke_result_t<Function>>
349 add_task(Function fct, TimePoint start_time = {}, std::string name = {})
352 [f = std::move(fct)](ThreadPool&)
mutable {
return f(); },
353 start_time, std::move(name));
356 template <
typename Function,
357 std::enable_if_t<is_invocable<Function, ThreadPool&>::value,
bool> =
true>
358 TaskHandle<invoke_result_t<Function, ThreadPool&>>
359 add_task(Function fct, Duration delay_before_start, std::string name = {})
362 std::chrono::system_clock::now() + delay_before_start, std::move(name));
365 template <
typename Function,
366 std::enable_if_t<is_invocable<Function>::value,
bool> =
true>
367 TaskHandle<invoke_result_t<Function>>
368 add_task(Function fct, Duration delay_before_start, std::string name = {})
371 std::chrono::system_clock::now() + delay_before_start, std::move(name));
374 template <
typename Function,
375 std::enable_if_t<is_invocable<Function, ThreadPool&>::value,
bool> =
true>
376 TaskHandle<invoke_result_t<Function, ThreadPool&>>
377 add_task(Function fct, std::string name)
379 return add_task(std::move(fct), TimePoint{}, std::move(name));
382 template <
typename Function,
383 std::enable_if_t<is_invocable<Function>::value,
bool> =
true>
384 TaskHandle<invoke_result_t<Function>>
385 add_task(Function fct, std::string name)
387 return add_task(std::move(fct), TimePoint{}, std::move(name));
403 std::size_t
capacity() const noexcept {
return capacity_; }
457 enum class InternalTaskState
469 NamedTask(std::string name)
470 : name_{ std::move(name) }
473 virtual ~NamedTask() =
default;
474 virtual void operator()(ThreadPool& pool) = 0;
479 template <
typename FunctionType>
480 struct NamedTaskImpl :
public NamedTask
483 NamedTaskImpl(FunctionType fct, std::string name)
484 : NamedTask{ std::move(name) }
485 , fct_{ std::move(fct) }
488 void operator()(ThreadPool& pool)
override { fct_(pool); }
496 std::unique_ptr<NamedTask> named_task_;
497 TimePoint start_time_{};
501 Task(
TaskId task_id, std::unique_ptr<NamedTask> named_task, TimePoint start_time)
503 , named_task_{ std::move(named_task) }
504 , start_time_{ start_time }
508 std::size_t capacity_{ 0 };
514 std::vector<std::thread> threads_;
520 std::condition_variable cv_;
522 mutable std::mutex mutex_;
523 std::vector<Task> pending_tasks_;
524 std::vector<TaskId> running_task_ids_;
525 std::vector<std::string> running_task_names_;
527 bool shutdown_requested_{
false };
547 ThreadPool(std::size_t num_threads, std::size_t
capacity);
562 bool cancel_pending_task(
TaskId task_id);
574 InternalTaskState get_task_state(
TaskId task_id)
const;
581 bool is_full_i() const noexcept;
Declaration of the overload set for cat() and of the associated class ConvertingStringView.
A handle for a task that has (or had) been enqueued on a ThreadPool.
Definition: ThreadPool.h:133
T get_result()
Block until the task has finished and return its result.
Definition: ThreadPool.h:185
bool cancel()
Remove the task from the queue if it is still pending.
Definition: ThreadPool.h:173
TaskHandle()
Default-construct an invalid TaskHandle.
Definition: ThreadPool.h:141
bool is_complete() const
Determine whether the task has completed.
Definition: ThreadPool.h:204
TaskState get_state() const
Determine if the task is running, waiting to be started, completed, or has been canceled.
Definition: ThreadPool.h:224
TaskHandle(TaskId id, std::future< T > future, std::shared_ptr< ThreadPool > pool)
Construct a TaskHandle.
Definition: ThreadPool.h:156
A pool of worker threads with a task queue.
Definition: ThreadPool.h:110
std::uint64_t TaskId
A unique identifier for a task.
Definition: ThreadPool.h:113
~ThreadPool()
Destruct the ThreadPool and join all threads.
Definition: ThreadPool.cc:68
static GUL_EXPORT std::shared_ptr< ThreadPool > make_shared(std::size_t num_threads, std::size_t capacity=default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for enqueuing task...
Definition: ThreadPool.cc:178
GUL_EXPORT std::size_t cancel_pending_tasks()
Remove all pending tasks from the queue.
Definition: ThreadPool.cc:97
GUL_EXPORT bool is_idle() const
Return true if the pool has neither pending tasks nor tasks that are currently being executed.
Definition: ThreadPool.cc:148
GUL_EXPORT std::vector< std::string > get_running_task_names() const
Return a vector with the names of the tasks that are currently running.
Definition: ThreadPool.cc:131
GUL_EXPORT std::vector< std::string > get_pending_task_names() const
Return a vector with the names of the tasks that are waiting to be executed.
Definition: ThreadPool.cc:118
GUL_EXPORT std::size_t capacity() const noexcept
Return the maximum number of pending tasks that can be queued.
Definition: ThreadPool.h:403
constexpr static std::size_t default_capacity
Default capacity for the task queue.
Definition: ThreadPool.h:249
GUL_EXPORT std::size_t count_pending() const
Return the number of pending tasks.
Definition: ThreadPool.cc:107
TaskHandle< invoke_result_t< Function, ThreadPool & > > add_task(Function fct, TimePoint start_time={}, std::string name={})
Enqueue a task.
Definition: ThreadPool.h:305
constexpr static std::size_t max_capacity
Maximum possible capacity for the task queue.
Definition: ThreadPool.h:252
GUL_EXPORT std::size_t count_threads() const noexcept
Return the number of threads in the pool.
Definition: ThreadPool.cc:113
constexpr static std::size_t max_threads
Maximum possible number of threads.
Definition: ThreadPool.h:255
GUL_EXPORT bool is_shutdown_requested() const
Determine whether the thread pool has been requested to shut down.
Definition: ThreadPool.cc:172
GUL_EXPORT bool is_full() const noexcept
Determine whether the queue for pending tasks is full (at capacity).
Definition: ThreadPool.cc:137
std::shared_ptr< ThreadPool > make_thread_pool(std::size_t num_threads, std::size_t capacity=ThreadPool::default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for queuing tasks.
Definition: ThreadPool.h:599
TaskState
An enum describing the state of an individual task.
Definition: ThreadPool.h:65
@ running
The task is currently being executed.
@ pending
The task is waiting to be started.
@ canceled
The task was removed from the queue before it was started.
@ complete
The task has finished (successfully or by throwing an exception).
std::string cat()
Efficiently concatenate an arbitrary number of strings and numbers.
Definition: cat.h:97
Namespace gul14 contains all functions and classes of the General Utility Library.
Definition: doxygen.h:26
Some metaprogramming traits for the General Utility Library.