Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add cancellation handling in proving #638

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 71 additions & 24 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub enum PipelineError {
SendError(#[from] SendError<PipelineMessage>),
#[error("failed to schedule windowed PoSt")]
SchedulingError,
#[error("Proving cancelled")]
ProvingCancelled,
#[error("Custom error: {0}")]
CustomError(String),
}
Expand Down Expand Up @@ -124,8 +126,18 @@ pub async fn start_pipeline(
trait PipelineOperations {
fn add_piece(&self, state: Arc<PipelineState>, msg: AddPieceMessage, token: CancellationToken);
fn precommit(&self, state: Arc<PipelineState>, msg: PreCommitMessage);
fn prove_commit(&self, state: Arc<PipelineState>, msg: ProveCommitMessage);
fn submit_windowed_post(&self, state: Arc<PipelineState>, msg: SubmitWindowedPoStMessage);
fn prove_commit(
&self,
state: Arc<PipelineState>,
msg: ProveCommitMessage,
token: CancellationToken,
);
fn submit_windowed_post(
&self,
state: Arc<PipelineState>,
msg: SubmitWindowedPoStMessage,
token: CancellationToken,
);
fn schedule_posts(&self, state: Arc<PipelineState>);
}

Expand Down Expand Up @@ -174,11 +186,15 @@ impl PipelineOperations for TaskTracker {
});
}

fn prove_commit(&self, state: Arc<PipelineState>, msg: ProveCommitMessage) {
fn prove_commit(
&self,
state: Arc<PipelineState>,
msg: ProveCommitMessage,
token: CancellationToken,
) {
let ProveCommitMessage { sector_number } = msg;
self.spawn(async move {
// ProveCommit is not cancellation safe.
match prove_commit(state, sector_number).await {
match prove_commit(state, sector_number, token).await {
Ok(_) => {
tracing::info!(
"ProveCommit for sector {} finished successfully.",
Expand All @@ -192,19 +208,31 @@ impl PipelineOperations for TaskTracker {
});
}

fn submit_windowed_post(&self, state: Arc<PipelineState>, msg: SubmitWindowedPoStMessage) {
fn submit_windowed_post(
&self,
state: Arc<PipelineState>,
msg: SubmitWindowedPoStMessage,
token: CancellationToken,
) {
let SubmitWindowedPoStMessage { deadline_index } = msg;
self.spawn(async move {
// SubmitWindowedPoSt is not cancellation safe.
match submit_windowed_post(state, deadline_index).await {
Ok(_) => {
tracing::info!(
"SubmitWindowedPoSt for deadline {} finished successfully.",
deadline_index
)
}
Err(err) => {
tracing::error!(%err, "SubmitWindowedPoSt failed for deadline: {}", deadline_index)
tokio::select! {
// SubmitWindowedPoSt is not cancellation safe.
res = submit_windowed_post(state, deadline_index) => {
match res {
Ok(_) => {
tracing::info!(
"SubmitWindowedPoSt for deadline {} finished successfully.",
deadline_index
)
}
Err(err) => {
tracing::error!(%err, "SubmitWindowedPoSt failed for deadline: {}", deadline_index)
}
}
},
() = token.cancelled() => {
tracing::warn!("submit_windowed_post for deadline {} has been cancelled.", deadline_index);
}
}
});
Expand Down Expand Up @@ -233,9 +261,11 @@ fn process(
match msg {
PipelineMessage::AddPiece(msg) => tracker.add_piece(state.clone(), msg, token.clone()),
PipelineMessage::PreCommit(msg) => tracker.precommit(state.clone(), msg),
PipelineMessage::ProveCommit(msg) => tracker.prove_commit(state.clone(), msg),
PipelineMessage::ProveCommit(msg) => {
tracker.prove_commit(state.clone(), msg, token.clone())
}
PipelineMessage::SubmitWindowedPoStMessage(msg) => {
tracker.submit_windowed_post(state.clone(), msg)
tracker.submit_windowed_post(state.clone(), msg, token.clone())
}
PipelineMessage::SchedulePoSts => tracker.schedule_posts(state.clone()),
}
Expand Down Expand Up @@ -446,10 +476,11 @@ async fn precommit(
Ok(())
}

#[tracing::instrument(skip(state))]
#[tracing::instrument(skip(state, token))]
async fn prove_commit(
state: Arc<PipelineState>,
sector_number: SectorNumber,
token: CancellationToken,
) -> Result<(), PipelineError> {
tracing::info!("Starting prove commit");

Expand Down Expand Up @@ -485,10 +516,16 @@ async fn prove_commit(
let prove_commit_block = sector.precommit_block + PRECOMMIT_CHALLENGE_DELAY;

tracing::info!("Wait for block {} to get randomness", prove_commit_block);
state
.xt_client
.wait_for_height(prove_commit_block, true)
.await?;
tokio::select! {
res = state.xt_client.wait_for_height(prove_commit_block, true) => {
res?;
},
() = token.cancelled() => {
tracing::warn!("Cancelled while waiting to get randomness at block {}", prove_commit_block);
return Err(PipelineError::ProvingCancelled);
}
};

let Some(digest) = state.xt_client.get_randomness(prove_commit_block).await? else {
tracing::error!("Randomness for the block not available.");
return Err(PipelineError::RandomnessNotAvailable);
Expand Down Expand Up @@ -527,7 +564,17 @@ async fn prove_commit(
)
})
};
let proofs = sealing_handle.await??;

let proofs = tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
}
};

// We use sector size 2KiB only at this point, which guarantees to have 1 proof, because it has 1 partition in the config.
// That's why `prove_commit` will always generate a 1 proof.
Expand Down
Loading