Skip to content

Commit

Permalink
feat: add transactions compression
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef committed Nov 8, 2024
1 parent 3159d9c commit 1854447
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 30 deletions.
31 changes: 30 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions kafka-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ serde = { workspace = true }
rdkafka = { version = "0.36.2", features = ["tokio"] }
futures-util = { workspace = true }

ton-block-compressor = { git = "https://github.com/broxus/ton-block-compressor.git" }

[features]
ssl = ["rdkafka/ssl-vendored"]

Expand Down
62 changes: 33 additions & 29 deletions kafka-producer/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::time::Duration;

use anyhow::Result;
use everscale_types::boc::BocRepr;
use everscale_types::cell::CellBuilder;
use futures_util::future::BoxFuture;
use futures_util::stream::FuturesOrdered;
use futures_util::{FutureExt, StreamExt};
Expand Down Expand Up @@ -54,39 +53,44 @@ impl KafkaProducer {
Ok(Self { producer, config })
}
async fn handle_block(&self, block_stuff: &BlockStuff) -> Result<()> {
let block_id = block_stuff.id();
let block_id = *block_stuff.id();

let extra = block_stuff.load_extra()?;
let account_blocks = extra.account_blocks.load()?;

let mut transactions = Vec::new();
for account_block in account_blocks.iter() {
let (addr, _, block) = account_block?;
for transaction in block.transactions.iter() {
let (_, _, transaction) = transaction?;
let hash = CellBuilder::build_from(transaction.clone())?
.repr_hash()
.0
.to_vec();
let transaction = transaction.load()?;
let timestamp = transaction.now;
let data = BocRepr::encode(&transaction)?;

let partition = if block_id.is_masterchain() {
0
} else {
// first 3 bits of the account id
(1 + (addr[0] >> 5)) & 0b111
};

transactions.push(TransactionToKafka {
hash,
data,
partition,
timestamp,
});
let transactions: anyhow::Result<Vec<_>> = tokio::task::spawn_blocking(move || {
let mut compressor = ton_block_compressor::ZstdWrapper::with_level(3);
let mut transactions = Vec::new();

for account_block in account_blocks.iter() {
let (addr, _, block) = account_block?;
for transaction in block.transactions.iter() {
let (_, _, transaction) = transaction?;
let hash = transaction.inner().repr_hash().0.to_vec();
let transaction = transaction.load()?;
let timestamp = transaction.now;
let data = BocRepr::encode(&transaction)?;
let data = compressor.compress(&data)?.to_vec();

let partition = if block_id.is_masterchain() {
0
} else {
// first 3 bits of the account id
1 + (addr[0] >> 5)
};

transactions.push(TransactionToKafka {
hash,
data,
partition,
timestamp,
});
}
}
}
Ok(transactions)
})
.await?;
let transactions = transactions?;

let mut futures: FuturesOrdered<_> = transactions
.into_iter()
Expand Down

0 comments on commit 1854447

Please sign in to comment.