From 82024c71d3cb3ade1c90b91a98575adddf0a1623 Mon Sep 17 00:00:00 2001 From: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:40:09 +0530 Subject: [PATCH] snos release patches (#162) * feat : added snos failing job handling * chore : refactor and state update worker updated * merge * feat : added snos latest block env * feat : updated env vars * feat : updated workers * feat : updated and fixed workers and jobs * fix : tests * feat : rmeoved hacky code * refactor * update snos version is ci * add block time to anvil testing * changes is settlement client * fix blob test * increase visibility timeout * fixes * fix prio fees and verif q bug * coverage fix * Add no fail fast * fixes * fix comment * fix clippy --------- Co-authored-by: Arun Jangra --- .env.example | 1 + .env.test | 6 +- .github/workflows/coverage.yml | 2 +- .github/workflows/e2e-test.yml | 4 +- .github/workflows/linters-cargo.yml | 2 +- .github/workflows/rust-build.yml | 2 +- CHANGELOG.md | 1 + Cargo.lock | 12 +- Cargo.toml | 4 +- Dockerfile | 5 + .../orchestrator/src/database/mongodb/mod.rs | 6 +- crates/orchestrator/src/jobs/da_job/mod.rs | 2 +- .../src/jobs/state_update_job/mod.rs | 22 +- crates/orchestrator/src/queue/job_queue.rs | 5 +- .../src/tests/jobs/state_update_job/mod.rs | 14 +- .../src/tests/workers/snos/mod.rs | 34 +- .../src/tests/workers/update_state/mod.rs | 290 +++++++++++------- .../src/workers/data_submission_worker.rs | 60 +--- crates/orchestrator/src/workers/mod.rs | 6 +- .../src/workers/proof_registration.rs | 3 +- crates/orchestrator/src/workers/proving.rs | 10 +- crates/orchestrator/src/workers/snos.rs | 68 ++-- .../orchestrator/src/workers/update_state.rs | 203 ++++++------ .../ethereum/src/conversion.rs | 1 - crates/settlement-clients/ethereum/src/lib.rs | 63 ++-- .../ethereum/src/tests/mod.rs | 17 +- e2e-tests/src/localstack.rs | 10 +- e2e-tests/tests.rs | 7 +- 28 files changed, 475 insertions(+), 385 deletions(-) diff --git a/.env.example b/.env.example index 74f4cd69..e4dd70aa 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,7 @@ HOST= PORT= +MAX_BLOCK_TO_PROCESS= ##### AWS CONFIG ##### diff --git a/.env.test b/.env.test index 583b0d41..c4ae2c32 100644 --- a/.env.test +++ b/.env.test @@ -2,6 +2,8 @@ HOST=127.0.0.1 PORT=3000 +MAX_BLOCK_TO_PROCESS= +MIN_BLOCK_TO_PROCESS= ##### AWS CONFIG ##### @@ -31,7 +33,7 @@ SQS_PROVING_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstac SQS_DATA_SUBMISSION_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_data_submission_job_processing_queue" SQS_DATA_SUBMISSION_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_data_submission_job_verification_queue" -SQS_UPDATE_STATE_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_update_state_job_verification_queue" +SQS_UPDATE_STATE_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_update_state_job_processing_queue" SQS_UPDATE_STATE_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_update_state_job_verification_queue" SQS_JOB_HANDLE_FAILURE_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_handle_failure_queue" @@ -52,7 +54,7 @@ DATABASE_NAME="orchestrator" PROVER_SERVICE="sharp" SHARP_CUSTOMER_ID="sharp_consumer_id" -SHARP_URL="http://127.0.0.1:5000" +SHARP_URL="http://127.0.0.1:6000" # [IMP!!!] These are test certificates (they don't work) SHARP_USER_CRT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUR4ekNDQXErZ0F3SUJBZ0lVTjBSK0xpb1MzL2ZadUZsK291RjZNNFk2RnRZd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2N6RUxNQWtHQTFVRUJoTUNTVTR4RXpBUkJnTlZCQWdNQ2xOdmJXVXRVM1JoZEdVeElUQWZCZ05WQkFvTQpHRWx1ZEdWeWJtVjBJRmRwWkdkcGRITWdVSFI1SUV4MFpERU5NQXNHQTFVRUF3d0VVMVJTU3pFZE1Cc0dDU3FHClNJYjNEUUVKQVJZT1lXSmpRR3RoY201dmRDNTRlWG93SGhjTk1qUXdPREV6TVRNd05UTTBXaGNOTWpVd09ERXoKTVRNd05UTTBXakJ6TVFzd0NRWURWUVFHRXdKSlRqRVRNQkVHQTFVRUNBd0tVMjl0WlMxVGRHRjBaVEVoTUI4RwpBMVVFQ2d3WVNXNTBaWEp1WlhRZ1YybGtaMmwwY3lCUWRIa2dUSFJrTVEwd0N3WURWUVFEREFSVFZGSkxNUjB3Ckd3WUpLb1pJaHZjTkFRa0JGZzVoWW1OQWEyRnlibTkwTG5oNWVqQ0NBU0l3RFFZSktvWklodmNOQVFFQkJRQUQKZ2dFUEFEQ0NBUW9DZ2dFQkFOSEtaUGRqWSs4QWo4ZFV2V0xReEl5NTNrK1BHY001T2FlYnpTV3FER0xGSlBOdgpkVzJvWjFCSnNEb2hobWZFSCt5ZEFoQXEvbzc4NDljblg2VDJTOVhta25wdnNud2dRckU5Z3lqSmV3MUxBRzNHCm10U0lOMWJJSm9peWJ3QUR5NGxPd0xrVzUzdFdueHBSazVVVmZUU1hLYVRRTnlHd2o3Q2xMSGthcnlZYVk3OVkKOXlHMFJ2RkFkb1IzczBveWthNkFLV0d1WjhOdWd4NTY2bysyWllRenJteWVNU1NGYkhNdW1aUkxYb0hpazhBSgpLZXJ0bnNBRC9LMVJRYm80Y21ubHFoTVRhQktiTEFVVjVteFVvMlpveFBJVU9tREE5N3IyMmRTYkRkRlVjeC9kCjhQcDB6VXNycXdQckJlcW5SMXdLOE80MUlHajUzRnUzVmxDeS94MENBd0VBQWFOVE1GRXdIUVlEVlIwT0JCWUUKRkc0T0lvKzcvckJyZlR4S2FFMGx2L1dwRDJ3UE1COEdBMVVkSXdRWU1CYUFGRzRPSW8rNy9yQnJmVHhLYUUwbAp2L1dwRDJ3UE1BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFEMURDZkR3CnpoSXRGMWd5YVdhWURZRHErZjJSUHBFRWVaWk1BSDdJV0ZTajRrTzhmVHN1RnN6bFoyNXNlR3ZHYW4xQ3F4alQKYnJ3MXliVlJQeGZMUWgxRlZMMGhFeDZWYXhGditxMmtqUmlCQmZURFBxWGxYcmpaaUYrZTNPS3lKSVhnNkpIUAppbVpBV0dyRFBHNkorQi90bHRaQ3VLZVhLK1FUcnRSOVVCL29hOWVaQWc5RXNkOVJsZDRNeVo5b0NtdUNPU1hmCnk1THFkVlgrNENpTnJXQ3BwM1B2M2MyL28rZ0RMQjUzZ252R056RjR6Q1FIZ0RtN0RNZnpmZlY1TUMwV1MvWXkKVnpyUG11Sys0Y0tSK3dMOFZITVNEeC9ybTFhYnh0dEN2VW92MUw5dVZ1QUNGc29yNmdsR0N1RDNNQ0dIa0pNNgpxaS8rM1haeHhxeGw1Rzg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" SHARP_USER_KEY="LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2UUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktjd2dnU2pBZ0VBQW9JQkFRRFJ5bVQzWTJQdkFJL0gKVkwxaTBNU011ZDVQanhuRE9UbW5tODBscWd4aXhTVHpiM1Z0cUdkUVNiQTZJWVpueEIvc25RSVFLdjZPL09QWApKMStrOWt2VjVwSjZiN0o4SUVLeFBZTW95WHNOU3dCdHhwclVpRGRXeUNhSXNtOEFBOHVKVHNDNUZ1ZDdWcDhhClVaT1ZGWDAwbHltazBEY2hzSSt3cFN4NUdxOG1HbU8vV1BjaHRFYnhRSGFFZDdOS01wR3VnQ2xocm1mRGJvTWUKZXVxUHRtV0VNNjVzbmpFa2hXeHpMcG1VUzE2QjRwUEFDU25xN1o3QUEveXRVVUc2T0hKcDVhb1RFMmdTbXl3RgpGZVpzVktObWFNVHlGRHBnd1BlNjl0blVtdzNSVkhNZjNmRDZkTTFMSzZzRDZ3WHFwMGRjQ3ZEdU5TQm8rZHhiCnQxWlFzdjhkQWdNQkFBRUNnZ0VBQU9mcDFiT2xLOVFKeXVlUHhjeDIvTkNVcUMxTEJDL01FdkEyUzVKWGFWbkcKbGhLR0pFb1U0Q0RoVk83dUlLYVZLTFZvMjk4RHFHUnBLM1d0RVE1TE40bytXYTcveTA5c1drMlVzbWxrVWFOZwpSaGtVZEJSK2dLNXVsQ3FKRml2dUJoTEQvRWlnQ1VWUGZKS2JtNG96TnpYcjVSMU5ENlV1aWFtODdtenlFcTBLCmZsVXlhR0RZNGdIdFNBOVBENVBFYlUveFpKeitKaHk5T2l3aVRXV0MrSHoyb2c3UWRDRDE2RlhGcit2VHpQN0MKb2tFb0VDZFNPRWlMalVENjBhS2ZxRmFCVm5MTkVudC9QSytmY1RBM05mNGtSMnFDNk9ZWjVFb09zYm1ka29ZTgpyU3NJZW9XblMxOEhvekZud2w3Z05wTUtjNmRzQzRBTldOVDFsTkhCb1FLQmdRRHlaUDFJSlppZUh6NlExaUVTCm5zd2tnblZCQUQ0SlVLR1ZDMHA3dk4yclNDZXh4c05ZZXFPTEEyZGZCUGpOVjd3blFKcUgxT05XellOMUJVSUUKeThLTCtFZVl6Q3RZa21LL21wSGJIMzNjd2tJODBuMHJROU1BalZMTlJ2YVVEOWp1NFBsRzFqaEFZUVVyTkViZQpKRlVpSk83aDVQa1llZG50SitqSHFpQnRoUUtCZ1FEZGtPbndmL0szYk4xenR0bXZQd0VicjhkVWJjRVh5NDFOCkl5VWwrZW1WSlgzYktKM0duNDZnQ2RsTTdkYmpwS3JVZ3oxL2JsZTgvMkVFckJvSEFRNkMrU2pEaGhvL01CbnIKekZheTBoK3YxbjBnZnNNVzRoOEF4cEFwc25OYnh6K2g1Wm5uSnRTd0srUjB3U0VJVVEzRjAxL2hMWWhLQ2l5OApwbW5HQi9hU3VRS0JnRzdxd1cvVExGd214ZlYyMXBsenFzeUdHZXVObGRXalhOMGIxcEI2b3lDdW11TmhwYUFHCk5uSDFNOGNxT2tPVWd4ZWZHMWRPbGx6eEc5ZGZlWTlDUWhyVW1NYVZucndmK0NuZkxDRU43d1VtcXpLenl1MFMKVXlwc2dOaElRYXNNK1dLTjllTnhRVHBNYXhZVERONjMxM0VSWDNKazJZdFdydDh6cFBSQXFDZ1ZBb0dCQU54egpUa0NMbmJ6aFphbTNlZm9DenlCMEVma3dSdHBkSGxkc3E0NlFqTmRuK1VSd3NpTXBLR2lWeEE3bDZsU1B4NlV3CmU2VHA3Z1JQZUlHRWwxVDJ1VENacGZSODNtcVdlb1FCeVJXZE9nZmplcFkxYWZpL3ZhY3c2Y21ERTRKeXloNVUKYTMveFE5ZVJwSHFDbWxKREMxZ1V5eVlwL3B2a2FjUytNeW5sVEhHSkFvR0FQekdTSzdXOHBUYldSVEFoaTVrSQpwZk5kWk1tcnRodUxNT3F6TGhyRjZublpldk9OdTBoYXVhZktlVElFd2w0clhYZHFKQlJBaWZKMFFsLzZKWFFkCmd1VzFrZWk1Ui8rUFZ5eUhab042c3NXSTNWYklwUUloUmt6UENnTDZhbHEwSzFpT1dlV1lIOHdORGRRdlB1T2UKRkZPOEovSzNxV0NtWjU0ODBBbTNhT0U9Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K" diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index ceef0c79..dc3465c4 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -128,7 +128,7 @@ jobs: # Navigating to snos cd snos-59fe8329bb16fe65 # Navigating to the build - cd 3bd95bf + cd af74c75 # Activating the venv source ~/cairo_venv/bin/activate # Building the cairo lang repo requirements diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 5456a01a..02427b1b 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -71,7 +71,7 @@ jobs: then echo "Anvil is installed. Version information:" anvil --version - anvil & + anvil --block-time 6 & else echo "Anvil is not installed or not in PATH" exit 1 @@ -86,7 +86,7 @@ jobs: # Navigating to snos cd snos-59fe8329bb16fe65 # Navigating to the build - cd 3bd95bf + cd af74c75 # Activating the venv source ~/cairo_venv/bin/activate # Building the cairo lang repo requirements diff --git a/.github/workflows/linters-cargo.yml b/.github/workflows/linters-cargo.yml index 88cf2813..56fd9a07 100644 --- a/.github/workflows/linters-cargo.yml +++ b/.github/workflows/linters-cargo.yml @@ -43,7 +43,7 @@ jobs: # Navigating to snos cd snos-59fe8329bb16fe65 # Navigating to the build - cd 3bd95bf + cd af74c75 # Activating the venv source ~/cairo_venv/bin/activate # Building the cairo lang repo requirements diff --git a/.github/workflows/rust-build.yml b/.github/workflows/rust-build.yml index 34f096ec..467dc5a2 100644 --- a/.github/workflows/rust-build.yml +++ b/.github/workflows/rust-build.yml @@ -45,7 +45,7 @@ jobs: # Navigating to snos cd snos-59fe8329bb16fe65 # Navigating to the build - cd 3bd95bf + cd af74c75 # Activating the venv source ~/cairo_venv/bin/activate # Building the cairo lang repo requirements diff --git a/CHANGELOG.md b/CHANGELOG.md index af109bad..3f39afb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed +- fixes after sepolia testing - all failed jobs should move to failed state - Fixes all unwraps() in code to improve error logging - Simplified Update_Job for Database. diff --git a/Cargo.lock b/Cargo.lock index 74f78aed..e759c245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3346,7 +3346,7 @@ dependencies = [ [[package]] name = "cairo-type-derive" version = "0.1.0" -source = "git+https://github.com/keep-starknet-strange/snos?rev=3bd95bfb315a596519b5b1d3c81114fbfa06f8a6#3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" +source = "git+https://github.com/keep-starknet-strange/snos?rev=af74c7599231c25ad9c342f6888668680a9782d9#af74c7599231c25ad9c342f6888668680a9782d9" dependencies = [ "proc-macro2", "quote", @@ -7731,7 +7731,7 @@ dependencies = [ [[package]] name = "prove_block" version = "0.1.0" -source = "git+https://github.com/keep-starknet-strange/snos?rev=3bd95bfb315a596519b5b1d3c81114fbfa06f8a6#3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" +source = "git+https://github.com/keep-starknet-strange/snos?rev=af74c7599231c25ad9c342f6888668680a9782d9#af74c7599231c25ad9c342f6888668680a9782d9" dependencies = [ "anyhow", "blockifier", @@ -8191,7 +8191,7 @@ dependencies = [ [[package]] name = "rpc-client" version = "0.1.0" -source = "git+https://github.com/keep-starknet-strange/snos?rev=3bd95bfb315a596519b5b1d3c81114fbfa06f8a6#3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" +source = "git+https://github.com/keep-starknet-strange/snos?rev=af74c7599231c25ad9c342f6888668680a9782d9#af74c7599231c25ad9c342f6888668680a9782d9" dependencies = [ "log", "reqwest 0.11.27", @@ -8206,7 +8206,7 @@ dependencies = [ [[package]] name = "rpc-replay" version = "0.1.0" -source = "git+https://github.com/keep-starknet-strange/snos?rev=3bd95bfb315a596519b5b1d3c81114fbfa06f8a6#3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" +source = "git+https://github.com/keep-starknet-strange/snos?rev=af74c7599231c25ad9c342f6888668680a9782d9#af74c7599231c25ad9c342f6888668680a9782d9" dependencies = [ "blockifier", "cairo-lang-starknet-classes", @@ -9580,7 +9580,7 @@ dependencies = [ [[package]] name = "starknet-os" version = "0.1.0" -source = "git+https://github.com/keep-starknet-strange/snos?rev=3bd95bfb315a596519b5b1d3c81114fbfa06f8a6#3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" +source = "git+https://github.com/keep-starknet-strange/snos?rev=af74c7599231c25ad9c342f6888668680a9782d9#af74c7599231c25ad9c342f6888668680a9782d9" dependencies = [ "anyhow", "ark-ec", @@ -9629,7 +9629,7 @@ dependencies = [ [[package]] name = "starknet-os-types" version = "0.1.0" -source = "git+https://github.com/keep-starknet-strange/snos?rev=3bd95bfb315a596519b5b1d3c81114fbfa06f8a6#3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" +source = "git+https://github.com/keep-starknet-strange/snos?rev=af74c7599231c25ad9c342f6888668680a9782d9#af74c7599231c25ad9c342f6888668680a9782d9" dependencies = [ "blockifier", "cairo-lang-starknet-classes", diff --git a/Cargo.toml b/Cargo.toml index ee6e9aa5..3bb08d75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,8 +109,8 @@ cairo-vm = { git = "https://github.com/Moonsong-Labs/cairo-vm", branch = "notles # Snos & Sharp (Starkware) # TODO: Update branch to main once the PR is merged (PR #368 in the snos repository) -starknet-os = { git = "https://github.com/keep-starknet-strange/snos", rev = "3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" } -prove_block = { git = "https://github.com/keep-starknet-strange/snos", rev = "3bd95bfb315a596519b5b1d3c81114fbfa06f8a6" } +starknet-os = { git = "https://github.com/keep-starknet-strange/snos", rev = "af74c7599231c25ad9c342f6888668680a9782d9" } +prove_block = { git = "https://github.com/keep-starknet-strange/snos", rev = "af74c7599231c25ad9c342f6888668680a9782d9" } # Madara prover API madara-prover-common = { git = "https://github.com/Moonsong-Labs/madara-prover-api", branch = "od/use-latest-cairo-vm" } diff --git a/Dockerfile b/Dockerfile index 937decce..303dd0a6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -81,6 +81,11 @@ COPY --from=builder /usr/src/madara-orchestrator/package.json . COPY --from=builder /usr/src/madara-orchestrator/migrate-mongo-config.js . COPY --from=builder /usr/src/madara-orchestrator/migrations ./migrations +# To be fixed by this https://github.com/keep-starknet-strange/snos/issues/404 +RUN mkdir -p /usr/local/cargo/git/checkouts/snos-59fe8329bb16fe65/af74c75/crates/starknet-os/kzg +COPY ./crates/da-clients/ethereum/trusted_setup.txt /usr/local/cargo/git/checkouts/snos-59fe8329bb16fe65/af74c75/crates/starknet-os/kzg/trusted_setup.txt +COPY ./crates/da-clients/ethereum/trusted_setup.txt /usr/src/madara-orchestrator/crates/settlement-clients/ethereum/src/trusted_setup.txt + # Create a startup script RUN echo '#!/bin/bash\n\ npm run migrate up\n\ diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 51ec471d..1ab69cf7 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -39,7 +39,11 @@ impl MongoDb { // Get a handle to the cluster let client = Client::with_options(client_options).expect("Failed to create MongoDB client"); // Ping the server to see if you can connect to the cluster - client.database("admin").run_command(doc! {"ping": 1}, None).await.expect("Failed to ping MongoDB deployment"); + client + .database("orchestrator") + .run_command(doc! {"ping": 1}, None) + .await + .expect("Failed to ping MongoDB deployment"); tracing::debug!("Pinged your deployment. You successfully connected to MongoDB!"); Self { client, database_name: mongo_db_settings.database_name } diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 2c51ace8..c660b1ce 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -203,7 +203,7 @@ impl Job for DaJob { } } -#[tracing::instrument(skip(elements), ret)] +#[tracing::instrument(skip(elements))] pub fn fft_transformation(elements: Vec) -> Vec { let xs: Vec = (0..*BLOB_LEN) .map(|i| { diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 9d288c52..1ac7bdc9 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -130,16 +130,19 @@ impl Job for StateUpdateJob { let mut sent_tx_hashes: Vec = Vec::with_capacity(block_numbers.len()); for block_no in block_numbers.iter() { tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block"); + let snos = self.fetch_snos_for_block(*block_no, config.clone()).await; - let tx_hash = self.update_state_for_block(config.clone(), *block_no, snos, nonce).await.map_err(|e| { - tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block"); - job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string()); - self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); - StateUpdateError::Other(OtherError(eyre!( - "Block #{block_no} - Error occurred during the state update: {e}" - ))) - })?; - sent_tx_hashes.push(tx_hash); + let txn_hash = self + .update_state_for_block(config.clone(), *block_no, snos, nonce) + .await + .map_err(|e| { + tracing::error!(job_id = %job.internal_id, block_no = %block_no, error = %e, "Error updating state for block"); + job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string()); + self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); + OtherError(eyre!("Block #{block_no} - Error occurred during the state update: {e}")); + }) + .unwrap(); + sent_tx_hashes.push(txn_hash); nonce += 1; } @@ -318,6 +321,7 @@ impl StateUpdateJob { .map_err(|e| JobError::Other(OtherError(e)))?; let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await; + // TODO : // Fetching nonce before the transaction is run // Sending update_state transaction from the settlement client diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index d8ee8e2f..616c3b62 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -405,7 +405,10 @@ pub async fn init_consumers(config: Arc) -> Result<(), JobError> { /// To spawn the worker by passing the worker struct async fn spawn_worker(worker: Box, config: Arc) -> color_eyre::Result<()> { - worker.run_worker_if_enabled(config).await.expect("Error in running the worker."); + if let Err(e) = worker.run_worker_if_enabled(config).await { + log::error!("Failed to spawn worker. Error: {}", e); + return Err(e); + } Ok(()) } async fn add_job_to_queue(id: Uuid, queue: String, delay: Option, config: Arc) -> EyreResult<()> { diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 9a74e35f..1c0acf01 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; +use std::fs; use std::fs::read_to_string; use std::path::PathBuf; -use std::{env, fs}; use assert_matches::assert_matches; use bytes::Bytes; @@ -27,7 +27,7 @@ use crate::jobs::state_update_job::{StateUpdateError, StateUpdateJob}; use crate::jobs::types::{JobStatus, JobType}; use crate::jobs::{Job, JobError}; use crate::tests::common::default_job_item; -use crate::tests::config::TestConfigBuilder; +use crate::tests::config::{ConfigType, TestConfigBuilder}; lazy_static! { pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().expect("Failed to get Current Path"); @@ -60,18 +60,8 @@ async fn test_process_job_works( #[case] blocks_to_process: String, #[case] processing_start_index: u8, ) { - // Will be used by storage client which we call while storing the data. - - use crate::tests::config::ConfigType; - - let aws_region = env::var("AWS_REGION").unwrap(); - println!("AWS_REGION: {}", aws_region); - dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); - let aws_region = env::var("AWS_REGION").unwrap(); - println!("AWS_REGION: {}", aws_region); - // Mocking the settlement client. let mut settlement_client = MockSettlementClient::new(); diff --git a/crates/orchestrator/src/tests/workers/snos/mod.rs b/crates/orchestrator/src/tests/workers/snos/mod.rs index 9cd47d29..2c99a80d 100644 --- a/crates/orchestrator/src/tests/workers/snos/mod.rs +++ b/crates/orchestrator/src/tests/workers/snos/mod.rs @@ -13,7 +13,7 @@ use uuid::Uuid; use crate::database::MockDatabase; use crate::jobs::job_handler_factory::mock_factory; -use crate::jobs::types::{JobStatus, JobType}; +use crate::jobs::types::JobType; use crate::jobs::{Job, MockJob}; use crate::queue::job_queue::SNOS_JOB_PROCESSING_QUEUE; use crate::queue::MockQueueProvider; @@ -39,20 +39,30 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { // Mocking db function expectations if !db_val { - db.expect_get_latest_job_by_type_and_status() - .times(1) - .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) + db.expect_get_latest_job_by_type().with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + db.expect_get_job_by_internal_id_and_type() + .with(eq(0.to_string()), eq(JobType::SnosRun)) + .returning(|_, _| Ok(None)); + db.expect_get_job_by_internal_id_and_type() + .with(eq(1.to_string()), eq(JobType::SnosRun)) .returning(|_, _| Ok(None)); - start_job_index = 1; + + start_job_index = 0; block = 5; } else { let uuid_temp = Uuid::new_v4(); - - db.expect_get_latest_job_by_type_and_status() - .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) + db.expect_get_latest_job_by_type() + .with(eq(JobType::SnosRun)) + .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); + db.expect_get_job_by_internal_id_and_type() + .with(eq(0.to_string()), eq(JobType::SnosRun)) + .returning(move |_, _| Ok(Some(get_job_item_mock_by_id("0".to_string(), uuid_temp)))); + db.expect_get_job_by_internal_id_and_type() + .with(eq(1.to_string()), eq(JobType::SnosRun)) .returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); - block = 6; + start_job_index = 2; + block = 6; } for i in start_job_index..block + 1 { @@ -65,12 +75,10 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { let job_item = get_job_item_mock_by_id(i.clone().to_string(), uuid); let job_item_cloned = job_item.clone(); - - job_handler.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item_cloned.clone())); + job_handler.expect_create_job().returning(move |_, _, _| Ok(job_item_cloned.clone())); // creating jobs call expectations db.expect_create_job() - .times(1) .withf(move |item| item.internal_id == i.clone().to_string()) .returning(move |_| Ok(job_item.clone())); } @@ -78,7 +86,7 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { let job_handler: Arc> = Arc::new(Box::new(job_handler)); let ctx = mock_factory::get_job_handler_context(); // Mocking the `get_job_handler` call in create_job function. - ctx.expect().times(5).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler)); + ctx.expect().with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler)); // Queue function call simulations queue diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index f1f51e2a..d569e92d 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -1,123 +1,207 @@ -use std::error::Error; +use std::collections::HashMap; use std::sync::Arc; -use da_client_interface::MockDaClient; -use httpmock::MockServer; use mockall::predicate::eq; -use rstest::rstest; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::JsonRpcClient; -use url::Url; +use rstest::*; use uuid::Uuid; -use crate::database::MockDatabase; +use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::job_handler_factory::mock_factory; +use crate::jobs::state_update_job::StateUpdateJob; use crate::jobs::types::{JobStatus, JobType}; -use crate::jobs::{Job, MockJob}; -use crate::queue::job_queue::UPDATE_STATE_JOB_PROCESSING_QUEUE; -use crate::queue::MockQueueProvider; -use crate::tests::config::TestConfigBuilder; -use crate::tests::workers::utils::{get_job_by_mock_id_vector, get_job_item_mock_by_id}; +use crate::tests::config::{ConfigType, TestConfigBuilder}; +use crate::tests::workers::utils::get_job_item_mock_by_id; use crate::workers::update_state::UpdateStateWorker; use crate::workers::Worker; #[rstest] -#[case(false, 0)] -#[case(true, 5)] #[tokio::test] -async fn test_update_state_worker( - #[case] last_successful_job_exists: bool, - #[case] number_of_processed_jobs: usize, -) -> Result<(), Box> { - let server = MockServer::start(); - let da_client = MockDaClient::new(); - let mut db = MockDatabase::new(); - let mut queue = MockQueueProvider::new(); - - // Mocking the get_job_handler function. - let mut job_handler = MockJob::new(); - - // Mocking db function expectations - // If no successful state update jobs exist - if !last_successful_job_exists { - db.expect_get_latest_job_by_type_and_status() - .with(eq(JobType::StateTransition), eq(JobStatus::Completed)) - .times(1) - .returning(|_, _| Ok(None)); - - db.expect_get_jobs_without_successor() - .with(eq(JobType::DataSubmission), eq(JobStatus::Completed), eq(JobType::StateTransition)) - .times(1) - .returning(|_, _, _| Ok(vec![])); - } else { - // if successful state update job exists - - // mocking the return value of first function call (getting last successful jobs): - db.expect_get_latest_job_by_type_and_status() - .with(eq(JobType::StateTransition), eq(JobStatus::Completed)) - .times(1) - .returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4())))); - - // mocking the return values of second function call (getting completed proving worker jobs) - let job_vec = - get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2); - let job_vec_clone = job_vec.clone(); - db.expect_get_jobs_after_internal_id_by_job_type() - .with(eq(JobType::DataSubmission), eq(JobStatus::Completed), eq("1".to_string())) - .returning(move |_, _, _| Ok(job_vec.clone())); - db.expect_get_jobs_without_successor() - .with(eq(JobType::DataSubmission), eq(JobStatus::Completed), eq(JobType::StateTransition)) - .returning(move |_, _, _| Ok(job_vec_clone.clone())); - - // mocking getting of the jobs (when there is a safety check for any pre-existing job during job - // creation) - let completed_jobs = - get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2); - db.expect_get_job_by_internal_id_and_type() - .times(1) - .with(eq(completed_jobs[0].internal_id.to_string()), eq(JobType::StateTransition)) - .returning(|_, _| Ok(None)); - - // mocking the creation of jobs - let job_item = get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()); - let job_item_cloned = job_item.clone(); - - job_handler.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item.clone())); - - db.expect_create_job() - .times(1) - .withf(move |item| item.internal_id == *"1".to_string()) - .returning(move |_| Ok(job_item_cloned.clone())); - } - - let y: Arc> = Arc::new(Box::new(job_handler)); +async fn update_state_worker_with_pending_jobs() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let unique_id = Uuid::new_v4(); + let mut job_item = get_job_item_mock_by_id("1".to_string(), unique_id); + job_item.status = JobStatus::PendingVerification; + job_item.job_type = JobType::StateTransition; + services.config.database().create_job(job_item).await.unwrap(); + + let update_state_worker = UpdateStateWorker {}; + assert!(update_state_worker.run_worker(services.config.clone()).await.is_ok()); + + let latest_job = + services.config.database().get_latest_job_by_type(JobType::StateTransition).await.unwrap().unwrap(); + assert_eq!(latest_job.status, JobStatus::PendingVerification); + assert_eq!(latest_job.job_type, JobType::StateTransition); + assert_eq!(latest_job.id, unique_id); +} + +#[rstest] +#[tokio::test] +async fn update_state_worker_first_block() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let unique_id = Uuid::new_v4(); + let mut job_item = get_job_item_mock_by_id("0".to_string(), unique_id); + job_item.status = JobStatus::Completed; + job_item.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item).await.unwrap(); + let ctx = mock_factory::get_job_handler_context(); - // Mocking the `get_job_handler` call in create_job function. - if last_successful_job_exists { - ctx.expect().times(1).with(eq(JobType::StateTransition)).returning(move |_| Arc::clone(&y)); - } - - // Queue function call simulations - queue - .expect_send_message_to_queue() - .returning(|_, _, _| Ok(())) - .withf(|queue, _payload, _delay| queue == UPDATE_STATE_JOB_PROCESSING_QUEUE); - - let provider = JsonRpcClient::new(HttpTransport::new( - Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), - )); - - // mock block number (madara) : 5 + ctx.expect().with(eq(JobType::StateTransition)).returning(move |_| Arc::new(Box::new(StateUpdateJob))); + + let update_state_worker = UpdateStateWorker {}; + assert!(update_state_worker.run_worker(services.config.clone()).await.is_ok()); + + let latest_job = + services.config.database().get_latest_job_by_type(JobType::StateTransition).await.unwrap().unwrap(); + assert_eq!(latest_job.status, JobStatus::Created); + assert_eq!(latest_job.job_type, JobType::StateTransition); + assert_eq!(latest_job.metadata.get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY).unwrap(), "0"); +} + +#[rstest] +#[tokio::test] +async fn update_state_worker_first_block_missing() { let services = TestConfigBuilder::new() - .configure_starknet_client(provider.into()) - .configure_database(db.into()) - .configure_queue_client(queue.into()) - .configure_da_client(da_client.into()) + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) .build() .await; + // skiip first block from DA completion + let mut job_item = get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()); + job_item.status = JobStatus::Completed; + job_item.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item).await.unwrap(); + + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().with(eq(JobType::StateTransition)).returning(move |_| Arc::new(Box::new(StateUpdateJob))); + + let update_state_worker = UpdateStateWorker {}; + assert!(update_state_worker.run_worker(services.config.clone()).await.is_ok()); + + // update state worker should not create any job + assert!(services.config.database().get_latest_job_by_type(JobType::StateTransition).await.unwrap().is_none()); +} + +#[rstest] +#[tokio::test] +async fn update_state_worker_selects_consective_blocks() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let mut job_item_one = get_job_item_mock_by_id("0".to_string(), Uuid::new_v4()); + job_item_one.status = JobStatus::Completed; + job_item_one.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item_one).await.unwrap(); + + let mut job_item_two = get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()); + job_item_two.status = JobStatus::Completed; + job_item_two.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item_two).await.unwrap(); + + // skip block 3 + let mut job_item_three = get_job_item_mock_by_id("3".to_string(), Uuid::new_v4()); + job_item_three.status = JobStatus::Completed; + job_item_three.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item_three).await.unwrap(); + + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().with(eq(JobType::StateTransition)).returning(move |_| Arc::new(Box::new(StateUpdateJob))); + + let update_state_worker = UpdateStateWorker {}; + assert!(update_state_worker.run_worker(services.config.clone()).await.is_ok()); + + let latest_job = + services.config.database().get_latest_job_by_type(JobType::StateTransition).await.unwrap().unwrap(); + // update state worker should not create any job + assert_eq!(latest_job.status, JobStatus::Created); + assert_eq!(latest_job.job_type, JobType::StateTransition); + assert_eq!(latest_job.metadata.get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY).unwrap(), "0,1"); +} + +#[rstest] +#[tokio::test] +async fn update_state_worker_continues_from_previous_state_update() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + // add DA completion job for block 5 + let mut job_item = get_job_item_mock_by_id("5".to_string(), Uuid::new_v4()); + job_item.status = JobStatus::Completed; + job_item.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item).await.unwrap(); + + // add state transition job for blocks 0-4 + let mut job_item = get_job_item_mock_by_id("0".to_string(), Uuid::new_v4()); + job_item.status = JobStatus::Completed; + job_item.job_type = JobType::StateTransition; + let mut metadata = HashMap::new(); + metadata.insert(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), "0,1,2,3,4".to_string()); + job_item.metadata = metadata; + services.config.database().create_job(job_item).await.unwrap(); + + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().with(eq(JobType::StateTransition)).returning(move |_| Arc::new(Box::new(StateUpdateJob))); + + let update_state_worker = UpdateStateWorker {}; + assert!(update_state_worker.run_worker(services.config.clone()).await.is_ok()); + + let latest_job = + services.config.database().get_latest_job_by_type(JobType::StateTransition).await.unwrap().unwrap(); + println!("latest job item {:?}", latest_job); + // update state worker should not create any job + assert_eq!(latest_job.status, JobStatus::Created); + assert_eq!(latest_job.job_type, JobType::StateTransition); + assert_eq!(latest_job.metadata.get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY).unwrap(), "5"); +} + +#[rstest] +#[tokio::test] +async fn update_state_worker_next_block_missing() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + // add DA completion job for block 5 + let mut job_item = get_job_item_mock_by_id("6".to_string(), Uuid::new_v4()); + job_item.status = JobStatus::Completed; + job_item.job_type = JobType::DataSubmission; + services.config.database().create_job(job_item).await.unwrap(); + + // add state transition job for blocks 0-4 + let unique_id = Uuid::new_v4(); + let mut job_item = get_job_item_mock_by_id("0".to_string(), unique_id); + job_item.status = JobStatus::Completed; + job_item.job_type = JobType::StateTransition; + let mut metadata = HashMap::new(); + metadata.insert(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), "0,1,2,3,4".to_string()); + job_item.metadata = metadata; + services.config.database().create_job(job_item).await.unwrap(); + + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().with(eq(JobType::StateTransition)).returning(move |_| Arc::new(Box::new(StateUpdateJob))); + let update_state_worker = UpdateStateWorker {}; - update_state_worker.run_worker(services.config).await?; + assert!(update_state_worker.run_worker(services.config.clone()).await.is_ok()); - Ok(()) + let latest_job = + services.config.database().get_latest_job_by_type(JobType::StateTransition).await.unwrap().unwrap(); + assert_eq!(latest_job.id, unique_id); } diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index aae03644..08c6068c 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::Arc; use async_trait::async_trait; @@ -14,61 +13,18 @@ pub struct DataSubmissionWorker; #[async_trait] impl Worker for DataSubmissionWorker { // 0. All ids are assumed to be block numbers. - // 1. Fetch the latest completed Proving job. - // 2. Fetch the latest DA job creation. - // 3. Create jobs from after the lastest DA job already created till latest completed proving job. - async fn run_worker(&self, config: Arc) -> Result<(), Box> { + // 1. Fetch the latest completed Proving jobs without Data Submission jobs as successor jobs + // 2. Create jobs. + async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { tracing::trace!(log_type = "starting", category = "DataSubmissionWorker", "DataSubmissionWorker started."); - // provides latest completed proof creation job id - let latest_proven_job_id = config + let successful_proving_jobs = config .database() - .get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed) - .await? - .map(|item| item.internal_id) - .unwrap_or("0".to_string()); + .get_jobs_without_successor(JobType::ProofCreation, JobStatus::Completed, JobType::DataSubmission) + .await?; - tracing::debug!(latest_proven_job_id, "Fetched latest completed ProofCreation job"); - - // provides latest triggered data submission job id - let latest_data_submission_job_id = config - .database() - .get_latest_job_by_type(JobType::DataSubmission) - .await? - .map(|item| item.internal_id) - .unwrap_or("0".to_string()); - - tracing::debug!(latest_data_submission_job_id, "Fetched latest DataSubmission job"); - - let latest_data_submission_id: u64 = match latest_data_submission_job_id.parse() { - Ok(id) => id, - Err(e) => { - tracing::error!(error = ?e, "Failed to parse latest_data_submission_job_id"); - return Err(Box::new(e)); - } - }; - - let latest_proven_id: u64 = match latest_proven_job_id.parse() { - Ok(id) => id, - Err(e) => { - tracing::error!(error = ?e, "Failed to parse latest_proven_job_id"); - return Err(Box::new(e)); - } - }; - - tracing::debug!(latest_data_submission_id, latest_proven_id, "Parsed job IDs"); - - // creating data submission jobs for latest blocks that don't have existing data submission jobs - // yet. - for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { - tracing::debug!(new_job_id, "Creating new DataSubmission job"); - match create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new(), config.clone()).await { - Ok(_) => tracing::info!(job_id = new_job_id, "Successfully created DataSubmission job"), - Err(e) => { - tracing::error!(job_id = new_job_id, error = ?e, "Failed to create DataSubmission job"); - return Err(Box::new(e)); - } - } + for job in successful_proving_jobs { + create_job(JobType::DataSubmission, job.internal_id, HashMap::new(), config.clone()).await?; } tracing::trace!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 73703859..1a909a8b 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -27,14 +27,14 @@ pub enum WorkerError { #[async_trait] pub trait Worker: Send + Sync { - async fn run_worker_if_enabled(&self, config: Arc) -> Result<(), Box> { + async fn run_worker_if_enabled(&self, config: Arc) -> color_eyre::Result<()> { if !self.is_worker_enabled(config.clone()).await? { return Ok(()); } self.run_worker(config).await } - async fn run_worker(&self, config: Arc) -> Result<(), Box>; + async fn run_worker(&self, config: Arc) -> color_eyre::Result<()>; // Assumption // If say a job for block X fails, we don't want the worker to respawn another job for the same @@ -46,7 +46,7 @@ pub trait Worker: Send + Sync { // Checks if any of the jobs have failed // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed // Halts any new job creation till all the count of failed jobs is not Zero. - async fn is_worker_enabled(&self, config: Arc) -> Result> { + async fn is_worker_enabled(&self, config: Arc) -> color_eyre::Result { let failed_jobs = config .database() .get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1)) diff --git a/crates/orchestrator/src/workers/proof_registration.rs b/crates/orchestrator/src/workers/proof_registration.rs index 70b79630..21e46753 100644 --- a/crates/orchestrator/src/workers/proof_registration.rs +++ b/crates/orchestrator/src/workers/proof_registration.rs @@ -1,4 +1,3 @@ -use std::error::Error; use std::sync::Arc; use async_trait::async_trait; @@ -13,7 +12,7 @@ impl Worker for ProofRegistrationWorker { /// 1. Fetch all blocks with a successful proving job run /// 2. Group blocks that have the same proof /// 3. For each group, create a proof registration job with from and to block in metadata - async fn run_worker(&self, _config: Arc) -> Result<(), Box> { + async fn run_worker(&self, _config: Arc) -> color_eyre::Result<()> { todo!() } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 464e684d..523f310d 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,4 +1,3 @@ -use std::error::Error; use std::sync::Arc; use async_trait::async_trait; @@ -14,7 +13,7 @@ pub struct ProvingWorker; impl Worker for ProvingWorker { /// 1. Fetch all successful SNOS job runs that don't have a proving job /// 2. Create a proving job for each SNOS job run - async fn run_worker(&self, config: Arc) -> Result<(), Box> { + async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { tracing::trace!(log_type = "starting", category = "ProvingWorker", "ProvingWorker started."); let successful_snos_jobs = config @@ -26,12 +25,7 @@ impl Worker for ProvingWorker { for job in successful_snos_jobs { tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job"); - match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata.clone(), config.clone()) - .await - { - Ok(_) => tracing::info!(block_no = %job.internal_id, "Successfully created proof creation job"), - Err(e) => tracing::error!(job_id = %job.internal_id, error = %e, "Failed to create proof creation job"), - } + create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await?; } tracing::trace!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed."); diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index b0f11742..ccff21ed 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -1,13 +1,13 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::Arc; use async_trait::async_trait; use starknet::providers::Provider; +use utils::env_utils::get_env_var_or_default; use crate::config::Config; use crate::jobs::create_job; -use crate::jobs::types::{JobStatus, JobType}; +use crate::jobs::types::JobType; use crate::workers::Worker; pub struct SnosWorker; @@ -17,55 +17,43 @@ impl Worker for SnosWorker { /// 1. Fetch the latest completed block from the Starknet chain /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks - async fn run_worker(&self, config: Arc) -> Result<(), Box> { + async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { tracing::trace!(log_type = "starting", category = "SnosWorker", "SnosWorker started."); let provider = config.starknet_client(); - let latest_block_number = provider.block_number().await?; - tracing::debug!(latest_block_number = %latest_block_number, "Fetched latest block number from Starknet"); + let block_number_provider = &provider.block_number().await?; - let latest_block_processed_data = config - .database() - .get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed) - .await? - .map(|item| item.internal_id) - .unwrap_or("0".to_string()); - tracing::debug!(latest_processed_block = %latest_block_processed_data, "Fetched latest processed block from database"); + let latest_block_number = + get_env_var_or_default_block_number("MAX_BLOCK_TO_PROCESS", &block_number_provider.to_string())?; + tracing::debug!(latest_block_number = %latest_block_number, "Fetched latest block number from starknet"); - // Check if job does not exist - // TODO: fetching all SNOS jobs with internal id > latest_block_processed_data - // can be done in one DB call - let job_in_db = config - .database() - .get_job_by_internal_id_and_type(&latest_block_number.to_string(), &JobType::SnosRun) - .await?; + let latest_job_in_db = config.database().get_latest_job_by_type(JobType::SnosRun).await?; - if job_in_db.is_some() { - tracing::trace!(block_number = %latest_block_number, "SNOS job already exists for the latest block"); - return Ok(()); - } - - let latest_block_processed: u64 = match latest_block_processed_data.parse() { - Ok(block) => block, - Err(e) => { - tracing::error!(error = %e, block_no = %latest_block_processed_data, "Failed to parse latest processed block number"); - return Err(Box::new(e)); - } + let latest_job_id = match latest_job_in_db { + Some(job) => job.internal_id, + None => "0".to_string(), }; - let block_diff = latest_block_number - latest_block_processed; - tracing::debug!(block_diff = %block_diff, "Calculated block difference"); - - // if all blocks are processed - if block_diff == 0 { - tracing::info!("All blocks are already processed"); - return Ok(()); - } + // To be used when testing in specific block range + let block_start = get_env_var_or_default_block_number("MIN_BLOCK_TO_PROCESS", &latest_job_id)?; - for x in latest_block_processed + 1..latest_block_number + 1 { - create_job(JobType::SnosRun, x.to_string(), HashMap::new(), config.clone()).await?; + for block_num in block_start..latest_block_number + 1 { + match create_job(JobType::SnosRun, block_num.to_string(), HashMap::new(), config.clone()).await { + Ok(_) => {} + Err(e) => { + log::warn!("Failed to create job: {:?}", e); + } + } } tracing::trace!(log_type = "completed", category = "SnosWorker", "SnosWorker completed."); Ok(()) } } + +fn get_env_var_or_default_block_number(env_var_name: &str, default_block_number: &str) -> color_eyre::Result { + if get_env_var_or_default(env_var_name, default_block_number) == *"" { + Ok(default_block_number.to_string().parse::()?) + } else { + Ok(get_env_var_or_default(env_var_name, default_block_number).parse::()?) + } +} diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 1675eabf..961d2522 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -1,4 +1,4 @@ -use std::error::Error; +use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; @@ -6,131 +6,148 @@ use async_trait::async_trait; use crate::config::Config; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::create_job; -use crate::jobs::types::{JobItem, JobStatus, JobType}; +use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; pub struct UpdateStateWorker; #[async_trait] impl Worker for UpdateStateWorker { - /// 1. Fetch the last successful state update job - /// 2. Fetch all successful proving jobs covering blocks after the last state update - /// 3. Create state updates for all the blocks that don't have a state update job - async fn run_worker(&self, config: Arc) -> Result<(), Box> { + async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { tracing::trace!(log_type = "starting", category = "UpdateStateWorker", "UpdateStateWorker started."); - let latest_successful_job = - config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?; + let latest_job = config.database().get_latest_job_by_type(JobType::StateTransition).await?; - match latest_successful_job { + let (completed_da_jobs, last_block_processed_in_last_job) = match latest_job { Some(job) => { - tracing::debug!(job_id = %job.id, "Found latest successful state transition job"); - let successful_da_jobs_without_successor = config - .database() - .get_jobs_without_successor(JobType::DataSubmission, JobStatus::Completed, JobType::StateTransition) - .await?; - - if successful_da_jobs_without_successor.is_empty() { - tracing::debug!("No new data submission jobs to process"); + if job.status != JobStatus::Completed { + log::warn!( + "There's already a pending update state job. Parallel jobs can cause nonce issues or can \ + completely fail as the update logic needs to be strictly ordered. Returning safely..." + ); return Ok(()); } - tracing::debug!( - count = successful_da_jobs_without_successor.len(), - "Found data submission jobs without state transition" - ); - - let mut metadata = job.metadata; - let blocks_to_settle = - Self::parse_job_items_into_block_number_list(successful_da_jobs_without_successor.clone()); - metadata.insert(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), blocks_to_settle.clone()); - - tracing::trace!(blocks_to_settle = %blocks_to_settle, "Prepared blocks to settle for state transition"); - - // Creating a single job for all the pending blocks. - let new_job_id = successful_da_jobs_without_successor[0].internal_id.clone(); - match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config).await { - Ok(_) => tracing::info!(job_id = %new_job_id, "Successfully created new state transition job"), - Err(e) => { - tracing::error!(job_id = %new_job_id, error = %e, "Failed to create new state transition job"); - return Err(e.into()); - } - } - - tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); - Ok(()) + let mut blocks_processed_in_last_job: Vec = job + .metadata + .get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY) + .unwrap() + .split(',') + .filter_map(|s| s.parse().ok()) + .collect(); + + // ideally it's already sorted, but just to be safe + blocks_processed_in_last_job.sort(); + + let last_block_processed_in_last_job = + blocks_processed_in_last_job[blocks_processed_in_last_job.len() - 1]; + + ( + config + .database() + .get_jobs_after_internal_id_by_job_type( + JobType::DataSubmission, + JobStatus::Completed, + last_block_processed_in_last_job.to_string(), + ) + .await?, + Some(last_block_processed_in_last_job), + ) } None => { tracing::warn!("No previous state transition job found, fetching latest data submission job"); // Getting latest DA job in case no latest state update job is present - let latest_successful_jobs_without_successor = config - .database() - .get_jobs_without_successor(JobType::DataSubmission, JobStatus::Completed, JobType::StateTransition) - .await?; - - if latest_successful_jobs_without_successor.is_empty() { - tracing::debug!("No data submission jobs found to process"); - return Ok(()); - } - - let job = latest_successful_jobs_without_successor[0].clone(); - let mut metadata = job.metadata; + ( + config + .database() + .get_jobs_without_successor( + JobType::DataSubmission, + JobStatus::Completed, + JobType::StateTransition, + ) + .await?, + None, + ) + } + }; - let blocks_to_settle = - Self::parse_job_items_into_block_number_list(latest_successful_jobs_without_successor.clone()); - metadata.insert(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), blocks_to_settle.clone()); + let mut blocks_to_process: Vec = + completed_da_jobs.iter().map(|j| j.internal_id.parse::().unwrap()).collect(); + blocks_to_process.sort(); - tracing::trace!(job_id = %job.id, blocks_to_settle = %blocks_to_settle, "Prepared blocks to settle for initial state transition"); + // no DA jobs completed after the last settled block + if blocks_to_process.is_empty() { + log::warn!("No DA jobs completed after the last settled block. Returning safely..."); + return Ok(()); + } - match create_job(JobType::StateTransition, job.internal_id.clone(), metadata, config).await { - Ok(_) => tracing::info!(job_id = %job.id, "Successfully created initial state transition job"), - Err(e) => { - tracing::error!(job_id = %job.id, error = %e, "Failed to create initial state transition job"); - return Err(e.into()); - } + match last_block_processed_in_last_job { + Some(last_block_processed_in_last_job) => { + // DA job for the block just after the last settled block + // is not yet completed + if blocks_to_process[0] != last_block_processed_in_last_job + 1 { + log::warn!( + "DA job for the block just after the last settled block is not yet completed. Returning \ + safely..." + ); + return Ok(()); + } + } + None => { + if blocks_to_process[0] != 0 { + log::warn!("DA job for the first block is not yet completed. Returning safely..."); + return Ok(()); } + } + } - tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); - Ok(()) + let blocks_to_process: Vec = find_successive_blocks_in_vector(blocks_to_process); + + let mut metadata = HashMap::new(); + metadata.insert( + JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), + blocks_to_process.iter().map(|ele| ele.to_string()).collect::>().join(","), + ); + + // Creating a single job for all the pending blocks. + let new_job_id = blocks_to_process[0].to_string(); + match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config.clone()).await { + Ok(_) => tracing::info!(job_id = %new_job_id, "Successfully created new state transition job"), + Err(e) => { + tracing::error!(job_id = %new_job_id, error = %e, "Failed to create new state transition job"); + return Err(e.into()); } } + + tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); + Ok(()) } } -impl UpdateStateWorker { - /// To parse the block numbers from the vector of jobs. - pub fn parse_job_items_into_block_number_list(job_items: Vec) -> String { - job_items.iter().map(|j| j.internal_id.clone()).collect::>().join(",") - } +/// Gets the successive list of blocks from all the blocks processed in previous jobs +/// Eg : input_vec : [1,2,3,4,7,8,9,11] +/// We will take the first 4 block numbers and send it for processing +pub fn find_successive_blocks_in_vector(block_numbers: Vec) -> Vec { + block_numbers + .iter() + .enumerate() + .take_while(|(index, block_number)| **block_number == (block_numbers[0] + *index as u64)) + .map(|(_, block_number)| *block_number) + .collect() } #[cfg(test)] mod test_update_state_worker_utils { - use chrono::{SubsecRound, Utc}; use rstest::rstest; - use uuid::Uuid; - - use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; - use crate::workers::update_state::UpdateStateWorker; #[rstest] - fn test_parse_job_items_into_block_number_list() { - let mut job_vec = Vec::new(); - for i in 0..3 { - job_vec.push(JobItem { - id: Uuid::new_v4(), - internal_id: i.to_string(), - job_type: JobType::ProofCreation, - status: JobStatus::Completed, - external_id: ExternalId::Number(0), - metadata: Default::default(), - version: 0, - created_at: Utc::now().round_subsecs(0), - updated_at: Utc::now().round_subsecs(0), - }); - } - - let block_string = UpdateStateWorker::parse_job_items_into_block_number_list(job_vec); - assert_eq!(block_string, String::from("0,1,2")); + #[case(vec![], vec![])] + #[case(vec![1], vec![1])] + #[case(vec![1, 2, 3, 4, 5], vec![1, 2, 3, 4, 5])] + #[case(vec![1, 2, 3, 4, 7, 8, 9, 11], vec![1, 2, 3, 4])] + #[case(vec![1, 3, 5, 7, 9], vec![1])] + fn test_find_successive_blocks(#[case] input: Vec, #[case] expected: Vec) { + let result = super::find_successive_blocks_in_vector(input); + assert_eq!(result, expected); } } diff --git a/crates/settlement-clients/ethereum/src/conversion.rs b/crates/settlement-clients/ethereum/src/conversion.rs index 7f22eb96..607fbd66 100644 --- a/crates/settlement-clients/ethereum/src/conversion.rs +++ b/crates/settlement-clients/ethereum/src/conversion.rs @@ -276,7 +276,6 @@ mod tests { // Blob Data let blob_data_file_path = format!("{}{}{}{}", current_path.clone(), "/src/test_data/blob_data/", fork_block_no, ".txt"); - println!("{}", blob_data_file_path); let blob_data = fs::read_to_string(blob_data_file_path).expect("Failed to read the blob data txt file"); // Blob Commitment diff --git a/crates/settlement-clients/ethereum/src/lib.rs b/crates/settlement-clients/ethereum/src/lib.rs index 1994c90c..2c325f3a 100644 --- a/crates/settlement-clients/ethereum/src/lib.rs +++ b/crates/settlement-clients/ethereum/src/lib.rs @@ -5,13 +5,15 @@ use std::sync::Arc; use alloy::consensus::{ BlobTransactionSidecar, SignableTransaction, TxEip4844, TxEip4844Variant, TxEip4844WithSidecar, TxEnvelope, }; +#[cfg(not(feature = "testing"))] +use alloy::eips::eip2718::Encodable2718; use alloy::eips::eip2930::AccessList; use alloy::eips::eip4844::BYTES_PER_BLOB; use alloy::hex; use alloy::network::EthereumWallet; use alloy::primitives::{Address, B256, U256}; use alloy::providers::{PendingTransactionConfig, Provider, ProviderBuilder}; -use alloy::rpc::types::{TransactionReceipt, TransactionRequest}; +use alloy::rpc::types::TransactionReceipt; use alloy::signers::local::PrivateKeySigner; use alloy_primitives::Bytes; use async_trait::async_trait; @@ -180,7 +182,7 @@ impl SettlementClient for EthereumSettlementClient { &self, program_output: Vec<[u8; 32]>, state_diff: Vec>, - nonce: u64, + _nonce: u64, ) -> Result { tracing::info!( log_type = "starting", @@ -194,10 +196,7 @@ impl SettlementClient for EthereumSettlementClient { let eip1559_est = self.provider.estimate_eip1559_fees(None).await?; let chain_id: u64 = self.provider.get_chain_id().await?.to_string().parse()?; - let mut max_fee_per_blob_gas: u128 = self.provider.get_blob_base_fee().await?.to_string().parse()?; - // TODO: need to send more than current gas price. - max_fee_per_blob_gas += 12; - let max_priority_fee_per_gas: u128 = self.provider.get_max_priority_fee_per_gas().await?.to_string().parse()?; + let max_fee_per_blob_gas: u128 = self.provider.get_blob_base_fee().await?.to_string().parse()?; // x_0_value : program_output[10] // Updated with starknet 0.13.2 spec @@ -210,17 +209,27 @@ impl SettlementClient for EthereumSettlementClient { let input_bytes = get_input_data_for_eip_4844(program_output, kzg_proof)?; + let nonce = self.provider.get_transaction_count(self.wallet_address).await?.to_string().parse()?; + + // add a safety margin to the gas price to handle fluctuations + let add_safety_margin = |n: u128, div_factor: u128| n + n / div_factor; + + let max_fee_per_gas: u128 = eip1559_est.max_fee_per_gas.to_string().parse()?; + let max_priority_fee_per_gas: u128 = eip1559_est.max_priority_fee_per_gas.to_string().parse()?; + let tx: TxEip4844 = TxEip4844 { chain_id, nonce, - gas_limit: 30_000_000, - max_fee_per_gas: eip1559_est.max_fee_per_gas.to_string().parse()?, - max_priority_fee_per_gas, + // we noticed Starknet uses the same limit on mainnet + // https://etherscan.io/tx/0x8a58b936faaefb63ee1371991337ae3b99d74cb3504d73868615bf21fa2f25a1 + gas_limit: 5_500_000, + max_fee_per_gas: add_safety_margin(max_fee_per_gas, 5), + max_priority_fee_per_gas: add_safety_margin(max_priority_fee_per_gas, 5), to: self.core_contract_client.contract_address(), value: U256::from(0), access_list: AccessList(vec![]), blob_versioned_hashes: sidecar.versioned_hashes().collect(), - max_fee_per_blob_gas, + max_fee_per_blob_gas: add_safety_margin(max_fee_per_blob_gas, 5), input: Bytes::from(hex::decode(input_bytes)?), }; @@ -231,24 +240,34 @@ impl SettlementClient for EthereumSettlementClient { let tx_signed = variant.into_signed(signature); let tx_envelope: TxEnvelope = tx_signed.into(); - #[cfg(not(feature = "testing"))] - let txn_request = { - let txn_request: TransactionRequest = tx_envelope.clone().into(); - txn_request + #[cfg(feature = "testing")] + let pending_transaction = { + let txn_request = { + test_config::configure_transaction(self.provider.clone(), tx_envelope, self.impersonate_account).await + }; + self.provider.send_transaction(txn_request).await? }; - #[cfg(feature = "testing")] - let txn_request = - { test_config::configure_transaction(self.provider.clone(), tx_envelope, self.impersonate_account).await }; + #[cfg(not(feature = "testing"))] + let pending_transaction = { + let encoded = tx_envelope.encoded_2718(); + self.provider.send_raw_transaction(encoded.as_slice()).await? + }; - let pending_transaction = self.provider.send_transaction(txn_request).await?; tracing::info!( log_type = "completed", category = "update_state", function_type = "blobs", "State updated with blobs." ); - return Ok(pending_transaction.tx_hash().to_string()); + + log::warn!("⏳ Waiting for txn finality......."); + + // Prod feature only (may cause issues while testing with anvil) + let txn_hash = pending_transaction.tx_hash().to_string(); + self.wait_for_tx_finality(&txn_hash).await?; + + Ok(txn_hash) } /// Should verify the inclusion of a tx in the settlement layer @@ -300,7 +319,9 @@ impl SettlementClient for EthereumSettlementClient { /// Wait for a pending tx to achieve finality async fn wait_for_tx_finality(&self, tx_hash: &str) -> Result<()> { let tx_hash = B256::from_str(tx_hash)?; - self.provider.watch_pending_transaction(PendingTransactionConfig::new(tx_hash)).await?; + self.provider + .watch_pending_transaction(PendingTransactionConfig::new(tx_hash).with_required_confirmations(1)) + .await?; Ok(()) } @@ -319,9 +340,11 @@ impl SettlementClient for EthereumSettlementClient { #[cfg(feature = "testing")] mod test_config { use alloy::network::TransactionBuilder; + use alloy::rpc::types::TransactionRequest; use super::*; + #[allow(dead_code)] pub async fn configure_transaction( provider: Arc>>, tx_envelope: TxEnvelope, diff --git a/crates/settlement-clients/ethereum/src/tests/mod.rs b/crates/settlement-clients/ethereum/src/tests/mod.rs index 373e8ac0..72d6e97a 100644 --- a/crates/settlement-clients/ethereum/src/tests/mod.rs +++ b/crates/settlement-clients/ethereum/src/tests/mod.rs @@ -71,6 +71,8 @@ pub struct EthereumTest { pub rpc_url: Url, } +const BLOCK_TIME: u64 = 6; + #[allow(clippy::new_without_default)] impl EthereumTestBuilder { pub fn new() -> Self { @@ -93,10 +95,13 @@ impl EthereumTestBuilder { // Setup Anvil let anvil = match self.fork_block { - Some(fork_block) => { - Anvil::new().fork(&*ETH_RPC).fork_block_number(fork_block).try_spawn().expect("Could not spawn Anvil.") - } - None => Anvil::new().try_spawn().expect("Could not spawn Anvil."), + Some(fork_block) => Anvil::new() + .fork(&*ETH_RPC) + .fork_block_number(fork_block) + .block_time(BLOCK_TIME) + .try_spawn() + .expect("Could not spawn Anvil."), + None => Anvil::new().block_time(BLOCK_TIME).try_spawn().expect("Could not spawn Anvil."), }; // Setup Provider @@ -130,7 +135,7 @@ mod settlement_client_tests { use settlement_client_interface::{SettlementClient, SettlementVerificationStatus}; use tokio::time::sleep; - use super::ENV_FILE_PATH; + use super::{BLOCK_TIME, ENV_FILE_PATH}; use crate::conversion::to_padded_hex; use crate::tests::{ DummyCoreContract, EthereumTestBuilder, Pipe, CURRENT_PATH, STARKNET_CORE_CONTRACT, @@ -187,7 +192,7 @@ mod settlement_client_tests { assert_eq!(txn.to.unwrap(), *contract.address()); // Testing verify_tx_inclusion - sleep(Duration::from_secs(2)).await; + sleep(Duration::from_secs(BLOCK_TIME + 2)).await; ethereum_settlement_client .wait_for_tx_finality(update_state_result.as_str()) .await diff --git a/e2e-tests/src/localstack.rs b/e2e-tests/src/localstack.rs index 5252900c..c93d6418 100644 --- a/e2e-tests/src/localstack.rs +++ b/e2e-tests/src/localstack.rs @@ -1,9 +1,11 @@ use std::collections::HashMap; use std::sync::Arc; +use aws_config::environment::EnvironmentVariableCredentialsProvider; use aws_config::meta::region::RegionProviderChain; -use aws_config::Region; +use aws_config::{from_env, Region}; use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target}; +use aws_sdk_s3::config::ProvideCredentials; use aws_sdk_sqs::types::QueueAttributeName; use aws_sdk_sqs::types::QueueAttributeName::VisibilityTimeout; use orchestrator::config::ProviderConfig; @@ -29,7 +31,9 @@ pub struct LocalStack { impl LocalStack { pub async fn new() -> Self { let region_provider = Region::new(get_env_var_or_panic("AWS_REGION")); - let config = aws_config::from_env().region(region_provider).load().await; + + let creds = EnvironmentVariableCredentialsProvider::new().provide_credentials().await.unwrap(); + let config = from_env().region(region_provider).credentials_provider(creds).load().await; let provider_config = Arc::new(ProviderConfig::AWS(Box::from(config.clone()))); Self { @@ -58,7 +62,7 @@ impl LocalStack { // Creating SQS queues let mut queue_attributes = HashMap::new(); - queue_attributes.insert(VisibilityTimeout, "1".into()); + queue_attributes.insert(VisibilityTimeout, "10000".into()); let queue_names = vec![ DATA_SUBMISSION_JOB_PROCESSING_QUEUE, diff --git a/e2e-tests/tests.rs b/e2e-tests/tests.rs index 5e3630ed..2c8448da 100644 --- a/e2e-tests/tests.rs +++ b/e2e-tests/tests.rs @@ -13,7 +13,7 @@ use e2e_tests::utils::{get_mongo_db_client, read_state_update_from_file, vec_u8_ use e2e_tests::{MongoDbServer, Orchestrator}; use mongodb::bson::doc; use orchestrator::data_storage::DataStorage; -use orchestrator::jobs::constants::JOB_METADATA_SNOS_BLOCK; +use orchestrator::jobs::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY}; use orchestrator::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; use orchestrator::queue::job_queue::{JobQueueMessage, WorkerTriggerType}; use rstest::rstest; @@ -397,7 +397,10 @@ pub async fn put_job_data_in_db_update_state(mongo_db: &MongoDbServer, l2_block_ job_type: JobType::StateTransition, status: JobStatus::Completed, external_id: ExternalId::Number(0), - metadata: HashMap::new(), + metadata: HashMap::from([( + JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY.to_string(), + (l2_block_number.parse::().unwrap() - 1).to_string(), + )]), version: 0, created_at: Utc::now().round_subsecs(0), updated_at: Utc::now().round_subsecs(0),