Doxygen
Loading...
Searching...
No Matches
threadpool.h
Go to the documentation of this file.
1/******************************************************************************
2 *
3 * Copyright (C) 1997-2020 by Dimitri van Heesch.
4 *
5 * Permission to use, copy, modify, and distribute this software and its
6 * documentation under the terms of the GNU General Public License is hereby
7 * granted. No representations are made about the suitability of this software
8 * for any purpose. It is provided "as is" without express or implied warranty.
9 * See the GNU General Public License for more details.
10 *
11 * Documents produced by Doxygen are derivative works derived from the
12 * input used in their production; they are not affected by this license.
13 *
14 */
15
16#ifndef THREADPOOL_H
17#define THREADPOOL_H
18
19#include <condition_variable>
20#include <deque>
21#include <functional>
22#include <future>
23#include <mutex>
24#include <thread>
25#include <type_traits>
26#include <utility>
27#include <vector>
28
29/// Class managing a pool of worker threads.
30/// Work can be queued by passing a function to queue(). A future will be
31/// returned that can be used to obtain the result of the function after execution.
32///
33/// Usage example:
34/// @code
35/// ThreadPool pool(10);
36/// std::vector< std::future< int > > results;
37/// for (int i=0;i<10;i++)
38/// {
39/// auto run = [](int i) { return i*i; };
40/// results.emplace_back(pool.queue(std::bind(run,i)));
41/// }
42/// for (auto &f : results)
43/// {
44/// printf("Result %d:\n", f.get());
45/// }
46/// @endcode
48{
49 public:
50 /// start N threads in the thread pool.
51 ThreadPool(std::size_t N=1)
52 {
53 for (std::size_t i = 0; i < N; ++i)
54 {
55 // each thread is a std::async running thread_task():
56 m_finished.push_back(
57 std::async(
58 std::launch::async,
59 [this]{ threadTask(); }
60 )
61 );
62 }
63 }
64 /// deletes the thread pool by finishing all threads
66 {
67 finish();
68 }
69 ThreadPool(const ThreadPool &) = delete;
70 ThreadPool &operator=(const ThreadPool &) = delete;
71 ThreadPool(ThreadPool &&) = delete;
73
74 /// Queue the callable function \a f for the threads to execute.
75 /// A future of the return type of the function is returned to capture the result.
76 template<class F, typename ...Args>
77 auto queue(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
78 {
79 // We wrap the function object into a packaged task, splitting
80 // execution from the return value.
81 // Since the packaged_task object is not copyable, we create it on the heap
82 // and capture it via a shared pointer in a lambda and then assign that lambda
83 // to a std::function.
84 using RetType = decltype(f(args...));
85 auto ptr = std::make_shared< std::packaged_task<RetType()> >(std::forward<F>(f), std::forward<Args>(args)...);
86 auto taskFunc = [ptr]() { if (ptr->valid()) (*ptr)(); };
87
88 auto r=ptr->get_future(); // get the return value before we hand off the task
89 {
90 std::unique_lock<std::mutex> l(m_mutex);
91 m_work.emplace_back(taskFunc);
92 m_cond.notify_one(); // wake a thread to work on the task
93 }
94
95 return r; // return the future result of the task
96 }
97
98 /// finish enques a "stop the thread" message for every thread,
99 /// then waits for them to finish
100 void finish()
101 {
102 {
103 std::unique_lock<std::mutex> l(m_mutex);
104 for(auto&& u : m_finished)
105 {
106 (void)u; //unused_variable, to silence the compiler warning about unused variables
107 m_work.emplace_back(); // insert empty function object to signal abort
108 }
109 }
110 m_cond.notify_all();
111 m_finished.clear();
112 }
113 private:
114
115 // the work that a worker thread does:
117 {
118 while(true)
119 {
120 // pop a task off the queue:
121 std::function<void()> f;
122 {
123 // usual thread-safe queue code:
124 std::unique_lock<std::mutex> l(m_mutex);
125 if (m_work.empty())
126 {
127 m_cond.wait(l,[&]{return !m_work.empty();});
128 }
129 f = std::move(m_work.front());
130 m_work.pop_front();
131 }
132 // if the function is empty, it means we are asked to abort
133 if (!f) return;
134 // run the task
135 f();
136 }
137 }
138
139 // the mutex, condition variable and deque form a single
140 // thread-safe triggered queue of tasks:
141 std::mutex m_mutex;
142 std::condition_variable m_cond;
143
144 // hold the queue of work
145 std::deque< std::function<void()> > m_work;
146
147 // this holds futures representing the worker threads being done:
148 std::vector< std::future<void> > m_finished;
149};
150
151#endif
152
void finish()
finish enques a "stop the thread" message for every thread, then waits for them to finish
Definition threadpool.h:100
~ThreadPool()
deletes the thread pool by finishing all threads
Definition threadpool.h:65
void threadTask()
Definition threadpool.h:116
std::deque< std::function< void()> > m_work
Definition threadpool.h:145
ThreadPool(std::size_t N=1)
start N threads in the thread pool.
Definition threadpool.h:51
std::mutex m_mutex
Definition threadpool.h:141
ThreadPool(const ThreadPool &)=delete
auto queue(F &&f, Args &&... args) -> std::future< decltype(f(args...))>
Queue the callable function f for the threads to execute.
Definition threadpool.h:77
std::vector< std::future< void > > m_finished
Definition threadpool.h:148
ThreadPool(ThreadPool &&)=delete
ThreadPool & operator=(ThreadPool &&)=delete
std::condition_variable m_cond
Definition threadpool.h:142
ThreadPool & operator=(const ThreadPool &)=delete