diff --git a/catalyst-gateway/bin/src/cardano/mod.rs b/catalyst-gateway/bin/src/cardano/mod.rs index 4656961a416..b9cd68cc67b 100644 --- a/catalyst-gateway/bin/src/cardano/mod.rs +++ b/catalyst-gateway/bin/src/cardano/mod.rs @@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn}; use crate::{ db::index::{ - block::index_block, + block::{index_block, roll_forward}, queries::sync_status::{ get::{get_sync_status, SyncStatus}, update::update_sync_status, @@ -453,14 +453,11 @@ impl SyncTask { }, } - // TODO: IF there is only 1 chain follower left in sync_tasks, then all - // immutable followers have finished. - // When this happens we need to purge the live index of any records that exist - // before the current immutable tip. - // Note: to prevent a data race when multiple nodes are syncing, we probably - // want to put a gap in this, so that there are X slots of overlap - // between the live chain and immutable chain. This gap should be - // a parameter. + if self.sync_tasks.len() == 1 { + if let Err(error) = roll_forward::purge_live_index(self.immutable_tip_slot).await { + error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed."); + } + } } error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!"); diff --git a/catalyst-gateway/bin/src/db/index/block/mod.rs b/catalyst-gateway/bin/src/db/index/block/mod.rs index 2096bedc4e9..327e0682f2f 100644 --- a/catalyst-gateway/bin/src/db/index/block/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod certs; pub(crate) mod cip36; pub(crate) mod rbac509; +pub(crate) mod roll_forward; pub(crate) mod txi; pub(crate) mod txo; diff --git a/catalyst-gateway/bin/src/db/index/block/roll_forward.rs b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs new file mode 100644 index 00000000000..46994b5d4c7 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/block/roll_forward.rs @@ -0,0 +1,314 @@ +//! Immutable Roll Forward logic. + +use std::{collections::HashSet, sync::Arc}; + +use futures::StreamExt; + +use crate::{ + db::index::{block::CassandraSession, queries::purge}, + settings::Settings, +}; + +/// Purge Data from Live Index +pub(crate) async fn purge_live_index(purge_slot: u64) -> anyhow::Result<()> { + let persistent = false; // get volatile session + let Some(session) = CassandraSession::get(persistent) else { + anyhow::bail!("Failed to acquire db session"); + }; + + // Purge data up to this slot + let purge_to_slot: num_bigint::BigInt = purge_slot + .saturating_sub(Settings::purge_slot_buffer()) + .into(); + + let txn_hashes = purge_txi_by_hash(&session, &purge_to_slot).await?; + purge_chain_root_for_role0_key(&session, &purge_to_slot).await?; + purge_chain_root_for_stake_address(&session, &purge_to_slot).await?; + purge_chain_root_for_txn_id(&session, &txn_hashes).await?; + purge_cip36_registration(&session, &purge_to_slot).await?; + purge_cip36_registration_for_vote_key(&session, &purge_to_slot).await?; + purge_cip36_registration_invalid(&session, &purge_to_slot).await?; + purge_rbac509_registration(&session, &purge_to_slot).await?; + purge_stake_registration(&session, &purge_to_slot).await?; + purge_txo_ada(&session, &purge_to_slot).await?; + purge_txo_assets(&session, &purge_to_slot).await?; + purge_unstaked_txo_ada(&session, &purge_to_slot).await?; + purge_unstaked_txo_assets(&session, &purge_to_slot).await?; + + Ok(()) +} + +/// Purge data from `chain_root_for_role0_key`. +async fn purge_chain_root_for_role0_key( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::chain_root_for_role0_key::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `chain_root_for_stake_address`. +async fn purge_chain_root_for_stake_address( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::chain_root_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `chain_root_for_txn_id`. +async fn purge_chain_root_for_txn_id( + session: &Arc, txn_hashes: &HashSet>, +) -> anyhow::Result<()> { + use purge::chain_root_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if txn_hashes.contains(¶ms.transaction_id) { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `cip36_registration`. +async fn purge_cip36_registration( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `cip36_registration_for_vote_key`. +async fn purge_cip36_registration_for_vote_key( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `cip36_registration_invalid`. +async fn purge_cip36_registration_invalid( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `rbac509_registration`. +async fn purge_rbac509_registration( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `stake_registration`. +async fn purge_stake_registration( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `txi_by_hash`. +async fn purge_txi_by_hash( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result>> { + use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + let mut txn_hashes: HashSet> = HashSet::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + if &primary_key.2 <= purge_to_slot { + let params: Params = primary_key.into(); + txn_hashes.insert(params.txn_hash.clone()); + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(txn_hashes) +} + +/// Purge data from `txo_ada`. +async fn purge_txo_ada( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `txo_assets`. +async fn purge_txo_assets( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `unstaked_txo_ada`. +async fn purge_unstaked_txo_ada( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + if &primary_key.2 <= purge_to_slot { + let params: Params = primary_key.into(); + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} + +/// Purge data from `unstaked_txo_assets`. +async fn purge_unstaked_txo_assets( + session: &Arc, purge_to_slot: &num_bigint::BigInt, +) -> anyhow::Result<()> { + use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery}; + + // Get all keys + let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?; + // Filter + let mut delete_params: Vec = Vec::new(); + while let Some(Ok(primary_key)) = primary_keys_stream.next().await { + let params: Params = primary_key.into(); + if ¶ms.slot_no <= purge_to_slot { + delete_params.push(params); + } + } + // Delete filtered keys + DeleteQuery::execute(session, delete_params).await?; + Ok(()) +} diff --git a/catalyst-gateway/bin/src/db/index/queries/mod.rs b/catalyst-gateway/bin/src/db/index/queries/mod.rs index a759fb5a27e..1bff2bd55dc 100644 --- a/catalyst-gateway/bin/src/db/index/queries/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/mod.rs @@ -2,6 +2,7 @@ //! //! This improves query execution time. +pub(crate) mod purge; pub(crate) mod rbac; pub(crate) mod registrations; pub(crate) mod staked_ada; @@ -169,6 +170,7 @@ pub(crate) type FallibleQueryTasks = Vec, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { @@ -193,7 +195,7 @@ impl PreparedQueries { let chain_root_by_stake_address = GetChainRootQuery::prepare(session.clone()).await; let registrations_by_chain_root = GetRegistrationsByChainRootQuery::prepare(session.clone()).await; - let chain_root_by_role0_key = GetRole0ChainRootQuery::prepare(session).await; + let chain_root_by_role0_key = GetRole0ChainRootQuery::prepare(session.clone()).await; let ( txo_insert_queries, @@ -332,11 +334,7 @@ impl PreparedQueries { }, PreparedSelectQuery::ChainRootByRole0Key => &self.chain_root_by_role0_key_query, }; - - session - .execute_iter(prepared_stmt.clone(), params) - .await - .map_err(|e| anyhow::anyhow!(e)) + session_execute_iter(session, prepared_stmt, params).await } /// Execute a Batch query with the given parameters. @@ -376,35 +374,62 @@ impl PreparedQueries { &self.chain_root_for_stake_address_insert_queries }, }; + session_execute_batch(session, query_map, cfg, query, values).await + } +} - let mut results: Vec = Vec::new(); - - let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1)); - let mut query_failed = false; - let query_str = format!("{query}"); - - for chunk in chunks { - let chunk_size: u16 = chunk.len().try_into()?; - let Some(batch_query) = query_map.get(&chunk_size) else { - // This should not actually occur. - bail!("No batch query found for size {}", chunk_size); - }; - let batch_query_statements = batch_query.value().clone(); - match session.batch(&batch_query_statements, chunk).await { - Ok(result) => results.push(result), - Err(err) => { - let chunk_str = format!("{chunk:?}"); - error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed"); - query_failed = true; - // Defer failure until all batches have been processed. - }, - } - } - - if query_failed { - bail!("Query Failed: {query_str}!"); +/// Execute a Batch query with the given parameters. +/// +/// Values should be a Vec of values which implement `SerializeRow` and they MUST be +/// the same, and must match the query being executed. +/// +/// This will divide the batch into optimal sized chunks and execute them until all +/// values have been executed or the first error is encountered. +async fn session_execute_batch( + session: Arc, query_map: &SizedBatch, cfg: Arc, query: Q, + values: Vec, +) -> FallibleQueryResults { + let mut results: Vec = Vec::new(); + + let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1)); + let mut query_failed = false; + let query_str = format!("{query}"); + + for chunk in chunks { + let chunk_size: u16 = chunk.len().try_into()?; + let Some(batch_query) = query_map.get(&chunk_size) else { + // This should not actually occur. + bail!("No batch query found for size {}", chunk_size); + }; + let batch_query_statements = batch_query.value().clone(); + match session.batch(&batch_query_statements, chunk).await { + Ok(result) => results.push(result), + Err(err) => { + let chunk_str = format!("{chunk:?}"); + error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed"); + query_failed = true; + // Defer failure until all batches have been processed. + }, } + } - Ok(results) + if query_failed { + bail!("Query Failed: {query_str}!"); } + + Ok(results) +} + +/// Executes a select query with the given parameters. +/// +/// Returns an iterator that iterates over all the result pages that the query +/// returns. +pub(crate) async fn session_execute_iter

( + session: Arc, prepared_stmt: &PreparedStatement, params: P, +) -> anyhow::Result +where P: SerializeRow { + session + .execute_iter(prepared_stmt.clone(), params) + .await + .map_err(|e| anyhow::anyhow!(e)) } diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs new file mode 100644 index 00000000000..b13a4103e69 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_role0_key.rs @@ -0,0 +1,128 @@ +//! Chain Root For Role0 Key (RBAC 509 registrations) Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for Chain Root For Role0 Key registration purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16); +} + +/// Select primary keys for Chain Root For Role0 Key registration. +const SELECT_QUERY: &str = include_str!("./cql/get_chain_root_for_role0_key.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Role0 Key - Binary 16 bytes. + pub(crate) role0_key: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("role0_key", &self.role0_key) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + role0_key: value.0, + slot_no: value.1, + txn: value.2, + } + } +} +/// Get primary key for Chain Root For Role0 Key registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all Chain Root For Role0 Key registration primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get Chain Root For Role0 Key registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all Chain Root For Role0 Key registration primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::ChainRootForRole0Key) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete Chain Root For Role0 Key registration +const DELETE_QUERY: &str = include_str!("./cql/delete_chain_root_for_role0_key.cql"); + +/// Delete Chain Root For Role0 Key registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::ChainRootForRole0Key, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs new file mode 100644 index 00000000000..ac925a2c350 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_stake_address.rs @@ -0,0 +1,130 @@ +//! Chain Root For Stake Address (RBAC 509 registrations) Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for Chain Root For Stake Address registration purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16); +} + +/// Select primary keys for Chain Root For Stake Address registration. +const SELECT_QUERY: &str = include_str!("./cql/get_chain_root_for_stake_addr.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 32 bytes. + pub(crate) stake_addr: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_addr", &hex::encode(&self.stake_addr)) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_addr: value.0, + slot_no: value.1, + txn: value.2, + } + } +} +/// Get primary key for Chain Root For Stake Address registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all Chain Root For Stake Address registration primary + /// keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get Chain Root For Stake Address registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all Chain Root For Stake Address registration primary + /// keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::ChainRootForStakeAddress) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete Chain Root For Stake Address registration +const DELETE_QUERY: &str = include_str!("./cql/delete_chain_root_for_stake_addr.cql"); + +/// Delete Chain Root For Stake Address registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::ChainRootForStakeAddress, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs new file mode 100644 index 00000000000..aeb258daf56 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/chain_root_for_txn_id.rs @@ -0,0 +1,125 @@ +//! Chain Root For TX ID (RBAC 509 registrations) Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for Chain Root For TX ID registration purge queries. + use scylla::DeserializeRow; + + /// Primary Key Row + #[derive(DeserializeRow)] + pub(crate) struct PrimaryKey { + /// TXN ID HASH - Binary 32 bytes. + pub(crate) transaction_id: Vec, + } +} + +/// Select primary keys for Chain Root For TX ID registration. +const SELECT_QUERY: &str = include_str!("./cql/get_chain_root_for_txn_id.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// TXN ID HASH - Binary 32 bytes. + pub(crate) transaction_id: Vec, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("transaction_id", &hex::encode(&self.transaction_id)) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + transaction_id: value.transaction_id, + } + } +} +/// Get primary key for Chain Root For TX ID registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all Chain Root For TX ID registration primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get Chain Root For TX ID registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all Chain Root For TX ID registration primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::ChainRootForTxnId) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete Chain Root For TX ID registration +const DELETE_QUERY: &str = include_str!("./cql/delete_chain_root_for_txn_id.cql"); + +/// Delete Chain Root For TX ID registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::ChainRootForTxnId, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs new file mode 100644 index 00000000000..3ba4b2b4d83 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs @@ -0,0 +1,132 @@ +//! CIP-36 registration Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for CIP-36 registration purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, num_bigint::BigInt, i16); +} + +/// Select primary keys for CIP-36 registration. +const SELECT_QUERY: &str = include_str!("./cql/get_cip36_registration.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_address: Vec, + /// Nonce that has been slot corrected. + pub(crate) nonce: num_bigint::BigInt, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_address", &self.stake_address) + .field("nonce", &self.nonce) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_address: value.0, + nonce: value.1, + slot_no: value.2, + txn: value.3, + } + } +} +/// Get primary key for CIP-36 registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all CIP-36 registration primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get CIP-36 registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all CIP-36 registration primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::Cip36Registration) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete CIP-36 registration +const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration.cql"); + +/// Delete CIP-36 registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::Cip36Registration, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs new file mode 100644 index 00000000000..7f4dbd02dd7 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs @@ -0,0 +1,136 @@ +//! CIP-36 registration by Vote Key Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for CIP-36 registration by Vote key purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, Vec, num_bigint::BigInt, i16, bool); +} + +/// Select primary keys for CIP-36 registration by Vote key. +const SELECT_QUERY: &str = include_str!("./cql/get_cip36_registration_for_vote_key.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Vote key - Binary 28 bytes. + pub(crate) vote_key: Vec, + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_address: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, + /// True if registration is valid. + pub(crate) valid: bool, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("vote_key", &self.vote_key) + .field("stake_address", &self.stake_address) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .field("valid", &self.valid) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + vote_key: value.0, + stake_address: value.1, + slot_no: value.2, + txn: value.3, + valid: value.4, + } + } +} +/// Get primary key for CIP-36 registration by Vote key query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all CIP-36 registration by Vote key primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get CIP-36 registration by Vote key primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all CIP-36 registration by Vote key primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::Cip36RegistrationForVoteKey) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete CIP-36 registration +const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration_for_vote_key.cql"); + +/// Delete CIP-36 registration by Vote key Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::Cip36RegistrationForVoteKey, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs new file mode 100644 index 00000000000..13e7edd70ad --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs @@ -0,0 +1,128 @@ +//! CIP-36 Registration (Invalid) Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for CIP-36 invalid registration purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16); +} + +/// Select primary keys for CIP-36 invalid registration. +const SELECT_QUERY: &str = include_str!("./cql/get_cip36_registration_invalid.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_address: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_address", &self.stake_address) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_address: value.0, + slot_no: value.1, + txn: value.2, + } + } +} +/// Get primary key for CIP-36 invalid registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all CIP-36 invalid registration primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all CIP-36 invalid registration primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::Cip36RegistrationInvalid) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete CIP-36 invalid registration +const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration_invalid.cql"); + +/// Delete CIP-36 invalid registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::Cip36RegistrationInvalid, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_role0_key.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_role0_key.cql new file mode 100644 index 00000000000..dfffe6d83d7 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_role0_key.cql @@ -0,0 +1,5 @@ +-- Delete Chain Root For Role0 Key (RBAC 509 registrations). +DELETE FROM chain_root_for_role0_key +WHERE role0_key = :role0_key + AND slot_no = :slot_no + AND txn = :txn diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_stake_addr.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_stake_addr.cql new file mode 100644 index 00000000000..d964bb11812 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_stake_addr.cql @@ -0,0 +1,5 @@ +-- Delete Chain Root For Stake Address (RBAC 509 registrations). +DELETE FROM chain_root_for_stake_addr +WHERE stake_addr = :stake_addr + AND slot_no = :slot_no + AND txn = :txn diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_txn_id.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_txn_id.cql new file mode 100644 index 00000000000..5a4ca4d773c --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_chain_root_for_txn_id.cql @@ -0,0 +1,3 @@ +-- Delete Chain Root For TX ID (RBAC 509 registrations). +DELETE FROM chain_root_for_txn_id +WHERE transaction_id = :transaction_id diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration.cql new file mode 100644 index 00000000000..2ab7b51b7cc --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration.cql @@ -0,0 +1,6 @@ +-- Delete CIP-36 registration. +DELETE FROM cip36_registration +WHERE stake_address = :stake_address + AND nonce = :nonce + AND slot_no = :slot_no + AND txn = :txn diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration_for_vote_key.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration_for_vote_key.cql new file mode 100644 index 00000000000..6cfa670f048 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration_for_vote_key.cql @@ -0,0 +1,7 @@ +-- Delete CIP-36 registration by Stake Address. +DELETE FROM cip36_registration_for_vote_key +WHERE vote_key = :vote_key + AND stake_address = :stake_address + AND slot_no = :slot_no + AND txn = :txn + AND valid = :valid diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration_invalid.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration_invalid.cql new file mode 100644 index 00000000000..ada3acdc2a7 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_cip36_registration_invalid.cql @@ -0,0 +1,5 @@ +-- Delete invalid CIP-36 registration. +DELETE FROM cip36_registration_invalid +WHERE stake_address = :stake_address + AND slot_no = :slot_no + AND txn = :txn diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_rbac509_registration.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_rbac509_registration.cql new file mode 100644 index 00000000000..53eef2da725 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_rbac509_registration.cql @@ -0,0 +1,7 @@ +-- Delete RBAC 509 registration. +DELETE FROM rbac509_registration +WHERE chain_root = :chain_root + AND slot_no = :slot_no + AND txn = :txn + AND transaction_id = :transaction_id + AND purpose = :purpose diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_stake_registration.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_stake_registration.cql new file mode 100644 index 00000000000..03c85302892 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_stake_registration.cql @@ -0,0 +1,6 @@ +-- Delete Stake Registration. +DELETE FROM stake_registration +WHERE stake_hash = :stake_hash + AND script = :script + AND slot_no = :slot_no + AND txn = :txn diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txi_by_txn_hashes.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txi_by_txn_hashes.cql new file mode 100644 index 00000000000..87bf0a8b7a2 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txi_by_txn_hashes.cql @@ -0,0 +1,5 @@ +-- Delete ADA or a native asset being spent. +-- This can represent a spend on either immutable data or volatile data. +DELETE FROM txi_by_txn_hash +WHERE txn_hash = :txn_hash + AND txo = :txo diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txo_assets_by_stake_addr.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txo_assets_by_stake_addr.cql new file mode 100644 index 00000000000..c46df7011bb --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txo_assets_by_stake_addr.cql @@ -0,0 +1,8 @@ +-- Delete Transaction Output Assets by Stake Address. +DELETE FROM txo_assets_by_stake +WHERE stake_address = :stake_address + AND slot_no = :slot_no + AND txn = :txn + AND txo = :txo + AND policy_id = :policy_id + AND policy_name = :policy_name diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txo_by_stake_address.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txo_by_stake_address.cql new file mode 100644 index 00000000000..1b0c1fdbdf9 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_txo_by_stake_address.cql @@ -0,0 +1,6 @@ +-- Delete Transaction Output by Stake Address. +DELETE FROM txo_by_stake +WHERE stake_address = :stake_address + AND slot_no = :slot_no + AND txn = :txn + AND txo = :txo diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_unstaked_txo_assets_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_unstaked_txo_assets_by_txn_hash.cql new file mode 100644 index 00000000000..8b975bf963d --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_unstaked_txo_assets_by_txn_hash.cql @@ -0,0 +1,6 @@ +-- Delete Primary Keys from Unstaked Transaction Outputs (Native Assets) by their transaction hash. +DELETE FROM unstaked_txo_assets_by_txn_hash +WHERE txn_hash = :txn_hash + AND txo = :txo + AND policy_id = :policy_id + AND policy_name = :policy_name diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_unstaked_txo_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_unstaked_txo_by_txn_hash.cql new file mode 100644 index 00000000000..ae1cf76c88e --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/delete_unstaked_txo_by_txn_hash.cql @@ -0,0 +1,4 @@ +-- Delete Unstaked Transaction Output Assets by Transaction Hash. +DELETE FROM unstaked_txo_by_txn_hash +WHERE txn_hash = :txn_hash + AND txo = :txo diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_role0_key.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_role0_key.cql new file mode 100644 index 00000000000..6f64ffc05e7 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_role0_key.cql @@ -0,0 +1,6 @@ +-- Get all primary keys from Chain Root For Role0 Key (RBAC 509 registrations). +SELECT + role0_key, + slot_no, + txn +FROM chain_root_for_role0_key diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_stake_addr.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_stake_addr.cql new file mode 100644 index 00000000000..fdc872b488d --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_stake_addr.cql @@ -0,0 +1,6 @@ +-- Get all primary keys from Chain Root For Stake Address (RBAC 509 registrations). +SELECT + stake_addr, + slot_no, + txn +FROM chain_root_for_stake_addr diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_txn_id.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_txn_id.cql new file mode 100644 index 00000000000..176a29975f5 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_chain_root_for_txn_id.cql @@ -0,0 +1,3 @@ +-- Get all primary keys from Chain Root For TX ID (RBAC 509 Registrations). +SELECT transaction_id +FROM chain_root_for_txn_id diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration.cql new file mode 100644 index 00000000000..2c0212301bb --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration.cql @@ -0,0 +1,7 @@ +-- Get all primary keys from CIP-36 registration. +SELECT + stake_address, + nonce, + slot_no, + txn +FROM cip36_registration diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration_for_vote_key.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration_for_vote_key.cql new file mode 100644 index 00000000000..795f00c415b --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration_for_vote_key.cql @@ -0,0 +1,8 @@ +-- Get all primary keys from CIP-36 registration by Vote Key. +SELECT + vote_key, + stake_address, + slot_no, + txn, + valid +FROM cip36_registration_for_vote_key diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration_invalid.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration_invalid.cql new file mode 100644 index 00000000000..7883352bca7 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_cip36_registration_invalid.cql @@ -0,0 +1,6 @@ +-- Get all primary keys from CIP-36 Invalid Registration by Stake Address. +SELECT + stake_address, + slot_no, + txn, +FROM cip36_registration_invalid diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_rbac509_registration.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_rbac509_registration.cql new file mode 100644 index 00000000000..3b88a5d8023 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_rbac509_registration.cql @@ -0,0 +1,8 @@ +-- Get all primary keys from RBAC 509 registration. +SELECT + chain_root, + slot_no, + txn, + transaction_id, + purpose +FROM rbac509_registration diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_stake_registration.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_stake_registration.cql new file mode 100644 index 00000000000..a0f16522b5a --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_stake_registration.cql @@ -0,0 +1,7 @@ +-- Get all primary keys from Stake Registration. +SELECT + stake_hash, + script, + slot_no, + txn +FROM stake_registration diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql new file mode 100644 index 00000000000..062f3388847 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txi_by_txn_hashes.cql @@ -0,0 +1,5 @@ +-- Get all primary keys from ADA or a native asset being spent. +SELECT + txn_hash, + txo +FROM txi_by_txn_hash diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txo_assets_by_stake_addr.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txo_assets_by_stake_addr.cql new file mode 100644 index 00000000000..07de857bd5a --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txo_assets_by_stake_addr.cql @@ -0,0 +1,9 @@ +-- Get all primary keys from Transaction Output Assets by Stake Address. +SELECT + stake_address, + slot_no, + txn, + txo, + policy_id, + policy_name +FROM txo_assets_by_stake diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txo_by_stake_address.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txo_by_stake_address.cql new file mode 100644 index 00000000000..d83329ca7c6 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_txo_by_stake_address.cql @@ -0,0 +1,7 @@ +-- Get all primary keys from Transaction Output by Stake Address. +SELECT + stake_address, + slot_no, + txn, + txo +FROM txo_by_stake diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql new file mode 100644 index 00000000000..45ae2ef93b1 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_assets_by_txn_hash.cql @@ -0,0 +1,7 @@ +-- Get all primary keys from Transaction Outputs (Native Assets) by Transaction Hash. +SELECT + txn_hash, + txo, + policy_id, + policy_name +FROM unstaked_txo_assets_by_txn_hash diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql new file mode 100644 index 00000000000..dffba316e50 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cql/get_unstaked_txo_by_txn_hash.cql @@ -0,0 +1,5 @@ +-- Get ALL Primary Keys from Unstaked Transaction Outputs (Native Assets) per stake address. +SELECT + txn_hash, + txo +FROM unstaked_txo_by_txn_hash diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs b/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs new file mode 100644 index 00000000000..6958c96b748 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs @@ -0,0 +1,298 @@ +//! Queries for purging volatile data. + +pub(crate) mod chain_root_for_role0_key; +pub(crate) mod chain_root_for_stake_address; +pub(crate) mod chain_root_for_txn_id; +pub(crate) mod cip36_registration; +pub(crate) mod cip36_registration_for_vote_key; +pub(crate) mod cip36_registration_invalid; +pub(crate) mod rbac509_registration; +pub(crate) mod stake_registration; +pub(crate) mod txi_by_hash; +pub(crate) mod txo_ada; +pub(crate) mod txo_assets; +pub(crate) mod unstaked_txo_ada; +pub(crate) mod unstaked_txo_assets; + +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, serialize::row::SerializeRow, + transport::iterator::QueryPager, Session, +}; + +use super::{FallibleQueryResults, SizedBatch}; +use crate::settings::cassandra_db; + +/// No parameters +const NO_PARAMS: () = (); + +/// All prepared DELETE query statements (purge DB table rows). +#[derive(strum_macros::Display)] +pub(crate) enum PreparedDeleteQuery { + /// TXO Delete query. + TxoAda, + /// TXO Assets Delete query. + TxoAssets, + /// Unstaked TXO Delete query. + UnstakedTxoAda, + /// Unstaked TXO Asset Delete query. + UnstakedTxoAsset, + /// TXI by TXN Hash Delete query. + Txi, + /// Stake Registration Delete query. + StakeRegistration, + /// CIP 36 Registration Delete Query. + Cip36Registration, + /// CIP 36 Registration Invalid Delete query. + Cip36RegistrationInvalid, + /// CIP 36 Registration for vote key Delete query. + Cip36RegistrationForVoteKey, + /// RBAC 509 Registration Delete query. + Rbac509, + /// Chain Root For Transaction ID Delete query. + ChainRootForTxnId, + /// Chain Root For Role0 Key Delete query. + ChainRootForRole0Key, + /// Chain Root For Stake Address Delete query. + ChainRootForStakeAddress, +} + +/// All prepared SELECT query statements (primary keys from table). +#[derive(strum_macros::Display)] +pub(crate) enum PreparedSelectQuery { + /// TXO Select query. + TxoAda, + /// TXO Asset Select query. + TxoAssets, + /// Unstaked TXO Select query. + UnstakedTxoAda, + /// Unstaked TXO Asset Select query. + UnstakedTxoAsset, + /// TXI by TXN Hash Select query. + Txi, + /// Stake Registration Select query. + StakeRegistration, + /// CIP 36 Registration Select Query. + Cip36Registration, + /// CIP 36 Registration Invalid Select query. + Cip36RegistrationInvalid, + /// CIP 36 Registration for vote key Select query. + Cip36RegistrationForVoteKey, + /// RBAC 509 Registration Select query. + Rbac509, + /// Chain Root For Transaction ID Select query. + ChainRootForTxnId, + /// Chain Root For Role0 Key Select query. + ChainRootForRole0Key, + /// Chain Root For Stake Address Select query. + ChainRootForStakeAddress, +} + +/// All prepared purge queries for a session. +#[allow(dead_code)] +pub(crate) struct PreparedQueries { + /// TXO ADA Primary Key Query. + select_txo_ada: PreparedStatement, + /// TXO Delete Query. + delete_txo_ada: SizedBatch, + /// TXO Asset Primary Key Query. + select_txo_assets: PreparedStatement, + /// TXO Assets Delete Query. + delete_txo_assets: SizedBatch, + /// Unstaked TXO ADA Primary Key Query. + select_unstaked_txo_ada: PreparedStatement, + /// Unstaked TXO ADA Delete Query. + delete_unstaked_txo_ada: SizedBatch, + /// Unstaked TXO Assets Primary Key Query. + select_unstaked_txo_assets: PreparedStatement, + /// Unstaked TXO Asset Delete Query. + delete_unstaked_txo_assets: SizedBatch, + /// TXI by TXN Hash by TXN Hash Primary Key Query. + select_txi_by_hash: PreparedStatement, + /// TXI by TXN Hash Delete Query. + delete_txi_by_hash: SizedBatch, + /// Stake Registration Primary Key Query. + select_stake_registration: PreparedStatement, + /// Stake Registration Delete Query. + delete_stake_registration: SizedBatch, + /// CIP36 Registrations Primary Key Query. + select_cip36_registration: PreparedStatement, + /// CIP36 Registrations Delete Query. + delete_cip36_registration: SizedBatch, + /// CIP36 Registration Invalid Primary Key Query. + select_cip36_registration_invalid: PreparedStatement, + /// CIP36 Registration Invalid Delete Query. + delete_cip36_registration_invalid: SizedBatch, + /// CIP36 Registration for Vote Key Primary Key Query. + select_cip36_registration_for_vote_key: PreparedStatement, + /// CIP36 Registration for Vote Key Delete Query. + delete_cip36_registration_for_vote_key: SizedBatch, + /// RBAC 509 Registrations Primary Key Query. + select_rbac509_registration: PreparedStatement, + /// RBAC 509 Registrations Delete Query. + delete_rbac509_registration: SizedBatch, + /// Chain Root for TX ID Primary Key Query.. + select_chain_root_for_txn_id: PreparedStatement, + /// Chain Root for TX ID Delete Query.. + delete_chain_root_for_txn_id: SizedBatch, + /// Chain Root for Role 0 Key Primary Key Query.. + select_chain_root_for_role0_key: PreparedStatement, + /// Chain Root for Role 0 Key Delete Query.. + delete_chain_root_for_role0_key: SizedBatch, + /// Chain Root for Stake Address Primary Key Query.. + select_chain_root_for_stake_address: PreparedStatement, + /// Chain Root for Stake Address Delete Query.. + delete_chain_root_for_stake_address: SizedBatch, +} + +impl PreparedQueries { + /// Create new prepared queries for a given session. + pub(crate) async fn new( + session: Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + // We initialize like this, so that all errors preparing querys get shown before aborting. + Ok(Self { + select_txo_ada: txo_ada::PrimaryKeyQuery::prepare(&session).await?, + delete_txo_ada: txo_ada::DeleteQuery::prepare_batch(&session, cfg).await?, + select_txo_assets: txo_assets::PrimaryKeyQuery::prepare(&session).await?, + delete_txo_assets: txo_assets::DeleteQuery::prepare_batch(&session, cfg).await?, + select_unstaked_txo_ada: unstaked_txo_ada::PrimaryKeyQuery::prepare(&session).await?, + delete_unstaked_txo_ada: unstaked_txo_ada::DeleteQuery::prepare_batch(&session, cfg) + .await?, + select_unstaked_txo_assets: unstaked_txo_assets::PrimaryKeyQuery::prepare(&session) + .await?, + delete_unstaked_txo_assets: unstaked_txo_assets::DeleteQuery::prepare_batch( + &session, cfg, + ) + .await?, + select_txi_by_hash: txi_by_hash::PrimaryKeyQuery::prepare(&session).await?, + delete_txi_by_hash: txi_by_hash::DeleteQuery::prepare_batch(&session, cfg).await?, + select_stake_registration: stake_registration::PrimaryKeyQuery::prepare(&session) + .await?, + delete_stake_registration: stake_registration::DeleteQuery::prepare_batch( + &session, cfg, + ) + .await?, + select_cip36_registration: cip36_registration::PrimaryKeyQuery::prepare(&session) + .await?, + delete_cip36_registration: cip36_registration::DeleteQuery::prepare_batch( + &session, cfg, + ) + .await?, + select_cip36_registration_invalid: + cip36_registration_invalid::PrimaryKeyQuery::prepare(&session).await?, + delete_cip36_registration_invalid: + cip36_registration_invalid::DeleteQuery::prepare_batch(&session, cfg).await?, + select_cip36_registration_for_vote_key: + cip36_registration_for_vote_key::PrimaryKeyQuery::prepare(&session).await?, + delete_cip36_registration_for_vote_key: + cip36_registration_for_vote_key::DeleteQuery::prepare_batch(&session, cfg).await?, + select_rbac509_registration: rbac509_registration::PrimaryKeyQuery::prepare(&session) + .await?, + delete_rbac509_registration: rbac509_registration::DeleteQuery::prepare_batch( + &session, cfg, + ) + .await?, + select_chain_root_for_role0_key: chain_root_for_role0_key::PrimaryKeyQuery::prepare( + &session, + ) + .await?, + delete_chain_root_for_role0_key: chain_root_for_role0_key::DeleteQuery::prepare_batch( + &session, cfg, + ) + .await?, + select_chain_root_for_txn_id: chain_root_for_txn_id::PrimaryKeyQuery::prepare(&session) + .await?, + delete_chain_root_for_txn_id: chain_root_for_txn_id::DeleteQuery::prepare_batch( + &session, cfg, + ) + .await?, + select_chain_root_for_stake_address: + chain_root_for_stake_address::PrimaryKeyQuery::prepare(&session).await?, + delete_chain_root_for_stake_address: + chain_root_for_stake_address::DeleteQuery::prepare_batch(&session, cfg).await?, + }) + } + + /// Prepares a statement. + pub(crate) async fn prepare( + session: Arc, query: &str, consistency: scylla::statement::Consistency, + idempotent: bool, + ) -> anyhow::Result { + super::PreparedQueries::prepare(session, query, consistency, idempotent).await + } + + /// Prepares all permutations of the batch from 1 to max. + /// It is necessary to do this because batches are pre-sized, they can not be dynamic. + /// Preparing the batches in advance is a very larger performance increase. + pub(crate) async fn prepare_batch( + session: Arc, query: &str, cfg: &cassandra_db::EnvVars, + consistency: scylla::statement::Consistency, idempotent: bool, logged: bool, + ) -> anyhow::Result { + super::PreparedQueries::prepare_batch(session, query, cfg, consistency, idempotent, logged) + .await + } + + /// Executes a select query with the given parameters. + /// + /// Returns an iterator that iterates over all the result pages that the query + /// returns. + pub(crate) async fn execute_iter( + &self, session: Arc, select_query: PreparedSelectQuery, + ) -> anyhow::Result { + let prepared_stmt = match select_query { + PreparedSelectQuery::TxoAda => &self.select_txo_ada, + PreparedSelectQuery::TxoAssets => &self.select_txo_assets, + PreparedSelectQuery::UnstakedTxoAda => &self.select_unstaked_txo_ada, + PreparedSelectQuery::UnstakedTxoAsset => &self.select_unstaked_txo_assets, + PreparedSelectQuery::Txi => &self.select_txi_by_hash, + PreparedSelectQuery::StakeRegistration => &self.select_stake_registration, + PreparedSelectQuery::Cip36Registration => &self.select_cip36_registration, + PreparedSelectQuery::Cip36RegistrationInvalid => { + &self.select_cip36_registration_invalid + }, + PreparedSelectQuery::Cip36RegistrationForVoteKey => { + &self.select_cip36_registration_for_vote_key + }, + PreparedSelectQuery::Rbac509 => &self.select_rbac509_registration, + PreparedSelectQuery::ChainRootForTxnId => &self.select_chain_root_for_txn_id, + PreparedSelectQuery::ChainRootForRole0Key => &self.select_chain_root_for_role0_key, + PreparedSelectQuery::ChainRootForStakeAddress => { + &self.select_chain_root_for_stake_address + }, + }; + + super::session_execute_iter(session, prepared_stmt, NO_PARAMS).await + } + + /// Execute a purge query with the given parameters. + pub(crate) async fn execute_batch( + &self, session: Arc, cfg: Arc, query: PreparedDeleteQuery, + values: Vec, + ) -> FallibleQueryResults { + let query_map = match query { + PreparedDeleteQuery::TxoAda => &self.delete_txo_ada, + PreparedDeleteQuery::TxoAssets => &self.delete_txo_assets, + PreparedDeleteQuery::UnstakedTxoAda => &self.delete_unstaked_txo_ada, + PreparedDeleteQuery::UnstakedTxoAsset => &self.delete_unstaked_txo_assets, + PreparedDeleteQuery::Txi => &self.delete_txi_by_hash, + PreparedDeleteQuery::StakeRegistration => &self.delete_stake_registration, + PreparedDeleteQuery::Cip36Registration => &self.delete_cip36_registration, + PreparedDeleteQuery::Cip36RegistrationInvalid => { + &self.delete_cip36_registration_invalid + }, + PreparedDeleteQuery::Cip36RegistrationForVoteKey => { + &self.delete_cip36_registration_for_vote_key + }, + PreparedDeleteQuery::Rbac509 => &self.delete_rbac509_registration, + PreparedDeleteQuery::ChainRootForTxnId => &self.delete_chain_root_for_txn_id, + PreparedDeleteQuery::ChainRootForRole0Key => &self.delete_chain_root_for_role0_key, + PreparedDeleteQuery::ChainRootForStakeAddress => { + &self.delete_chain_root_for_stake_address + }, + }; + + super::session_execute_batch(session, query_map, cfg, query, values).await + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs new file mode 100644 index 00000000000..4ca10bb3a55 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs @@ -0,0 +1,136 @@ +//! RBAC 509 Registration Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for RBAC 509 registration purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16, Vec, Vec); +} + +/// Select primary keys for RBAC 509 registration. +const SELECT_QUERY: &str = include_str!("./cql/get_rbac509_registration.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Chain Root - Binary 32 bytes. + pub(crate) chain_root: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, + /// Transaction Hash ID - Binary 32 bytes. + transaction_id: Vec, + /// `UUIDv4` Purpose - Binary 16 bytes. + purpose: Vec, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("chain_root", &self.chain_root) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .field("transaction_id", &self.transaction_id) + .field("purpose", &self.purpose) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + chain_root: value.0, + slot_no: value.1, + txn: value.2, + transaction_id: value.3, + purpose: value.4, + } + } +} +/// Get primary key for RBAC 509 registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all RBAC 509 registration primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all RBAC 509 registration primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::Rbac509) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete RBAC 509 registration +const DELETE_QUERY: &str = include_str!("./cql/delete_rbac509_registration.cql"); + +/// Delete RBAC 509 registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::Rbac509, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs new file mode 100644 index 00000000000..ad5e2270c6f --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs @@ -0,0 +1,128 @@ +//! Stake Registration Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for Stake Registration purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16); +} + +/// Select primary keys for Stake Registration. +const SELECT_QUERY: &str = include_str!("./cql/get_stake_registration.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_hash: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_hash", &self.stake_hash) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_hash: value.0, + slot_no: value.1, + txn: value.2, + } + } +} +/// Get primary key for Stake Registration query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all Stake Registration primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get Stake Registration primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all Stake Registration primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::StakeRegistration) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete Stake Registration +const DELETE_QUERY: &str = include_str!("./cql/delete_stake_registration.cql"); + +/// Delete Stake Registration Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::StakeRegistration, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs new file mode 100644 index 00000000000..002f8be8a5c --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs @@ -0,0 +1,124 @@ +//! Transaction Inputs (ADA or a native asset) queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for TXI by hash purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, i16, num_bigint::BigInt); +} + +/// Select primary keys for TXI by hash. +const SELECT_QUERY: &str = include_str!("./cql/get_txi_by_txn_hashes.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// 32 byte hash of this transaction. + pub(crate) txn_hash: Vec, + /// Transaction Output Offset inside the transaction. + pub(crate) txo: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("txn_hash", &hex::encode(&self.txn_hash)) + .field("txo", &self.txo) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + txn_hash: value.0, + txo: value.1, + } + } +} +/// Get primary key for TXI by hash query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all TXI by hash primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get TXI by hash primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all TXI by hash primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::Txi) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete TXI by hash Query +const DELETE_QUERY: &str = include_str!("./cql/delete_txi_by_txn_hashes.cql"); + +/// Delete TXI by hash Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::Txi, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs new file mode 100644 index 00000000000..e2ae31d8637 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs @@ -0,0 +1,132 @@ +//! TXO by Stake Address Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for TXO by Stake Address purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16, i16); +} + +/// Select primary keys for TXO by Stake Address. +const SELECT_QUERY: &str = include_str!("./cql/get_txo_by_stake_address.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_address: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, + /// Transaction Output Offset inside the transaction. + pub(crate) txo: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_address", &self.stake_address) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .field("txo", &self.txo) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_address: value.0, + slot_no: value.1, + txn: value.2, + txo: value.3, + } + } +} +/// Get primary key for TXO by Stake Address query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all TXO by stake address primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get TXO by stake address primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all TXO by stake address primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::TxoAda) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete TXO by Stake Address +const DELETE_QUERY: &str = include_str!("./cql/delete_txo_by_stake_address.cql"); + +/// Delete TXO by Stake Address Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::TxoAda, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs new file mode 100644 index 00000000000..50df27451aa --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs @@ -0,0 +1,141 @@ +//! TXO Assets by Stake Address Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for TXO Assets by Stake Address purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16, i16, Vec, String); +} + +/// Select primary keys for TXO Assets by Stake Address. +const SELECT_QUERY: &str = include_str!("./cql/get_txo_by_stake_address.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_address: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, + /// Transaction Output Offset inside the transaction. + pub(crate) txo: i16, + /// Asset Policy Hash - Binary 28 bytes. + policy_id: Vec, + /// Name of the Policy (UTF8) + // TODO: https://github.com/input-output-hk/catalyst-voices/issues/1121 + policy_name: String, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_address", &self.stake_address) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .field("txo", &self.txo) + .field("policy_id", &self.policy_id) + .field("policy_name", &self.policy_name) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_address: value.0, + slot_no: value.1, + txn: value.2, + txo: value.3, + policy_id: value.4, + policy_name: value.5, + } + } +} +/// Get primary key for TXO Assets by Stake Address query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all TXO Assets by stake address primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all TXO Assets by stake address primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::TxoAssets) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete TXO Assets by Stake Address +const DELETE_QUERY: &str = include_str!("./cql/delete_txo_by_stake_address.cql"); + +/// Delete TXO Assets by Stake Address Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::TxoAssets, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs new file mode 100644 index 00000000000..99c4d5da22f --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs @@ -0,0 +1,125 @@ +//! Unstaked Transaction Outputs (ADA), by their transaction hash, queries used in purging +//! data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for Unstaked TXO ADA purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, i16, num_bigint::BigInt); +} + +/// Select primary keys for Unstaked TXO ADA. +const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_by_txn_hash.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// 32 byte hash of this transaction. + pub(crate) txn_hash: Vec, + /// Transaction Output Offset inside the transaction. + pub(crate) txo: i16, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("txn_hash", &self.txn_hash) + .field("txo", &self.txo) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + txn_hash: value.0, + txo: value.1, + } + } +} +/// Get primary key for Unstaked TXO ADA query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all Unstaked TXO ADA primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get Unstaked TXO ADA primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all Unstaked TXO ADA primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::UnstakedTxoAda) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete Unstaked TXO by Stake Address +const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_by_txn_hash.cql"); + +/// Delete TXO by Stake Address Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::UnstakedTxoAda, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs new file mode 100644 index 00000000000..89f277056ea --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs @@ -0,0 +1,141 @@ +//! TXO Assets by TXN Hash Queries used in purging data. +use std::{fmt::Debug, sync::Arc}; + +use scylla::{ + prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, + Session, +}; +use tracing::error; + +use crate::{ + db::index::{ + queries::{ + purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, + FallibleQueryResults, SizedBatch, + }, + session::CassandraSession, + }, + settings::cassandra_db, +}; + +pub(crate) mod result { + //! Return values for TXO Assets by TXN Hash purge queries. + + /// Primary Key Row + pub(crate) type PrimaryKey = (Vec, num_bigint::BigInt, i16, i16, Vec, String); +} + +/// Select primary keys for TXO Assets by TXN Hash. +const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_assets_by_txn_hash.cql"); + +/// Primary Key Value. +#[derive(SerializeRow)] +pub(crate) struct Params { + /// Stake Address - Binary 28 bytes. 0 bytes = not staked. + pub(crate) stake_address: Vec, + /// Block Slot Number + pub(crate) slot_no: num_bigint::BigInt, + /// Transaction Offset inside the block. + pub(crate) txn: i16, + /// Transaction Output Offset inside the transaction. + pub(crate) txo: i16, + /// Asset Policy Hash - Binary 28 bytes. + policy_id: Vec, + /// Name of the Policy (UTF8) + // TODO: https://github.com/input-output-hk/catalyst-voices/issues/1121 + policy_name: String, +} + +impl Debug for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Params") + .field("stake_address", &self.stake_address) + .field("slot_no", &self.slot_no) + .field("txn", &self.txn) + .field("txo", &self.txo) + .field("policy_id", &self.policy_id) + .field("policy_name", &self.policy_name) + .finish() + } +} + +impl From for Params { + fn from(value: result::PrimaryKey) -> Self { + Self { + stake_address: value.0, + slot_no: value.1, + txn: value.2, + txo: value.3, + policy_id: value.4, + policy_name: value.5, + } + } +} +/// Get primary key for TXO Assets by TXN Hash query. +pub(crate) struct PrimaryKeyQuery; + +impl PrimaryKeyQuery { + /// Prepares a query to get all TXO Assets by TXN Hash primary keys. + pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + let select_primary_key = PreparedQueries::prepare( + session.clone(), + SELECT_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = select_primary_key { + error!(error=%error, "Failed to prepare get TXO Assets by TXN Hash primary key query"); + }; + + select_primary_key + } + + /// Executes a query to get all TXO Assets by TXN Hash primary keys. + pub(crate) async fn execute( + session: &CassandraSession, + ) -> anyhow::Result> { + let iter = session + .purge_execute_iter(PreparedSelectQuery::UnstakedTxoAsset) + .await? + .rows_stream::()?; + + Ok(iter) + } +} + +/// Delete TXO Assets by TXN Hash +const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_assets_by_txn_hash.cql"); + +/// Delete TXO Assets by TXN Hash Query +pub(crate) struct DeleteQuery; + +impl DeleteQuery { + /// Prepare Batch of Delete Queries + pub(crate) async fn prepare_batch( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result { + let delete_queries = PreparedQueries::prepare_batch( + session.clone(), + DELETE_QUERY, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await?; + Ok(delete_queries) + } + + /// Executes a DELETE Query + pub(crate) async fn execute( + session: &CassandraSession, params: Vec, + ) -> FallibleQueryResults { + let results = session + .purge_execute_batch(PreparedDeleteQuery::UnstakedTxoAsset, params) + .await?; + + Ok(results) + } +} diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_role0_key.cql b/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_role0_key.cql index 6d5ff854621..fb34c2a4131 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_role0_key.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_role0_key.cql @@ -1,9 +1,11 @@ --- Index of Chain Root For Role0 Key. RBAC 509 registrations. +-- Index of Chain Root For Role0 Key. RBAC 509 registrations. CREATE TABLE IF NOT EXISTS chain_root_for_role0_key ( -- Primary Key Data role0_key blob, -- 16 Bytes of Role0 Key. slot_no varint, -- slot number when the key_was_registered. txn smallint, -- Index of the TX which holds the registration data. + + -- Non-primary Key Data chain_root blob, -- 32 Bytes of Chain Root. PRIMARY KEY (role0_key, slot_no, txn) diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_stake_addr.cql b/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_stake_addr.cql index 0bf88a7321e..e1f23ac0b8f 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_stake_addr.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_stake_addr.cql @@ -1,9 +1,11 @@ --- Index of Chain Root For Stake Address. RBAC 509 registrations. +-- Index of Chain Root For Stake Address (RBAC 509 registrations). CREATE TABLE IF NOT EXISTS chain_root_for_stake_addr ( -- Primary Key Data stake_addr blob, -- 32 Bytes of Stake Address. slot_no varint, -- slot number when the key_was_registered. txn smallint, -- Index of the TX which holds the registration data. + + -- Non-primary Key Data chain_root blob, -- 32 Bytes of Chain Root. PRIMARY KEY (stake_addr, slot_no, txn) diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_txn_id.cql b/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_txn_id.cql index 4c8324b0883..a051b76d76e 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_txn_id.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/chain_root_for_txn_id.cql @@ -1,4 +1,4 @@ --- Index of Chain Root For TX ID. RBAC 509 registrations. +-- Index of Chain Root For TX ID (RBAC 509 registrations). CREATE TABLE IF NOT EXISTS chain_root_for_txn_id ( -- Primary Key Data transaction_id blob, -- 32 Bytes of Transaction Hash. diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/cip36_registration_for_vote_key.cql b/catalyst-gateway/bin/src/db/index/schema/cql/cip36_registration_for_vote_key.cql index c3ba5f6dfce..066245e2672 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/cip36_registration_for_vote_key.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/cip36_registration_for_vote_key.cql @@ -1,5 +1,5 @@ --- Index of CIP-36 registrations searchable by Stake Address. --- Full registration data needs to be queried from the man cip36 registration tables. +-- Index of CIP-36 registrations searchable by Vote Key. +-- Full registration data needs to be queried from the main CIP-36 registration tables. -- Includes both Valid and Invalid registrations. CREATE TABLE IF NOT EXISTS cip36_registration_for_vote_key ( -- Primary Key Data diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/unstaked_txo_assets_by_txn_hash.cql b/catalyst-gateway/bin/src/db/index/schema/cql/unstaked_txo_assets_by_txn_hash.cql index 4628e236ec4..c3dffcfbd75 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/unstaked_txo_assets_by_txn_hash.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/unstaked_txo_assets_by_txn_hash.cql @@ -1,4 +1,4 @@ --- Transaction Outputs (Native Assets) per stake address. +-- Transaction Outputs (Native Assets) by their transaction hash. CREATE TABLE IF NOT EXISTS unstaked_txo_assets_by_txn_hash ( -- Primary Key Fields txn_hash blob, -- 32 byte hash of this transaction. diff --git a/catalyst-gateway/bin/src/db/index/session.rs b/catalyst-gateway/bin/src/db/index/session.rs index 6193a628dac..a5531664984 100644 --- a/catalyst-gateway/bin/src/db/index/session.rs +++ b/catalyst-gateway/bin/src/db/index/session.rs @@ -17,15 +17,13 @@ use tracing::{error, info}; use super::{ queries::{ + purge::{self, PreparedDeleteQuery}, FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery, PreparedUpsertQuery, }, schema::create_schema, }; -use crate::{ - db::index::queries, - settings::{cassandra_db, Settings}, -}; +use crate::settings::{cassandra_db, Settings}; /// Configuration Choices for compression #[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)] @@ -63,6 +61,8 @@ pub(crate) struct CassandraSession { session: Arc, /// All prepared queries we can use on this session. queries: Arc, + /// All prepared purge queries we can use on this session. + purge_queries: Arc, } /// Persistent DB Session. @@ -76,8 +76,10 @@ impl CassandraSession { pub(crate) fn init() { let (persistent, volatile) = Settings::cassandra_db_cfg(); - let _join_handle = tokio::task::spawn(async move { retry_init(persistent, true).await }); - let _join_handle = tokio::task::spawn(async move { retry_init(volatile, false).await }); + let _join_handle = + tokio::task::spawn(async move { Box::pin(retry_init(persistent, true)).await }); + let _join_handle = + tokio::task::spawn(async move { Box::pin(retry_init(volatile, false)).await }); } /// Check to see if the Cassandra Indexing DB is ready for use @@ -147,6 +149,48 @@ impl CassandraSession { queries.execute_upsert(session, query, value).await } + /// Execute a purge query with the given parameters. + /// + /// Values should be a Vec of values which implement `SerializeRow` and they MUST be + /// the same, and must match the query being executed. + /// + /// This will divide the batch into optimal sized chunks and execute them until all + /// values have been executed or the first error is encountered. + /// + /// NOTE: This is currently only used to purge volatile data. + pub(crate) async fn purge_execute_batch( + &self, query: PreparedDeleteQuery, values: Vec, + ) -> FallibleQueryResults { + // Only execute purge queries on the volatile session + let persistent = false; + let Some(volatile_db) = Self::get(persistent) else { + // This should never happen + anyhow::bail!("Volatile DB Session not found"); + }; + let cfg = self.cfg.clone(); + let queries = self.purge_queries.clone(); + let session = volatile_db.session.clone(); + + queries.execute_batch(session, cfg, query, values).await + } + + /// Execute a select query to gather primary keys for purging. + pub(crate) async fn purge_execute_iter( + &self, query: purge::PreparedSelectQuery, + ) -> anyhow::Result { + // Only execute purge queries on the volatile session + let persistent = false; + let Some(volatile_db) = Self::get(persistent) else { + // This should never happen + anyhow::bail!("Volatile DB Session not found"); + }; + let queries = self.purge_queries.clone(); + + queries + .execute_iter(volatile_db.session.clone(), query) + .await + } + /// Get underlying Raw Cassandra Session. pub(crate) fn get_raw_session(&self) -> Arc { self.session.clone() @@ -269,7 +313,7 @@ async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) { continue; } - let queries = match queries::PreparedQueries::new(session.clone(), &cfg).await { + let queries = match PreparedQueries::new(session.clone(), &cfg).await { Ok(queries) => Arc::new(queries), Err(error) => { error!( @@ -281,11 +325,25 @@ async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) { }, }; + let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await + { + Ok(queries) => Arc::new(queries), + Err(error) => { + error!( + db_type = db_type, + error = %error, + "Failed to Create Cassandra Prepared Purge Queries" + ); + continue; + }, + }; + let cassandra_session = CassandraSession { persistent, cfg: Arc::new(cfg), session, queries, + purge_queries, }; // Save the session so we can execute queries on the DB diff --git a/catalyst-gateway/bin/src/settings/mod.rs b/catalyst-gateway/bin/src/settings/mod.rs index 6be03945050..5705246c00a 100644 --- a/catalyst-gateway/bin/src/settings/mod.rs +++ b/catalyst-gateway/bin/src/settings/mod.rs @@ -56,6 +56,9 @@ const CHECK_CONFIG_TICK_DEFAULT: &str = "5s"; const EVENT_DB_URL_DEFAULT: &str = "postgresql://postgres:postgres@localhost/catalyst_events?sslmode=disable"; +/// Default number of slots used as overlap when purging Live Index data. +const PURGE_SLOT_BUFFER_DEFAULT: u64 = 100; + /// Hash the Public IPv4 and IPv6 address of the machine, and convert to a 128 bit V4 /// UUID. fn calculate_service_uuid() -> String { @@ -144,6 +147,9 @@ struct EnvVars { /// Tick every N seconds until config exists in db #[allow(unused)] check_config_tick: Duration, + + /// Slot buffer used as overlap when purging Live Index data. + purge_slot_buffer: u64, } // Lazy initialization of all env vars which are not command line parameters. @@ -169,6 +175,8 @@ static ENV_VARS: LazyLock = LazyLock::new(|| { Duration::from_secs(5) }, }; + let purge_slot_buffer = + StringEnvVar::new_as("PURGE_SLOT_BUFFER", PURGE_SLOT_BUFFER_DEFAULT, 0, u64::MAX); EnvVars { github_repo_owner: StringEnvVar::new("GITHUB_REPO_OWNER", GITHUB_REPO_OWNER_DEFAULT.into()), @@ -195,6 +203,7 @@ static ENV_VARS: LazyLock = LazyLock::new(|| { chain_follower: chain_follower::EnvVars::new(), internal_api_key: StringEnvVar::new_optional("INTERNAL_API_KEY", true), check_config_tick, + purge_slot_buffer, } }); @@ -370,6 +379,11 @@ impl Settings { false } } + + /// Slot buffer used as overlap when purging Live Index data. + pub(crate) fn purge_slot_buffer() -> u64 { + ENV_VARS.purge_slot_buffer + } } /// Transform a string list of host names into a vec of host names.