Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: catch-all loop timeout #29

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 68 additions & 62 deletions script/bin/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,70 +221,64 @@ impl SP1BlobstreamOperator {
}
}

async fn run(&mut self) -> Result<()> {
info!("Starting SP1 Blobstream operator");
let mut fetcher = TendermintRPCClient::default();
let loop_interval_mins = get_loop_interval_mins();
async fn run(&self) -> Result<()> {
let fetcher = TendermintRPCClient::default();
let block_update_interval = get_block_update_interval();

loop {
let contract = SP1Blobstream::new(self.contract_address, self.wallet_filler.clone());

// Read the data commitment max from the contract.
let data_commitment_max = contract
.DATA_COMMITMENT_MAX()
.call()
.await?
.DATA_COMMITMENT_MAX;

// Get the latest block from the contract.
let current_block = contract.latestBlock().call().await?.latestBlock;

// Get the head of the chain.
let latest_tendermint_block_nb = fetcher.get_latest_block_height().await;

// Subtract 1 block to ensure the block is stable.
let latest_stable_tendermint_block = latest_tendermint_block_nb - 1;

// block_to_request is the closest interval of block_interval less than min(latest_stable_tendermint_block, data_commitment_max + current_block)
let max_block = std::cmp::min(
latest_stable_tendermint_block,
data_commitment_max + current_block,
);
let block_to_request = max_block - (max_block % block_update_interval);

// If block_to_request is greater than the current block in the contract, attempt to request.
if block_to_request > current_block {
// The next block the operator should request.
let max_end_block = block_to_request;

let target_block = fetcher
.find_block_to_request(current_block, max_end_block)
.await;

info!("Current block: {}", current_block);
info!("Attempting to step to block {}", target_block);

// Request a header range if the target block is not the next block.
match self.request_header_range(current_block, target_block).await {
Ok(proof) => {
let tx_hash = self.relay_header_range(proof).await?;
info!(
"Posted data commitment from block {} to block {}\nTransaction hash: {}",
current_block, target_block, tx_hash
);
}
Err(e) => {
error!("Header range request failed: {}", e);
continue;
}
};
} else {
info!("Next block to request is {} which is > the head of the Tendermint chain which is {}. Sleeping.", block_to_request + block_update_interval, latest_stable_tendermint_block);
}
let contract = SP1Blobstream::new(self.contract_address, self.wallet_filler.clone());

tokio::time::sleep(tokio::time::Duration::from_secs(60 * loop_interval_mins)).await;
// Read the data commitment max from the contract.
let data_commitment_max = contract
.DATA_COMMITMENT_MAX()
.call()
.await?
.DATA_COMMITMENT_MAX;

// Get the latest block from the contract.
let current_block = contract.latestBlock().call().await?.latestBlock;

// Get the head of the chain.
let latest_tendermint_block_nb = fetcher.get_latest_block_height().await;

// Subtract 1 block to ensure the block is stable.
let latest_stable_tendermint_block = latest_tendermint_block_nb - 1;

// block_to_request is the closest interval of block_interval less than min(latest_stable_tendermint_block, data_commitment_max + current_block)
let max_block = std::cmp::min(
latest_stable_tendermint_block,
data_commitment_max + current_block,
);
let block_to_request = max_block - (max_block % block_update_interval);

// If block_to_request is greater than the current block in the contract, attempt to request.
if block_to_request > current_block {
// The next block the operator should request.
let max_end_block = block_to_request;

let target_block = fetcher
.find_block_to_request(current_block, max_end_block)
.await;

info!("Current block: {}", current_block);
info!("Attempting to step to block {}", target_block);

// Request a header range if the target block is not the next block.
match self.request_header_range(current_block, target_block).await {
Ok(proof) => {
let tx_hash = self.relay_header_range(proof).await?;
info!(
"Posted data commitment from block {} to block {}\nTransaction hash: {}",
current_block, target_block, tx_hash
);
}
Err(e) => {
return Err(anyhow::anyhow!("Header range request failed: {}", e));
}
};
} else {
info!("Next block to request is {} which is > the head of the Tendermint chain which is {}. Sleeping.", block_to_request + block_update_interval, latest_stable_tendermint_block);
}
Ok(())
}
}

Expand Down Expand Up @@ -317,11 +311,23 @@ async fn main() {
dotenv::dotenv().ok();
env_logger::init();

let mut operator = SP1BlobstreamOperator::new().await;
let operator = SP1BlobstreamOperator::new().await;
operator.check_vkey().await.unwrap();

info!("Starting SP1 Blobstream operator");
const LOOP_TIMEOUT_MINS: u64 = 20;
loop {
if let Err(e) = operator.run().await {
let request_interval_mins = get_loop_interval_mins();
// If the operator takes longer than LOOP_TIMEOUT_MINS for a single invocation, or there's
// an error, sleep for the loop interval and try again.
if let Err(e) = tokio::time::timeout(
tokio::time::Duration::from_secs(60 * LOOP_TIMEOUT_MINS),
operator.run(),
)
.await
{
error!("Error running operator: {}", e);
}
tokio::time::sleep(tokio::time::Duration::from_secs(60 * request_interval_mins)).await;
}
}
2 changes: 1 addition & 1 deletion script/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl TendermintRPCClient {
}

// Search to find the greatest block number to request.
pub async fn find_block_to_request(&mut self, start_block: u64, max_end_block: u64) -> u64 {
pub async fn find_block_to_request(&self, start_block: u64, max_end_block: u64) -> u64 {
let mut curr_end_block = max_end_block;
loop {
if curr_end_block - start_block == 1 {
Expand Down
Loading