Skip to content

Commit

Permalink
merge develop
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid committed Jul 12, 2023
1 parent 83c937a commit 9efa7c9
Show file tree
Hide file tree
Showing 34 changed files with 79 additions and 5,429 deletions.
61 changes: 2 additions & 59 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion components/chainhook-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ hex = "0.4.3"
rand = "0.8.5"
# tikv-client = { git = "https://github.com/tikv/client-rust.git", rev = "8f54e6114227718e256027df2577bbacdf425f86" }
# raft-proto = { git = "https://github.com/tikv/raft-rs", rev="f73766712a538c2f6eb135b455297ad6c03fc58d", version = "0.7.0"}
chainhook-sdk = { version = "0.5.0", default-features = false, features = ["ordinals", "zeromq"], path = "../chainhook-sdk" }
chainhook-sdk = { version = "0.6.0", default-features = false, features = ["zeromq"], path = "../chainhook-sdk" }
chainhook-types = { version = "1.0.6", path = "../chainhook-types-rs" }
clarinet-files = "1.0.1"
hiro-system-kit = "0.1.0"
Expand All @@ -41,6 +41,10 @@ threadpool = "1.8.1"
rocket_okapi = { version = "0.8.0-rc.3", git = "https://github.com/MicaiahReid/okapi.git", branch = "feat-chainhook-fixes"}
rocket = { version = "=0.5.0-rc.3", features = ["json"] }

[dependencies.rocksdb]
version = "0.20.1"
default-features = false
features = ["lz4", "snappy"]

[dev-dependencies]
criterion = "0.3"
Expand Down
142 changes: 1 addition & 141 deletions components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::Config;
use chainhook_sdk::utils::Context;
use chainhook_types::{BitcoinNetwork, StacksNetwork};
use chainhook_types::StacksNetwork;
use clarinet_files::FileLocation;
use flate2::read::GzDecoder;
use futures_util::StreamExt;
Expand All @@ -16,14 +16,6 @@ pub fn default_tsv_sha_file_path(network: &StacksNetwork) -> String {
format!("{:?}-stacks-events.sha256", network).to_lowercase()
}

pub fn default_sqlite_file_path(_network: &BitcoinNetwork) -> String {
format!("hord.sqlite").to_lowercase()
}

pub fn default_sqlite_sha_file_path(_network: &BitcoinNetwork) -> String {
format!("hord.sqlite.sha256").to_lowercase()
}

pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
let mut destination_path = config.expected_cache_path();
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
Expand Down Expand Up @@ -84,68 +76,6 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
Ok(())
}

pub async fn download_sqlite_file(config: &Config) -> Result<(), String> {
let mut destination_path = config.expected_cache_path();
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
println!("{}", e.to_string());
});

let remote_sha_url = config.expected_remote_ordinals_sqlite_sha256();
let res = reqwest::get(&remote_sha_url)
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
.bytes()
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?;

let mut local_sha_file_path = destination_path.clone();
local_sha_file_path.push(default_sqlite_sha_file_path(
&config.network.bitcoin_network,
));

let local_sha_file = FileLocation::from_path(local_sha_file_path);
let _ = local_sha_file.write_content(&res.to_vec());

let file_url = config.expected_remote_ordinals_sqlite_url();
let res = reqwest::get(&file_url)
.await
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;

// Download chunks
let (tx, rx) = flume::bounded(0);
destination_path.push(default_sqlite_file_path(&config.network.bitcoin_network));

let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let mut decoder = GzDecoder::new(input);
let mut content = Vec::new();
let _ = decoder.read_to_end(&mut content);
let mut file = fs::File::create(&destination_path).unwrap();
if let Err(e) = file.write_all(&content[..]) {
println!("unable to write file: {}", e.to_string());
std::process::exit(1);
}
});

if res.status() == reqwest::StatusCode::OK {
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item.or(Err(format!("Error while downloading file")))?;
tx.send_async(chunk.to_vec())
.await
.map_err(|e| format!("unable to download stacks event: {}", e.to_string()))?;
}
drop(tx);
}

tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap();

Ok(())
}

// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
Expand Down Expand Up @@ -239,73 +169,3 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
false
}
}

pub async fn download_ordinals_dataset_if_required(config: &Config, ctx: &Context) -> bool {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_ordinals_sqlite()
&& config.should_download_remote_ordinals_sqlite()
{
let url = config.expected_remote_ordinals_sqlite_url();
let mut sqlite_file_path = config.expected_cache_path();
sqlite_file_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
let mut tsv_sha_file_path = config.expected_cache_path();
tsv_sha_file_path.push(default_sqlite_sha_file_path(
&config.network.bitcoin_network,
));

// Download archive if not already present in cache
// Load the local
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
let sha_url = config.expected_remote_ordinals_sqlite_sha256();

let remote_sha_file = match reqwest::get(&sha_url).await {
Ok(response) => response.bytes().await,
Err(e) => Err(e),
};
let should_download = match (local_sha_file, remote_sha_file) {
(Ok(local), Ok(remote_response)) => {
let cache_not_expired = remote_response.starts_with(&local[0..32]) == false;
if cache_not_expired {
info!(
ctx.expect_logger(),
"More recent Stacks archive file detected"
);
}
cache_not_expired == false
}
(_, _) => {
info!(
ctx.expect_logger(),
"Unable to retrieve Stacks archive file locally"
);
true
}
};
if should_download {
info!(ctx.expect_logger(), "Downloading {}", url);
match download_sqlite_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
}
} else {
info!(
ctx.expect_logger(),
"Basing ordinals evaluation on database {}",
sqlite_file_path.display()
);
}
// config.add_local_ordinals_sqlite_source(&sqlite_file_path);
}
true
} else {
info!(
ctx.expect_logger(),
"Streaming blocks from bitcoind {}", config.network.bitcoind_rpc_url
);
false
}
}
Loading

0 comments on commit 9efa7c9

Please sign in to comment.