Bitcoin Core Fuzz Coverage Report for #26966

Coverage Report

Created: 2025-10-10 09:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/Users/brunogarcia/projects/bitcoin-core-dev/src/util/threadpool.h
Line
Count
Source
1
// Copyright (c) 2024-present The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5
#ifndef BITCOIN_UTIL_THREADPOOL_H
6
#define BITCOIN_UTIL_THREADPOOL_H
7
8
#include <sync.h>
9
#include <util/string.h>
10
#include <util/thread.h>
11
#include <util/threadinterrupt.h>
12
13
#include <algorithm>
14
#include <atomic>
15
#include <condition_variable>
16
#include <cstddef>
17
#include <functional>
18
#include <future>
19
#include <memory>
20
#include <stdexcept>
21
#include <utility>
22
#include <queue>
23
#include <thread>
24
#include <vector>
25
26
/**
27
 * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
28
 *
29
 * The thread pool maintains a set of worker threads that consume and execute
30
 * tasks submitted through Submit(). Once started, tasks can be queued and
31
 * processed asynchronously until Stop() is called.
32
 *
33
 * ### Thread-safety and lifecycle
34
 * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
35
 *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
36
 *   workers to join, including the current one.
37
 *
38
 * - `Submit()` can be called from any thread, including workers. It safely
39
 *   enqueues new work for execution as long as the pool has active workers.
40
 *
41
 * - `Stop()` prevents further task submission and wakes all worker threads.
42
 *   Workers finish processing all remaining queued tasks before exiting,
43
 *   guaranteeing that no caller waits forever on a pending future.
44
 */
45
class ThreadPool {
46
47
private:
48
    std::string m_name;
49
    Mutex m_mutex;
50
    std::queue<std::function<void()>> m_work_queue GUARDED_BY(m_mutex);
51
    std::condition_variable m_cv;
52
    // Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable.
53
    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
54
    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
55
    bool m_interrupt GUARDED_BY(m_mutex){false};
56
    std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
57
58
    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
59
459
    {
60
459
        WAIT_LOCK(m_mutex, wait_lock);
Line
Count
Source
265
459
#define WAIT_LOCK(cs, name) UniqueLock name(LOCK_ARGS(cs))
Line
Count
Source
263
459
#define LOCK_ARGS(cs) MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__
61
100k
        for (;;) {
62
100k
            std::function<void()> task;
63
100k
            {
64
                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
65
100k
                if (!m_interrupt && 
m_work_queue.empty()100k
) {
66
                    // Block until the pool is interrupted or a task is available.
67
52.0k
                    m_cv.wait(wait_lock,[&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || 
!m_work_queue.empty()51.5k
; });
68
24.1k
                }
69
70
                // If stopped and no work left, exit worker
71
100k
                if (m_interrupt && 
m_work_queue.empty()459
) {
72
459
                    return;
73
459
                }
74
75
99.5k
                task = std::move(m_work_queue.front());
76
99.5k
                m_work_queue.pop();
77
99.5k
            }
78
79
0
            {
80
                // Execute the task without the lock
81
99.5k
                REVERSE_LOCK(wait_lock, m_mutex);
Line
Count
Source
245
99.5k
#define REVERSE_LOCK(g, cs) typename std::decay<decltype(g)>::type::reverse_lock UNIQUE_NAME(revlock)(g, cs, #cs, __FILE__, __LINE__)
Line
Count
Source
11
99.5k
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
99.5k
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
99.5k
#define PASTE(x, y) x ## y
82
99.5k
                task();
83
99.5k
            }
84
99.5k
        }
85
459
    }
86
87
public:
88
201
    explicit ThreadPool(const std::string& name) : m_name(name) {}
89
90
    ~ThreadPool()
91
201
    {
92
201
        Stop(); // In case it hasn't been stopped.
93
201
    }
94
95
    /**
96
     * @brief Start worker threads.
97
     *
98
     * Creates and launches `num_workers` threads that begin executing tasks
99
     * from the queue. If the pool is already started, throws.
100
     *
101
     * Must be called from a controller (non-worker) thread.
102
     */
103
    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
104
201
    {
105
201
        assert(num_workers > 0);
106
201
        LOCK(m_mutex);
Line
Count
Source
259
201
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
201
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
201
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
201
#define PASTE(x, y) x ## y
107
201
        if (!m_workers.empty()) 
throw std::runtime_error("Thread pool already started")0
;
108
201
        m_interrupt = false; // Reset
109
110
        // Create workers
111
660
        for (int i = 0; i < num_workers; 
i++459
) {
112
459
            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
113
459
        }
114
201
    }
115
116
    /**
117
     * @brief Stop all worker threads and wait for them to exit.
118
     *
119
     * Sets the interrupt flag, wakes all waiting workers, and joins them.
120
     * Any remaining tasks in the queue will be processed before returning.
121
     *
122
     * Must be called from a controller (non-worker) thread.
123
     */
124
    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
125
402
    {
126
        // Notify workers and join them.
127
402
        std::vector<std::thread> threads_to_join;
128
402
        {
129
402
            LOCK(m_mutex);
Line
Count
Source
259
402
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
402
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
402
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
402
#define PASTE(x, y) x ## y
130
402
            m_interrupt = true;
131
402
            threads_to_join.swap(m_workers);
132
402
        }
133
402
        m_cv.notify_all();
134
459
        for (auto& worker : threads_to_join) worker.join();
135
402
        {
136
            // Sanity cleanup: release any std::function captured shared_ptrs
137
402
            LOCK(m_mutex);
Line
Count
Source
259
402
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
402
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
402
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
402
#define PASTE(x, y) x ## y
138
402
            std::queue<std::function<void()>> empty;
139
402
            m_work_queue.swap(empty);
140
402
        }
141
        // Note: m_interrupt is left true until next Start()
142
402
    }
143
144
    /**
145
     * @brief Submit a new task for asynchronous execution.
146
     *
147
     * Enqueues a callable to be executed by one of the worker threads.
148
     * Returns a `std::future` that can be used to retrieve the task’s result.
149
     */
150
    template<class T> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
151
    auto Submit(T task) -> std::future<decltype(task())>
152
99.5k
    {
153
99.5k
        using TaskType = std::packaged_task<decltype(task())()>;
154
99.5k
        auto ptr_task = std::make_shared<TaskType>(std::move(task));
155
99.5k
        std::future<decltype(task())> future = ptr_task->get_future();
156
99.5k
        {
157
99.5k
            LOCK(m_mutex);
Line
Count
Source
259
10.1k
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
10.1k
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
10.1k
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
10.1k
#define PASTE(x, y) x ## y
            LOCK(m_mutex);
Line
Count
Source
259
89.3k
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
89.3k
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
89.3k
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
89.3k
#define PASTE(x, y) x ## y
            LOCK(m_mutex);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
158
99.5k
            if (m_workers.empty() || m_interrupt) {
159
0
                throw std::runtime_error("No active workers; cannot accept new tasks");
160
0
            }
161
99.5k
            m_work_queue.emplace([ptr_task]() mutable {
162
99.3k
                (*ptr_task)();
163
99.3k
                ptr_task.reset(); // Explicitly release packaged_task and the stored function obj.
164
99.3k
            });
std::__1::future<decltype(fp())> ThreadPool::Submit<ThrowTask>(ThrowTask)::'lambda'()::operator()()
Line
Count
Source
161
10.1k
            m_work_queue.emplace([ptr_task]() mutable {
162
10.1k
                (*ptr_task)();
163
10.1k
                ptr_task.reset(); // Explicitly release packaged_task and the stored function obj.
164
10.1k
            });
std::__1::future<decltype(fp())> ThreadPool::Submit<CounterTask>(CounterTask)::'lambda'()::operator()()
Line
Count
Source
161
89.1k
            m_work_queue.emplace([ptr_task]() mutable {
162
89.1k
                (*ptr_task)();
163
89.1k
                ptr_task.reset(); // Explicitly release packaged_task and the stored function obj.
164
89.1k
            });
Unexecuted instantiation: base.cpp:std::__1::future<decltype(fp())> ThreadPool::Submit<BaseIndex::Sync()::$_4>(BaseIndex::Sync()::$_4)::'lambda'()::operator()()
165
99.5k
        }
166
0
        m_cv.notify_one();
167
99.5k
        return future;
168
99.5k
    }
std::__1::future<decltype(fp())> ThreadPool::Submit<ThrowTask>(ThrowTask)
Line
Count
Source
152
10.1k
    {
153
10.1k
        using TaskType = std::packaged_task<decltype(task())()>;
154
10.1k
        auto ptr_task = std::make_shared<TaskType>(std::move(task));
155
10.1k
        std::future<decltype(task())> future = ptr_task->get_future();
156
10.1k
        {
157
10.1k
            LOCK(m_mutex);
Line
Count
Source
259
10.1k
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
10.1k
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
10.1k
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
10.1k
#define PASTE(x, y) x ## y
158
10.1k
            if (m_workers.empty() || m_interrupt) {
159
0
                throw std::runtime_error("No active workers; cannot accept new tasks");
160
0
            }
161
10.1k
            m_work_queue.emplace([ptr_task]() mutable {
162
10.1k
                (*ptr_task)();
163
10.1k
                ptr_task.reset(); // Explicitly release packaged_task and the stored function obj.
164
10.1k
            });
165
10.1k
        }
166
0
        m_cv.notify_one();
167
10.1k
        return future;
168
10.1k
    }
std::__1::future<decltype(fp())> ThreadPool::Submit<CounterTask>(CounterTask)
Line
Count
Source
152
89.3k
    {
153
89.3k
        using TaskType = std::packaged_task<decltype(task())()>;
154
89.3k
        auto ptr_task = std::make_shared<TaskType>(std::move(task));
155
89.3k
        std::future<decltype(task())> future = ptr_task->get_future();
156
89.3k
        {
157
89.3k
            LOCK(m_mutex);
Line
Count
Source
259
89.3k
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
89.3k
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
89.3k
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
89.3k
#define PASTE(x, y) x ## y
158
89.3k
            if (m_workers.empty() || m_interrupt) {
159
0
                throw std::runtime_error("No active workers; cannot accept new tasks");
160
0
            }
161
89.3k
            m_work_queue.emplace([ptr_task]() mutable {
162
89.3k
                (*ptr_task)();
163
89.3k
                ptr_task.reset(); // Explicitly release packaged_task and the stored function obj.
164
89.3k
            });
165
89.3k
        }
166
0
        m_cv.notify_one();
167
89.3k
        return future;
168
89.3k
    }
Unexecuted instantiation: base.cpp:std::__1::future<decltype(fp())> ThreadPool::Submit<BaseIndex::Sync()::$_4>(BaseIndex::Sync()::$_4)
169
170
    /**
171
     * @brief Execute a single queued task synchronously.
172
     * Removes one task from the queue and executes it on the calling thread.
173
     */
174
    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
175
0
    {
176
0
        std::function<void()> task;
177
0
        {
178
0
            LOCK(m_mutex);
179
0
            if (m_work_queue.empty()) return;
180
0
181
0
            // Pop the task
182
0
            task = std::move(m_work_queue.front());
183
0
            m_work_queue.pop();
184
0
        }
185
0
        task();
186
0
    }
187
188
    size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
189
402
    {
190
402
        return WITH_LOCK(m_mutex, return m_work_queue.size());
Line
Count
Source
290
402
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
191
402
    }
192
193
    size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
194
201
    {
195
201
        return WITH_LOCK(m_mutex, return m_workers.size());
Line
Count
Source
290
201
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
196
201
    }
197
};
198
199
#endif // BITCOIN_UTIL_THREADPOOL_H