From 7786b18b8428bdc47c8ee417100fbf748b660506 Mon Sep 17 00:00:00 2001 From: Konrad Stepniak Date: Fri, 13 Dec 2024 16:34:25 +0100 Subject: [PATCH] fix: add cancellation handling in proving --- storage-provider/server/src/pipeline/mod.rs | 95 +++++++++++++++------ 1 file changed, 71 insertions(+), 24 deletions(-) diff --git a/storage-provider/server/src/pipeline/mod.rs b/storage-provider/server/src/pipeline/mod.rs index 0448f47f..83cda540 100644 --- a/storage-provider/server/src/pipeline/mod.rs +++ b/storage-provider/server/src/pipeline/mod.rs @@ -68,6 +68,8 @@ pub enum PipelineError { SendError(#[from] SendError), #[error("failed to schedule windowed PoSt")] SchedulingError, + #[error("Proving cancelled")] + ProvingCancelled, #[error("Custom error: {0}")] CustomError(String), } @@ -124,8 +126,18 @@ pub async fn start_pipeline( trait PipelineOperations { fn add_piece(&self, state: Arc, msg: AddPieceMessage, token: CancellationToken); fn precommit(&self, state: Arc, msg: PreCommitMessage); - fn prove_commit(&self, state: Arc, msg: ProveCommitMessage); - fn submit_windowed_post(&self, state: Arc, msg: SubmitWindowedPoStMessage); + fn prove_commit( + &self, + state: Arc, + msg: ProveCommitMessage, + token: CancellationToken, + ); + fn submit_windowed_post( + &self, + state: Arc, + msg: SubmitWindowedPoStMessage, + token: CancellationToken, + ); fn schedule_posts(&self, state: Arc); } @@ -174,11 +186,15 @@ impl PipelineOperations for TaskTracker { }); } - fn prove_commit(&self, state: Arc, msg: ProveCommitMessage) { + fn prove_commit( + &self, + state: Arc, + msg: ProveCommitMessage, + token: CancellationToken, + ) { let ProveCommitMessage { sector_number } = msg; self.spawn(async move { - // ProveCommit is not cancellation safe. - match prove_commit(state, sector_number).await { + match prove_commit(state, sector_number, token).await { Ok(_) => { tracing::info!( "ProveCommit for sector {} finished successfully.", @@ -192,19 +208,31 @@ impl PipelineOperations for TaskTracker { }); } - fn submit_windowed_post(&self, state: Arc, msg: SubmitWindowedPoStMessage) { + fn submit_windowed_post( + &self, + state: Arc, + msg: SubmitWindowedPoStMessage, + token: CancellationToken, + ) { let SubmitWindowedPoStMessage { deadline_index } = msg; self.spawn(async move { - // SubmitWindowedPoSt is not cancellation safe. - match submit_windowed_post(state, deadline_index).await { - Ok(_) => { - tracing::info!( - "SubmitWindowedPoSt for deadline {} finished successfully.", - deadline_index - ) - } - Err(err) => { - tracing::error!(%err, "SubmitWindowedPoSt failed for deadline: {}", deadline_index) + tokio::select! { + // SubmitWindowedPoSt is not cancellation safe. + res = submit_windowed_post(state, deadline_index) => { + match res { + Ok(_) => { + tracing::info!( + "SubmitWindowedPoSt for deadline {} finished successfully.", + deadline_index + ) + } + Err(err) => { + tracing::error!(%err, "SubmitWindowedPoSt failed for deadline: {}", deadline_index) + } + } + }, + () = token.cancelled() => { + tracing::warn!("submit_windowed_post for deadline {} has been cancelled.", deadline_index); } } }); @@ -233,9 +261,11 @@ fn process( match msg { PipelineMessage::AddPiece(msg) => tracker.add_piece(state.clone(), msg, token.clone()), PipelineMessage::PreCommit(msg) => tracker.precommit(state.clone(), msg), - PipelineMessage::ProveCommit(msg) => tracker.prove_commit(state.clone(), msg), + PipelineMessage::ProveCommit(msg) => { + tracker.prove_commit(state.clone(), msg, token.clone()) + } PipelineMessage::SubmitWindowedPoStMessage(msg) => { - tracker.submit_windowed_post(state.clone(), msg) + tracker.submit_windowed_post(state.clone(), msg, token.clone()) } PipelineMessage::SchedulePoSts => tracker.schedule_posts(state.clone()), } @@ -446,10 +476,11 @@ async fn precommit( Ok(()) } -#[tracing::instrument(skip(state))] +#[tracing::instrument(skip(state, token))] async fn prove_commit( state: Arc, sector_number: SectorNumber, + token: CancellationToken, ) -> Result<(), PipelineError> { tracing::info!("Starting prove commit"); @@ -485,10 +516,16 @@ async fn prove_commit( let prove_commit_block = sector.precommit_block + PRECOMMIT_CHALLENGE_DELAY; tracing::info!("Wait for block {} to get randomness", prove_commit_block); - state - .xt_client - .wait_for_height(prove_commit_block, true) - .await?; + tokio::select! { + res = state.xt_client.wait_for_height(prove_commit_block, true) => { + res?; + }, + () = token.cancelled() => { + tracing::warn!("Cancelled while waiting to get randomness at block {}", prove_commit_block); + return Err(PipelineError::ProvingCancelled); + } + }; + let Some(digest) = state.xt_client.get_randomness(prove_commit_block).await? else { tracing::error!("Randomness for the block not available."); return Err(PipelineError::RandomnessNotAvailable); @@ -527,7 +564,17 @@ async fn prove_commit( ) }) }; - let proofs = sealing_handle.await??; + + let proofs = tokio::select! { + // Up to this point everything is retryable. + // Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB. + res = sealing_handle => { + res?? + }, + () = token.cancelled() => { + return Err(PipelineError::ProvingCancelled); + } + }; // We use sector size 2KiB only at this point, which guarantees to have 1 proof, because it has 1 partition in the config. // That's why `prove_commit` will always generate a 1 proof.