Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(da-indexer): update celestia integration #1132

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
414 changes: 305 additions & 109 deletions da-indexer/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions da-indexer/da-indexer-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ sea-orm = { version = "0.12.2", features = [
"postgres-array",
] }

celestia-rpc = "0.1.1"
celestia-types = "0.1.1"
celestia-rpc = "0.7.0"
celestia-types = "0.7.0"
tokio = { version = "1", features = ["full"] }
hex = "0.4.3"
lazy_static = "1.4.0"
sha3 = "0.10.8"
futures = "0.3"
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }
jsonrpsee = { version = "0.24.7", features = ["client-core", "macros"] }
serde = "1.0"
serde_with = "3.6.1"
serde_json = "1.0.96"
async-trait = "0.1"
http = "0.2.9"
http = "1.1.0"
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
prost = "0.10"
ethabi = "18.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use celestia_rpc::{Error, Result};
use http::{header, HeaderValue};
use jsonrpsee::{
http_client::{HeaderMap, HttpClientBuilder},
ws_client::WsClientBuilder,
};
pub mod share;

use celestia_rpc::{Client, Error, Result};
use http::{header, HeaderMap, HeaderValue};
use jsonrpsee::{http_client::HttpClientBuilder, ws_client::WsClientBuilder};

/// The maximum request size in the default client in celestia_rpc is not sufficient for some blocks,
/// therefore, we need to customize client initialization
Expand All @@ -12,7 +11,7 @@ pub async fn new_celestia_client(
auth_token: Option<&str>,
max_request_size: u32,
max_response_size: u32,
) -> Result<celestia_rpc::Client> {
) -> Result<Client> {
let mut headers = HeaderMap::new();

if let Some(token) = auth_token {
Expand Down
45 changes: 45 additions & 0 deletions da-indexer/da-indexer-logic/src/celestia/client/share.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::future::Future;

use celestia_types::{AppVersion, ExtendedDataSquare};
use jsonrpsee::core::client::{ClientT, Error};

// celestia_rpc::Client doesn't support new version of share.GetEDS method
// so we need to implement it manually
mod rpc {
use celestia_types::eds::RawExtendedDataSquare;
use jsonrpsee::proc_macros::rpc;

#[rpc(client)]
pub trait Share {
#[method(name = "share.GetEDS")]
async fn share_get_eds(&self, height: u64) -> Result<RawExtendedDataSquare, client::Error>;
}
}

pub trait ShareClient: ClientT {
/// GetEDS gets the full EDS identified by the given root.
fn share_get_eds<'a, 'b, 'fut>(
&'a self,
height: u64,
app_version: u64,
) -> impl Future<Output = Result<ExtendedDataSquare, Error>> + Send + 'fut
where
'a: 'fut,
'b: 'fut,
Self: Sized + Sync + 'fut,
{
async move {
let app_version = AppVersion::from_u64(app_version).ok_or_else(|| {
let e = format!("Invalid or unsupported AppVersion: {app_version}");
Error::Custom(e)
})?;

let raw_eds = rpc::ShareClient::share_get_eds(self, height).await?;

ExtendedDataSquare::from_raw(raw_eds, app_version)
.map_err(|e| Error::Custom(e.to_string()))
}
}
}

impl<T> ShareClient for T where T: ClientT {}
30 changes: 20 additions & 10 deletions da-indexer/da-indexer-logic/src/celestia/da.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use crate::{
celestia::{client, repository::blobs},
indexer::{Job, DA},
};
use anyhow::Result;
use async_trait::async_trait;
use celestia_rpc::{Client, HeaderClient, ShareClient};
use celestia_rpc::{Client, HeaderClient};
use celestia_types::{Blob, ExtendedHeader};
use sea_orm::{DatabaseConnection, TransactionTrait};
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};

use crate::{
celestia::{repository::blobs, rpc_client},
indexer::{Job, DA},
use super::{
client::share::ShareClient, job::CelestiaJob, parser, repository::blocks,
settings::IndexerSettings,
};

use super::{job::CelestiaJob, parser, repository::blocks, settings::IndexerSettings};

pub struct CelestiaDA {
client: Client,
db: Arc<DatabaseConnection>,
Expand All @@ -25,7 +27,7 @@ pub struct CelestiaDA {

impl CelestiaDA {
pub async fn new(db: Arc<DatabaseConnection>, settings: IndexerSettings) -> Result<Self> {
let client = rpc_client::new_celestia_client(
let client = client::new_celestia_client(
&settings.rpc.url,
settings.rpc.auth_token.as_deref(),
settings.rpc.max_request_size,
Expand Down Expand Up @@ -54,11 +56,14 @@ impl CelestiaDA {

async fn get_blobs_by_height(&self, height: u64) -> Result<(ExtendedHeader, Vec<Blob>)> {
let header = self.client.header_get_by_height(height).await?;
let mut blobs = vec![];

let mut blobs = vec![];
if parser::maybe_contains_blobs(&header.dah) {
let eds = self.client.share_get_eds(&header).await?;
blobs = parser::parse_eds(&eds, header.dah.square_len())?;
let eds = self
.client
.share_get_eds(height, header.header.version.app)
.await?;
blobs = parser::parse_eds(&eds, header.header.version.app)?;
}

Ok((header, blobs))
Expand Down Expand Up @@ -103,6 +108,11 @@ impl DA for CelestiaDA {
let height = self.client.header_local_head().await?.header.height.value();
tracing::info!(height, "latest block");

if height <= self.last_known_height.load(Ordering::Acquire) {
tracing::info!("latest block is below last known height, skipping...");
return Ok(vec![]);
}

let from = self.last_known_height.swap(height, Ordering::AcqRel) + 1;
Ok((from..=height)
.map(|height| Job::Celestia(CelestiaJob { height }))
Expand Down
2 changes: 1 addition & 1 deletion da-indexer/da-indexer-logic/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod client;
pub mod da;
pub mod job;
pub mod l2_router;
mod parser;
pub mod repository;
mod rpc_client;
pub mod settings;
#[cfg(test)]
pub mod tests;
83 changes: 10 additions & 73 deletions da-indexer/da-indexer-logic/src/celestia/parser.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{Error, Result};
use anyhow::Result;
use celestia_types::{
blob::Blob, consts::appconsts, nmt::Namespace, Commitment, DataAvailabilityHeader,
ExtendedDataSquare, Share,
blob::Blob, nmt::Namespace, AppVersion, DataAvailabilityHeader, ExtendedDataSquare,
};

lazy_static! {
Expand All @@ -17,81 +16,19 @@ lazy_static! {

/// Checks if the DataAvailabilityHeader might contain blobs.
pub fn maybe_contains_blobs(dah: &DataAvailabilityHeader) -> bool {
dah.row_roots.iter().any(|row| {
dah.row_roots().iter().any(|row| {
*PAY_FOR_BLOB_NAMESPACE >= row.min_namespace().into()
&& *PAY_FOR_BLOB_NAMESPACE <= row.max_namespace().into()
})
}

/// Extracts blobs from the ExtendedDataSquare.
/// The format described here: https://github.com/celestiaorg/celestia-app/blob/main/specs/src/specs/shares.md
pub fn parse_eds(eds: &ExtendedDataSquare, width: usize) -> Result<Vec<Blob>> {
// sanity check
if width * width != eds.data_square.len() {
return Err(Error::msg("data square length mismatch"));
}
pub fn parse_eds(eds: &ExtendedDataSquare, app_version: u64) -> Result<Vec<Blob>> {
let app_version = AppVersion::from_u64(app_version)
.ok_or_else(|| anyhow::anyhow!("invalid or unsupported app_version: {app_version}"))?;

let mut blobs: Vec<Blob> = vec![];
let mut sequence_length = 0;
let mut parsed_length = 0;

for row in eds.data_square.chunks(width).take(width / 2) {
for share in row.iter().take(width / 2) {
let share = Share::from_raw(share)?;
let ns = share.namespace();

if ns == *TAIL_PADDING_NAMESPACE {
break;
}

if ns.is_reserved_on_celestia() {
continue;
}

let info_byte = share.info_byte();

let mut share_data;
if info_byte.is_sequence_start() {
assert!(parsed_length == sequence_length);

sequence_length = share.sequence_length().unwrap() as usize;
parsed_length = 0;

if sequence_length == 0
&& blobs.last().is_some()
&& blobs.last().unwrap().namespace == ns
{
// Namespace Padding Share, should be ignored
continue;
}

blobs.push(Blob {
namespace: ns,
data: vec![0; sequence_length],
share_version: info_byte.version(),
commitment: Commitment([0; 32]),
});

// first share: skip info byte and sequence length
share_data = &share.data()[1 + appconsts::SEQUENCE_LEN_BYTES..];
} else {
// continuation share: skip info byte
share_data = &share.data()[1..];
}

let data_length = share_data.len().min(sequence_length - parsed_length);
share_data = &share_data[..data_length];

let last_blob = blobs.last_mut().unwrap();
last_blob.data[parsed_length..(parsed_length + data_length)]
.copy_from_slice(share_data);
parsed_length += data_length;

if parsed_length == sequence_length {
last_blob.commitment =
Commitment::from_blob(ns, info_byte.version(), &last_blob.data)?;
}
}
}
Ok(blobs)
Blob::reconstruct_all(eds.data_square(), app_version).map_err(|err| {
tracing::error!(err = ?err, "failed to parse EDS");
anyhow::anyhow!(err)
})
}
14 changes: 12 additions & 2 deletions da-indexer/da-indexer-logic/src/celestia/tests/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use celestia_types::{nmt::Namespace, Blob as CelestiaBlob, Commitment};
use celestia_types::{
consts::appconsts::subtree_root_threshold, nmt::Namespace, AppVersion, Blob as CelestiaBlob,
Commitment,
};

use crate::celestia::{
repository::{blobs, blocks},
Expand Down Expand Up @@ -49,12 +52,19 @@ fn celestia_blob(seed: u32) -> CelestiaBlob {
Namespace::new(0, &[&[0_u8; 18], &sha3("namespace", seed)[..10]].concat()).unwrap();
let data = sha3("data", seed).to_vec();
let share_version = 0;
let commitment = Commitment::from_blob(namespace, share_version, &data).unwrap();
let commitment = Commitment::from_blob(
namespace,
&data,
share_version,
subtree_root_threshold(AppVersion::latest()),
)
.unwrap();
CelestiaBlob {
namespace,
data,
share_version,
commitment,
index: None,
}
}

Expand Down

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion da-indexer/da-indexer-logic/src/celestia/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod blobs;
pub mod blocks;
pub mod l2_router;
pub mod parser;

use blockscout_service_launcher::test_database::TestDbGuard;

Expand Down
45 changes: 0 additions & 45 deletions da-indexer/da-indexer-logic/src/celestia/tests/parser.rs

This file was deleted.

Loading