From fa67bf3e85a68538d7d13d0dac201190ffbe6551 Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Thu, 7 Nov 2024 14:43:46 +0100 Subject: [PATCH] feat: add transactions compression --- Cargo.lock | 31 +++++++++++++++- kafka-producer/Cargo.toml | 2 + kafka-producer/src/subscriber.rs | 64 ++++++++++++++++++-------------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5972a24..6c43a45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1340,6 +1340,7 @@ dependencies = [ "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", + "ton-block-compressor", "tracing", "tycho-block-util", "tycho-core", @@ -2782,6 +2783,15 @@ dependencies = [ "winnow", ] +[[package]] +name = "ton-block-compressor" +version = "0.1.0" +source = "git+https://github.com/broxus/ton-block-compressor.git#625ba4b40aeb80d04d45f72cce4835ae4fc1b560" +dependencies = [ + "anyhow", + "zstd", +] + [[package]] name = "tower" version = "0.4.13" @@ -3159,7 +3169,7 @@ dependencies = [ "tracing-appender", "tracing-stackdriver", "tracing-subscriber", - "zstd-safe", + "zstd-safe 7.2.1", "zstd-sys", ] @@ -3601,6 +3611,25 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-safe" version = "7.2.1" diff --git a/kafka-producer/Cargo.toml b/kafka-producer/Cargo.toml index 9442cc8..a038969 100644 --- a/kafka-producer/Cargo.toml +++ b/kafka-producer/Cargo.toml @@ -24,6 +24,8 @@ serde = { workspace = true } rdkafka = { version = "0.36.2", features = ["tokio"] } futures-util = { workspace = true } +ton-block-compressor = { git = "https://github.com/broxus/ton-block-compressor.git" } + [features] ssl = ["rdkafka/ssl-vendored"] diff --git a/kafka-producer/src/subscriber.rs b/kafka-producer/src/subscriber.rs index aee6dfc..75d1d9b 100644 --- a/kafka-producer/src/subscriber.rs +++ b/kafka-producer/src/subscriber.rs @@ -54,39 +54,47 @@ impl KafkaProducer { Ok(Self { producer, config }) } async fn handle_block(&self, block_stuff: &BlockStuff) -> Result<()> { - let block_id = block_stuff.id(); + let block_id = *block_stuff.id(); let extra = block_stuff.load_extra()?; let account_blocks = extra.account_blocks.load()?; - let mut transactions = Vec::new(); - for account_block in account_blocks.iter() { - let (addr, _, block) = account_block?; - for transaction in block.transactions.iter() { - let (_, _, transaction) = transaction?; - let hash = CellBuilder::build_from(transaction.clone())? - .repr_hash() - .0 - .to_vec(); - let transaction = transaction.load()?; - let timestamp = transaction.now; - let data = BocRepr::encode(&transaction)?; - - let partition = if block_id.is_masterchain() { - 0 - } else { - // first 3 bits of the account id - (1 + (addr[0] >> 5)) & 0b111 - }; - - transactions.push(TransactionToKafka { - hash, - data, - partition, - timestamp, - }); + let transactions: anyhow::Result> = tokio::task::spawn_blocking(move || { + let mut compressor = ton_block_compressor::ZstdWrapper::with_level(3); + let mut transactions = Vec::new(); + + for account_block in account_blocks.iter() { + let (addr, _, block) = account_block?; + for transaction in block.transactions.iter() { + let (_, _, transaction) = transaction?; + let hash = CellBuilder::build_from(transaction.clone())? + .repr_hash() + .0 + .to_vec(); + let transaction = transaction.load()?; + let timestamp = transaction.now; + let data = BocRepr::encode(&transaction)?; + let data = compressor.compress(&data)?.to_vec(); + + let partition = if block_id.is_masterchain() { + 0 + } else { + // first 3 bits of the account id + (1 + (addr[0] >> 5)) & 0b111 + }; + + transactions.push(TransactionToKafka { + hash, + data, + partition, + timestamp, + }); + } } - } + Ok(transactions) + }) + .await?; + let transactions = transactions?; let mut futures: FuturesOrdered<_> = transactions .into_iter()