Skip to content

Commit

Permalink
WIP on parallel download.
Browse files Browse the repository at this point in the history
  • Loading branch information
evoskuil committed Feb 25, 2024
1 parent 5de786b commit 1cebb6d
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 39 deletions.
16 changes: 11 additions & 5 deletions include/bitcoin/node/chasers/chaser_check.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#define LIBBITCOIN_NODE_CHASERS_CHASER_CHECK_HPP

#include <functional>
#include <memory>
#include <unordered_map>
#include <bitcoin/network.hpp>
#include <bitcoin/node/define.hpp>
#include <bitcoin/node/chasers/chaser.hpp>
Expand All @@ -34,24 +36,28 @@ class BCN_API chaser_check
: public chaser
{
public:
typedef std::unordered_map<system::hash_digest,
system::chain::context> hashmap;
typedef std::shared_ptr<hashmap> hashmap_ptr;
typedef std::function<void(const code&, const hashmap_ptr&)> handler;

DELETE_COPY_MOVE(chaser_check);

chaser_check(full_node& node) NOEXCEPT;
virtual ~chaser_check() NOEXCEPT;

virtual code start() NOEXCEPT;

virtual void get_hashes(network::result_handler&& handler) NOEXCEPT;
virtual void put_hashes(network::result_handler&& handler) NOEXCEPT;
virtual void get_hashes(handler&& handler) NOEXCEPT;
virtual void put_hashes(handler&& handler) NOEXCEPT;

protected:
/// Handlers.
virtual void handle_header(height_t branch_point) NOEXCEPT;
virtual void handle_event(const code& ec, chase event_,
link value) NOEXCEPT;

virtual void do_get_hashes(const network::result_handler& handler) NOEXCEPT;
virtual void do_put_hashes(const network::result_handler& handler) NOEXCEPT;
virtual void do_get_hashes(const handler& handler) NOEXCEPT;
virtual void do_put_hashes(const handler& handler) NOEXCEPT;

private:
void do_handle_event(const code& ec, chase event_, link value) NOEXCEPT;
Expand Down
20 changes: 12 additions & 8 deletions include/bitcoin/node/protocols/protocol_block_in_31800.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#ifndef LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_BLOCK_IN_31800_HPP
#define LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_BLOCK_IN_31800_HPP

#include <functional>
#include <memory>
#include <bitcoin/network.hpp>
#include <bitcoin/node/chasers/chasers.hpp>
#include <bitcoin/node/define.hpp>
#include <bitcoin/node/protocols/protocol.hpp>

Expand Down Expand Up @@ -55,9 +58,9 @@ class BCN_API protocol_block_in_31800
void start() NOEXCEPT override;
void stopping(const code& ec) NOEXCEPT override;

/// Check and store any registered block in any order of arrival.
virtual void check(const system::chain::block::cptr& block_ptr,
network::result_handler&& handler) NOEXCEPT;
/// Manage download queue.
virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT;
virtual void put_hashes(chaser_check::handler&& handler) NOEXCEPT;

protected:

Expand All @@ -71,12 +74,13 @@ class BCN_API protocol_block_in_31800
/// Handle result of performance reporting.
virtual void handle_performance(const code& ec) NOEXCEPT;

/// Handle check result.
virtual void handle_check(const code& ec) NOEXCEPT;
/// Manage download queue.
virtual void handle_put_hashes(const code& ec) NOEXCEPT;
virtual void handle_get_hashes(const code& ec,
const chaser_check::hashmap_ptr& hashes) NOEXCEPT;

private:
network::messages::get_data create_get_data(
const system::hashes& hashes) const NOEXCEPT;
network::messages::get_data create_get_data() const NOEXCEPT;

void do_handle_performance(const code& ec) NOEXCEPT;

Expand All @@ -86,7 +90,7 @@ class BCN_API protocol_block_in_31800

// These are protected by strand.
uint64_t bytes_{ zero };
system::chain::checkpoint top_{};
chaser_check::hashmap_ptr hashes_{};
network::steady_clock::time_point start_{};
network::deadline::ptr performance_timer_;
};
Expand Down
11 changes: 4 additions & 7 deletions src/chasers/chaser_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,21 @@ void chaser_check::do_handle_event(const code&, chase event_,
{
BC_ASSERT_MSG(stranded(), "chaser_check");

if (event_ == chase::stop)
return;

if (event_ == chase::header)
{
BC_ASSERT(std::holds_alternative<height_t>(value));
handle_header(std::get<height_t>(value));
}
}

void chaser_check::get_hashes(result_handler&& handler) NOEXCEPT
void chaser_check::get_hashes(handler&& handler) NOEXCEPT
{
boost::asio::post(strand(),
std::bind(&chaser_check::do_get_hashes,
this, std::move(handler)));
}

void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT
void chaser_check::put_hashes(handler&& handler) NOEXCEPT
{
boost::asio::post(strand(),
std::bind(&chaser_check::do_put_hashes,
Expand All @@ -104,12 +101,12 @@ void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT
// protected
// ----------------------------------------------------------------------------

void chaser_check::do_get_hashes(const result_handler&) NOEXCEPT
void chaser_check::do_get_hashes(const handler&) NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "chaser_check");
}

void chaser_check::do_put_hashes(const result_handler&) NOEXCEPT
void chaser_check::do_put_hashes(const handler&) NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "chaser_check");
}
Expand Down
63 changes: 44 additions & 19 deletions src/protocols/protocol_block_in_31800.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <bitcoin/system.hpp>
#include <bitcoin/database.hpp>
#include <bitcoin/network.hpp>
#include <bitcoin/node/chasers/chasers.hpp>
#include <bitcoin/node/define.hpp>
#include <bitcoin/node/error.hpp>

Expand Down Expand Up @@ -111,30 +112,51 @@ void protocol_block_in_31800::start() NOEXCEPT
if (started())
return;

const auto& query = archive();
const auto top = query.get_top_candidate();
top_ = { query.get_header_key(query.to_candidate(top)), top };

if (report_performance_)
{
start_ = steady_clock::now();
performance_timer_->start(BIND1(handle_performance_timer, _1));
}

SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2);
get_hashes(BIND2(handle_get_hashes, _1, hashes_));
protocol::start();
}

void protocol_block_in_31800::stopping(const code& ec) NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "protocol_block_in_31800");

performance_timer_->stop();
put_hashes(BIND1(handle_put_hashes, _1));

protocol::stopping(ec);
}

// Inbound (blocks).
// ----------------------------------------------------------------------------

void protocol_block_in_31800::handle_get_hashes(const code& ec,
const chaser_check::hashmap_ptr&) NOEXCEPT
{
if (ec)
{
stop(ec);
return;
}

SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2);

// TODO: send if not empty, send when new headers (subscrive to header).
SEND1(create_get_data(), handle_send, _1);
stop(ec);
}

void protocol_block_in_31800::handle_put_hashes(const code& ec) NOEXCEPT
{
if (ec)
stop(ec);
}

bool protocol_block_in_31800::handle_receive_block(const code& ec,
const block::cptr& message) NOEXCEPT
{
Expand All @@ -143,36 +165,39 @@ bool protocol_block_in_31800::handle_receive_block(const code& ec,
if (stopped(ec))
return false;

const auto hash = message->block_ptr->hash();
if (is_zero(hashes_->erase(hash)))
{
// Zero erased implies not found (not requested of peer).
LOGR("Unrequested block [" << encode_hash(hash) << "].");
return true;
}

archive().set_link(*message->block_ptr);

// Asynchronous organization serves all channels.
// A job backlog will occur when organize is slower than download.
// This should not be a material issue given lack of validation here.
check(message->block_ptr, BIND1(handle_check, _1));
get_hashes(BIND2(handle_get_hashes, _1, hashes_));

bytes_ += message->cached_size;

// TODO: return true only if there are more blocks outstanding.
return true;
}

void protocol_block_in_31800::handle_check(const code& ec) NOEXCEPT
{
if (ec == network::error::service_stopped)
return;

// TODO: log result (LOGR/LOGP).

stop(ec);
}

// private
// ----------------------------------------------------------------------------

get_data protocol_block_in_31800::create_get_data(
const hashes&) const NOEXCEPT
get_data protocol_block_in_31800::create_get_data() const NOEXCEPT
{
get_data getter{};
getter.items.reserve(hashes_->size());

// TODO: generate get_data request.
// clang emplace_back bug (no matching constructor), using push_back.
// bip144: get_data uses witness constant but inventory does not.
for (const auto& item: *hashes_)
getter.items.push_back({ block_type_, item.first });

return getter;
}
Expand Down

0 comments on commit 1cebb6d

Please sign in to comment.