General Utility Library for C++14  2.12
ThreadPool.h
Go to the documentation of this file.
1 
23 // SPDX-License-Identifier: LGPL-2.1-or-later
24 
25 #ifndef GUL14_THREADPOOL_H_
26 #define GUL14_THREADPOOL_H_
27 
28 #include <chrono>
29 #include <condition_variable>
30 #include <functional>
31 #include <future>
32 #include <memory>
33 #include <mutex>
34 #include <stdexcept>
35 #include <thread>
36 #include <vector>
37 
38 #include <gul14/cat.h>
39 #include <gul14/traits.h>
40 
41 namespace gul14 {
42 
43 class ThreadPool;
44 
45 namespace detail {
46 
47 GUL_EXPORT
48 std::shared_ptr<ThreadPool> lock_pool_or_throw(std::weak_ptr<ThreadPool> pool);
49 
50 } // namespace detail
51 
64 enum class TaskState
65 {
66  pending,
67  running,
68  complete,
69  canceled
70 };
71 
109 class ThreadPool : public std::enable_shared_from_this<ThreadPool>
110 {
111 public:
113  using TaskId = std::uint64_t;
114 
131  template <typename T>
133  {
134  public:
142  {}
143 
156  TaskHandle(TaskId id, std::future<T> future, std::shared_ptr<ThreadPool> pool)
157  : future_{ std::move(future) }
158  , id_{ id }
159  , pool_{ std::move(pool) }
160  {}
161 
173  bool cancel()
174  {
175  future_ = {};
176  return detail::lock_pool_or_throw(pool_)->cancel_pending_task(id_);
177  }
178 
186  {
187  if (not future_.valid())
188  throw std::logic_error("Canceled task has no result");
189  return future_.get();
190  }
191 
204  bool is_complete() const
205  {
206  if (not future_.valid())
207  return false;
208 
209  return future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
210  }
211 
225  {
226  const auto state = detail::lock_pool_or_throw(pool_)->get_task_state(id_);
227 
228  if (state == InternalTaskState::unknown)
229  {
230  if (is_complete())
231  return TaskState::complete;
232  else
233  return TaskState::canceled;
234  }
235  return static_cast<TaskState>(state);
236  }
237 
238  private:
239  std::future<T> future_;
240  TaskId id_{ 0 };
241  std::weak_ptr<ThreadPool> pool_;
242  };
243 
244 
245  using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
246  using Duration = TimePoint::duration;
247 
249  constexpr static std::size_t default_capacity{ 200 };
250 
252  constexpr static std::size_t max_capacity{ 10'000'000 };
253 
255  constexpr static std::size_t max_threads{ 10'000 };
256 
264  ~ThreadPool();
265 
303  template <typename Function>
304  TaskHandle<invoke_result_t<Function, ThreadPool&>>
305  add_task(Function fct, TimePoint start_time = {}, std::string name = {})
306  {
307  static_assert(
308  is_invocable<Function, ThreadPool&>::value
309  || is_invocable<Function>::value,
310  "Invalid function signature: Must be T fct() or T fct(ThreadPool&)");
311 
312  using Result = invoke_result_t<Function, ThreadPool&>;
313 
314  TaskHandle<Result> task_handle = [this, &fct, start_time, &name]()
315  {
316  std::lock_guard<std::mutex> lock(mutex_);
317 
318  if (is_full_i())
319  {
320  throw std::runtime_error(cat(
321  "Cannot add task: Pending queue has reached capacity (",
322  pending_tasks_.size(), ')'));
323  }
324 
325  using PackagedTask = std::packaged_task<Result(ThreadPool&)>;
326 
327  auto named_task_ptr = std::make_unique<NamedTaskImpl<PackagedTask>>(
328  PackagedTask{ std::move(fct) }, std::move(name));
329 
330  TaskHandle<Result> handle{
331  next_task_id_, named_task_ptr->fct_.get_future(), shared_from_this() };
332 
333  pending_tasks_.emplace_back(
334  next_task_id_, std::move(named_task_ptr), start_time);
335 
336  ++next_task_id_;
337 
338  return handle;
339  }();
340 
341  cv_.notify_one();
342 
343  return task_handle;
344  }
345 
346  template <typename Function,
347  std::enable_if_t<is_invocable<Function>::value, bool> = true>
348  TaskHandle<invoke_result_t<Function>>
349  add_task(Function fct, TimePoint start_time = {}, std::string name = {})
350  {
351  return add_task(
352  [f = std::move(fct)](ThreadPool&) mutable { return f(); },
353  start_time, std::move(name));
354  }
355 
356  template <typename Function,
357  std::enable_if_t<is_invocable<Function, ThreadPool&>::value, bool> = true>
358  TaskHandle<invoke_result_t<Function, ThreadPool&>>
359  add_task(Function fct, Duration delay_before_start, std::string name = {})
360  {
361  return add_task(std::move(fct),
362  std::chrono::system_clock::now() + delay_before_start, std::move(name));
363  }
364 
365  template <typename Function,
366  std::enable_if_t<is_invocable<Function>::value, bool> = true>
367  TaskHandle<invoke_result_t<Function>>
368  add_task(Function fct, Duration delay_before_start, std::string name = {})
369  {
370  return add_task(std::move(fct),
371  std::chrono::system_clock::now() + delay_before_start, std::move(name));
372  }
373 
374  template <typename Function,
375  std::enable_if_t<is_invocable<Function, ThreadPool&>::value, bool> = true>
376  TaskHandle<invoke_result_t<Function, ThreadPool&>>
377  add_task(Function fct, std::string name)
378  {
379  return add_task(std::move(fct), TimePoint{}, std::move(name));
380  }
381 
382  template <typename Function,
383  std::enable_if_t<is_invocable<Function>::value, bool> = true>
384  TaskHandle<invoke_result_t<Function>>
385  add_task(Function fct, std::string name)
386  {
387  return add_task(std::move(fct), TimePoint{}, std::move(name));
388  }
389 
398  GUL_EXPORT
399  std::size_t cancel_pending_tasks();
400 
402  GUL_EXPORT
403  std::size_t capacity() const noexcept { return capacity_; }
404 
406  GUL_EXPORT
407  std::size_t count_pending() const;
408 
410  GUL_EXPORT
411  std::size_t count_threads() const noexcept;
412 
414  GUL_EXPORT
415  std::vector<std::string> get_pending_task_names() const;
416 
418  GUL_EXPORT
419  std::vector<std::string> get_running_task_names() const;
420 
422  GUL_EXPORT
423  bool is_full() const noexcept;
424 
429  GUL_EXPORT
430  bool is_idle() const;
431 
433  GUL_EXPORT
434  bool is_shutdown_requested() const;
435 
445  GUL_EXPORT
446  static std::shared_ptr<ThreadPool> make_shared(
447  std::size_t num_threads, std::size_t capacity = default_capacity);
448 
449 private:
457  enum class InternalTaskState
458  {
460  pending = static_cast<int>(TaskState::pending),
462  running = static_cast<int>(TaskState::running),
464  unknown
465  };
466 
467  struct NamedTask
468  {
469  NamedTask(std::string name)
470  : name_{ std::move(name) }
471  {}
472 
473  virtual ~NamedTask() = default;
474  virtual void operator()(ThreadPool& pool) = 0;
475 
476  std::string name_;
477  };
478 
479  template <typename FunctionType>
480  struct NamedTaskImpl : public NamedTask
481  {
482  public:
483  NamedTaskImpl(FunctionType fct, std::string name)
484  : NamedTask{ std::move(name) }
485  , fct_{ std::move(fct) }
486  {}
487 
488  void operator()(ThreadPool& pool) override { fct_(pool); }
489 
490  FunctionType fct_;
491  };
492 
493  struct Task
494  {
495  TaskId id_{};
496  std::unique_ptr<NamedTask> named_task_;
497  TimePoint start_time_{}; // When the task is to be started (at least no earlier)
498 
499  Task() = default;
500 
501  Task(TaskId task_id, std::unique_ptr<NamedTask> named_task, TimePoint start_time)
502  : id_{ task_id }
503  , named_task_{ std::move(named_task) }
504  , start_time_{ start_time }
505  {}
506  };
507 
508  std::size_t capacity_{ 0 };
509 
514  std::vector<std::thread> threads_;
515 
520  std::condition_variable cv_;
521 
522  mutable std::mutex mutex_; // Protects the following variables
523  std::vector<Task> pending_tasks_;
524  std::vector<TaskId> running_task_ids_;
525  std::vector<std::string> running_task_names_;
526  TaskId next_task_id_ = 0;
527  bool shutdown_requested_{ false };
528 
529 
547  ThreadPool(std::size_t num_threads, std::size_t capacity);
548 
561  GUL_EXPORT
562  bool cancel_pending_task(TaskId task_id);
563 
573  GUL_EXPORT
574  InternalTaskState get_task_state(TaskId task_id) const;
575 
580  GUL_EXPORT
581  bool is_full_i() const noexcept;
582 
587  void perform_work();
588 };
589 
599 inline std::shared_ptr<ThreadPool> make_thread_pool(
600  std::size_t num_threads, std::size_t capacity = ThreadPool::default_capacity)
601 {
602  return ThreadPool::make_shared(num_threads, capacity);
603 }
604 
606 
607 } // namespace gul14
608 
609 #endif // GUL14_THREADPOOL_H_
Declaration of the overload set for cat() and of the associated class ConvertingStringView.
A handle for a task that has (or had) been enqueued on a ThreadPool.
Definition: ThreadPool.h:133
T get_result()
Block until the task has finished and return its result.
Definition: ThreadPool.h:185
bool cancel()
Remove the task from the queue if it is still pending.
Definition: ThreadPool.h:173
TaskHandle()
Default-construct an invalid TaskHandle.
Definition: ThreadPool.h:141
bool is_complete() const
Determine whether the task has completed.
Definition: ThreadPool.h:204
TaskState get_state() const
Determine if the task is running, waiting to be started, completed, or has been canceled.
Definition: ThreadPool.h:224
TaskHandle(TaskId id, std::future< T > future, std::shared_ptr< ThreadPool > pool)
Construct a TaskHandle.
Definition: ThreadPool.h:156
A pool of worker threads with a task queue.
Definition: ThreadPool.h:110
std::uint64_t TaskId
A unique identifier for a task.
Definition: ThreadPool.h:113
~ThreadPool()
Destruct the ThreadPool and join all threads.
Definition: ThreadPool.cc:68
static GUL_EXPORT std::shared_ptr< ThreadPool > make_shared(std::size_t num_threads, std::size_t capacity=default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for enqueuing task...
Definition: ThreadPool.cc:178
GUL_EXPORT std::size_t cancel_pending_tasks()
Remove all pending tasks from the queue.
Definition: ThreadPool.cc:97
GUL_EXPORT bool is_idle() const
Return true if the pool has neither pending tasks nor tasks that are currently being executed.
Definition: ThreadPool.cc:148
GUL_EXPORT std::vector< std::string > get_running_task_names() const
Return a vector with the names of the tasks that are currently running.
Definition: ThreadPool.cc:131
GUL_EXPORT std::vector< std::string > get_pending_task_names() const
Return a vector with the names of the tasks that are waiting to be executed.
Definition: ThreadPool.cc:118
GUL_EXPORT std::size_t capacity() const noexcept
Return the maximum number of pending tasks that can be queued.
Definition: ThreadPool.h:403
constexpr static std::size_t default_capacity
Default capacity for the task queue.
Definition: ThreadPool.h:249
GUL_EXPORT std::size_t count_pending() const
Return the number of pending tasks.
Definition: ThreadPool.cc:107
TaskHandle< invoke_result_t< Function, ThreadPool & > > add_task(Function fct, TimePoint start_time={}, std::string name={})
Enqueue a task.
Definition: ThreadPool.h:305
constexpr static std::size_t max_capacity
Maximum possible capacity for the task queue.
Definition: ThreadPool.h:252
GUL_EXPORT std::size_t count_threads() const noexcept
Return the number of threads in the pool.
Definition: ThreadPool.cc:113
constexpr static std::size_t max_threads
Maximum possible number of threads.
Definition: ThreadPool.h:255
GUL_EXPORT bool is_shutdown_requested() const
Determine whether the thread pool has been requested to shut down.
Definition: ThreadPool.cc:172
GUL_EXPORT bool is_full() const noexcept
Determine whether the queue for pending tasks is full (at capacity).
Definition: ThreadPool.cc:137
std::shared_ptr< ThreadPool > make_thread_pool(std::size_t num_threads, std::size_t capacity=ThreadPool::default_capacity)
Create a thread pool with the desired number of threads and the specified capacity for queuing tasks.
Definition: ThreadPool.h:599
TaskState
An enum describing the state of an individual task.
Definition: ThreadPool.h:65
@ running
The task is currently being executed.
@ pending
The task is waiting to be started.
@ canceled
The task was removed from the queue before it was started.
@ complete
The task has finished (successfully or by throwing an exception).
std::string cat()
Efficiently concatenate an arbitrary number of strings and numbers.
Definition: cat.h:97
Namespace gul14 contains all functions and classes of the General Utility Library.
Definition: doxygen.h:26
Some metaprogramming traits for the General Utility Library.