General Utility Library for C++14  2.13
ThreadPool.h
Go to the documentation of this file.
1 
23 // SPDX-License-Identifier: LGPL-2.1-or-later
24 
25 #ifndef GUL14_THREADPOOL_H_
26 #define GUL14_THREADPOOL_H_
27 
28 #include <chrono>
29 #include <condition_variable>
30 #include <functional>
31 #include <future>
32 #include <limits>
33 #include <memory>
34 #include <mutex>
35 #include <stdexcept>
36 #include <thread>
37 #include <vector>
38 
39 #include <gul14/cat.h>
40 #include <gul14/traits.h>
41 
42 namespace gul14 {
43 
44 class ThreadPool;
45 
46 namespace detail {
47 
48 GUL_EXPORT
49 std::shared_ptr<ThreadPool> lock_pool_or_throw(std::weak_ptr<ThreadPool> pool);
50 
51 } // namespace detail
52 
65 enum class TaskState
66 {
67  pending,
68  running,
69  complete,
70  canceled
71 };
72 
110 class ThreadPool : public std::enable_shared_from_this<ThreadPool>
111 {
112 public:
114  using TaskId = std::uint64_t;
115 
117  using ThreadId = std::vector<std::thread>::size_type;
118 
135  template <typename T>
137  {
138  public:
146  {}
147 
160  TaskHandle(TaskId id, std::future<T> future, std::shared_ptr<ThreadPool> pool)
161  : future_{ std::move(future) }
162  , id_{ id }
163  , pool_{ std::move(pool) }
164  {}
165 
177  bool cancel()
178  {
179  future_ = {};
180  return detail::lock_pool_or_throw(pool_)->cancel_pending_task(id_);
181  }
182 
190  {
191  if (not future_.valid())
192  throw std::logic_error("Canceled task has no result");
193  return future_.get();
194  }
195 
208  bool is_complete() const
209  {
210  if (not future_.valid())
211  return false;
212 
213  return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
214  }
215 
229  {
230  const auto state = detail::lock_pool_or_throw(pool_)->get_task_state(id_);
231 
232  if (state == InternalTaskState::unknown)
233  {
234  if (is_complete())
235  return TaskState::complete;
236  else
237  return TaskState::canceled;
238  }
239  return static_cast<TaskState>(state);
240  }
241 
242  private:
243  std::future<T> future_;
244  TaskId id_{ 0 };
245  std::weak_ptr<ThreadPool> pool_;
246  };
247 
248 
249  using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
250  using Duration = TimePoint::duration;
251 
253  constexpr static std::size_t default_capacity{ 200 };
254 
256  constexpr static std::size_t max_capacity{ 10'000'000 };
257 
259  constexpr static std::size_t max_threads{ 10'000 };
260 
268  ~ThreadPool();
269 
307  template <typename Function>
308  TaskHandle<invoke_result_t<Function, ThreadPool&>>
309  add_task(Function fct, TimePoint start_time = {}, std::string name = {})
310  {
311  static_assert(
312  is_invocable<Function, ThreadPool&>::value
313  || is_invocable<Function>::value,
314  "Invalid function signature: Must be T fct() or T fct(ThreadPool&)");
315 
316  using Result = invoke_result_t<Function, ThreadPool&>;
317 
318  TaskHandle<Result> task_handle = [this, &fct, start_time, &name]()
319  {
320  std::lock_guard<std::mutex> lock(mutex_);
321 
322  if (is_full_i())
323  {
324  throw std::runtime_error(cat(
325  "Cannot add task: Pending queue has reached capacity (",
326  pending_tasks_.size(), ')'));
327  }
328 
329  using PackagedTask = std::packaged_task<Result(ThreadPool&)>;
330 
331  auto named_task_ptr = std::make_unique<NamedTaskImpl<PackagedTask>>(
332  PackagedTask{ std::move(fct) }, std::move(name));
333 
334  TaskHandle<Result> handle{
335  next_task_id_, named_task_ptr->fct_.get_future(), shared_from_this() };
336 
337  pending_tasks_.emplace_back(
338  next_task_id_, std::move(named_task_ptr), start_time);
339 
340  ++next_task_id_;
341 
342  return handle;
343  }();
344 
345  cv_.notify_one();
346 
347  return task_handle;
348  }
349 
350  template <typename Function,
351  std::enable_if_t<is_invocable<Function>::value, bool> = true>
352  TaskHandle<invoke_result_t<Function>>
353  add_task(Function fct, TimePoint start_time = {}, std::string name = {})
354  {
355  return add_task(
356  [f = std::move(fct)](ThreadPool&) mutable { return f(); },
357  start_time, std::move(name));
358  }
359 
360  template <typename Function,
361  std::enable_if_t<is_invocable<Function, ThreadPool&>::value, bool> = true>
362  TaskHandle<invoke_result_t<Function, ThreadPool&>>
363  add_task(Function fct, Duration delay_before_start, std::string name = {})
364  {
365  return add_task(std::move(fct),
366  std::chrono::system_clock::now() + delay_before_start, std::move(name));
367  }
368 
369  template <typename Function,
370  std::enable_if_t<is_invocable<Function>::value, bool> = true>
371  TaskHandle<invoke_result_t<Function>>
372  add_task(Function fct, Duration delay_before_start, std::string name = {})
373  {
374  return add_task(std::move(fct),
375  std::chrono::system_clock::now() + delay_before_start, std::move(name));
376  }
377 
378  template <typename Function,
379  std::enable_if_t<is_invocable<Function, ThreadPool&>::value, bool> = true>
380  TaskHandle<invoke_result_t<Function, ThreadPool&>>
381  add_task(Function fct, std::string name)
382  {
383  return add_task(std::move(fct), TimePoint{}, std::move(name));
384  }
385 
386  template <typename Function,
387  std::enable_if_t<is_invocable<Function>::value, bool> = true>
388  TaskHandle<invoke_result_t<Function>>
389  add_task(Function fct, std::string name)
390  {
391  return add_task(std::move(fct), TimePoint{}, std::move(name));
392  }
393 
402  GUL_EXPORT
403  std::size_t cancel_pending_tasks();
404 
406  GUL_EXPORT
407  std::size_t capacity() const noexcept { return capacity_; }
408 
410  GUL_EXPORT
411  std::size_t count_pending() const;
412 
414  GUL_EXPORT
415  std::size_t count_threads() const noexcept;
416 
418  GUL_EXPORT
419  std::vector<std::string> get_pending_task_names() const;
420 
422  GUL_EXPORT
423  std::vector<std::string> get_running_task_names() const;
424 
435  GUL_EXPORT
436  ThreadId get_thread_id() const;
437 
439  GUL_EXPORT
440  bool is_full() const noexcept;
441 
446  GUL_EXPORT
447  bool is_idle() const;
448 
450  GUL_EXPORT
451  bool is_shutdown_requested() const;
452 
462  GUL_EXPORT
463  static std::shared_ptr<ThreadPool> make_shared(
464  std::size_t num_threads, std::size_t capacity = default_capacity);
465 
466 private:
474  enum class InternalTaskState
475  {
477  pending = static_cast<int>(TaskState::pending),
479  running = static_cast<int>(TaskState::running),
481  unknown
482  };
483 
484  struct NamedTask
485  {
486  NamedTask(std::string name)
487  : name_{ std::move(name) }
488  {}
489 
490  virtual ~NamedTask() = default;
491  virtual void operator()(ThreadPool& pool) = 0;
492 
493  std::string name_;
494  };
495 
496  template <typename FunctionType>
497  struct NamedTaskImpl : public NamedTask
498  {
499  public:
500  NamedTaskImpl(FunctionType fct, std::string name)
501  : NamedTask{ std::move(name) }
502  , fct_{ std::move(fct) }
503  {}
504 
505  void operator()(ThreadPool& pool) override { fct_(pool); }
506 
507  FunctionType fct_;
508  };
509 
510  struct Task
511  {
512  TaskId id_{};
513  std::unique_ptr<NamedTask> named_task_;
514  TimePoint start_time_{}; // When the task is to be started (at least no earlier)
515 
516  Task() = default;
517 
518  Task(TaskId task_id, std::unique_ptr<NamedTask> named_task, TimePoint start_time)
519  : id_{ task_id }
520  , named_task_{ std::move(named_task) }
521  , start_time_{ start_time }
522  {}
523  };
524 
525  std::size_t capacity_{ 0 };
526 
531  std::vector<std::thread> threads_;
532 
538  thread_local static ThreadId thread_id_;
539 
544  std::condition_variable cv_;
545 
546  mutable std::mutex mutex_; // Protects the following variables
547  std::vector<Task> pending_tasks_;
548  std::vector<TaskId> running_task_ids_;
549  std::vector<std::string> running_task_names_;
550  TaskId next_task_id_ = 0;
551  bool shutdown_requested_{ false };
552 
553 
571  ThreadPool(std::size_t num_threads, std::size_t capacity);
572 
585  GUL_EXPORT
586  bool cancel_pending_task(TaskId task_id);
587 
597  GUL_EXPORT
598  InternalTaskState get_task_state(TaskId task_id) const;
599 
604  GUL_EXPORT
605  bool is_full_i() const noexcept;
606 
613  void perform_work(std::size_t thread_index);
614 };
615 
625 inline std::shared_ptr<ThreadPool> make_thread_pool(
626  std::size_t num_threads, std::size_t capacity = ThreadPool::default_capacity)
627 {
628  return ThreadPool::make_shared(num_threads, capacity);
629 }
630 
632 
633 } // namespace gul14
634 
635 #endif // GUL14_THREADPOOL_H_
Declaration of the overload set for cat() and of the associated class ConvertingStringView.
A handle for a task that has (or had) been enqueued on a ThreadPool.
Definition: ThreadPool.h:137
T get_result()
Block until the task has finished and return its result.
Definition: ThreadPool.h:189
bool cancel()
Remove the task from the queue if it is still pending.
Definition: ThreadPool.h:177
TaskHandle()
Default-construct an invalid TaskHandle.
Definition: ThreadPool.h:145
bool is_complete() const
Determine whether the task has completed.
Definition: ThreadPool.h:208
TaskState get_state() const
Determine if the task is running, waiting to be started, completed, or has been canceled.
Definition: ThreadPool.h:228
TaskHandle(TaskId id, std::future< T > future, std::shared_ptr< ThreadPool > pool)
Construct a TaskHandle.
Definition: ThreadPool.h:160
A pool of worker threads with a task queue.
Definition: ThreadPool.h:111
std::uint64_t TaskId
A unique identifier for a task.
Definition: ThreadPool.h:114
~ThreadPool()
Destruct the ThreadPool and join all threads.
Definition: ThreadPool.cc:69
static GUL_EXPORT std::shared_ptr< ThreadPool > make_shared(std::size_t num_threads, std::size_t capacity=default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for enqueuing task...
Definition: ThreadPool.cc:187
GUL_EXPORT std::size_t cancel_pending_tasks()
Remove all pending tasks from the queue.
Definition: ThreadPool.cc:98
GUL_EXPORT bool is_idle() const
Return true if the pool has neither pending tasks nor tasks that are currently being executed.
Definition: ThreadPool.cc:175
GUL_EXPORT std::vector< std::string > get_running_task_names() const
Return a vector with the names of the tasks that are currently running.
Definition: ThreadPool.cc:132
GUL_EXPORT std::vector< std::string > get_pending_task_names() const
Return a vector with the names of the tasks that are waiting to be executed.
Definition: ThreadPool.cc:119
GUL_EXPORT ThreadId get_thread_id() const
Return the thread pool ID of the current thread.
Definition: ThreadPool.cc:156
GUL_EXPORT std::size_t capacity() const noexcept
Return the maximum number of pending tasks that can be queued.
Definition: ThreadPool.h:407
constexpr static std::size_t default_capacity
Default capacity for the task queue.
Definition: ThreadPool.h:253
GUL_EXPORT std::size_t count_pending() const
Return the number of pending tasks.
Definition: ThreadPool.cc:108
std::vector< std::thread >::size_type ThreadId
A unique identifier for a thread in the pool in the range of [0, count_threads()).
Definition: ThreadPool.h:117
TaskHandle< invoke_result_t< Function, ThreadPool & > > add_task(Function fct, TimePoint start_time={}, std::string name={})
Enqueue a task.
Definition: ThreadPool.h:309
constexpr static std::size_t max_capacity
Maximum possible capacity for the task queue.
Definition: ThreadPool.h:256
GUL_EXPORT std::size_t count_threads() const noexcept
Return the number of threads in the pool.
Definition: ThreadPool.cc:114
constexpr static std::size_t max_threads
Maximum possible number of threads.
Definition: ThreadPool.h:259
GUL_EXPORT bool is_shutdown_requested() const
Determine whether the thread pool has been requested to shut down.
Definition: ThreadPool.cc:181
GUL_EXPORT bool is_full() const noexcept
Determine whether the queue for pending tasks is full (at capacity).
Definition: ThreadPool.cc:164
std::shared_ptr< ThreadPool > make_thread_pool(std::size_t num_threads, std::size_t capacity=ThreadPool::default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for queuing tasks.
Definition: ThreadPool.h:625
TaskState
An enum describing the state of an individual task.
Definition: ThreadPool.h:66
@ running
The task is currently being executed.
@ pending
The task is waiting to be started.
@ canceled
The task was removed from the queue before it was started.
@ complete
The task has finished (successfully or by throwing an exception).
std::string cat()
Efficiently concatenate an arbitrary number of strings and numbers.
Definition: cat.h:97
Namespace gul14 contains all functions and classes of the General Utility Library.
Definition: doxygen.h:26
Some metaprogramming traits for the General Utility Library.