diff --git a/Cargo.lock b/Cargo.lock index 311fea06c..fef7d5510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8737,6 +8737,7 @@ dependencies = [ "frame-benchmarking", "frame-support", "frame-system", + "hex", "log", "multihash-codetable", "pallet-balances", @@ -9497,8 +9498,10 @@ dependencies = [ "cid 0.11.1", "clap", "futures", + "hex", "jsonrpsee 0.23.2", "mater", + "parity-scale-codec", "polka-storage-proofs", "polka-storage-provider-common", "primitives-commitment", diff --git a/cli/artifacts/metadata.scale b/cli/artifacts/metadata.scale index dbce6c54b..62c06839d 100644 Binary files a/cli/artifacts/metadata.scale and b/cli/artifacts/metadata.scale differ diff --git a/cli/polka-storage-provider/server/Cargo.toml b/cli/polka-storage-provider/server/Cargo.toml index 2f02f2f67..13c67e25b 100644 --- a/cli/polka-storage-provider/server/Cargo.toml +++ b/cli/polka-storage-provider/server/Cargo.toml @@ -10,7 +10,7 @@ version = "0.1.0" [dependencies] # "Homegrown" crates mater = { workspace = true } -polka-storage-proofs = { workspace = true, features = ["std"] } +polka-storage-proofs = { workspace = true, features = ["std", "substrate"] } polka-storage-provider-common = { workspace = true } primitives-commitment = { workspace = true, features = ["serde", "std"] } primitives-proofs = { workspace = true, features = ["clap"] } @@ -20,7 +20,9 @@ async-trait = { workspace = true } axum = { workspace = true, features = ["macros", "multipart"] } cid = { workspace = true, features = ["serde", "std"] } clap = { workspace = true, features = ["derive"] } +codec = { workspace = true } futures = { workspace = true } +hex = { workspace = true, features = ["std"] } jsonrpsee = { workspace = true, features = ["http-client", "macros", "server", "ws-client"] } rand = { workspace = true } rocksdb = { workspace = true } diff --git a/cli/polka-storage-provider/server/src/db.rs b/cli/polka-storage-provider/server/src/db.rs index 18911ff35..705a30b3a 100644 --- a/cli/polka-storage-provider/server/src/db.rs +++ b/cli/polka-storage-provider/server/src/db.rs @@ -5,10 +5,9 @@ use std::{ use primitives_proofs::SectorNumber; use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options as DBOptions, DB as RocksDB}; +use serde::{de::DeserializeOwned, Serialize}; use storagext::types::market::{ConversionError, DealProposal}; -use crate::pipeline::types::Sector; - #[derive(Debug, thiserror::Error)] pub enum DBError { #[error(transparent)] @@ -25,6 +24,9 @@ pub enum DBError { #[error(transparent)] Json(#[from] serde_json::Error), + + #[error("unexpected data when trying to serialize given sector type: {0}")] + InvalidSectorData(serde_json::Error), } const ACCEPTED_DEAL_PROPOSALS_CF: &str = "accepted_deal_proposals"; @@ -118,25 +120,30 @@ impl DealDB { )?) } - pub fn get_sector(&self, sector_id: SectorNumber) -> Result, DBError> { + pub fn get_sector( + &self, + sector_number: SectorNumber, + ) -> Result, DBError> { let Some(sector_slice) = self .database - .get_pinned_cf(self.cf_handle(SECTORS_CF), sector_id.to_le_bytes())? + .get_pinned_cf(self.cf_handle(SECTORS_CF), sector_number.to_le_bytes())? else { return Ok(None); }; let sector = serde_json::from_reader(sector_slice.as_ref()) - // SAFETY: this should never fail since the API sets a sector - // if this happens, it means that someone wrote it from a side channel - .expect("invalid content was placed in the database from outside this API"); + .map_err(|e| DBError::InvalidSectorData(e))?; Ok(Some(sector)) } - pub fn save_sector(&self, sector: &Sector) -> Result<(), DBError> { + pub fn save_sector( + &self, + sector_number: SectorNumber, + sector: &SectorType, + ) -> Result<(), DBError> { let cf_handle = self.cf_handle(SECTORS_CF); - let key = sector.sector_number.to_le_bytes(); + let key = sector_number.to_le_bytes(); let json = serde_json::to_vec(§or)?; self.database.put_cf(cf_handle, key, json)?; diff --git a/cli/polka-storage-provider/server/src/main.rs b/cli/polka-storage-provider/server/src/main.rs index 379798477..b7d4c44a9 100644 --- a/cli/polka-storage-provider/server/src/main.rs +++ b/cli/polka-storage-provider/server/src/main.rs @@ -11,6 +11,7 @@ use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Durati use clap::Parser; use pipeline::types::PipelineMessage; +use polka_storage_proofs::porep::{self, PoRepParameters}; use polka_storage_provider_common::rpc::ServerInfo; use primitives_proofs::{RegisteredPoStProof, RegisteredSealProof}; use rand::Rng; @@ -118,6 +119,9 @@ pub enum ServerError { #[error("proof sectors sizes do not match")] SectorSizeMismatch, + #[error("failed to load PoRep parameters from: {0}, because: {1}")] + InvalidPoRepParameters(std::path::PathBuf, porep::PoRepError), + #[error("FromEnv error: {0}")] EnvFilter(#[from] tracing_subscriber::filter::FromEnvError), @@ -194,13 +198,22 @@ pub struct ServerArguments { /// Proof of Spacetime proof type. #[arg(long)] post_proof: RegisteredPoStProof, + + /// Proving Parameters for PoRep proof, corresponding to given `seal_proof` sector size. + /// They are shared across all of the nodes in the network, as the chain stores corresponding Verifying Key parameters. + /// Shared parameters available to get in the [root repo](http://github.com/eigerco/polka-storage/README.md#Parameters). + /// + /// Testing/temporary parameters can be also generated via `polka-storage-provider-client proofs porep-params` command. + /// Note that when you generate keys, for local testnet, + /// **they need to be set** via an extrinsic pallet-proofs::set_porep_verifyingkey. + #[arg(long)] + porep_parameters: PathBuf, } /// A valid server configuration. To be created using [`ServerConfiguration::try_from`]. /// /// The main difference to [`Server`] is that this structure only contains validated and /// ready to use parameters. -#[derive(Debug)] pub struct ServerConfiguration { /// Storage server listen address. upload_listen_address: SocketAddr, @@ -226,6 +239,10 @@ pub struct ServerConfiguration { /// Proof of Spacetime proof type. post_proof: RegisteredPoStProof, + + /// Proving Parameters for PoRep proof. + /// For 2KiB sectors they're ~1GiB of data. + porep_parameters: PoRepParameters, } impl TryFrom for ServerConfiguration { @@ -264,6 +281,9 @@ impl TryFrom for ServerConfiguration { }); std::fs::create_dir_all(&storage_directory)?; + let porep_parameters = porep::load_groth16_parameters(value.porep_parameters.clone()) + .map_err(|e| ServerError::InvalidPoRepParameters(value.porep_parameters, e))?; + Ok(Self { upload_listen_address: value.upload_listen_address, rpc_listen_address: value.rpc_listen_address, @@ -273,6 +293,7 @@ impl TryFrom for ServerConfiguration { storage_directory, seal_proof: value.seal_proof, post_proof: value.post_proof, + porep_parameters, }) } } @@ -395,6 +416,7 @@ impl ServerConfiguration { unsealed_sectors_dir: unsealed_sector_storage_dir, sealed_sectors_dir: sealed_sector_storage_dir, sealing_cache_dir, + porep_parameters: Arc::new(self.porep_parameters), xt_client, xt_keypair: self.multi_pair_signer, pipeline_sender: pipeline_tx, diff --git a/cli/polka-storage-provider/server/src/pipeline/mod.rs b/cli/polka-storage-provider/server/src/pipeline/mod.rs index 2a7b98595..b900f4216 100644 --- a/cli/polka-storage-provider/server/src/pipeline/mod.rs +++ b/cli/polka-storage-provider/server/src/pipeline/mod.rs @@ -3,35 +3,39 @@ pub mod types; use std::{path::PathBuf, sync::Arc}; use polka_storage_proofs::porep::{ - sealer::{prepare_piece, PreCommitOutput, Sealer}, - PoRepError, + sealer::{prepare_piece, BlstrsProof, PreCommitOutput, Sealer, SubstrateProof}, + PoRepError, PoRepParameters, }; use polka_storage_provider_common::rpc::ServerInfo; use primitives_commitment::Commitment; -use primitives_proofs::{derive_prover_id, SectorNumber}; +use primitives_proofs::{ + derive_prover_id, + randomness::{draw_randomness, DomainSeparationTag}, + SectorNumber, +}; use storagext::{ - types::{market::DealProposal, storage_provider::SectorPreCommitInfo}, - StorageProviderClientExt, SystemClientExt, + types::{ + market::DealProposal, + storage_provider::{ProveCommitSector, SectorPreCommitInfo}, + }, + RandomnessClientExt, StorageProviderClientExt, SystemClientExt, }; -use subxt::tx::Signer; +use subxt::{ext::codec::Encode, tx::Signer}; use tokio::{ sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}, task::{JoinError, JoinHandle}, }; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use types::{AddPieceMessage, PipelineMessage, PreCommitMessage}; - -use self::types::Sector; -use crate::{ - db::{DBError, DealDB}, - pipeline::types::SectorState, +use types::{ + AddPieceMessage, PipelineMessage, PreCommitMessage, PreCommittedSector, ProveCommitMessage, + ProvenSector, UnsealedSector, }; -// PLACEHOLDERS!!!!! -// TODO(@th7nder,29/10/2024): get from pallet randomness -const TICKET: [u8; 32] = [12u8; 32]; -// const SEED: [u8; 32] = [13u8; 32]; +use crate::db::{DBError, DealDB}; + const SECTOR_EXPIRATION_MARGIN: u64 = 20; +/// TODO(@th7nder,[#457, #458], 06/11/2024): Blockchain cannot give randomness until block 82. +const MINIMUM_BLOCK_WITH_RANDOMNESS_AVAILABLE: u64 = 82; #[derive(Debug, thiserror::Error)] pub enum PipelineError { @@ -46,7 +50,9 @@ pub enum PipelineError { #[error(transparent)] DBError(#[from] DBError), #[error("sector does not exist")] - NotExistentSector, + SectorNotFound, + #[error("precommit scheduled too early, randomness not available")] + RandomnessNotAvailable, #[error(transparent)] SendError(#[from] SendError), } @@ -57,6 +63,7 @@ pub struct PipelineState { pub unsealed_sectors_dir: Arc, pub sealed_sectors_dir: Arc, pub sealing_cache_dir: Arc, + pub porep_parameters: Arc, pub xt_client: Arc, pub xt_keypair: storagext::multipair::MultiPairSigner, @@ -98,63 +105,97 @@ pub async fn start_pipeline( Ok(()) } -fn process( - tracker: &TaskTracker, - msg: PipelineMessage, - state: Arc, - token: CancellationToken, -) { - match msg { - PipelineMessage::AddPiece(AddPieceMessage { +trait PipelineOperations { + fn add_piece(&self, state: Arc, msg: AddPieceMessage, token: CancellationToken); + fn precommit(&self, state: Arc, msg: PreCommitMessage); + fn prove_commit(&self, state: Arc, msg: ProveCommitMessage); +} + +impl PipelineOperations for TaskTracker { + fn add_piece(&self, state: Arc, msg: AddPieceMessage, token: CancellationToken) { + let AddPieceMessage { deal, published_deal_id, piece_path, piece_cid, - }) => { - tracker.spawn(async move { - tokio::select! { - // AddPiece is cancellation safe, as it can be retried and the state will be fine. - res = add_piece(state, piece_path, piece_cid, deal, published_deal_id) => { - match res { - Ok(_) => tracing::info!("Add Piece for piece {:?}, deal id {}, finished successfully.", piece_cid, published_deal_id), - Err(err) => tracing::error!(%err, "Add Piece for piece {:?}, deal id {}, failed!", piece_cid, published_deal_id), - } - }, - () = token.cancelled() => { - tracing::warn!("AddPiece has been cancelled."); + } = msg; + self.spawn(async move { + tokio::select! { + // AddPiece is cancellation safe, as it can be retried and the state will be fine. + res = add_piece(state, piece_path, piece_cid, deal, published_deal_id) => { + match res { + Ok(_) => tracing::info!("Add Piece for piece {:?}, deal id {}, finished successfully.", piece_cid, published_deal_id), + Err(err) => tracing::error!(%err, "Add Piece for piece {:?}, deal id {}, failed!", piece_cid, published_deal_id), } + }, + () = token.cancelled() => { + tracing::warn!("AddPiece has been cancelled."); } - }); - } - PipelineMessage::PreCommit(PreCommitMessage { sector_number }) => { - tracker.spawn(async move { - // Precommit is not cancellation safe. - // TODO(@th7nder,#501, 04/11/2024): when it's cancelled, it can hang and user will have to wait for it to finish. - // If they don't the state can be corrupted, we could improve that situation. - // One of the ideas is to store state as 'Precommitting' so then we know we can retry that after some time. - match precommit(state, sector_number).await { - Ok(_) => { - tracing::info!( - "Precommit for sector {} finished successfully.", - sector_number - ) - } - Err(err) => { - tracing::error!(%err, "Failed PreCommit for Sector: {}", sector_number) - } + } + }); + } + + fn precommit(&self, state: Arc, msg: PreCommitMessage) { + let PreCommitMessage { sector_number } = msg; + self.spawn(async move { + // Precommit is not cancellation safe. + // TODO(@th7nder,#501, 04/11/2024): when it's cancelled, it can hang and user will have to wait for it to finish. + // If they don't the state can be corrupted, we could improve that situation. + // One of the ideas is to store state as 'Precommitting' so then we know we can retry that after some time. + match precommit(state, sector_number).await { + Ok(_) => { + tracing::info!( + "Precommit for sector {} finished successfully.", + sector_number + ) } - }); - } + Err(err) => { + tracing::error!(%err, "Failed PreCommit for Sector: {}", sector_number) + } + } + }); + } + + fn prove_commit(&self, state: Arc, msg: ProveCommitMessage) { + let ProveCommitMessage { sector_number } = msg; + self.spawn(async move { + // ProveCommit is not cancellation safe. + match prove_commit(state, sector_number).await { + Ok(_) => { + tracing::info!( + "ProveCommit for sector {} finished successfully.", + sector_number + ) + } + Err(err) => { + tracing::error!(%err, "Failed ProveCommit for Sector: {}", sector_number) + } + } + }); } } -async fn find_sector_for_piece(state: &Arc) -> Result { +fn process( + tracker: &TaskTracker, + msg: PipelineMessage, + state: Arc, + token: CancellationToken, +) { + match msg { + PipelineMessage::AddPiece(msg) => tracker.add_piece(state.clone(), msg, token.clone()), + PipelineMessage::PreCommit(msg) => tracker.precommit(state.clone(), msg), + PipelineMessage::ProveCommit(msg) => tracker.prove_commit(state.clone(), msg), + } +} + +async fn find_sector_for_piece( + state: &Arc, +) -> Result { // TODO(@th7nder,30/10/2024): simplification, we're always creating a new sector for storing a piece. // It should not work like that, sectors should be filled with pieces according to *some* algorithm. let sector_number = state.db.next_sector_number(); let unsealed_path = state.unsealed_sectors_dir.join(sector_number.to_string()); - let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string()); - let sector = Sector::create_unsealed(sector_number, unsealed_path, sealed_path).await?; + let sector = UnsealedSector::create(sector_number, unsealed_path).await?; Ok(sector) } @@ -177,7 +218,7 @@ async fn add_piece( tracing::info!("Adding a piece..."); let sealer = Sealer::new(state.server_info.seal_proof); - let handle: JoinHandle> = + let handle: JoinHandle> = tokio::task::spawn_blocking(move || { let unsealed_sector = std::fs::File::options() .append(true) @@ -198,10 +239,10 @@ async fn add_piece( Ok(sector) }); - let sector: Sector = handle.await??; + let sector: UnsealedSector = handle.await??; tracing::info!("Finished adding a piece"); - state.db.save_sector(§or)?; + state.db.save_sector(sector.sector_number, §or)?; // TODO(@th7nder,30/10/2024): simplification, as we're always scheduling a precommit just after adding a piece and creating a new sector. // Ideally sector won't be finalized after one piece has been added and the precommit will depend on the start_block? @@ -228,21 +269,57 @@ async fn precommit( tracing::info!("Starting pre-commit"); let sealer = Sealer::new(state.server_info.seal_proof); - let Some(mut sector) = state.db.get_sector(sector_number)? else { + let Some(mut sector) = state.db.get_sector::(sector_number)? else { tracing::error!("Tried to precommit non-existing sector"); - return Err(PipelineError::NotExistentSector); + return Err(PipelineError::SectorNotFound); }; // Pad sector so CommD can be properly calculated. sector.piece_infos = sealer.pad_sector(§or.piece_infos, sector.occupied_sector_space)?; tracing::debug!("piece_infos: {:?}", sector.piece_infos); - tracing::info!("Padded sector, commencing pre-commit."); + tracing::info!("Padded sector, commencing pre-commit and getting last finalized block"); + + let mut current_block = state.xt_client.height(true).await?; + tracing::info!("Current block: {current_block}"); + + if current_block < MINIMUM_BLOCK_WITH_RANDOMNESS_AVAILABLE { + tracing::info!( + "Waiting for randomness to be available at block: {}", + MINIMUM_BLOCK_WITH_RANDOMNESS_AVAILABLE + ); + state + .xt_client + .wait_for_height(MINIMUM_BLOCK_WITH_RANDOMNESS_AVAILABLE, true) + .await?; + current_block = MINIMUM_BLOCK_WITH_RANDOMNESS_AVAILABLE; + } + + let digest = state + .xt_client + .get_randomness(current_block) + .await? + .expect("randomness to be available as we wait for it"); + + let entropy = state.xt_keypair.account_id().encode(); + // Must match pallet's logic or otherwise proof won't be verified: + // https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539 + let ticket = draw_randomness( + &digest, + DomainSeparationTag::SealRandomness, + current_block, + &entropy, + ); + + let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string()); + tokio::fs::File::create_new(&sealed_path).await?; + // TODO(@th7nder,31/10/2024): what happens if some of the process fails? SP will be slashed, and there is no error reporting? what about retries? let sealing_handle: JoinHandle> = { let prover_id = derive_prover_id(state.xt_keypair.account_id()); let cache_dir = state.sealing_cache_dir.clone(); let unsealed_path = sector.unsealed_path.clone(); - let sealed_path = sector.sealed_path.clone(); + let sealed_path = sealed_path.clone(); + let piece_infos = sector.piece_infos.clone(); tokio::task::spawn_blocking(move || { sealer.precommit_sector( @@ -251,7 +328,7 @@ async fn precommit( sealed_path, prover_id, sector_number, - TICKET, + ticket, &piece_infos, ) }) @@ -259,12 +336,7 @@ async fn precommit( let sealing_output = sealing_handle.await??; tracing::info!("Created sector's replica: {:?}", sealing_output); - sector.state = SectorState::Sealed; - state.db.save_sector(§or)?; - - let current_block = state.xt_client.height(false).await?; tracing::debug!("Precommiting at block: {}", current_block); - let result = state .xt_client .pre_commit_sectors( @@ -297,9 +369,6 @@ async fn precommit( .await? .expect("we're waiting for the result"); - sector.state = SectorState::Precommitted; - state.db.save_sector(§or)?; - let precommited_sectors = result .events .find::() @@ -309,10 +378,144 @@ async fn precommit( .map(|result| result.map_err(|err| subxt::Error::from(err))) .collect::, _>>()?; + let sector = PreCommittedSector::create( + sector, + sealed_path, + Commitment::replica(sealing_output.comm_r), + Commitment::data(sealing_output.comm_d), + current_block, + precommited_sectors[0].block, + ) + .await?; + state.db.save_sector(sector.sector_number, §or)?; + tracing::info!( "Successfully pre-commited sectors on-chain: {:?}", precommited_sectors ); + state + .pipeline_sender + .send(PipelineMessage::ProveCommit(ProveCommitMessage { + sector_number: sector.sector_number, + }))?; + + Ok(()) +} + +#[tracing::instrument(skip_all, fields(sector_number))] +async fn prove_commit( + state: Arc, + sector_number: SectorNumber, +) -> Result<(), PipelineError> { + tracing::info!("Starting prove commit"); + + let sealer = Sealer::new(state.server_info.seal_proof); + let Some(sector) = state.db.get_sector::(sector_number)? else { + tracing::error!("Tried to precommit non-existing sector"); + return Err(PipelineError::SectorNotFound); + }; + + let seal_randomness_height = sector.seal_randomness_height; + let Some(digest) = state + .xt_client + .get_randomness(seal_randomness_height) + .await? + else { + tracing::error!("Out-of-the-state transition, this SHOULD NOT happen"); + return Err(PipelineError::RandomnessNotAvailable); + }; + let entropy = state.xt_keypair.account_id().encode(); + // Must match pallet's logic or otherwise proof won't be verified: + // https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539 + let ticket = draw_randomness( + &digest, + DomainSeparationTag::SealRandomness, + seal_randomness_height, + &entropy, + ); + + // TODO(@th7nder,04/11/2024): + // https://github.com/eigerco/polka-storage/blob/5edd4194f08f29d769c277577ccbb70bb6ff63bc/runtime/src/configs/mod.rs#L360 + // 10 blocks = 1 minute, only testnet + const PRECOMMIT_CHALLENGE_DELAY: u64 = 10; + let prove_commit_block = sector.precommit_block + PRECOMMIT_CHALLENGE_DELAY; + + tracing::info!("Wait for block {} to get randomness", prove_commit_block); + state + .xt_client + .wait_for_height(prove_commit_block, true) + .await?; + let Some(digest) = state.xt_client.get_randomness(prove_commit_block).await? else { + tracing::error!("Randomness for the block not available."); + return Err(PipelineError::RandomnessNotAvailable); + }; + let seed = draw_randomness( + &digest, + DomainSeparationTag::InteractiveSealChallengeSeed, + prove_commit_block, + &entropy, + ); + + let prover_id = derive_prover_id(state.xt_keypair.account_id()); + tracing::debug!("Performing prove commit for, seal_randomness_height {}, pre_commit_block: {}, prove_commit_block: {}, entropy: {}, ticket: {}, seed: {}, prover id: {}, sector_number: {}", + seal_randomness_height, sector.precommit_block, prove_commit_block, hex::encode(entropy), hex::encode(ticket), hex::encode(seed), hex::encode(prover_id), sector_number); + + let sealing_handle: JoinHandle, _>> = { + let porep_params = state.porep_parameters.clone(); + let cache_dir = state.sealing_cache_dir.clone(); + let sealed_path = sector.sealed_path.clone(); + let piece_infos = sector.piece_infos.clone(); + let comm_r = sector.comm_r.raw(); + let comm_d = sector.comm_d.raw(); + tokio::task::spawn_blocking(move || { + sealer.prove_sector( + porep_params.as_ref(), + cache_dir.as_ref(), + sealed_path, + prover_id, + sector_number, + ticket, + Some(seed), + PreCommitOutput { comm_r, comm_d }, + &piece_infos, + ) + }) + }; + let proofs = sealing_handle.await??; + + // We use sector size 2KiB only at this point, which guarantees to have 1 proof, because it has 1 partition in the config. + // That's why `prove_commit` will always generate a 1 proof. + let proof: SubstrateProof = proofs[0] + .clone() + .try_into() + .expect("converstion between rust-fil-proofs and polka-storage-proofs to work"); + let proof = codec::Encode::encode(&proof); + tracing::info!("Proven sector: {}", sector_number); + + let result = state + .xt_client + .prove_commit_sectors( + &state.xt_keypair, + vec![ProveCommitSector { + sector_number, + proof, + }], + true, + ) + .await? + .expect("waiting for finalization should always give results"); + + let proven_sectors = result + .events + .find::() + .map(|result| result.map_err(|err| subxt::Error::from(err))) + .collect::, _>>()?; + + tracing::info!("Successfully proven sectors on-chain: {:?}", proven_sectors); + + let sector = ProvenSector::create(sector); + state.db.save_sector(sector.sector_number, §or)?; + Ok(()) } diff --git a/cli/polka-storage-provider/server/src/pipeline/types.rs b/cli/polka-storage-provider/server/src/pipeline/types.rs index 96de7e487..34a5cc12d 100644 --- a/cli/polka-storage-provider/server/src/pipeline/types.rs +++ b/cli/polka-storage-provider/server/src/pipeline/types.rs @@ -8,10 +8,12 @@ use storagext::types::market::DealProposal; /// Represents a task to be executed on the Storage Provider Pipeline #[derive(Debug)] pub enum PipelineMessage { - /// Adds a deal to a sector selected by the storage provider + /// Adds a deal to a sector selected by the storage provider. AddPiece(AddPieceMessage), - /// Pads, seals the sector and pre-commits it on chain + /// Pads, seals a sector and pre-commits it on-chain. PreCommit(PreCommitMessage), + /// Generates a PoRep for a sector and verifies the proof on-chain. + ProveCommit(ProveCommitMessage), } /// Deal to be added to a sector with its contents. @@ -34,31 +36,57 @@ pub struct PreCommitMessage { pub sector_number: SectorNumber, } -/// Sector State serialized and stored in the RocksDB database -/// It is used for tracking the sector lifetime, precommiting and proving. +#[derive(Debug)] +pub struct ProveCommitMessage { + /// Number of an existing, pre-committed sector + pub sector_number: SectorNumber, +} + +/// Unsealed Sector which still accepts deals and pieces. +/// When sealed it's converted into [`PreCommittedSector`]. #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] -pub struct Sector { +pub struct UnsealedSector { /// [`SectorNumber`] which identifies a sector in the Storage Provider. /// /// It *should be centrally generated* by the Storage Provider, currently by [`crate::db::DealDB::next_sector_number`]. pub sector_number: SectorNumber, - /// Initially the sector is in [`SectorState::Unsealed`] state, should be changed after each of the sealing steps. - pub state: SectorState, + /// Tracks how much bytes have been written into [`Sector::unsealed_path`] /// by [`polka_storage_proofs::porep::sealer::Sealer::add_piece`] which adds padding. /// /// It is used before precomit to calculate padding /// with zero pieces by [`polka_storage_proofs::porep::sealer::Sealer::pad_sector`]. pub occupied_sector_space: u64, + /// Tracks all of the pieces that has been added to the sector. /// Indexes match with corresponding deals in [`Sector::deals`]. pub piece_infos: Vec, + /// Tracks all of the deals that have been added to the sector. pub deals: Vec<(DealId, DealProposal)>, + /// Path of an existing file where the pieces unsealed and padded data is stored. /// /// File at this path is created when the sector is created by [`Sector::create`]. pub unsealed_path: std::path::PathBuf, +} + +/// Sector which has been sealed and pre-committed on-chain. +/// When proven, it's converted into [`ProvenSector`]. +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub struct PreCommittedSector { + /// [`SectorNumber`] which identifies a sector in the Storage Provider. + /// + /// It *should be centrally generated* by the Storage Provider, currently by [`crate::db::DealDB::next_sector_number`]. + pub sector_number: SectorNumber, + + /// Tracks all of the pieces that has been added to the sector. + /// Indexes match with corresponding deals in [`Sector::deals`]. + pub piece_infos: Vec, + + /// Tracks all of the deals that have been added to the sector. + pub deals: Vec<(DealId, DealProposal)>, + /// Path of an existing file where the sealed sector data is stored. /// /// File at this path is initially created by [`Sector::create`], however it's empty. @@ -66,42 +94,110 @@ pub struct Sector { /// Only after pipeline [`PipelineMessage::PreCommit`], /// the file has contents which should not be touched and are used for later steps. pub sealed_path: std::path::PathBuf, + + /// CID of the sealed sector. + pub comm_r: Commitment, + + /// CID of the unsealed data of the sector. + pub comm_d: Commitment, + + /// Block at which randomness has been fetched to perform [`PipelineMessage::PreCommit`]. + /// + /// It is used as a randomness seed to create a replica. + /// Available at [`SectorState::Sealed`] and later. + pub seal_randomness_height: u64, + + /// Block at which the sector was precommitted (extrinsic submitted on-chain). + /// + /// It is used as a randomness seed to create a PoRep. + /// Available at [`SectorState::Precommitted`] and later. + pub precommit_block: u64, } -impl Sector { - /// Creates a new sector and empty files at the provided paths. +impl UnsealedSector { + /// Creates a new sector and empty file at the provided path. /// /// Sector Number must be unique - generated by [`crate::db::DealDB::next_sector_number`] /// otherwise the data will be overwritten. - pub async fn create_unsealed( + pub async fn create( sector_number: SectorNumber, unsealed_path: std::path::PathBuf, - sealed_path: std::path::PathBuf, - ) -> Result { + ) -> Result { tokio::fs::File::create_new(&unsealed_path).await?; - tokio::fs::File::create_new(&sealed_path).await?; Ok(Self { sector_number, - state: SectorState::Unsealed, occupied_sector_space: 0, piece_infos: vec![], deals: vec![], unsealed_path, + }) + } +} + +impl PreCommittedSector { + /// Transforms [`UnsealedSector`] and removes it's underlying data. + /// + /// Expects that file at `sealed_path` contains sealed_data. + /// Should only be called after sealing and pre-commit process has ended. + pub async fn create( + unsealed: UnsealedSector, + sealed_path: std::path::PathBuf, + comm_r: Commitment, + comm_d: Commitment, + seal_randomness_height: u64, + precommit_block: u64, + ) -> Result { + tokio::fs::remove_file(unsealed.unsealed_path).await?; + + Ok(Self { + sector_number: unsealed.sector_number, + piece_infos: unsealed.piece_infos, + deals: unsealed.deals, sealed_path, + comm_r, + comm_d, + seal_randomness_height, + precommit_block, }) } } -/// Represents a Sector's lifetime, sequentially. +/// Sector which has been sealed, precommitted and proven on-chain. #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] -pub enum SectorState { - /// When sector still has remaining space to add pieces or has not been sealed yet. - Unsealed, - /// After sector has been filled with pieces, padded and a replica with CommR has been created out of it. - Sealed, - /// Sealed sector has been published on-chain, so now the PoRep must be generated for it. - Precommitted, - /// After a PoRep for a sector has been created and publish on-chain. - Proven, +pub struct ProvenSector { + /// [`SectorNumber`] which identifies a sector in the Storage Provider. + /// + /// It *should be centrally generated* by the Storage Provider, currently by [`crate::db::DealDB::next_sector_number`]. + pub sector_number: SectorNumber, + + /// Tracks all of the pieces that has been added to the sector. + /// Indexes match with corresponding deals in [`Sector::deals`]. + pub piece_infos: Vec, + + /// Tracks all of the deals that have been added to the sector. + pub deals: Vec<(DealId, DealProposal)>, + + /// Path of an existing file where the sealed sector data is stored. + pub sealed_path: std::path::PathBuf, + + /// CID of the sealed sector. + pub comm_r: Commitment, + + /// CID of the unsealed data of the sector. + pub comm_d: Commitment, +} + +impl ProvenSector { + /// Creates a [`ProvenSector`] from a [`PreCommittedSector`]. + pub fn create(sector: PreCommittedSector) -> Self { + Self { + sector_number: sector.sector_number, + piece_infos: sector.piece_infos, + deals: sector.deals, + sealed_path: sector.sealed_path, + comm_r: sector.comm_r, + comm_d: sector.comm_d, + } + } } diff --git a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs index f3927df87..dd8187470 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs @@ -5,10 +5,7 @@ use primitives_proofs::{RegisteredPoStProof, SectorNumber}; use storagext::{ deser::DeserializablePath, multipair::MultiPairSigner, - runtime::{ - runtime_types::pallet_storage_provider::sector::ProveCommitSector as RuntimeProveCommitSector, - SubmissionResult, - }, + runtime::SubmissionResult, types::storage_provider::{ FaultDeclaration as SxtFaultDeclaration, ProveCommitSector as SxtProveCommitSector, RecoveryDeclaration as SxtRecoveryDeclaration, @@ -320,17 +317,14 @@ impl StorageProviderCommand { where Client: StorageProviderClientExt, { - let (sector_numbers, prove_commit_sectors): ( - Vec, - Vec, - ) = prove_commit_sectors - .into_iter() - .map(|s| { - let sector_number = s.sector_number; - (sector_number, s.into()) - }) - .unzip(); - + let (sector_numbers, prove_commit_sectors): (Vec, Vec) = + prove_commit_sectors + .into_iter() + .map(|s| { + let sector_number = s.sector_number; + (sector_number, s.into()) + }) + .unzip(); let submission_result = client .prove_commit_sectors( &account_keypair, diff --git a/cli/polka-storage/storagext/src/clients/storage_provider.rs b/cli/polka-storage/storagext/src/clients/storage_provider.rs index 1b5bdf56c..790740d7a 100644 --- a/cli/polka-storage/storagext/src/clients/storage_provider.rs +++ b/cli/polka-storage/storagext/src/clients/storage_provider.rs @@ -13,13 +13,13 @@ use crate::{ bounded_vec::IntoBoundedByteVec, client::SubmissionResult, runtime_types::pallet_storage_provider::{ - proofs::SubmitWindowedPoStParams, sector::ProveCommitSector, - storage_provider::StorageProviderState, + proofs::SubmitWindowedPoStParams, storage_provider::StorageProviderState, }, storage_provider::calls::types::register_storage_provider::PeerId, }, types::storage_provider::{ - FaultDeclaration, RecoveryDeclaration, SectorPreCommitInfo, TerminationDeclaration, + FaultDeclaration, ProveCommitSector, RecoveryDeclaration, SectorPreCommitInfo, + TerminationDeclaration, }, BlockNumber, Currency, PolkaStorageConfig, }; diff --git a/cli/polka-storage/storagext/src/lib.rs b/cli/polka-storage/storagext/src/lib.rs index f7feb217d..54186b246 100644 --- a/cli/polka-storage/storagext/src/lib.rs +++ b/cli/polka-storage/storagext/src/lib.rs @@ -8,7 +8,7 @@ pub mod types; pub mod deser; pub use crate::{ - clients::{MarketClientExt, StorageProviderClientExt, SystemClientExt}, + clients::{MarketClientExt, RandomnessClientExt, StorageProviderClientExt, SystemClientExt}, runtime::{bounded_vec::IntoBoundedByteVec, client::Client}, }; diff --git a/cli/polka-storage/storagext/src/runtime/display/storage_provider.rs b/cli/polka-storage/storagext/src/runtime/display/storage_provider.rs index 47eebd15a..3b96f2f2c 100644 --- a/cli/polka-storage/storagext/src/runtime/display/storage_provider.rs +++ b/cli/polka-storage/storagext/src/runtime/display/storage_provider.rs @@ -75,9 +75,13 @@ impl std::fmt::Display for Event { "Storage Provider Registered: {{ owner: {}, info: {}, proving_period_start: {} }}", owner, info, proving_period_start, )), - Event::SectorsPreCommitted { owner, sectors } => f.write_fmt(format_args!( - "Sectors Pre-Committed: {{ owner: {}, sector_number: {:?} }}", - owner, sectors, + Event::SectorsPreCommitted { + block, + owner, + sectors, + } => f.write_fmt(format_args!( + "Sectors Pre-Committed: {{ block: {}, owner: {}, sector_number: {:?} }}", + block, owner, sectors, )), Event::SectorsProven { owner, sectors } => f.write_fmt(format_args!( "Sectors Proven: {{ owner: {}, sectors: {:?} }}", diff --git a/examples/rpc_publish.sh b/examples/rpc_publish.sh index bc8e2c000..37ec46c20 100755 --- a/examples/rpc_publish.sh +++ b/examples/rpc_publish.sh @@ -57,7 +57,7 @@ DEAL_JSON=$( ) SIGNED_DEAL_JSON="$(RUST_LOG=error target/release/polka-storage-provider-client sign-deal --sr25519-key "$CLIENT" "$DEAL_JSON")" -(RUST_LOG=debug target/release/polka-storage-provider-server --sr25519-key "$PROVIDER" --seal-proof "2KiB" --post-proof "2KiB") & +(RUST_LOG=debug target/release/polka-storage-provider-server --sr25519-key "$PROVIDER" --seal-proof "2KiB" --post-proof "2KiB" --porep-parameters 2KiB.porep.params) & sleep 5 # gives time for the server to start DEAL_CID="$(RUST_LOG=error target/release/polka-storage-provider-client propose-deal "$DEAL_JSON")" diff --git a/lib/polka-storage-proofs/src/porep/mod.rs b/lib/polka-storage-proofs/src/porep/mod.rs index da0c34f72..828c4b2e8 100644 --- a/lib/polka-storage-proofs/src/porep/mod.rs +++ b/lib/polka-storage-proofs/src/porep/mod.rs @@ -13,6 +13,8 @@ use storage_proofs_porep::stacked::StackedDrg; use crate::types::Commitment; +pub type PoRepParameters = groth16::MappedParameters; + /// Generates parameters for proving and verifying PoRep. /// It should be called once and then reused across provers and the verifier. /// Verifying Key is only needed for verification (no_std), rest of the params are required for proving (std). @@ -35,9 +37,7 @@ pub fn generate_random_groth16_parameters( /// Loads Groth16 parameters from the specified path. /// Parameters needed to be serialized with [`groth16::Paramters::::write_bytes`]. -pub fn load_groth16_parameters( - path: std::path::PathBuf, -) -> Result, PoRepError> { +pub fn load_groth16_parameters(path: std::path::PathBuf) -> Result { groth16::Parameters::::build_mapped_parameters(path.clone(), false) .map_err(|e| PoRepError::FailedToLoadGrothParameters(path, e)) } diff --git a/lib/polka-storage-proofs/src/porep/sealer.rs b/lib/polka-storage-proofs/src/porep/sealer.rs index eefc066ca..29a04ba0d 100644 --- a/lib/polka-storage-proofs/src/porep/sealer.rs +++ b/lib/polka-storage-proofs/src/porep/sealer.rs @@ -22,6 +22,16 @@ use crate::{ ZeroPaddingReader, }; +/// Proof using [`blstrs::Bls12`] as finite field elements. +/// +/// It's the output of PoRep and PoSt proof generation and CANNOT be used in no_std. +pub type BlstrsProof = groth16::Proof; +/// Proof using [`bls12_381::Bls12`] as finite field elements. +/// +/// It is reexported so we don't need to add a dependencies in `polka-storage-provider-*` crates. +/// Used to convert into `no_std` version of proof, to call extrinsic to verify proof. +pub type SubstrateProof = crate::Proof; + /// Prepares an arbitrary piece to be used by [`Sealer::create_sector`]. /// /// It does so by calculating the proper size for the padded reader diff --git a/pallets/proofs/src/lib.rs b/pallets/proofs/src/lib.rs index 4b17b0412..d7b67de73 100644 --- a/pallets/proofs/src/lib.rs +++ b/pallets/proofs/src/lib.rs @@ -147,6 +147,7 @@ pub mod pallet { let proof_scheme = porep::ProofScheme::setup(seal_proof); let vkey = PoRepVerifyingKey::::get().ok_or(Error::::MissingPoRepVerifyingKey)?; + log::info!(target: LOG_TARGET, "Verifying PoRep proof for sector: {}...", sector); proof_scheme .verify( &comm_r, &comm_d, &prover_id, sector, &ticket, &seed, vkey, &proof, diff --git a/pallets/storage-provider/Cargo.toml b/pallets/storage-provider/Cargo.toml index e1d5b109c..c9e88f193 100644 --- a/pallets/storage-provider/Cargo.toml +++ b/pallets/storage-provider/Cargo.toml @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] cid = { workspace = true, features = ["alloc"] } codec = { workspace = true, default-features = false, features = ["derive"] } +hex = { workspace = true, default-features = false, features = ["alloc"] } log = { workspace = true, features = ["kv"] } pallet-insecure-randomness-collective-flip = { workspace = true, default-features = false } pallet-proofs = { workspace = true, default-features = false } @@ -28,6 +29,7 @@ sp-arithmetic = { workspace = true, default-features = false } sp-core = { workspace = true, default-features = false } sp-runtime = { workspace = true, default-features = false } + # Frame deps frame-benchmarking = { workspace = true, default-features = false, optional = true } frame-support = { workspace = true, default-features = false } diff --git a/pallets/storage-provider/src/lib.rs b/pallets/storage-provider/src/lib.rs index aba3a6ee4..538054d7d 100644 --- a/pallets/storage-provider/src/lib.rs +++ b/pallets/storage-provider/src/lib.rs @@ -260,6 +260,9 @@ pub mod pallet { }, /// Emitted when a storage provider pre commits some sectors. SectorsPreCommitted { + /// Block at which sectors have been precommitted. + /// It is used for `interactive_randomness` generation in `ProveCommit`. + block: BlockNumberFor, owner: T::AccountId, sectors: BoundedVec>, ConstU32>, @@ -555,6 +558,7 @@ pub mod pallet { })?; Self::deposit_event(Event::SectorsPreCommitted { + block: current_block, owner, sectors: pre_committed_sectors, }); @@ -1510,9 +1514,11 @@ pub mod pallet { let current_block_number = >::block_number(); + // https://github.com/filecoin-project/builtin-actors/blob/a45fb87910bca74d62215b0d58ed90cf78b6c8ff/actors/miner/src/lib.rs#L4865 // Check if we are too early with the proof submit let interactive_block_number = precommit.pre_commit_block_number + T::PreCommitChallengeDelay::get(); + if current_block_number < interactive_block_number { log::error!(target: LOG_TARGET, "too early to prove sector: current_block_number: {current_block_number:?} < interactive_block_number: {interactive_block_number:?}"); return Err(Error::::InvalidProof)?; @@ -1546,6 +1552,10 @@ pub mod pallet { let prover_id = derive_prover_id(owner); + log::debug!(target: LOG_TARGET, "Performing prove commit for, seal_randomness_height {:?}, pre_commit_block: {:?}, prove_commit_block: {:?}, entropy: {}, ticket: {}, seed: {}", + precommit.info.seal_randomness_height, precommit.pre_commit_block_number, interactive_block_number, hex::encode(entropy), hex::encode(randomness), hex::encode(interactive_randomness)); + log::debug!(target: LOG_TARGET, "Prover Id: {}, Sector Number: {}", hex::encode(prover_id), precommit.info.sector_number); + // Verify the porep proof T::ProofVerification::verify_porep( prover_id, diff --git a/pallets/storage-provider/src/tests/pre_commit_sectors.rs b/pallets/storage-provider/src/tests/pre_commit_sectors.rs index 70fad39bf..44ff6a83b 100644 --- a/pallets/storage-provider/src/tests/pre_commit_sectors.rs +++ b/pallets/storage-provider/src/tests/pre_commit_sectors.rs @@ -50,6 +50,7 @@ fn successfully_precommited() { amount: 1 },), RuntimeEvent::StorageProvider(Event::::SectorsPreCommitted { + block: 1, owner: account(storage_provider), sectors: bounded_vec![sector], }) @@ -97,6 +98,7 @@ fn successfully_precommited_no_deals() { amount: 1 },), RuntimeEvent::StorageProvider(Event::::SectorsPreCommitted { + block: 1, owner: account(storage_provider), sectors: bounded_vec![sector], }) @@ -160,6 +162,7 @@ fn successfully_precommited_batch() { amount: SECTORS_TO_PRECOMMIT },), RuntimeEvent::StorageProvider(Event::::SectorsPreCommitted { + block: 1, owner: account(storage_provider), sectors, }) diff --git a/primitives/commitment/src/lib.rs b/primitives/commitment/src/lib.rs index e62c9aa39..2b0ee0b00 100644 --- a/primitives/commitment/src/lib.rs +++ b/primitives/commitment/src/lib.rs @@ -80,6 +80,16 @@ impl Commitment { Self::new(commitment, CommitmentKind::Piece) } + /// Create a new data commitment. + pub fn data(commitment: [u8; 32]) -> Self { + Self::new(commitment, CommitmentKind::Data) + } + + /// Create a new replica commitment. + pub fn replica(commitment: [u8; 32]) -> Self { + Self::new(commitment, CommitmentKind::Replica) + } + /// Creates a new `Commitment` from bytes of a valid CID. /// Returns an error if the bytes passed do not represent a valid commitment. pub fn from_cid_bytes(bytes: &[u8], kind: CommitmentKind) -> Result {