From 1cebb6d4279c797627dd240df2513df42a7a478e Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sun, 25 Feb 2024 10:23:41 -0500 Subject: [PATCH] WIP on parallel download. --- include/bitcoin/node/chasers/chaser_check.hpp | 16 +++-- .../protocols/protocol_block_in_31800.hpp | 20 +++--- src/chasers/chaser_check.cpp | 11 ++-- src/protocols/protocol_block_in_31800.cpp | 63 +++++++++++++------ 4 files changed, 71 insertions(+), 39 deletions(-) diff --git a/include/bitcoin/node/chasers/chaser_check.hpp b/include/bitcoin/node/chasers/chaser_check.hpp index 668d76d6..4cc651ab 100644 --- a/include/bitcoin/node/chasers/chaser_check.hpp +++ b/include/bitcoin/node/chasers/chaser_check.hpp @@ -20,6 +20,8 @@ #define LIBBITCOIN_NODE_CHASERS_CHASER_CHECK_HPP #include +#include +#include #include #include #include @@ -34,6 +36,11 @@ class BCN_API chaser_check : public chaser { public: + typedef std::unordered_map hashmap; + typedef std::shared_ptr hashmap_ptr; + typedef std::function handler; + DELETE_COPY_MOVE(chaser_check); chaser_check(full_node& node) NOEXCEPT; @@ -41,17 +48,16 @@ class BCN_API chaser_check virtual code start() NOEXCEPT; - virtual void get_hashes(network::result_handler&& handler) NOEXCEPT; - virtual void put_hashes(network::result_handler&& handler) NOEXCEPT; + virtual void get_hashes(handler&& handler) NOEXCEPT; + virtual void put_hashes(handler&& handler) NOEXCEPT; protected: - /// Handlers. virtual void handle_header(height_t branch_point) NOEXCEPT; virtual void handle_event(const code& ec, chase event_, link value) NOEXCEPT; - virtual void do_get_hashes(const network::result_handler& handler) NOEXCEPT; - virtual void do_put_hashes(const network::result_handler& handler) NOEXCEPT; + virtual void do_get_hashes(const handler& handler) NOEXCEPT; + virtual void do_put_hashes(const handler& handler) NOEXCEPT; private: void do_handle_event(const code& ec, chase event_, link value) NOEXCEPT; diff --git a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp index 960b1873..c6328fcd 100644 --- a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp +++ b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp @@ -19,7 +19,10 @@ #ifndef LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_BLOCK_IN_31800_HPP #define LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_BLOCK_IN_31800_HPP +#include +#include #include +#include #include #include @@ -55,9 +58,9 @@ class BCN_API protocol_block_in_31800 void start() NOEXCEPT override; void stopping(const code& ec) NOEXCEPT override; - /// Check and store any registered block in any order of arrival. - virtual void check(const system::chain::block::cptr& block_ptr, - network::result_handler&& handler) NOEXCEPT; + /// Manage download queue. + virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT; + virtual void put_hashes(chaser_check::handler&& handler) NOEXCEPT; protected: @@ -71,12 +74,13 @@ class BCN_API protocol_block_in_31800 /// Handle result of performance reporting. virtual void handle_performance(const code& ec) NOEXCEPT; - /// Handle check result. - virtual void handle_check(const code& ec) NOEXCEPT; + /// Manage download queue. + virtual void handle_put_hashes(const code& ec) NOEXCEPT; + virtual void handle_get_hashes(const code& ec, + const chaser_check::hashmap_ptr& hashes) NOEXCEPT; private: - network::messages::get_data create_get_data( - const system::hashes& hashes) const NOEXCEPT; + network::messages::get_data create_get_data() const NOEXCEPT; void do_handle_performance(const code& ec) NOEXCEPT; @@ -86,7 +90,7 @@ class BCN_API protocol_block_in_31800 // These are protected by strand. uint64_t bytes_{ zero }; - system::chain::checkpoint top_{}; + chaser_check::hashmap_ptr hashes_{}; network::steady_clock::time_point start_{}; network::deadline::ptr performance_timer_; }; diff --git a/src/chasers/chaser_check.cpp b/src/chasers/chaser_check.cpp index 594e6622..e44407f2 100644 --- a/src/chasers/chaser_check.cpp +++ b/src/chasers/chaser_check.cpp @@ -77,9 +77,6 @@ void chaser_check::do_handle_event(const code&, chase event_, { BC_ASSERT_MSG(stranded(), "chaser_check"); - if (event_ == chase::stop) - return; - if (event_ == chase::header) { BC_ASSERT(std::holds_alternative(value)); @@ -87,14 +84,14 @@ void chaser_check::do_handle_event(const code&, chase event_, } } -void chaser_check::get_hashes(result_handler&& handler) NOEXCEPT +void chaser_check::get_hashes(handler&& handler) NOEXCEPT { boost::asio::post(strand(), std::bind(&chaser_check::do_get_hashes, this, std::move(handler))); } -void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT +void chaser_check::put_hashes(handler&& handler) NOEXCEPT { boost::asio::post(strand(), std::bind(&chaser_check::do_put_hashes, @@ -104,12 +101,12 @@ void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT // protected // ---------------------------------------------------------------------------- -void chaser_check::do_get_hashes(const result_handler&) NOEXCEPT +void chaser_check::do_get_hashes(const handler&) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); } -void chaser_check::do_put_hashes(const result_handler&) NOEXCEPT +void chaser_check::do_put_hashes(const handler&) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); } diff --git a/src/protocols/protocol_block_in_31800.cpp b/src/protocols/protocol_block_in_31800.cpp index 68ce1cb8..09bdeb19 100644 --- a/src/protocols/protocol_block_in_31800.cpp +++ b/src/protocols/protocol_block_in_31800.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -111,30 +112,51 @@ void protocol_block_in_31800::start() NOEXCEPT if (started()) return; - const auto& query = archive(); - const auto top = query.get_top_candidate(); - top_ = { query.get_header_key(query.to_candidate(top)), top }; - if (report_performance_) { start_ = steady_clock::now(); performance_timer_->start(BIND1(handle_performance_timer, _1)); } - SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); + get_hashes(BIND2(handle_get_hashes, _1, hashes_)); protocol::start(); } void protocol_block_in_31800::stopping(const code& ec) NOEXCEPT { BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); + performance_timer_->stop(); + put_hashes(BIND1(handle_put_hashes, _1)); + protocol::stopping(ec); } // Inbound (blocks). // ---------------------------------------------------------------------------- +void protocol_block_in_31800::handle_get_hashes(const code& ec, + const chaser_check::hashmap_ptr&) NOEXCEPT +{ + if (ec) + { + stop(ec); + return; + } + + SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); + + // TODO: send if not empty, send when new headers (subscrive to header). + SEND1(create_get_data(), handle_send, _1); + stop(ec); +} + +void protocol_block_in_31800::handle_put_hashes(const code& ec) NOEXCEPT +{ + if (ec) + stop(ec); +} + bool protocol_block_in_31800::handle_receive_block(const code& ec, const block::cptr& message) NOEXCEPT { @@ -143,10 +165,20 @@ bool protocol_block_in_31800::handle_receive_block(const code& ec, if (stopped(ec)) return false; + const auto hash = message->block_ptr->hash(); + if (is_zero(hashes_->erase(hash))) + { + // Zero erased implies not found (not requested of peer). + LOGR("Unrequested block [" << encode_hash(hash) << "]."); + return true; + } + + archive().set_link(*message->block_ptr); + // Asynchronous organization serves all channels. // A job backlog will occur when organize is slower than download. // This should not be a material issue given lack of validation here. - check(message->block_ptr, BIND1(handle_check, _1)); + get_hashes(BIND2(handle_get_hashes, _1, hashes_)); bytes_ += message->cached_size; @@ -154,25 +186,18 @@ bool protocol_block_in_31800::handle_receive_block(const code& ec, return true; } -void protocol_block_in_31800::handle_check(const code& ec) NOEXCEPT -{ - if (ec == network::error::service_stopped) - return; - - // TODO: log result (LOGR/LOGP). - - stop(ec); -} - // private // ---------------------------------------------------------------------------- -get_data protocol_block_in_31800::create_get_data( - const hashes&) const NOEXCEPT +get_data protocol_block_in_31800::create_get_data() const NOEXCEPT { get_data getter{}; + getter.items.reserve(hashes_->size()); - // TODO: generate get_data request. + // clang emplace_back bug (no matching constructor), using push_back. + // bip144: get_data uses witness constant but inventory does not. + for (const auto& item: *hashes_) + getter.items.push_back({ block_type_, item.first }); return getter; }