From 75c34a88c14b61f14265b2973c4de0fa9c9ca570 Mon Sep 17 00:00:00 2001
From: Marcos Nicolau <76252340+MarcosNicolau@users.noreply.github.com>
Date: Tue, 10 Dec 2024 11:19:13 -0300
Subject: [PATCH 1/5] refactor: move `createTask` simulation before s3 upload
(#1567)
Co-authored-by: Uriel Mihura <43704209+uri-99@users.noreply.github.com>
---
batcher/aligned-batcher/src/lib.rs | 90 +++++++++++--------
.../src/retry/batcher_retryables.rs | 8 +-
2 files changed, 59 insertions(+), 39 deletions(-)
diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs
index b760d9c82..40438c8d2 100644
--- a/batcher/aligned-batcher/src/lib.rs
+++ b/batcher/aligned-batcher/src/lib.rs
@@ -1428,28 +1428,12 @@ impl Batcher {
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
let file_name = batch_merkle_root_hex.clone() + ".json";
-
- info!("Uploading batch to S3...");
- self.upload_batch_to_s3(batch_bytes, &file_name).await?;
-
- if let Err(e) = self
- .telemetry
- .task_uploaded_to_s3(&batch_merkle_root_hex)
- .await
- {
- warn!("Failed to send task status to telemetry: {:?}", e);
- };
- info!("Batch sent to S3 with name: {}", file_name);
-
- info!("Uploading batch to contract");
let batch_data_pointer: String = "".to_owned() + &self.download_endpoint + "/" + &file_name;
let num_proofs_in_batch = leaves.len();
-
let gas_per_proof = (CONSTANT_GAS_COST
+ ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * num_proofs_in_batch as u128)
/ num_proofs_in_batch as u128;
-
let fee_per_proof = U256::from(gas_per_proof) * gas_price;
let fee_for_aggregator = (U256::from(AGGREGATOR_GAS_COST)
* gas_price
@@ -1465,12 +1449,31 @@ impl Batcher {
respond_to_task_fee_limit,
);
- let proof_submitters = finalized_batch.iter().map(|entry| entry.sender).collect();
+ let proof_submitters: Vec
=
+ finalized_batch.iter().map(|entry| entry.sender).collect();
+
+ self.simulate_create_new_task(
+ *batch_merkle_root,
+ batch_data_pointer.clone(),
+ proof_submitters.clone(),
+ fee_params.clone(),
+ )
+ .await?;
self.metrics
.gas_price_used_on_latest_batch
.set(gas_price.as_u64() as i64);
+ info!("Uploading batch to S3...");
+ self.upload_batch_to_s3(batch_bytes, &file_name).await?;
+ if let Err(e) = self
+ .telemetry
+ .task_uploaded_to_s3(&batch_merkle_root_hex)
+ .await
+ {
+ warn!("Failed to send task status to telemetry: {:?}", e);
+ };
+ info!("Batch sent to S3 with name: {}", file_name);
if let Err(e) = self
.telemetry
.task_created(
@@ -1483,6 +1486,7 @@ impl Batcher {
warn!("Failed to send task status to telemetry: {:?}", e);
};
+ info!("Submitting batch to contract");
match self
.create_new_task(
*batch_merkle_root,
@@ -1520,27 +1524,6 @@ impl Batcher {
proof_submitters: Vec,
fee_params: CreateNewTaskFeeParams,
) -> Result {
- // First, we simulate the tx
- retry_function(
- || {
- simulate_create_new_task_retryable(
- batch_merkle_root,
- batch_data_pointer.clone(),
- proof_submitters.clone(),
- fee_params.clone(),
- &self.payment_service,
- &self.payment_service_fallback,
- )
- },
- ETHEREUM_CALL_MIN_RETRY_DELAY,
- ETHEREUM_CALL_BACKOFF_FACTOR,
- ETHEREUM_CALL_MAX_RETRIES,
- ETHEREUM_CALL_MAX_RETRY_DELAY,
- )
- .await
- .map_err(|e| e.inner())?;
-
- // Then, we send the real tx
let result = retry_function(
|| {
create_new_task_retryable(
@@ -1579,6 +1562,37 @@ impl Batcher {
}
}
+ /// Simulates the `create_new_task` transaction by sending an `eth_call` to the RPC node.
+ /// This function does not mutate the state but verifies if it will revert under the given conditions.
+ async fn simulate_create_new_task(
+ &self,
+ batch_merkle_root: [u8; 32],
+ batch_data_pointer: String,
+ proof_submitters: Vec,
+ fee_params: CreateNewTaskFeeParams,
+ ) -> Result<(), BatcherError> {
+ retry_function(
+ || {
+ simulate_create_new_task_retryable(
+ batch_merkle_root,
+ batch_data_pointer.clone(),
+ proof_submitters.clone(),
+ fee_params.clone(),
+ &self.payment_service,
+ &self.payment_service_fallback,
+ )
+ },
+ ETHEREUM_CALL_MIN_RETRY_DELAY,
+ ETHEREUM_CALL_BACKOFF_FACTOR,
+ ETHEREUM_CALL_MAX_RETRIES,
+ ETHEREUM_CALL_MAX_RETRY_DELAY,
+ )
+ .await
+ .map_err(|e| e.inner())?;
+
+ Ok(())
+ }
+
/// Sends a transaction to Ethereum with the same nonce as the previous one to override it.
/// Retries on recoverable errors with exponential backoff.
/// Bumps the fee if not included in 6 blocks, using `calculate_bumped_gas_price`.
diff --git a/batcher/aligned-batcher/src/retry/batcher_retryables.rs b/batcher/aligned-batcher/src/retry/batcher_retryables.rs
index 677698dd3..d5f1aaef0 100644
--- a/batcher/aligned-batcher/src/retry/batcher_retryables.rs
+++ b/batcher/aligned-batcher/src/retry/batcher_retryables.rs
@@ -197,7 +197,13 @@ pub async fn simulate_create_new_task_retryable(
.gas_price(fee_params.gas_price);
// sends an `eth_call` request to the node
match simulation.call().await {
- Ok(_) => Ok(()),
+ Ok(_) => {
+ info!(
+ "Simulation task for: 0x{} succeeded.",
+ hex::encode(batch_merkle_root)
+ );
+ Ok(())
+ }
Err(ContractError::Revert(err)) => {
// Since transaction was reverted, we don't want to retry with fallback.
warn!("Simulated transaction reverted {:?}", err);
From a4e8e312f414165d18d7bd3d0eed2a0e340e5ec5 Mon Sep 17 00:00:00 2001
From: Julian Ventura <43799596+JulianVentura@users.noreply.github.com>
Date: Tue, 10 Dec 2024 12:38:47 -0300
Subject: [PATCH 2/5] feat: add aggregator quorum reached and task responded
latency gauges (#1565)
Co-authored-by: Julian Ventura
---
aggregator/pkg/aggregator.go | 17 ++
.../aligned/aggregator_batcher.json | 230 +++++++++++++++++-
metrics/metrics.go | 20 ++
3 files changed, 261 insertions(+), 6 deletions(-)
diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go
index 467b3b52b..daffcb04b 100644
--- a/aggregator/pkg/aggregator.go
+++ b/aggregator/pkg/aggregator.go
@@ -67,6 +67,9 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData
+ // Stores the start time for each batch of the aggregator by task index
+ batchStartTimeByIdx map[uint32]time.Time
+
// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
@@ -78,6 +81,7 @@ type Aggregator struct {
// - batchCreatedBlockByIdx
// - batchDataByIdentifierHash
// - nextBatchIndex
+ // - batchStartTimeByIdx
taskMutex *sync.Mutex
// Mutex to protect ethereum wallet
@@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
+ batchStartTimeByIdx := make(map[uint32]time.Time)
chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
@@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
+ batchStartTimeByIdx: batchStartTimeByIdx,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
@@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
+ taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")
@@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)
+ // Only observe quorum reached if successful
+ agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))
+
agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
@@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
}
+
+ startTime := time.Now()
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
batchMerkleRoot,
@@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
return nil, err
}
+ // We only send the latency metric if the response is successul
+ agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))
+
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))
@@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
+ agg.batchStartTimeByIdx[batchIndex] = time.Now()
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
@@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
+ delete(agg.batchStartTimeByIdx, i)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
diff --git a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
index 929ff29ea..52de76921 100644
--- a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
+++ b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
@@ -18,7 +18,7 @@
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
- "id": 4,
+ "id": 2,
"links": [],
"liveNow": false,
"panels": [
@@ -153,7 +153,6 @@
},
{
"datasource": {
- "default": true,
"type": "prometheus",
"uid": "prometheus"
},
@@ -2451,7 +2450,32 @@
]
}
},
- "overrides": []
+ "overrides": [
+ {
+ "__systemRef": "hideSeriesFrom",
+ "matcher": {
+ "id": "byNames",
+ "options": {
+ "mode": "exclude",
+ "names": [
+ "{bot=\"aggregator\", instance=\"host.docker.internal:9091\", job=\"aligned-aggregator\"}"
+ ],
+ "prefix": "All except:",
+ "readOnly": true
+ }
+ },
+ "properties": [
+ {
+ "id": "custom.hideFrom",
+ "value": {
+ "legend": false,
+ "tooltip": false,
+ "viz": true
+ }
+ }
+ ]
+ }
+ ]
},
"gridPos": {
"h": 7,
@@ -2625,9 +2649,203 @@
}
],
"type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "description": "",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "s"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 61
+ },
+ "id": 43,
+ "interval": "1s",
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "right",
+ "showLegend": false
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "editorMode": "code",
+ "expr": "aligned_aggregator_respond_to_task_latency{bot=\"aggregator\"}",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "Latest latency",
+ "range": true,
+ "refId": "Latency"
+ }
+ ],
+ "title": "Latest respond to task latency",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 69
+ },
+ "id": 44,
+ "interval": "1s",
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "right",
+ "showLegend": false
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "editorMode": "code",
+ "expr": "aligned_aggregator_task_quorum_reached_latency{bot=\"aggregator\"}",
+ "hide": false,
+ "instant": false,
+ "legendFormat": "Latest latency",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "Latest quorum reached latency",
+ "type": "timeseries"
}
],
- "refresh": "5s",
+ "refresh": "",
"schemaVersion": 38,
"style": "dark",
"tags": [],
@@ -2642,6 +2860,6 @@
"timezone": "browser",
"title": "System Data",
"uid": "aggregator",
- "version": 9,
+ "version": 19,
"weekStart": ""
-}
\ No newline at end of file
+}
diff --git a/metrics/metrics.go b/metrics/metrics.go
index dda2f7a04..ba0c187fc 100644
--- a/metrics/metrics.go
+++ b/metrics/metrics.go
@@ -21,6 +21,8 @@ type Metrics struct {
aggregatorGasCostPaidForBatcherTotal prometheus.Gauge
aggregatorNumTimesPaidForBatcher prometheus.Counter
numBumpedGasPriceForAggregatedResponse prometheus.Counter
+ aggregatorRespondToTaskLatency prometheus.Gauge
+ aggregatorTaskQuorumReachedLatency prometheus.Gauge
}
const alignedNamespace = "aligned"
@@ -59,6 +61,16 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging.
Name: "respond_to_task_gas_price_bumped_count",
Help: "Number of times gas price was bumped while sending aggregated response",
}),
+ aggregatorRespondToTaskLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
+ Namespace: alignedNamespace,
+ Name: "aggregator_respond_to_task_latency",
+ Help: "Latency of last call to respondToTask on Aligned Service Manager",
+ }),
+ aggregatorTaskQuorumReachedLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
+ Namespace: alignedNamespace,
+ Name: "aggregator_task_quorum_reached_latency",
+ Help: "Time it takes for a task to reach quorum",
+ }),
}
}
@@ -116,3 +128,11 @@ func (m *Metrics) AddAggregatorGasPaidForBatcher(value float64) {
func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() {
m.numBumpedGasPriceForAggregatedResponse.Inc()
}
+
+func (m *Metrics) ObserveLatencyForRespondToTask(elapsed time.Duration) {
+ m.aggregatorRespondToTaskLatency.Set(elapsed.Seconds())
+}
+
+func (m *Metrics) ObserveTaskQuorumReached(elapsed time.Duration) {
+ m.aggregatorTaskQuorumReachedLatency.Set(elapsed.Seconds())
+}
From 5b6aca11925de1b9db97585ad8a3932ff0c4940c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Avila=20Gast=C3=B3n?=
<72628438+avilagaston9@users.noreply.github.com>
Date: Tue, 10 Dec 2024 15:50:33 -0300
Subject: [PATCH 3/5] feat: add latency metrics to batcher (#1578)
Co-authored-by: Marcos Nicolau
---
batcher/aligned-batcher/src/lib.rs | 31 ++-
batcher/aligned-batcher/src/metrics.rs | 18 ++
.../aligned/aggregator_batcher.json | 262 +++++++++++++++++-
3 files changed, 298 insertions(+), 13 deletions(-)
diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs
index 40438c8d2..e5588d4e3 100644
--- a/batcher/aligned-batcher/src/lib.rs
+++ b/batcher/aligned-batcher/src/lib.rs
@@ -12,7 +12,7 @@ use retry::batcher_retryables::{
user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
-use tokio::time::timeout;
+use tokio::time::{timeout, Instant};
use types::batch_state::BatchState;
use types::user_state::UserState;
@@ -1524,6 +1524,7 @@ impl Batcher {
proof_submitters: Vec,
fee_params: CreateNewTaskFeeParams,
) -> Result {
+ let start = Instant::now();
let result = retry_function(
|| {
create_new_task_retryable(
@@ -1542,6 +1543,11 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await;
+ self.metrics
+ .create_new_task_duration
+ .set(start.elapsed().as_millis() as i64);
+ // Set to zero since it is not always executed
+ self.metrics.cancel_create_new_task_duration.set(0);
match result {
Ok(receipt) => {
if let Err(e) = self
@@ -1600,10 +1606,11 @@ impl Batcher {
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
info!("Cancelling createNewTask transaction...");
+ let start = Instant::now();
let iteration = Arc::new(Mutex::new(0));
let previous_gas_price = Arc::new(Mutex::new(old_tx_gas_price));
- if let Err(e) = retry_function(
+ match retry_function(
|| async {
let mut iteration = iteration.lock().await;
let mut previous_gas_price = previous_gas_price.lock().await;
@@ -1639,11 +1646,12 @@ impl Batcher {
)
.await
{
- error!("Could not cancel createNewTask transaction: {e}");
- return;
+ Ok(_) => info!("createNewTask transaction successfully canceled"),
+ Err(e) => error!("Could not cancel createNewTask transaction: {e}"),
};
-
- info!("createNewTask transaction successfully canceled");
+ self.metrics
+ .cancel_create_new_task_duration
+ .set(start.elapsed().as_millis() as i64);
}
/// Only relevant for testing and for users to easily use Aligned
@@ -1785,7 +1793,8 @@ impl Batcher {
batch_bytes: &[u8],
file_name: &str,
) -> Result<(), BatcherError> {
- retry_function(
+ let start = Instant::now();
+ let result = retry_function(
|| {
Self::upload_batch_to_s3_retryable(
batch_bytes,
@@ -1800,7 +1809,13 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
- .map_err(|e| BatcherError::BatchUploadError(e.to_string()))
+ .map_err(|e| BatcherError::BatchUploadError(e.to_string()));
+
+ self.metrics
+ .s3_duration
+ .set(start.elapsed().as_micros() as i64);
+
+ result
}
async fn upload_batch_to_s3_retryable(
diff --git a/batcher/aligned-batcher/src/metrics.rs b/batcher/aligned-batcher/src/metrics.rs
index a7c6f26e3..dccab58f3 100644
--- a/batcher/aligned-batcher/src/metrics.rs
+++ b/batcher/aligned-batcher/src/metrics.rs
@@ -19,6 +19,9 @@ pub struct BatcherMetrics {
pub batcher_started: IntCounter,
pub gas_price_used_on_latest_batch: IntGauge,
pub broken_ws_connections: IntCounter,
+ pub s3_duration: IntGauge,
+ pub create_new_task_duration: IntGauge,
+ pub cancel_create_new_task_duration: IntGauge,
}
impl BatcherMetrics {
@@ -46,6 +49,15 @@ impl BatcherMetrics {
"broken_ws_connections_count",
"Broken websocket connections"
))?;
+ let s3_duration = register_int_gauge!(opts!("s3_duration", "S3 Duration"))?;
+ let create_new_task_duration = register_int_gauge!(opts!(
+ "create_new_task_duration",
+ "Create New Task Duration"
+ ))?;
+ let cancel_create_new_task_duration = register_int_gauge!(opts!(
+ "cancel_create_new_task_duration",
+ "Cancel create New Task Duration"
+ ))?;
registry.register(Box::new(open_connections.clone()))?;
registry.register(Box::new(received_proofs.clone()))?;
@@ -56,6 +68,9 @@ impl BatcherMetrics {
registry.register(Box::new(gas_price_used_on_latest_batch.clone()))?;
registry.register(Box::new(batcher_started.clone()))?;
registry.register(Box::new(broken_ws_connections.clone()))?;
+ registry.register(Box::new(s3_duration.clone()))?;
+ registry.register(Box::new(create_new_task_duration.clone()))?;
+ registry.register(Box::new(cancel_create_new_task_duration.clone()))?;
let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
@@ -77,6 +92,9 @@ impl BatcherMetrics {
batcher_started,
gas_price_used_on_latest_batch,
broken_ws_connections,
+ s3_duration,
+ create_new_task_duration,
+ cancel_create_new_task_duration,
})
}
diff --git a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
index 52de76921..71167098a 100644
--- a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
+++ b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
@@ -2650,6 +2650,126 @@
],
"type": "timeseries"
},
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "gridPos": {
+ "h": 2,
+ "w": 24,
+ "x": 0,
+ "y": 69
+ },
+ "id": 46,
+ "options": {
+ "code": {
+ "language": "plaintext",
+ "showLineNumbers": false,
+ "showMiniMap": false
+ },
+ "content": "\n Latency\n
",
+ "mode": "html"
+ },
+ "pluginVersion": "10.1.10",
+ "transparent": true,
+ "type": "text"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 48,
+ "gradientMode": "opacity",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "smooth",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 71
+ },
+ "id": 47,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": false
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "expr": "s3_duration * 10 ^ (-3)",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "legendFormat": "__auto",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Upload Batch to S3 Duration",
+ "type": "timeseries"
+ },
{
"datasource": {
"type": "prometheus",
@@ -2714,7 +2834,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 61
+ "y": 71
},
"id": 43,
"interval": "1s",
@@ -2748,6 +2868,138 @@
"title": "Latest respond to task latency",
"type": "timeseries"
},
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 48,
+ "gradientMode": "opacity",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "smooth",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "s"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 79
+ },
+ "id": 45,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "right",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "expr": "cancel_create_new_task_duration * 10 ^ (-3)",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "legendFormat": "cancel_new_task",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "expr": "create_new_task_duration * 10 ^(-3)",
+ "fullMetaSearch": false,
+ "hide": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "legendFormat": "create_new_task",
+ "range": true,
+ "refId": "B",
+ "useBackend": false
+ }
+ ],
+ "title": "CreateNewTask Duration",
+ "transformations": [
+ {
+ "id": "calculateField",
+ "options": {
+ "alias": "total",
+ "mode": "reduceRow",
+ "reduce": {
+ "include": [
+ "cancel_new_task",
+ "create_new_task"
+ ],
+ "reducer": "sum"
+ }
+ }
+ }
+ ],
+ "type": "timeseries"
+ },
{
"datasource": {
"type": "prometheus",
@@ -2810,7 +3062,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 69
+ "y": 79
},
"id": 44,
"interval": "1s",
@@ -2853,13 +3105,13 @@
"list": []
},
"time": {
- "from": "now-30m",
+ "from": "now-5m",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "System Data",
"uid": "aggregator",
- "version": 19,
+ "version": 7,
"weekStart": ""
-}
+}
\ No newline at end of file
From 010e93d2c95d51a6b9f87ebbe382a45171fa9d62 Mon Sep 17 00:00:00 2001
From: Julian Ventura <43799596+JulianVentura@users.noreply.github.com>
Date: Tue, 10 Dec 2024 18:21:35 -0300
Subject: [PATCH 4/5] feat: aggregator total gas paid metric (#1592)
Co-authored-by: Julian Ventura
---
core/chainio/avs_writer.go | 15 ++--
.../aligned/aggregator_batcher.json | 73 ++++++++++++++++++-
metrics/metrics.go | 10 +++
3 files changed, 91 insertions(+), 7 deletions(-)
diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go
index cd379ee23..050fa8b3b 100644
--- a/core/chainio/avs_writer.go
+++ b/core/chainio/avs_writer.go
@@ -148,7 +148,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
if receipt == nil {
receipt, _ = w.ClientFallback.TransactionReceipt(context.Background(), tx.Hash())
if receipt != nil {
- w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash)
+ w.updateAggregatorGasCostMetrics(tx, batchIdentifierHash)
return receipt, nil
}
}
@@ -183,7 +183,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString)
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), retry.WaitForTxRetryParams(timeToWaitBeforeBump))
if receipt != nil {
- w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash)
+ w.updateAggregatorGasCostMetrics(realTx, batchIdentifierHash)
return receipt, nil
}
@@ -204,10 +204,10 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
return retry.RetryWithData(respondToTaskV2Func, retry.RespondToTaskV2())
}
-// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
-// if the tx cost was higher, then it means the aggregator has paid the difference for the batcher (txCost - respondToTaskFeeLimit) and so metrics are updated accordingly.
-// otherwise nothing is done.
-func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction, batchIdentifierHash [32]byte) {
+// Calculates the transaction cost from the receipt and updates the total amount paid by the aggregator metric
+// Then, it compares that tx cost with the batcher respondToTaskFeeLimit.
+// If the tx cost was higher, it means the aggregator has paid the difference for the batcher (txCost - respondToTaskFeeLimit) and so metrics are updated accordingly.
+func (w *AvsWriter) updateAggregatorGasCostMetrics(tx *types.Transaction, batchIdentifierHash [32]byte) {
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
if err != nil {
return
@@ -217,6 +217,9 @@ func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction,
// NOTE we are not using tx.Cost() because tx.Cost() includes tx.Value()
txCost := new(big.Int).Mul(big.NewInt(int64(tx.Gas())), tx.GasPrice())
+ txCostInEth := utils.WeiToEth(txCost)
+ w.metrics.AddAggregatorGasCostPaidTotal(txCostInEth)
+
if respondToTaskFeeLimit.Cmp(txCost) < 0 {
aggregatorDifferencePaid := new(big.Int).Sub(txCost, respondToTaskFeeLimit)
aggregatorDifferencePaidInEth := utils.WeiToEth(aggregatorDifferencePaid)
diff --git a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
index 71167098a..e11a694a3 100644
--- a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
+++ b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json
@@ -788,6 +788,77 @@
"title": "Accumulated Aggregator Extra Cost Paid [ETH]",
"type": "stat"
},
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "description": "Accumulated gas cost in ETH the Aggregator paid when sending RespondToTask transactions.",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "thresholds"
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ }
+ ]
+ },
+ "unit": "ETH"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 3,
+ "w": 7,
+ "x": 7,
+ "y": 14
+ },
+ "id": 48,
+ "options": {
+ "colorMode": "value",
+ "graphMode": "none",
+ "justifyMode": "auto",
+ "orientation": "auto",
+ "percentChangeColorMode": "standard",
+ "reduceOptions": {
+ "calcs": [
+ "lastNotNull"
+ ],
+ "fields": "",
+ "values": false
+ },
+ "showPercentChange": false,
+ "textMode": "auto",
+ "wideLayout": true
+ },
+ "pluginVersion": "10.1.10",
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "prometheus"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "expr": "increase(aligned_aggregator_gas_cost_paid_total_count{bot=\"aggregator\"}[10y])",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "legendFormat": "__auto",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Accumulated Aggregator Gas Cost Paid [ETH]",
+ "type": "stat"
+ },
{
"datasource": {
"type": "prometheus",
@@ -3114,4 +3185,4 @@
"uid": "aggregator",
"version": 7,
"weekStart": ""
-}
\ No newline at end of file
+}
diff --git a/metrics/metrics.go b/metrics/metrics.go
index ba0c187fc..743e7862d 100644
--- a/metrics/metrics.go
+++ b/metrics/metrics.go
@@ -21,6 +21,7 @@ type Metrics struct {
aggregatorGasCostPaidForBatcherTotal prometheus.Gauge
aggregatorNumTimesPaidForBatcher prometheus.Counter
numBumpedGasPriceForAggregatedResponse prometheus.Counter
+ aggregatorGasCostPaidTotal prometheus.Counter
aggregatorRespondToTaskLatency prometheus.Gauge
aggregatorTaskQuorumReachedLatency prometheus.Gauge
}
@@ -56,6 +57,11 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging.
Name: "aggregator_num_times_paid_for_batcher_count",
Help: "Number of times the aggregator paid for the batcher when the tx cost was higher than the respondToTaskFeeLimit",
}),
+ aggregatorGasCostPaidTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
+ Namespace: alignedNamespace,
+ Name: "aggregator_gas_cost_paid_total_count",
+ Help: "Total amount of gas paid by the aggregator while responding to tasks",
+ }),
numBumpedGasPriceForAggregatedResponse: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: alignedNamespace,
Name: "respond_to_task_gas_price_bumped_count",
@@ -125,6 +131,10 @@ func (m *Metrics) AddAggregatorGasPaidForBatcher(value float64) {
m.aggregatorGasCostPaidForBatcherTotal.Add(value)
}
+func (m *Metrics) AddAggregatorGasCostPaidTotal(value float64) {
+ m.aggregatorGasCostPaidTotal.Add(value)
+}
+
func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() {
m.numBumpedGasPriceForAggregatedResponse.Inc()
}
From 062eeb9f7991fa84c2941e0ad70ba3226ad1ab40 Mon Sep 17 00:00:00 2001
From: Uriel Mihura <43704209+uri-99@users.noreply.github.com>
Date: Fri, 13 Dec 2024 14:09:10 -0300
Subject: [PATCH 5/5] fix: zkquiz proof submitions (#1597)
---
batcher/aligned-sdk/src/core/constants.rs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/batcher/aligned-sdk/src/core/constants.rs b/batcher/aligned-sdk/src/core/constants.rs
index d45189000..b7a253f7a 100644
--- a/batcher/aligned-sdk/src/core/constants.rs
+++ b/batcher/aligned-sdk/src/core/constants.rs
@@ -8,7 +8,7 @@ pub const CONSTANT_GAS_COST: u128 =
+ BATCHER_SUBMISSION_BASE_GAS_COST;
pub const DEFAULT_MAX_FEE_PER_PROOF: u128 =
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000_000; // gas_price = 100 Gwei = 0.0000001 ether (high gas price)
-pub const CONNECTION_TIMEOUT: u64 = 5; // 5 secs
+pub const CONNECTION_TIMEOUT: u64 = 30; // 30 secs
// % modifiers: (100% is x1, 10% is x0.1, 1000% is x10)
pub const RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER: u128 = 250; // fee_for_aggregator -> respondToTaskFeeLimit modifier