diff --git a/consumer/src/backend.rs b/consumer/src/backend.rs index d24ff60..8a218f7 100644 --- a/consumer/src/backend.rs +++ b/consumer/src/backend.rs @@ -4,7 +4,7 @@ use holaplex_hub_nfts_solana_core::proto::{ }; use holaplex_hub_nfts_solana_entity::{collection_mints, collections, update_revisions}; use hub_core::prelude::*; -use solana_program::pubkey::Pubkey; +use solana_program::{hash::Hash, pubkey::Pubkey}; #[derive(Clone)] pub struct MasterEditionAddresses { pub metadata: Pubkey, @@ -77,6 +77,8 @@ pub struct TransferAssetAddresses { /// Represents a response from a transaction on the blockchain. This struct /// provides the serialized message and the signatures of the signed message. + +#[derive(Clone)] pub struct TransactionResponse { /// The serialized version of the message from the transaction. pub serialized_message: Vec, @@ -138,8 +140,12 @@ pub trait CollectionBackend { #[async_trait] pub trait MintBackend { - async fn mint(&self, collection: &collections::Model, txn: T) - -> Result>; + async fn mint( + &self, + collection: &collections::Model, + blockhash: Option, + txn: T, + ) -> Result>; } #[async_trait] @@ -147,6 +153,7 @@ pub trait TransferBackend { async fn transfer( &self, collection_mint: &M, + txn: TransferMetaplexAssetTransaction, ) -> Result>; } diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 714d73a..d068e28 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -9,7 +9,8 @@ use holaplex_hub_nfts_solana_core::{ MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction, MintMetaplexMetadataTransaction, SolanaCompletedMintTransaction, SolanaCompletedTransferTransaction, SolanaCompletedUpdateTransaction, - SolanaFailedTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, + SolanaFailedTransaction, SolanaMintOpenDropBatchedPayload, SolanaMintPendingTransactions, + SolanaMintTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, SolanaTransactionFailureReason, SwitchCollectionPayload, TransferMetaplexAssetTransaction, UpdateSolanaMintPayload, }, @@ -20,6 +21,7 @@ use holaplex_hub_nfts_solana_entity::{ collection_mints, collections, compression_leafs, update_revisions, }; use hub_core::{ + backon::{ExponentialBuilder, Retryable}, chrono::Utc, metrics::KeyValue, prelude::*, @@ -29,6 +31,7 @@ use hub_core::{ uuid, uuid::Uuid, }; +use solana_client::client_error::ClientError; use solana_program::pubkey::{ParsePubkeyError, Pubkey}; use solana_sdk::signature::Signature; @@ -39,6 +42,7 @@ use crate::{ }, metrics::Metrics, solana::{CompressedRef, EditionRef, Solana, SolanaAssetIdError, UncompressedRef}, + with_retry, }; #[derive(Debug, thiserror::Error, Triage)] @@ -126,6 +130,7 @@ pub enum EventKind { UpdateOpenDrop, RetryCreateOpenDrop, RetryMintOpenDrop, + MintOpenDropBatched, } impl EventKind { @@ -150,6 +155,7 @@ impl EventKind { Self::UpdateOpenDrop => "open drop update", Self::RetryCreateOpenDrop => "open drop creation retry", Self::RetryMintOpenDrop => "open drop mint retry", + Self::MintOpenDropBatched => "open drop mint batch", } } @@ -190,6 +196,7 @@ impl EventKind { SolanaNftEvent::RetryCreateOpenDropSigningRequested(tx) }, EventKind::RetryMintOpenDrop => SolanaNftEvent::RetryMintOpenDropSigningRequested(tx), + EventKind::MintOpenDropBatched => unreachable!(), } } @@ -390,6 +397,7 @@ impl EventKind { address: collection_mint.mint, }) }, + Self::MintOpenDropBatched => unreachable!(), }) } @@ -414,6 +422,7 @@ impl EventKind { Self::UpdateOpenDrop => SolanaNftEvent::UpdateOpenDropFailed(tx), Self::RetryCreateOpenDrop => SolanaNftEvent::RetryCreateOpenDropFailed(tx), Self::RetryMintOpenDrop => SolanaNftEvent::RetryMintOpenDropFailed(tx), + Self::MintOpenDropBatched => unreachable!(), } } } @@ -625,6 +634,15 @@ impl Processor { ) .await }, + Some(NftEvent::SolanaMintOpenDropBatched(payload)) => { + self.process_mint_batch(&key, payload).await.map_err(|e| { + ProcessorError::new( + e, + EventKind::MintOpenDropBatched, + ErrorSource::NftFailure, + ) + }) + }, _ => Ok(()), } }, @@ -714,6 +732,142 @@ impl Processor { } } + async fn process_mint_batch( + &self, + key: &SolanaNftEventKey, + payload: SolanaMintOpenDropBatchedPayload, + ) -> ProcessResult<()> { + let conn = self.db.get(); + let producer = self.producer.clone(); + + let collection_id = Uuid::parse_str(&payload.collection_id)?; + let collection = Collection::find_by_id(conn, collection_id) + .await? + .ok_or(ProcessorErrorKind::RecordNotFound)?; + + let signers_pubkeys = vec![ + self.solana().treasury_wallet().to_string(), + collection.owner.clone(), + ]; + let blockhash = with_retry!(self.solana().rpc().get_latest_blockhash()) + .await + .context("blockhash not found") + .map_err(ProcessorErrorKind::Solana)?; + + let send_event = |mint_transactions: Vec, + signers_pubkeys: Vec| async { + producer + .send( + Some(&SolanaNftEvents { + event: Some(SolanaNftEvent::MintOpenDropBatchedSigningRequested( + SolanaMintPendingTransactions { + signers_pubkeys, + mint_transactions, + }, + )), + }), + Some(key), + ) + .await + }; + + if payload.compressed { + let backend = &CompressedRef(self.solana()); + let mut leafs: Vec = Vec::new(); + let mut mint_transactions = Vec::new(); + + for mint_tx in payload.mint_open_drop_transactions.clone() { + let id = Uuid::from_str(&mint_tx.mint_id)?; + + let tx = backend + .mint( + &collection, + Some(blockhash), + MintMetaplexMetadataTransaction { + recipient_address: mint_tx.recipient_address, + metadata: mint_tx.metadata, + collection_id: payload.collection_id.clone(), + compressed: payload.compressed, + }, + ) + .await + .map_err(ProcessorErrorKind::Solana)?; + + mint_transactions.push(SolanaMintTransaction { + serialized_message: tx.serialized_message, + mint_id: mint_tx.mint_id, + signer_signature: None, + }); + + let compression_leaf = compression_leafs::Model { + id, + collection_id: collection.id, + merkle_tree: tx.addresses.merkle_tree.to_string(), + tree_authority: tx.addresses.tree_authority.to_string(), + tree_delegate: tx.addresses.tree_delegate.to_string(), + leaf_owner: tx.addresses.leaf_owner.to_string(), + created_at: Utc::now().naive_utc(), + ..Default::default() + }; + + leafs.push(compression_leaf.into()); + } + + compression_leafs::Entity::insert_many(leafs) + .exec(conn) + .await?; + + send_event(mint_transactions, signers_pubkeys).await?; + + return Ok(()); + } + + let backend = &UncompressedRef(self.solana()); + let mut mints: Vec = Vec::new(); + let mut mint_transactions = Vec::new(); + + for mint_tx in payload.mint_open_drop_transactions.clone() { + let id = Uuid::from_str(&mint_tx.mint_id)?; + let tx = backend + .mint( + &collection, + Some(blockhash), + MintMetaplexMetadataTransaction { + recipient_address: mint_tx.recipient_address, + metadata: mint_tx.metadata, + collection_id: payload.collection_id.clone(), + compressed: payload.compressed, + }, + ) + .await + .map_err(ProcessorErrorKind::Solana)?; + + mint_transactions.push(SolanaMintTransaction { + serialized_message: tx.serialized_message, + mint_id: mint_tx.mint_id, + signer_signature: tx.signatures_or_signers_public_keys.get(1).cloned(), + }); + + let collection_mint = collection_mints::Model { + id, + collection_id: collection.id, + owner: tx.addresses.recipient.to_string(), + mint: tx.addresses.mint.to_string(), + created_at: Utc::now().naive_utc(), + associated_token_account: tx.addresses.associated_token_account.to_string(), + }; + + mints.push(collection_mint.into()); + } + + collection_mints::Entity::insert_many(mints) + .exec(conn) + .await?; + + send_event(mint_transactions, signers_pubkeys).await?; + Ok(()) + } + async fn process_nft( &self, kind: EventKind, @@ -880,7 +1034,7 @@ impl Processor { let backend = &CompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -911,7 +1065,7 @@ impl Processor { let backend = &UncompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -951,7 +1105,7 @@ impl Processor { .ok_or(ProcessorErrorKind::RecordNotFound)?; let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -1165,7 +1319,7 @@ impl Processor { let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?; let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -1205,7 +1359,7 @@ impl Processor { let backend = &CompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -1228,7 +1382,7 @@ impl Processor { let backend = &UncompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 3956ee8..768085c 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -65,7 +65,7 @@ use crate::{ UpdateMasterEditionAddresses, }, }; - +#[macro_export] macro_rules! with_retry { ($expr:expr) => {{ (|| async { $expr.await }) @@ -81,7 +81,7 @@ macro_rules! with_retry { }) }}; } - +pub use with_retry; const TOKEN_PROGRAM_PUBKEY: &str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"; #[derive(Debug, clap::Args)] @@ -203,6 +203,10 @@ impl Solana { self.rpc_client.clone() } + pub fn treasury_wallet(&self) -> Pubkey { + self.treasury_wallet_address + } + /// Res /// /// # Errors @@ -756,6 +760,7 @@ impl<'a> MintBackend for E async fn mint( &self, collection: &collections::Model, + blockhash: Option, txn: MintMetaplexEditionTransaction, ) -> hub_core::prelude::Result> { let rpc = &self.0.rpc_client; @@ -837,7 +842,7 @@ impl<'a> MintBackend for E edition, )); - let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; + let blockhash = blockhash.unwrap_or(with_retry!(rpc.get_latest_blockhash()).await?); let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -1036,6 +1041,7 @@ impl<'a> MintBackend, txn: MintMetaplexMetadataTransaction, ) -> hub_core::prelude::Result> { let MintMetaplexMetadataTransaction { @@ -1125,8 +1131,8 @@ impl<'a> MintBackend MintBackend async fn mint( &self, collection: &collections::Model, + blockhash: Option, txn: MintMetaplexMetadataTransaction, ) -> hub_core::prelude::Result> { let MintMetaplexMetadataTransaction { @@ -1190,7 +1197,11 @@ impl<'a> MintBackend let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; + let blockhash = if let Some(blockhash) = blockhash { + blockhash + } else { + with_retry!(rpc.get_latest_blockhash()).await? + }; let create_account_ins = solana_program::system_instruction::create_account( &payer, diff --git a/core/proto.lock b/core/proto.lock index 1c4fe4d..cf509b3 100644 --- a/core/proto.lock +++ b/core/proto.lock @@ -1,12 +1,12 @@ [[schemas]] subject = "nfts" -version = 29 -sha512 = "b3b2136bd6c7a136d317da84395661de5fc056e8270510575a3281d78884d99a0d89f444754ed02cb18ad26dcc7cd65300c1df73b9d74d2edc6bcc8d552465d0" +version = 31 +sha512 = "449574f8551ab8c17824af9e08b1658ad1b26ac80340230ddf02e7a1e0979d8a47025913a6598799cf83dd1a9cda87697ee87a13f404ebb52c95ea0084205767" [[schemas]] subject = "solana_nfts" -version = 11 -sha512 = "967fefde938a0f6ce05194e4fca15673e681caac54d8aeec114c5d38418632b9696dbaf5362345a15114e5abb49de55d0af8b9edcc0f2c91f9ef1ccc4ff55d68" +version = 12 +sha512 = "4f85496c50a82cb40faa097cf6d0cb23275b3b90cb561d01388f3e5a71282a8b8e1eea617b7d712b0e415d65af483209fac2db1591456fa814a1f41a1c457433" [[schemas]] subject = "treasury" diff --git a/core/proto.toml b/core/proto.toml index 62864f2..c784d59 100644 --- a/core/proto.toml +++ b/core/proto.toml @@ -2,6 +2,6 @@ endpoint = "https://schemas.holaplex.tools" [schemas] -nfts = 29 +nfts = 31 treasury = 23 -solana_nfts = 11 \ No newline at end of file +solana_nfts = 12 \ No newline at end of file