Bitcoin Core Fuzz Coverage Report for #26966

Coverage Report

Created: 2025-10-10 09:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/Users/brunogarcia/projects/bitcoin-core-dev/src/index/base.cpp
Line
Count
Source
1
// Copyright (c) 2017-present The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5
#include <chainparams.h>
6
#include <common/args.h>
7
#include <index/base.h>
8
#include <interfaces/chain.h>
9
#include <kernel/chain.h>
10
#include <logging.h>
11
#include <node/abort.h>
12
#include <node/blockstorage.h>
13
#include <node/context.h>
14
#include <node/database_args.h>
15
#include <node/interface_ui.h>
16
#include <util/threadpool.h>
17
#include <tinyformat.h>
18
#include <undo.h>
19
#include <util/string.h>
20
#include <util/thread.h>
21
#include <util/translation.h>
22
#include <validation.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <memory>
27
#include <optional>
28
#include <stdexcept>
29
#include <string>
30
#include <thread>
31
#include <utility>
32
33
constexpr uint8_t DB_BEST_BLOCK{'B'};
34
35
constexpr auto SYNC_LOG_INTERVAL{30s};
36
constexpr auto SYNC_LOCATOR_WRITE_INTERVAL{30s};
37
38
template <typename... Args>
39
void BaseIndex::FatalErrorf(util::ConstevalFormatString<sizeof...(Args)> fmt, const Args&... args)
40
0
{
41
0
    auto message = tfm::format(fmt, args...);
42
0
    node::AbortNode(m_chain->context()->shutdown_request, m_chain->context()->exit_status, Untranslated(message), m_chain->context()->warnings.get());
43
0
}
Unexecuted instantiation: void BaseIndex::FatalErrorf<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>>(util::ConstevalFormatString<sizeof...(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>)>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&)
Unexecuted instantiation: void BaseIndex::FatalErrorf<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>>(util::ConstevalFormatString<sizeof...(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>)>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&)
Unexecuted instantiation: void BaseIndex::FatalErrorf<int>(util::ConstevalFormatString<sizeof...(int)>, int const&)
44
45
CBlockLocator GetLocator(interfaces::Chain& chain, const uint256& block_hash)
46
0
{
47
0
    CBlockLocator locator;
48
0
    bool found = chain.findBlock(block_hash, interfaces::FoundBlock().locator(locator));
49
0
    assert(found);
50
0
    assert(!locator.IsNull());
51
0
    return locator;
52
0
}
53
54
BaseIndex::DB::DB(const fs::path& path, size_t n_cache_size, bool f_memory, bool f_wipe, bool f_obfuscate) :
55
0
    CDBWrapper{DBParams{
56
0
        .path = path,
57
0
        .cache_bytes = n_cache_size,
58
0
        .memory_only = f_memory,
59
0
        .wipe_data = f_wipe,
60
0
        .obfuscate = f_obfuscate,
61
0
        .options = [] { DBOptions options; node::ReadDatabaseArgs(gArgs, options); return options; }()}}
62
0
{}
63
64
bool BaseIndex::DB::ReadBestBlock(CBlockLocator& locator) const
65
0
{
66
0
    bool success = Read(DB_BEST_BLOCK, locator);
67
0
    if (!success) {
68
0
        locator.SetNull();
69
0
    }
70
0
    return success;
71
0
}
72
73
void BaseIndex::DB::WriteBestBlock(CDBBatch& batch, const CBlockLocator& locator)
74
0
{
75
0
    batch.Write(DB_BEST_BLOCK, locator);
76
0
}
77
78
BaseIndex::BaseIndex(std::unique_ptr<interfaces::Chain> chain, std::string name)
79
0
    : m_chain{std::move(chain)}, m_name{std::move(name)} {}
80
81
BaseIndex::~BaseIndex()
82
0
{
83
0
    Interrupt();
84
0
    Stop();
85
0
}
86
87
bool BaseIndex::Init()
88
0
{
89
0
    AssertLockNotHeld(cs_main);
Line
Count
Source
142
0
#define AssertLockNotHeld(cs) AssertLockNotHeldInline(#cs, __FILE__, __LINE__, &cs)
90
91
    // May need reset if index is being restarted.
92
0
    m_interrupt.reset();
93
94
    // m_chainstate member gives indexing code access to node internals. It is
95
    // removed in followup https://github.com/bitcoin/bitcoin/pull/24230
96
0
    m_chainstate = WITH_LOCK(::cs_main,
Line
Count
Source
290
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
97
0
        return &m_chain->context()->chainman->GetChainstateForIndexing());
98
    // Register to validation interface before setting the 'm_synced' flag, so that
99
    // callbacks are not missed once m_synced is true.
100
0
    m_chain->context()->validation_signals->RegisterValidationInterface(this);
101
102
0
    CBlockLocator locator;
103
0
    if (!GetDB().ReadBestBlock(locator)) {
104
0
        locator.SetNull();
105
0
    }
106
107
0
    LOCK(cs_main);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
108
0
    CChain& index_chain = m_chainstate->m_chain;
109
110
0
    if (locator.IsNull()) {
111
0
        SetBestBlockIndex(nullptr);
112
0
    } else {
113
        // Setting the best block to the locator's top block. If it is not part of the
114
        // best chain, we will rewind to the fork point during index sync
115
0
        const CBlockIndex* locator_index{m_chainstate->m_blockman.LookupBlockIndex(locator.vHave.at(0))};
116
0
        if (!locator_index) {
117
0
            return InitError(Untranslated(strprintf("best block of %s not found. Please rebuild the index.", GetName())));
Line
Count
Source
1172
0
#define strprintf tfm::format
118
0
        }
119
0
        SetBestBlockIndex(locator_index);
120
0
    }
121
122
    // Child init
123
0
    const CBlockIndex* start_block = m_best_block_index.load();
124
0
    if (!CustomInit(start_block ? std::make_optional(interfaces::BlockRef{start_block->GetBlockHash(), start_block->nHeight}) : std::nullopt)) {
125
0
        return false;
126
0
    }
127
128
    // Note: this will latch to true immediately if the user starts up with an empty
129
    // datadir and an index enabled. If this is the case, indexation will happen solely
130
    // via `BlockConnected` signals until, possibly, the next restart.
131
0
    m_synced = start_block == index_chain.Tip();
132
0
    m_init = true;
133
0
    return true;
134
0
}
135
136
static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain& chain) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
137
0
{
138
0
    AssertLockHeld(cs_main);
Line
Count
Source
137
0
#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs)
139
140
0
    if (!pindex_prev) {
141
0
        return chain.Genesis();
142
0
    }
143
144
0
    const CBlockIndex* pindex = chain.Next(pindex_prev);
145
0
    if (pindex) {
146
0
        return pindex;
147
0
    }
148
149
    // Since block is not in the chain, return the next block in the chain AFTER the last common ancestor.
150
    // Caller will be responsible for rewinding back to the common ancestor.
151
0
    return chain.Next(chain.FindFork(pindex_prev));
152
0
}
153
154
std::any BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
155
0
{
156
0
    interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data);
157
158
0
    CBlock block;
159
0
    if (!block_data) { // disk lookup if block data wasn't provided
160
0
        if (!m_chainstate->m_blockman.ReadBlock(block, *pindex)) {
161
0
            FatalErrorf("Failed to read block %s from disk",
162
0
                        pindex->GetBlockHash().ToString());
163
0
            return {};
164
0
        }
165
0
        block_info.data = &block;
166
0
    }
167
168
0
    CBlockUndo block_undo;
169
0
    if (CustomOptions().connect_undo_data) {
170
0
        if (pindex->nHeight > 0 && !m_chainstate->m_blockman.ReadBlockUndo(block_undo, *pindex)) {
171
0
            FatalErrorf("Failed to read undo block data %s from disk",
172
0
                        pindex->GetBlockHash().ToString());
173
0
            return {};
174
0
        }
175
0
        block_info.undo_data = &block_undo;
176
0
    }
177
178
0
    const auto& any_obj = CustomProcessBlock(block_info);
179
0
    if (!any_obj.has_value()) {
180
0
        FatalErrorf("Failed to process block %s for index %s",
181
0
                    pindex->GetBlockHash().GetHex(), GetName());
182
0
        return {};
183
0
    }
184
0
    return any_obj;
185
0
}
186
187
std::vector<std::any> BaseIndex::ProcessBlocks(bool process_in_order, const CBlockIndex* start, const CBlockIndex* end)
188
0
{
189
0
    std::vector<std::any> results;
190
0
    if (process_in_order) {
191
        // When ordering is required, collect all block indexes from [end..start] in order
192
0
        std::vector<const CBlockIndex*> ordered_blocks;
193
0
        for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
194
0
            ordered_blocks.emplace_back(block);
195
0
        }
196
197
        // And process blocks in forward order: from start to end
198
0
        for (auto it = ordered_blocks.rbegin(); it != ordered_blocks.rend(); ++it) {
199
0
            auto op_res = ProcessBlock(*it);
200
0
            if (!op_res.has_value()) return {};
201
0
            results.emplace_back(std::move(op_res));
202
0
        }
203
0
        return results;
204
0
    }
205
206
    // If ordering is not required, process blocks directly from end to start
207
0
    for (const CBlockIndex* block = end; block && start->pprev != block; block = block->pprev) {
208
0
        auto op_res = ProcessBlock(block);
209
0
        if (!op_res.has_value()) return {};
210
0
        results.emplace_back(std::move(op_res));
211
0
    }
212
213
0
    return results;
214
0
}
215
216
struct Task {
217
    int id;
218
    const CBlockIndex* start_index;
219
    const CBlockIndex* end_index;
220
    std::vector<std::any> result;
221
222
    Task(int task_id, const CBlockIndex* start, const CBlockIndex* end)
223
0
            : id(task_id), start_index(start), end_index(end) {}
224
225
    // Disallow copy
226
    Task(const Task&) = delete;
227
    Task& operator=(const Task&) = delete;
228
0
    Task(Task&&) noexcept = default;
229
};
230
231
// Context shared across the initial sync workers
232
struct SyncContext {
233
    Mutex mutex_pending_tasks;
234
    std::queue<Task> pending_tasks GUARDED_BY(mutex_pending_tasks);
235
236
    Mutex mutex_processed_tasks;
237
    std::map<int, Task> processed_tasks GUARDED_BY(mutex_processed_tasks);
238
239
    std::atomic<int> next_id_to_process{0};
240
};
241
242
// Synchronizes the index with the active chain.
243
//
244
// If parallel sync is enabled, this method uses WorkersCount()+1 threads (including the current thread)
245
// to process block ranges concurrently. Each worker handles up to 'm_blocks_per_worker' blocks each time
246
// (this is called a "task"), which are processed via CustomProcessBlock calls. Results are stored in the
247
// SyncContext's 'processed_tasks' map so they can be sequentially post-processed later.
248
//
249
// After completing a task, workers opportunistically post-process completed tasks *in order* using
250
// CustomPostProcessBlocks. This continues until all blocks have been fully processed and committed.
251
//
252
// Reorgs are detected and handled before syncing begins, ensuring the index starts aligned with the active chain.
253
void BaseIndex::Sync()
254
0
{
255
0
    if (m_synced) return; // we are synced, nothing to do
256
257
    // Before anything, verify we are in the active chain
258
0
    const CBlockIndex* pindex = m_best_block_index.load();
259
0
    const int tip_height = WITH_LOCK(cs_main, return m_chainstate->m_chain.Height());
Line
Count
Source
290
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
260
    // Note: be careful, could return null if there is no more work to do or if pindex is not found (erased blocks dir).
261
0
    const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
Line
Count
Source
290
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
262
0
    if (!pindex_next) {
263
0
        m_synced = true;
264
0
        return;
265
0
    }
266
267
    // Handle potential reorgs; if the next block's parent doesn't match our current tip,
268
    // rewind our index state to match the chain and resume from there.
269
0
    if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
270
0
        FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
271
0
        return;
272
0
    }
273
274
    // Compute tasks ranges
275
0
    const int blocks_to_sync = tip_height - pindex_next->nHeight;
276
0
    const int num_tasks = blocks_to_sync / m_blocks_per_worker;
277
0
    const int remaining_blocks = blocks_to_sync % m_blocks_per_worker;
278
0
    const bool process_in_order = !AllowParallelSync();
279
280
0
    std::shared_ptr<SyncContext> ctx = std::make_shared<SyncContext>();
281
0
    {
282
0
        LOCK2(ctx->mutex_pending_tasks, ::cs_main);
Line
Count
Source
261
0
    UniqueLock criticalblock1(MaybeCheckNotHeld(cs1), #cs1, __FILE__, __LINE__); \
262
0
    UniqueLock criticalblock2(MaybeCheckNotHeld(cs2), #cs2, __FILE__, __LINE__)
283
        // Create fixed-size tasks
284
0
        const CBlockIndex* it_start = pindex;
285
0
        const CBlockIndex* it_end;
286
0
        for (int id = 0; id < num_tasks; ++id) {
287
0
            it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
Line
Count
Source
106
0
#define Assert(val) inline_assertion_check<true>(val, __FILE__, __LINE__, __func__, #val)
288
0
            it_end = m_chainstate->m_chain[it_start->nHeight + m_blocks_per_worker - 1];
289
0
            ctx->pending_tasks.emplace(id, it_start, it_end);
290
0
            it_start = it_end;
291
0
        }
292
293
        // Add final task with the remaining blocks, if any
294
0
        if (remaining_blocks > 0) {
295
0
            it_start = Assert(NextSyncBlock(it_start, m_chainstate->m_chain));
Line
Count
Source
106
0
#define Assert(val) inline_assertion_check<true>(val, __FILE__, __LINE__, __func__, #val)
296
0
            it_end = m_chainstate->m_chain[it_start->nHeight + remaining_blocks];
297
0
            ctx->pending_tasks.emplace(/*task_id=*/num_tasks, it_start, it_end);
298
0
        }
299
0
    }
300
301
    // Returns nullopt only when there are no pending tasks
302
0
    const auto& try_get_task = [](auto& ctx) -> std::optional<Task> {
303
0
        LOCK(ctx->mutex_pending_tasks);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
        LOCK(ctx->mutex_pending_tasks);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
304
0
        if (ctx->pending_tasks.empty()) return std::nullopt;
305
0
        Task t = std::move(ctx->pending_tasks.front());
306
0
        ctx->pending_tasks.pop();
307
0
        return t;
308
0
    };
Unexecuted instantiation: base.cpp:_ZZN9BaseIndex4SyncEvENK3$_2clIKNSt3__110shared_ptrI11SyncContextEEEENS2_8optionalI4TaskEERT_
Unexecuted instantiation: base.cpp:_ZZN9BaseIndex4SyncEvENK3$_2clINSt3__110shared_ptrI11SyncContextEEEENS2_8optionalI4TaskEERT_
309
310
0
    enum class WorkerStatus { ABORT, PROCESSING, FINISHED };
311
312
0
    const auto& func_worker = [this, try_get_task, process_in_order](auto& ctx) -> WorkerStatus {
313
0
        if (m_interrupt) return WorkerStatus::FINISHED;
314
315
        // Try to obtain a task and process it
316
0
        if (std::optional<Task> maybe_task = try_get_task(ctx)) {
317
0
            Task task = std::move(*maybe_task);
318
0
            task.result = ProcessBlocks(process_in_order, task.start_index, task.end_index);
319
0
            if (task.result.empty()) {
320
                // Empty result indicates an internal error (logged internally).
321
0
                m_interrupt();  // notify other workers and abort.
322
0
                return WorkerStatus::ABORT;
323
0
            }
324
325
0
            LOCK(ctx->mutex_processed_tasks);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
            LOCK(ctx->mutex_processed_tasks);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
326
0
            ctx->processed_tasks.emplace(task.id, std::move(task));
327
0
        } else {
328
            // No pending tasks — might be finished
329
            // If we still have processed task to consume, proceed to finalize them.
330
0
            LOCK(ctx->mutex_processed_tasks);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
            LOCK(ctx->mutex_processed_tasks);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
331
0
            if (ctx->processed_tasks.empty()) return WorkerStatus::FINISHED;
332
0
        }
333
334
        // Post-process completed tasks opportunistically
335
0
        std::vector<Task> to_process;
336
0
        {
337
0
            TRY_LOCK(ctx->mutex_processed_tasks, locked);
Line
Count
Source
264
0
#define TRY_LOCK(cs, name) UniqueLock name(LOCK_ARGS(cs), true)
Line
Count
Source
263
0
#define LOCK_ARGS(cs) MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__
            TRY_LOCK(ctx->mutex_processed_tasks, locked);
Line
Count
Source
264
0
#define TRY_LOCK(cs, name) UniqueLock name(LOCK_ARGS(cs), true)
Line
Count
Source
263
0
#define LOCK_ARGS(cs) MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__
338
0
            if (!locked) return WorkerStatus::PROCESSING;
339
340
            // Collect ready-to-process tasks in order
341
0
            int next_id = ctx->next_id_to_process.load();
342
0
            while (true) {
343
0
                auto it = ctx->processed_tasks.find(next_id);
344
0
                if (it == ctx->processed_tasks.end()) break;
345
0
                to_process.push_back(std::move(it->second));
346
0
                ctx->processed_tasks.erase(it);
347
0
                ++next_id;
348
0
            }
349
350
            // Nothing to process right now, keep processing
351
0
            if (to_process.empty()) return WorkerStatus::PROCESSING;
352
0
        }
353
354
        // Post-Process tasks
355
0
        for (const Task& task : to_process) {
356
0
            for (auto it = task.result.rbegin(); it != task.result.rend(); ++it) {
357
0
                if (!CustomPostProcessBlocks(*it)) { // error logged internally
358
0
                    m_interrupt();
359
0
                    FatalErrorf("Index %s: Failed to post process blocks", GetName());
360
0
                    return WorkerStatus::ABORT;
361
0
                }
362
0
            }
363
            // Update progress
364
0
            SetBestBlockIndex(task.end_index);
365
0
            ctx->next_id_to_process.store(task.id + 1);
366
0
        }
367
368
        // Check if there's anything left to do
369
0
        LOCK2(ctx->mutex_pending_tasks, ctx->mutex_processed_tasks);
Line
Count
Source
261
0
    UniqueLock criticalblock1(MaybeCheckNotHeld(cs1), #cs1, __FILE__, __LINE__); \
262
0
    UniqueLock criticalblock2(MaybeCheckNotHeld(cs2), #cs2, __FILE__, __LINE__)
        LOCK2(ctx->mutex_pending_tasks, ctx->mutex_processed_tasks);
Line
Count
Source
261
0
    UniqueLock criticalblock1(MaybeCheckNotHeld(cs1), #cs1, __FILE__, __LINE__); \
262
0
    UniqueLock criticalblock2(MaybeCheckNotHeld(cs2), #cs2, __FILE__, __LINE__)
370
0
        if (ctx->pending_tasks.empty() && ctx->processed_tasks.empty()) {
371
0
            return WorkerStatus::FINISHED;
372
0
        }
373
374
0
        return WorkerStatus::PROCESSING;
375
0
    };
Unexecuted instantiation: base.cpp:_ZZN9BaseIndex4SyncEvENK3$_3clIKNSt3__110shared_ptrI11SyncContextEEEEZNS_4SyncEvE12WorkerStatusRT_
Unexecuted instantiation: base.cpp:_ZZN9BaseIndex4SyncEvENK3$_3clINSt3__110shared_ptrI11SyncContextEEEEZNS_4SyncEvE12WorkerStatusRT_
376
377
    // Process task in parallel if enabled
378
0
    std::vector<std::future<void>> workers_job;
379
0
    if (m_thread_pool) {
380
0
        for (size_t i = 0; i < m_thread_pool->WorkersCount(); ++i) {
381
0
            workers_job.emplace_back(m_thread_pool->Submit([this, ctx, func_worker]() {
382
0
                WorkerStatus status{WorkerStatus::PROCESSING};
383
0
                while (!m_synced && status == WorkerStatus::PROCESSING) {
384
0
                    status = func_worker(ctx);
385
0
                    if (m_interrupt) return;
386
0
                }
387
0
            }));
388
0
        }
389
0
    }
390
391
    // Main index thread
392
    // Active-wait: we process blocks in this thread too.
393
0
    auto last_log_time{NodeClock::now()};
394
0
    auto last_locator_write_time{last_log_time};
395
396
0
    while (!m_synced) {
397
0
        const WorkerStatus status{func_worker(ctx)};
398
0
        if (m_interrupt || status == WorkerStatus::ABORT) {
399
0
            m_interrupt();
400
            // Ensure all workers are interrupted before returning.
401
            // This avoids accessing any local variable post-destruction.
402
0
            for (const auto& job : workers_job) job.wait();
403
0
            return;
404
0
        }
405
406
0
        if (status == WorkerStatus::FINISHED) {
407
            // No more tasks to process; wait for all workers to finish their current tasks
408
0
            for (const auto& job : workers_job) job.wait();
409
            // No need to handle errors in Commit. If it fails, the error will already be
410
            // logged. The best way to recover is to continue, as index cannot be corrupted by
411
            // a missed commit to disk for an advanced index state.
412
0
            Commit();
413
414
            // Before finishing, check if any new blocks were connected while we were syncing.
415
            // If so, process them synchronously before exiting.
416
            //
417
            // Note: it is important for cs_main to be locked while setting m_synced = true,
418
            // otherwise a new block could be attached while m_synced is still false, and
419
            // it would not be indexed.
420
0
            LOCK2(ctx->mutex_pending_tasks, ::cs_main);
Line
Count
Source
261
0
    UniqueLock criticalblock1(MaybeCheckNotHeld(cs1), #cs1, __FILE__, __LINE__); \
262
0
    UniqueLock criticalblock2(MaybeCheckNotHeld(cs2), #cs2, __FILE__, __LINE__)
421
0
            const CBlockIndex* curr_tip{m_best_block_index.load()};
422
0
            pindex_next = NextSyncBlock(curr_tip, m_chainstate->m_chain);
423
            // If the next block is null, it means we are done!
424
0
            if (!pindex_next) {
425
0
                m_synced = true;
426
0
                break;
427
0
            }
428
429
            // New blocks arrived during sync.
430
            // Handle potential reorgs; if the next block's parent doesn't match our tip,
431
            // rewind index state to the correct chain, then resume.
432
0
            if (pindex_next->pprev != curr_tip && !Rewind(curr_tip, pindex_next->pprev)) {
433
0
                FatalErrorf("Failed to rewind index %s to a previous chain tip", GetName());
434
0
                return;
435
0
            }
436
437
            // Queue the final range of blocks to process.
438
0
            ctx->pending_tasks.emplace(ctx->next_id_to_process.load(),
439
0
                                       /*start_index=*/pindex_next,
440
0
                                       /*end_index=*/m_chainstate->m_chain.Tip());
441
0
        }
442
443
0
        auto current_time{NodeClock::now()};
444
        // Log progress every so often
445
0
        if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
446
0
            LogInfo("Syncing %s with block chain from height %d\n",
Line
Count
Source
356
0
#define LogInfo(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Info, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
447
0
                    GetName(), m_best_block_index.load()->nHeight);
448
0
            last_log_time = current_time;
449
0
        }
450
451
        // Commit changes every so often
452
0
        if (current_time - last_locator_write_time >= SYNC_LOCATOR_WRITE_INTERVAL) {
453
0
            Commit(); // No need to handle errors in Commit. See rationale above.
454
0
            last_locator_write_time = current_time;
455
0
        }
456
0
    }
457
458
0
    LogInfo("%s is enabled at height %d\n", GetName(), (m_best_block_index) ? m_best_block_index.load()->nHeight : 0);
Line
Count
Source
356
0
#define LogInfo(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Info, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
459
0
}
460
461
bool BaseIndex::Commit()
462
0
{
463
    // Don't commit anything if we haven't indexed any block yet
464
    // (this could happen if init is interrupted).
465
0
    bool ok = m_best_block_index != nullptr;
466
0
    if (ok) {
467
0
        CDBBatch batch(GetDB());
468
0
        ok = CustomCommit(batch);
469
0
        if (ok) {
470
0
            GetDB().WriteBestBlock(batch, GetLocator(*m_chain, m_best_block_index.load()->GetBlockHash()));
471
0
            ok = GetDB().WriteBatch(batch);
472
0
        }
473
0
    }
474
0
    if (!ok) {
475
0
        LogError("Failed to commit latest %s state", GetName());
Line
Count
Source
358
0
#define LogError(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Error, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
476
0
        return false;
477
0
    }
478
0
    return true;
479
0
}
480
481
bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
482
0
{
483
0
    assert(current_tip == m_best_block_index);
484
0
    assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);
485
486
0
    CBlock block;
487
0
    CBlockUndo block_undo;
488
489
0
    for (const CBlockIndex* iter_tip = current_tip; iter_tip != new_tip; iter_tip = iter_tip->pprev) {
490
0
        interfaces::BlockInfo block_info = kernel::MakeBlockInfo(iter_tip);
491
0
        if (CustomOptions().disconnect_data) {
492
0
            if (!m_chainstate->m_blockman.ReadBlock(block, *iter_tip)) {
493
0
                LogError("Failed to read block %s from disk",
Line
Count
Source
358
0
#define LogError(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Error, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
494
0
                         iter_tip->GetBlockHash().ToString());
495
0
                return false;
496
0
            }
497
0
            block_info.data = &block;
498
0
        }
499
0
        if (CustomOptions().disconnect_undo_data && iter_tip->nHeight > 0) {
500
0
            if (!m_chainstate->m_blockman.ReadBlockUndo(block_undo, *iter_tip)) {
501
0
                return false;
502
0
            }
503
0
            block_info.undo_data = &block_undo;
504
0
        }
505
0
        if (!CustomRemove(block_info)) {
506
0
            return false;
507
0
        }
508
0
    }
509
510
    // Don't commit here - the committed index state must never be ahead of the
511
    // flushed chainstate, otherwise unclean restarts would lead to index corruption.
512
    // Pruning has a minimum of 288 blocks-to-keep and getting the index
513
    // out of sync may be possible but a users fault.
514
    // In case we reorg beyond the pruned depth, ReadBlock would
515
    // throw and lead to a graceful shutdown
516
0
    SetBestBlockIndex(new_tip);
517
0
    return true;
518
0
}
519
520
void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex)
521
0
{
522
    // Ignore events from the assumed-valid chain; we will process its blocks
523
    // (sequentially) after it is fully verified by the background chainstate. This
524
    // is to avoid any out-of-order indexing.
525
    //
526
    // TODO at some point we could parameterize whether a particular index can be
527
    // built out of order, but for now just do the conservative simple thing.
528
0
    if (role == ChainstateRole::ASSUMEDVALID) {
529
0
        return;
530
0
    }
531
532
    // Ignore BlockConnected signals until we have fully indexed the chain.
533
0
    if (!m_synced) {
534
0
        return;
535
0
    }
536
537
0
    const CBlockIndex* best_block_index = m_best_block_index.load();
538
0
    if (!best_block_index) {
539
0
        if (pindex->nHeight != 0) {
540
0
            FatalErrorf("First block connected is not the genesis block (height=%d)",
541
0
                       pindex->nHeight);
542
0
            return;
543
0
        }
544
0
    } else {
545
        // Ensure block connects to an ancestor of the current best block. This should be the case
546
        // most of the time, but may not be immediately after the sync thread catches up and sets
547
        // m_synced. Consider the case where there is a reorg and the blocks on the stale branch are
548
        // in the ValidationInterface queue backlog even after the sync thread has caught up to the
549
        // new chain tip. In this unlikely event, log a warning and let the queue clear.
550
0
        if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) {
551
0
            LogWarning("Block %s does not connect to an ancestor of "
Line
Count
Source
357
0
#define LogWarning(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Warning, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
552
0
                      "known best chain (tip=%s); not updating index",
553
0
                      pindex->GetBlockHash().ToString(),
554
0
                      best_block_index->GetBlockHash().ToString());
555
0
            return;
556
0
        }
557
0
        if (best_block_index != pindex->pprev && !Rewind(best_block_index, pindex->pprev)) {
558
0
            FatalErrorf("Failed to rewind %s to a previous chain tip",
559
0
                       GetName());
560
0
            return;
561
0
        }
562
0
    }
563
564
    // Dispatch block to child class; errors are logged internally and abort the node.
565
0
    if (CustomPostProcessBlocks(ProcessBlock(pindex, block.get()))) {
566
        // Setting the best block index is intentionally the last step of this
567
        // function, so BlockUntilSyncedToCurrentChain callers waiting for the
568
        // best block index to be updated can rely on the block being fully
569
        // processed, and the index object being safe to delete.
570
0
        SetBestBlockIndex(pindex);
571
0
    }
572
0
}
573
574
void BaseIndex::ChainStateFlushed(ChainstateRole role, const CBlockLocator& locator)
575
0
{
576
    // Ignore events from the assumed-valid chain; we will process its blocks
577
    // (sequentially) after it is fully verified by the background chainstate.
578
0
    if (role == ChainstateRole::ASSUMEDVALID) {
579
0
        return;
580
0
    }
581
582
0
    if (!m_synced) {
583
0
        return;
584
0
    }
585
586
0
    const uint256& locator_tip_hash = locator.vHave.front();
587
0
    const CBlockIndex* locator_tip_index;
588
0
    {
589
0
        LOCK(cs_main);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
590
0
        locator_tip_index = m_chainstate->m_blockman.LookupBlockIndex(locator_tip_hash);
591
0
    }
592
593
0
    if (!locator_tip_index) {
594
0
        FatalErrorf("First block (hash=%s) in locator was not found",
595
0
                   locator_tip_hash.ToString());
596
0
        return;
597
0
    }
598
599
    // This checks that ChainStateFlushed callbacks are received after BlockConnected. The check may fail
600
    // immediately after the sync thread catches up and sets m_synced. Consider the case where
601
    // there is a reorg and the blocks on the stale branch are in the ValidationInterface queue
602
    // backlog even after the sync thread has caught up to the new chain tip. In this unlikely
603
    // event, log a warning and let the queue clear.
604
0
    const CBlockIndex* best_block_index = m_best_block_index.load();
605
0
    if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) {
606
0
        LogWarning("Locator contains block (hash=%s) not on known best "
Line
Count
Source
357
0
#define LogWarning(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Warning, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
607
0
                  "chain (tip=%s); not writing index locator",
608
0
                  locator_tip_hash.ToString(),
609
0
                  best_block_index->GetBlockHash().ToString());
610
0
        return;
611
0
    }
612
613
    // No need to handle errors in Commit. If it fails, the error will already be logged. The
614
    // best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
615
    // for an advanced index state.
616
0
    Commit();
617
0
}
618
619
bool BaseIndex::BlockUntilSyncedToCurrentChain() const
620
0
{
621
0
    AssertLockNotHeld(cs_main);
Line
Count
Source
142
0
#define AssertLockNotHeld(cs) AssertLockNotHeldInline(#cs, __FILE__, __LINE__, &cs)
622
623
0
    if (!m_synced) {
624
0
        return false;
625
0
    }
626
627
0
    {
628
        // Skip the queue-draining stuff if we know we're caught up with
629
        // m_chain.Tip().
630
0
        LOCK(cs_main);
Line
Count
Source
259
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
631
0
        const CBlockIndex* chain_tip = m_chainstate->m_chain.Tip();
632
0
        const CBlockIndex* best_block_index = m_best_block_index.load();
633
0
        if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) {
634
0
            return true;
635
0
        }
636
0
    }
637
638
0
    LogInfo("%s is catching up on block notifications", GetName());
Line
Count
Source
356
0
#define LogInfo(...) LogPrintLevel_(BCLog::LogFlags::ALL, BCLog::Level::Info, /*should_ratelimit=*/true, __VA_ARGS__)
Line
Count
Source
350
0
#define LogPrintLevel_(category, level, should_ratelimit, ...) LogPrintFormatInternal(std::source_location::current(), category, level, should_ratelimit, __VA_ARGS__)
639
0
    m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue();
640
0
    return true;
641
0
}
642
643
void BaseIndex::Interrupt()
644
0
{
645
0
    m_interrupt();
646
0
}
647
648
bool BaseIndex::StartBackgroundSync()
649
0
{
650
0
    if (!m_init) throw std::logic_error("Error: Cannot start a non-initialized index");
651
652
0
    m_thread_sync = std::thread(&util::TraceThread, GetName(), [this] { Sync(); });
653
0
    return true;
654
0
}
655
656
void BaseIndex::Stop()
657
0
{
658
0
    if (m_chain->context()->validation_signals) {
659
0
        m_chain->context()->validation_signals->UnregisterValidationInterface(this);
660
0
    }
661
662
0
    if (m_thread_sync.joinable()) {
663
0
        m_thread_sync.join();
664
0
    }
665
0
}
666
667
IndexSummary BaseIndex::GetSummary() const
668
0
{
669
0
    IndexSummary summary{};
670
0
    summary.name = GetName();
671
0
    summary.synced = m_synced;
672
0
    if (const auto& pindex = m_best_block_index.load()) {
673
0
        summary.best_block_height = pindex->nHeight;
674
0
        summary.best_block_hash = pindex->GetBlockHash();
675
0
    } else {
676
0
        summary.best_block_height = 0;
677
0
        summary.best_block_hash = m_chain->getBlockHash(0);
678
0
    }
679
0
    return summary;
680
0
}
681
682
void BaseIndex::SetBestBlockIndex(const CBlockIndex* block)
683
0
{
684
0
    assert(!m_chainstate->m_blockman.IsPruneMode() || AllowPrune());
685
686
0
    if (AllowPrune() && block) {
687
0
        node::PruneLockInfo prune_lock;
688
0
        prune_lock.height_first = block->nHeight;
689
0
        WITH_LOCK(::cs_main, m_chainstate->m_blockman.UpdatePruneLock(GetName(), prune_lock));
Line
Count
Source
290
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
690
0
    }
691
692
    // Intentionally set m_best_block_index as the last step in this function,
693
    // after updating prune locks above, and after making any other references
694
    // to *this, so the BlockUntilSyncedToCurrentChain function (which checks
695
    // m_best_block_index as an optimization) can be used to wait for the last
696
    // BlockConnected notification and safely assume that prune locks are
697
    // updated and that the index object is safe to delete.
698
0
    m_best_block_index = block;
699
0
}