Skip to content

Commit

Permalink
feat(publisher): Transform FuelCore types to Stream types before publ…
Browse files Browse the repository at this point in the history
…ishing
  • Loading branch information
Jurshsmith committed Nov 23, 2024
1 parent 1b8a41b commit ed646cd
Show file tree
Hide file tree
Showing 24 changed files with 825 additions and 1,003 deletions.
8 changes: 7 additions & 1 deletion benches/nats-publisher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod utils;

use clap::Parser;
use fuel_core_importer::ports::ImporterDatabase;
use fuel_streams_core::prelude::*;
use utils::{blocks::BlockHelper, nats::NatsHelper, tx::TxHelper};

#[derive(Parser)]
Expand Down Expand Up @@ -38,13 +39,16 @@ async fn main() -> anyhow::Result<()> {
// OLD BLOCKS
// ------------------------------------------------------------------------
tokio::task::spawn({
let database = database.clone();
let block_helper = block_helper.clone();
let _tx_helper = tx_helper.clone();
let last_height = database.on_chain().latest_block_height()?.unwrap();
async move {
for height in 0..*last_height {
let height = height.into();
let block = block_helper.find_by_height(height);
let block = Block::new(&block, Consensus::default());

block_helper.publish(&block).await?;
// for (index, tx) in block.transactions().iter().enumerate() {
// tx_helper.publish(&block, tx, index).await?;
Expand All @@ -61,7 +65,9 @@ async fn main() -> anyhow::Result<()> {
while let Ok(result) = subscription.recv().await {
let result = &**result;
let block = &result.sealed_block.entity;
block_helper.publish(block).await?;
let block = Block::new(block, Consensus::default());

block_helper.publish(&block).await?;
// for (index, tx) in block.transactions().iter().enumerate() {
// tx_helper.publish(block, tx, index).await?;
// }
Expand Down
16 changes: 4 additions & 12 deletions benches/nats-publisher/src/utils/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use async_nats::jetstream::context::Publish;
use fuel_core::combined_database::CombinedDatabase;
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::{blockchain::block::Block, fuel_types::BlockHeight};
use fuel_streams_core::{blocks::BlocksSubject, prelude::IntoSubject};
use fuel_streams_core::prelude::*;
use tokio::try_join;
use tracing::info;

Expand All @@ -22,7 +21,7 @@ impl BlockHelper {
}
}

pub fn find_by_height(&self, height: BlockHeight) -> Block {
pub fn find_by_height(&self, height: FuelCoreBlockHeight) -> FuelCoreBlock {
self.database
.on_chain()
.latest_view()
Expand Down Expand Up @@ -58,7 +57,7 @@ impl BlockHelper {
Ok(())
}
async fn publish_encoded(&self, block: &Block) -> anyhow::Result<()> {
let height = self.get_height(block);
let height = block.height;
let subject: BlocksSubject = block.into();
let payload = self.nats.data_parser().encode(block).await?;
let nats_payload = Publish::build()
Expand All @@ -79,7 +78,7 @@ impl BlockHelper {
}

async fn publish_to_kv(&self, block: &Block) -> anyhow::Result<()> {
let height = self.get_height(block);
let height = block.height;
let subject: BlocksSubject = block.into();

let payload = self.nats.data_parser().encode(block).await?;
Expand All @@ -92,10 +91,3 @@ impl BlockHelper {
Ok(())
}
}

/// Getters
impl BlockHelper {
fn get_height(&self, block: &Block) -> u32 {
*block.header().consensus().height
}
}
41 changes: 6 additions & 35 deletions benches/nats-publisher/src/utils/tx.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
use async_nats::jetstream::context::Publish;
use fuel_core::combined_database::CombinedDatabase;
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::{
blockchain::block::Block,
fuel_tx::{Transaction, UniqueIdentifier},
fuel_types::ChainId,
services::txpool::TransactionStatus as TxPoolTransactionStatus,
};
use fuel_streams_core::{
blocks::types::BlockHeight,
prelude::IntoSubject,
transactions::TransactionsSubject,
};
use fuel_core_types::fuel_types::ChainId;
use fuel_streams_core::prelude::*;
use tokio::try_join;
use tracing::info;

Expand Down Expand Up @@ -78,7 +68,7 @@ impl TxHelper {
tx: &Transaction,
index: usize,
) -> anyhow::Result<()> {
let tx_id = self.get_id(tx);
let tx_id = &tx.id;
let subject = self.get_subject(tx, block, index);
let payload = self.nats.data_parser().encode(block).await?;
let nats_payload = Publish::build()
Expand All @@ -104,7 +94,7 @@ impl TxHelper {
tx: &Transaction,
index: usize,
) -> anyhow::Result<()> {
let tx_id = self.get_id(tx);
let tx_id = &tx.id;
let subject = self.get_subject(tx, block, index);
let payload = self.nats.data_parser().encode(block).await?;
self.nats
Expand Down Expand Up @@ -132,27 +122,8 @@ impl TxHelper {
let mut subject: TransactionsSubject = tx.into();
subject = subject
.with_index(Some(index))
.with_block_height(Some(BlockHeight::from(self.get_height(block))))
.with_status(self.get_status(tx).map(Into::into));
.with_block_height(Some(BlockHeight::from(block.height)))
.with_status(Some(tx.status.clone()));
subject
}

fn get_id(&self, tx: &Transaction) -> String {
let id = tx.id(&self.chain_id).to_string();
format!("0x{id}")
}

fn get_height(&self, block: &Block) -> u32 {
*block.header().consensus().height
}

fn get_status(&self, tx: &Transaction) -> Option<TxPoolTransactionStatus> {
self.database
.off_chain()
.latest_view()
.unwrap()
.get_tx_status(&tx.id(&self.chain_id))
.ok()
.flatten()
}
}
8 changes: 4 additions & 4 deletions crates/fuel-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use fuel_streams_core::{
nats::{NatsClient, NatsClientOpts},
types::{Block, ChainId, DeliverPolicy, Transaction, UniqueIdentifier},
nats::{types::DeliverPolicy, NatsClient, NatsClientOpts},
types::{Block, Transaction},
StreamEncoder,
Streamable,
SubscribeConsumerConfig,
Expand Down Expand Up @@ -73,7 +73,7 @@ async fn sync_blocks(
while let Some(msg) = subscription.next().await {
let msg = msg?;
let block = Block::decode(msg.payload.clone().into()).await;
let height = *block.header().consensus().height;
let height = block.height;
let id = height.to_string();
let key = ("block".to_string(), id.clone());
let record: Option<BlockRecord> = db
Expand Down Expand Up @@ -106,7 +106,7 @@ async fn sync_transactions(
while let Some(msg) = subscription.next().await {
let msg = msg?;
let transaction = Transaction::decode(msg.payload.clone().into()).await;
let tx_id = transaction.id(&ChainId::default());
let tx_id = &transaction.id;
let id = format!("0x{}", tx_id);
let key = ("transaction".to_string(), id.clone());
let record: Option<TransactionRecord> = db
Expand Down
95 changes: 72 additions & 23 deletions crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,40 @@ use std::sync::Arc;

use fuel_core::{
combined_database::CombinedDatabase,
database::database_description::DatabaseHeight,
database::{
database_description::{on_chain::OnChain, DatabaseHeight},
Database,
},
fuel_core_graphql_api::ports::DatabaseBlocks,
state::{
generic_database::GenericDatabase,
iterable_key_value_view::IterableKeyValueViewWrapper,
},
};
use fuel_core_bin::FuelService;
use fuel_core_importer::ports::ImporterDatabase;
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::{
blockchain::consensus::Sealed,
fuel_tx::Bytes32,
blockchain::consensus::{Consensus, Sealed},
fuel_tx::{AssetId, Bytes32},
fuel_types::BlockHeight,
tai64::Tai64,
};
use fuel_streams_core::types::{
Address,
Block,
ChainId,
FuelCoreBlock,
FuelCoreReceipt,
FuelCoreTransactionStatus,
Receipt,
};
use tokio::sync::broadcast::Receiver;

pub type OffchainDatabase = GenericDatabase<
IterableKeyValueViewWrapper<
fuel_core::fuel_core_graphql_api::storage::Column,
>,
>;

/// Interface for `fuel-core` related logic.
/// This was introduced to simplify mocking and testing the `fuel-streams-publisher` crate.
#[async_trait::async_trait]
Expand All @@ -29,44 +44,74 @@ pub trait FuelCoreLike: Sync + Send {
fn is_started(&self) -> bool;
async fn stop(&self);

fn base_asset_id(&self) -> &AssetId;
fn chain_id(&self) -> &ChainId;

fn database(&self) -> &CombinedDatabase;
fn onchain_database(&self) -> &Database<OnChain> {
self.database().on_chain()
}
fn offchain_database(&self) -> anyhow::Result<Arc<OffchainDatabase>> {
Ok(Arc::new(self.database().off_chain().latest_view()?))
}

fn blocks_subscription(
&self,
) -> Receiver<fuel_core_importer::ImporterResult>;

fn get_latest_block_height(&self) -> anyhow::Result<Option<u64>> {
Ok(self
.database()
.on_chain()
.onchain_database()
.latest_block_height()?
.map(|h| h.as_u64()))
}

fn get_receipts(
&self,
tx_id: &Bytes32,
) -> anyhow::Result<Option<Vec<Receipt>>>;
) -> anyhow::Result<Option<Vec<FuelCoreReceipt>>>;

fn get_block_and_producer_by_height(
&self,
height: u64,
) -> anyhow::Result<(Block, Address)> {
height: u32,
) -> anyhow::Result<(FuelCoreBlock, Address)> {
let sealed_block = self
.database()
.on_chain()
.onchain_database()
.latest_view()?
.get_sealed_block_by_height(&(height as u32).into())?
.get_sealed_block_by_height(&(height).into())?
.expect("NATS Publisher: no block at height {height}");

Ok(self.get_block_and_producer(&sealed_block))
}

#[cfg(not(feature = "test-helpers"))]
fn get_consensus(
&self,
block_height: &BlockHeight,
) -> anyhow::Result<Consensus> {
Ok(self
.onchain_database()
.latest_view()?
.consensus(block_height)?)
}

#[cfg(feature = "test-helpers")]
fn get_consensus(
&self,
block_height: &BlockHeight,
) -> anyhow::Result<Consensus> {
Ok(self
.onchain_database()
.latest_view()?
.consensus(block_height)
.unwrap_or_default())
}

#[cfg(not(feature = "test-helpers"))]
fn get_block_and_producer(
&self,
sealed_block: &Sealed<Block>,
) -> (Block, Address) {
sealed_block: &Sealed<FuelCoreBlock>,
) -> (FuelCoreBlock, Address) {
let block = sealed_block.entity.clone();
let block_producer = sealed_block
.consensus
Expand All @@ -79,8 +124,8 @@ pub trait FuelCoreLike: Sync + Send {
#[cfg(feature = "test-helpers")]
fn get_block_and_producer(
&self,
sealed_block: &Sealed<Block>,
) -> (Block, Address) {
sealed_block: &Sealed<FuelCoreBlock>,
) -> (FuelCoreBlock, Address) {
let block = sealed_block.entity.clone();
let block_producer = sealed_block
.consensus
Expand All @@ -90,9 +135,8 @@ pub trait FuelCoreLike: Sync + Send {
(block, block_producer.into())
}

fn get_sealed_block_by_height(&self, height: u32) -> Sealed<Block> {
self.database()
.on_chain()
fn get_sealed_block_by_height(&self, height: u32) -> Sealed<FuelCoreBlock> {
self.onchain_database()
.latest_view()
.expect("failed to get latest db view")
.get_sealed_block_by_height(&height.into())
Expand All @@ -101,8 +145,7 @@ pub trait FuelCoreLike: Sync + Send {
}

fn get_sealed_block_time_by_height(&self, height: u32) -> Tai64 {
self.database()
.on_chain()
self.onchain_database()
.latest_view()
.expect("failed to get latest db view")
.get_sealed_block_header(&height.into())
Expand All @@ -117,6 +160,7 @@ pub trait FuelCoreLike: Sync + Send {
pub struct FuelCore {
pub fuel_service: Arc<FuelService>,
chain_id: ChainId,
base_asset_id: AssetId,
database: CombinedDatabase,
}

Expand All @@ -125,12 +169,14 @@ impl From<FuelService> for FuelCore {
let chain_config =
fuel_service.shared.config.snapshot_reader.chain_config();
let chain_id = chain_config.consensus_parameters.chain_id();
let base_asset_id = *chain_config.consensus_parameters.base_asset_id();

let database = fuel_service.shared.database.clone();

Self {
fuel_service: Arc::new(fuel_service),
chain_id,
base_asset_id,
database,
}
}
Expand Down Expand Up @@ -191,6 +237,9 @@ impl FuelCoreLike for FuelCore {
}
}

fn base_asset_id(&self) -> &AssetId {
&self.base_asset_id
}
fn chain_id(&self) -> &ChainId {
&self.chain_id
}
Expand All @@ -212,7 +261,7 @@ impl FuelCoreLike for FuelCore {
fn get_receipts(
&self,
tx_id: &Bytes32,
) -> anyhow::Result<Option<Vec<Receipt>>> {
) -> anyhow::Result<Option<Vec<FuelCoreReceipt>>> {
let off_chain_database = self.database().off_chain().latest_view()?;
let receipts = off_chain_database
.get_tx_status(tx_id)?
Expand Down
Loading

0 comments on commit ed646cd

Please sign in to comment.