/Users/brunogarcia/projects/bitcoin-core-dev/src/util/threadpool.h
Line | Count | Source |
1 | | // Copyright (c) 2024-present The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or https://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_UTIL_THREADPOOL_H |
6 | | #define BITCOIN_UTIL_THREADPOOL_H |
7 | | |
8 | | #include <sync.h> |
9 | | #include <util/string.h> |
10 | | #include <util/thread.h> |
11 | | #include <util/threadinterrupt.h> |
12 | | |
13 | | #include <algorithm> |
14 | | #include <atomic> |
15 | | #include <condition_variable> |
16 | | #include <cstddef> |
17 | | #include <functional> |
18 | | #include <future> |
19 | | #include <memory> |
20 | | #include <stdexcept> |
21 | | #include <utility> |
22 | | #include <queue> |
23 | | #include <thread> |
24 | | #include <vector> |
25 | | |
26 | | /** |
27 | | * @brief Fixed-size thread pool for running arbitrary tasks concurrently. |
28 | | * |
29 | | * The thread pool maintains a set of worker threads that consume and execute |
30 | | * tasks submitted through Submit(). Once started, tasks can be queued and |
31 | | * processed asynchronously until Stop() is called. |
32 | | * |
33 | | * ### Thread-safety and lifecycle |
34 | | * - `Start()` and `Stop()` must be called from a controller (non-worker) thread. |
35 | | * Calling `Stop()` from a worker thread will deadlock, as it waits for all |
36 | | * workers to join, including the current one. |
37 | | * |
38 | | * - `Submit()` can be called from any thread, including workers. It safely |
39 | | * enqueues new work for execution as long as the pool has active workers. |
40 | | * |
41 | | * - `Stop()` prevents further task submission and wakes all worker threads. |
42 | | * Workers finish processing all remaining queued tasks before exiting, |
43 | | * guaranteeing that no caller waits forever on a pending future. |
44 | | */ |
45 | | class ThreadPool { |
46 | | |
47 | | private: |
48 | | std::string m_name; |
49 | | Mutex m_mutex; |
50 | | std::queue<std::function<void()>> m_work_queue GUARDED_BY(m_mutex); |
51 | | std::condition_variable m_cv; |
52 | | // Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable. |
53 | | // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals. |
54 | | // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable |
55 | | bool m_interrupt GUARDED_BY(m_mutex){false}; |
56 | | std::vector<std::thread> m_workers GUARDED_BY(m_mutex); |
57 | | |
58 | | void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
59 | 459 | { |
60 | 459 | WAIT_LOCK(m_mutex, wait_lock); Line | Count | Source | 265 | 459 | #define WAIT_LOCK(cs, name) UniqueLock name(LOCK_ARGS(cs)) Line | Count | Source | 263 | 459 | #define LOCK_ARGS(cs) MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__ |
|
|
61 | 100k | for (;;) { |
62 | 100k | std::function<void()> task; |
63 | 100k | { |
64 | | // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one. |
65 | 100k | if (!m_interrupt && m_work_queue.empty()100k ) { |
66 | | // Block until the pool is interrupted or a task is available. |
67 | 52.0k | m_cv.wait(wait_lock,[&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty()51.5k ; }); |
68 | 24.1k | } |
69 | | |
70 | | // If stopped and no work left, exit worker |
71 | 100k | if (m_interrupt && m_work_queue.empty()459 ) { |
72 | 459 | return; |
73 | 459 | } |
74 | | |
75 | 99.5k | task = std::move(m_work_queue.front()); |
76 | 99.5k | m_work_queue.pop(); |
77 | 99.5k | } |
78 | | |
79 | 0 | { |
80 | | // Execute the task without the lock |
81 | 99.5k | REVERSE_LOCK(wait_lock, m_mutex); Line | Count | Source | 245 | 99.5k | #define REVERSE_LOCK(g, cs) typename std::decay<decltype(g)>::type::reverse_lock UNIQUE_NAME(revlock)(g, cs, #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 99.5k | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 99.5k | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 99.5k | #define PASTE(x, y) x ## y |
|
|
|
|
82 | 99.5k | task(); |
83 | 99.5k | } |
84 | 99.5k | } |
85 | 459 | } |
86 | | |
87 | | public: |
88 | 201 | explicit ThreadPool(const std::string& name) : m_name(name) {} |
89 | | |
90 | | ~ThreadPool() |
91 | 201 | { |
92 | 201 | Stop(); // In case it hasn't been stopped. |
93 | 201 | } |
94 | | |
95 | | /** |
96 | | * @brief Start worker threads. |
97 | | * |
98 | | * Creates and launches `num_workers` threads that begin executing tasks |
99 | | * from the queue. If the pool is already started, throws. |
100 | | * |
101 | | * Must be called from a controller (non-worker) thread. |
102 | | */ |
103 | | void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
104 | 201 | { |
105 | 201 | assert(num_workers > 0); |
106 | 201 | LOCK(m_mutex); Line | Count | Source | 259 | 201 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 201 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 201 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 201 | #define PASTE(x, y) x ## y |
|
|
|
|
107 | 201 | if (!m_workers.empty()) throw std::runtime_error("Thread pool already started")0 ; |
108 | 201 | m_interrupt = false; // Reset |
109 | | |
110 | | // Create workers |
111 | 660 | for (int i = 0; i < num_workers; i++459 ) { |
112 | 459 | m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); }); |
113 | 459 | } |
114 | 201 | } |
115 | | |
116 | | /** |
117 | | * @brief Stop all worker threads and wait for them to exit. |
118 | | * |
119 | | * Sets the interrupt flag, wakes all waiting workers, and joins them. |
120 | | * Any remaining tasks in the queue will be processed before returning. |
121 | | * |
122 | | * Must be called from a controller (non-worker) thread. |
123 | | */ |
124 | | void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
125 | 402 | { |
126 | | // Notify workers and join them. |
127 | 402 | std::vector<std::thread> threads_to_join; |
128 | 402 | { |
129 | 402 | LOCK(m_mutex); Line | Count | Source | 259 | 402 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 402 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 402 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 402 | #define PASTE(x, y) x ## y |
|
|
|
|
130 | 402 | m_interrupt = true; |
131 | 402 | threads_to_join.swap(m_workers); |
132 | 402 | } |
133 | 402 | m_cv.notify_all(); |
134 | 459 | for (auto& worker : threads_to_join) worker.join(); |
135 | 402 | { |
136 | | // Sanity cleanup: release any std::function captured shared_ptrs |
137 | 402 | LOCK(m_mutex); Line | Count | Source | 259 | 402 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 402 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 402 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 402 | #define PASTE(x, y) x ## y |
|
|
|
|
138 | 402 | std::queue<std::function<void()>> empty; |
139 | 402 | m_work_queue.swap(empty); |
140 | 402 | } |
141 | | // Note: m_interrupt is left true until next Start() |
142 | 402 | } |
143 | | |
144 | | /** |
145 | | * @brief Submit a new task for asynchronous execution. |
146 | | * |
147 | | * Enqueues a callable to be executed by one of the worker threads. |
148 | | * Returns a `std::future` that can be used to retrieve the task’s result. |
149 | | */ |
150 | | template<class T> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
151 | | auto Submit(T task) -> std::future<decltype(task())> |
152 | 99.5k | { |
153 | 99.5k | using TaskType = std::packaged_task<decltype(task())()>; |
154 | 99.5k | auto ptr_task = std::make_shared<TaskType>(std::move(task)); |
155 | 99.5k | std::future<decltype(task())> future = ptr_task->get_future(); |
156 | 99.5k | { |
157 | 99.5k | LOCK(m_mutex); Line | Count | Source | 259 | 10.1k | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 10.1k | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 10.1k | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 10.1k | #define PASTE(x, y) x ## y |
|
|
|
| LOCK(m_mutex); Line | Count | Source | 259 | 89.3k | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 89.3k | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 89.3k | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 89.3k | #define PASTE(x, y) x ## y |
|
|
|
| LOCK(m_mutex); Line | Count | Source | 259 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
158 | 99.5k | if (m_workers.empty() || m_interrupt) { |
159 | 0 | throw std::runtime_error("No active workers; cannot accept new tasks"); |
160 | 0 | } |
161 | 99.5k | m_work_queue.emplace([ptr_task]() mutable { |
162 | 99.3k | (*ptr_task)(); |
163 | 99.3k | ptr_task.reset(); // Explicitly release packaged_task and the stored function obj. |
164 | 99.3k | }); std::__1::future<decltype(fp())> ThreadPool::Submit<ThrowTask>(ThrowTask)::'lambda'()::operator()() Line | Count | Source | 161 | 10.1k | m_work_queue.emplace([ptr_task]() mutable { | 162 | 10.1k | (*ptr_task)(); | 163 | 10.1k | ptr_task.reset(); // Explicitly release packaged_task and the stored function obj. | 164 | 10.1k | }); |
std::__1::future<decltype(fp())> ThreadPool::Submit<CounterTask>(CounterTask)::'lambda'()::operator()() Line | Count | Source | 161 | 89.1k | m_work_queue.emplace([ptr_task]() mutable { | 162 | 89.1k | (*ptr_task)(); | 163 | 89.1k | ptr_task.reset(); // Explicitly release packaged_task and the stored function obj. | 164 | 89.1k | }); |
Unexecuted instantiation: base.cpp:std::__1::future<decltype(fp())> ThreadPool::Submit<BaseIndex::Sync()::$_4>(BaseIndex::Sync()::$_4)::'lambda'()::operator()() |
165 | 99.5k | } |
166 | 0 | m_cv.notify_one(); |
167 | 99.5k | return future; |
168 | 99.5k | } std::__1::future<decltype(fp())> ThreadPool::Submit<ThrowTask>(ThrowTask) Line | Count | Source | 152 | 10.1k | { | 153 | 10.1k | using TaskType = std::packaged_task<decltype(task())()>; | 154 | 10.1k | auto ptr_task = std::make_shared<TaskType>(std::move(task)); | 155 | 10.1k | std::future<decltype(task())> future = ptr_task->get_future(); | 156 | 10.1k | { | 157 | 10.1k | LOCK(m_mutex); Line | Count | Source | 259 | 10.1k | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 10.1k | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 10.1k | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 10.1k | #define PASTE(x, y) x ## y |
|
|
|
| 158 | 10.1k | if (m_workers.empty() || m_interrupt) { | 159 | 0 | throw std::runtime_error("No active workers; cannot accept new tasks"); | 160 | 0 | } | 161 | 10.1k | m_work_queue.emplace([ptr_task]() mutable { | 162 | 10.1k | (*ptr_task)(); | 163 | 10.1k | ptr_task.reset(); // Explicitly release packaged_task and the stored function obj. | 164 | 10.1k | }); | 165 | 10.1k | } | 166 | 0 | m_cv.notify_one(); | 167 | 10.1k | return future; | 168 | 10.1k | } |
std::__1::future<decltype(fp())> ThreadPool::Submit<CounterTask>(CounterTask) Line | Count | Source | 152 | 89.3k | { | 153 | 89.3k | using TaskType = std::packaged_task<decltype(task())()>; | 154 | 89.3k | auto ptr_task = std::make_shared<TaskType>(std::move(task)); | 155 | 89.3k | std::future<decltype(task())> future = ptr_task->get_future(); | 156 | 89.3k | { | 157 | 89.3k | LOCK(m_mutex); Line | Count | Source | 259 | 89.3k | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 89.3k | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 89.3k | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 89.3k | #define PASTE(x, y) x ## y |
|
|
|
| 158 | 89.3k | if (m_workers.empty() || m_interrupt) { | 159 | 0 | throw std::runtime_error("No active workers; cannot accept new tasks"); | 160 | 0 | } | 161 | 89.3k | m_work_queue.emplace([ptr_task]() mutable { | 162 | 89.3k | (*ptr_task)(); | 163 | 89.3k | ptr_task.reset(); // Explicitly release packaged_task and the stored function obj. | 164 | 89.3k | }); | 165 | 89.3k | } | 166 | 0 | m_cv.notify_one(); | 167 | 89.3k | return future; | 168 | 89.3k | } |
Unexecuted instantiation: base.cpp:std::__1::future<decltype(fp())> ThreadPool::Submit<BaseIndex::Sync()::$_4>(BaseIndex::Sync()::$_4) |
169 | | |
170 | | /** |
171 | | * @brief Execute a single queued task synchronously. |
172 | | * Removes one task from the queue and executes it on the calling thread. |
173 | | */ |
174 | | void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
175 | 0 | { |
176 | 0 | std::function<void()> task; |
177 | 0 | { |
178 | 0 | LOCK(m_mutex); |
179 | 0 | if (m_work_queue.empty()) return; |
180 | 0 |
|
181 | 0 | // Pop the task |
182 | 0 | task = std::move(m_work_queue.front()); |
183 | 0 | m_work_queue.pop(); |
184 | 0 | } |
185 | 0 | task(); |
186 | 0 | } |
187 | | |
188 | | size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
189 | 402 | { |
190 | 402 | return WITH_LOCK(m_mutex, return m_work_queue.size()); Line | Count | Source | 290 | 402 | #define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }()) |
|
191 | 402 | } |
192 | | |
193 | | size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
194 | 201 | { |
195 | 201 | return WITH_LOCK(m_mutex, return m_workers.size()); Line | Count | Source | 290 | 201 | #define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }()) |
|
196 | 201 | } |
197 | | }; |
198 | | |
199 | | #endif // BITCOIN_UTIL_THREADPOOL_H |