Skip to content

Commit

Permalink
feat(listener): skip proofs with wrong nonce (#2365)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored Sep 12, 2024
1 parent 3147450 commit fe0ec7c
Showing 1 changed file with 30 additions and 36 deletions.
66 changes: 30 additions & 36 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ impl ChainListener {
let result = tokio::task::Builder::new()
.name("ChainListener")
.spawn(async move {

if let Err(err) = self.set_utility_core().await {
tracing::error!(target: "chain-listener", "Failed to set utility core: {err}; Stopping...");
exit(1);
Expand Down Expand Up @@ -427,16 +426,16 @@ impl ChainListener {
.cloned()
.ok_or(eyre::eyre!("No utility core id"))?;
measured_request(&self.metrics,
retry(ExponentialBackoff::default(), || async {
ccp_client
.realloc_utility_cores(vec![utility_core])
.await
.map_err(|err| {
tracing::warn!(target: "chain-listener", "Error reallocating utility core {utility_core} to CCP, error: {err}. Retrying...");
eyre::eyre!("Error reallocating utility core {utility_core} to CCP, error: {err}")
})?;
Ok(())
})
retry(ExponentialBackoff::default(), || async {
ccp_client
.realloc_utility_cores(vec![utility_core])
.await
.map_err(|err| {
tracing::warn!(target: "chain-listener", "Error reallocating utility core {utility_core} to CCP, error: {err}. Retrying...");
eyre::eyre!("Error reallocating utility core {utility_core} to CCP, error: {err}")
})?;
Ok(())
}),
).await?;

tracing::info!("Utility core {utility_core} successfully reallocated");
Expand Down Expand Up @@ -611,17 +610,18 @@ impl ChainListener {
params: ArrayParams,
) -> Result<Subscription<JsonValue>, client::Error> {
let sub = retry(ExponentialBackoff::default(), || async {
self
self
.ws_client
.subscribe("eth_subscribe", params.clone(), "eth_unsubscribe")
.await.map_err(|err| {
.await.map_err(|err| {
if let client::Error::RestartNeeded(_) = err {
tracing::error!(target: "chain-listener", "Failed to subscribe to {method}: {err};");
Permanent(err)
} else {
tracing::warn!(target: "chain-listener", "Failed to subscribe to {method}: {err}; Retrying...");
backoff::Error::transient(err)
}})
}
})
}).await?;

Ok(sub)
Expand Down Expand Up @@ -1073,7 +1073,7 @@ impl ChainListener {
tracing::info!(target: "chain-listener", "Stopping current commitment");
if let Some(ref ccp_client) = self.ccp_client {
measured_request(&self.metrics,
ccp_client.on_no_active_commitment()
ccp_client.on_no_active_commitment(),
).await.map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to send no active commitment to CCP: {err}");
eyre::eyre!("Failed to send no active commitment to CCP: {err}")
Expand Down Expand Up @@ -1158,30 +1158,22 @@ impl ChainListener {
.map_err(|err| eyre::eyre!("Failed to poll batched proofs from ccp: {err}"))?
};

// TODO: maybe filter out proofs that are not related to current epoch
// // Filter proofs related to current epoch only
// let proof_batches: Vec<BatchResponse> = proofs
// .into_iter()
// .for_each(move |p| {
// p.proof_batches
// .into_iter()
// .filter(|p| p.id.global_nonce == self.global_nonce)
// .collect()
// })
// .collect();

if !proof_batches.is_empty() {
let total_proofs = proof_batches
.iter()
.map(|p| p.proof_batches.len())
.sum::<usize>();
tracing::info!(target: "chain-listener", "Found {} proofs in {} batches from polling", total_proofs, proof_batches.len());

let mut unit_ids = Vec::new();
let mut local_nonces = Vec::new();
let mut result_hashes = Vec::new();
let batch_count = proof_batches.len();
let mut skipped_proofs_count = 0;
for batch in proof_batches.into_iter() {
for proof in batch.proof_batches.into_iter() {
if proof.id.global_nonce != self.global_nonce
|| proof.id.difficulty != self.difficulty
{
tracing::debug!(target: "chain-listener", "Proof (id={}, global nonce={}, difficulty={}) doesn't match current nonce {} and/or difficulty {}. Skipping..", proof.id.idx, proof.id.global_nonce, proof.id.difficulty, self.global_nonce, self.difficulty);
skipped_proofs_count += 1;
continue;
}

unit_ids.push(proof.cu_id);
local_nonces.push(proof.local_nonce);
result_hashes.push(proof.result_hash);
Expand All @@ -1191,6 +1183,8 @@ impl ChainListener {
}
}

tracing::info!(target: "chain-listener", "Found {} proofs in {} batches from polling, skipped {} proofs", result_hashes.len(), batch_count, skipped_proofs_count);

self.submit_proofs(unit_ids, local_nonces, result_hashes)
.await?;
} else {
Expand All @@ -1210,14 +1204,14 @@ impl ChainListener {
self.chain_connector.submit_proofs(unit_ids.clone(), local_nonces.clone(), result_hashes.clone()).await.map_err(|err| {
match err {
ConnectorError::RpcCallError { .. } => { Permanent(err) }
_ => {
_ => {
tracing::warn!(target: "chain-listener", "Failed to submit proof: {err}. Retrying..");
backoff::Error::transient(err)
}
}
})
})
.await;
.await;

match submit {
Err(err) => {
Expand Down Expand Up @@ -1278,7 +1272,7 @@ impl ChainListener {

Ok(s)
})
.await?;
.await?;

for (status, (deal_id, worker)) in statuses
.into_iter()
Expand Down

0 comments on commit fe0ec7c

Please sign in to comment.