Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add aggregator quorum reached and task responded latency gauges #1565

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores the start time for each batch of the aggregator by task index
batchStartTimeByIdx map[uint32]time.Time

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand All @@ -78,6 +81,7 @@ type Aggregator struct {
// - batchCreatedBlockByIdx
// - batchDataByIdentifierHash
// - nextBatchIndex
// - batchStartTimeByIdx
taskMutex *sync.Mutex

// Mutex to protect ethereum wallet
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
batchStartTimeByIdx := make(map[uint32]time.Time)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
batchStartTimeByIdx: batchStartTimeByIdx,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")

Expand Down Expand Up @@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)

// Only observe quorum reached if successful
agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))

agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
}

startTime := time.Now()
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
batchMerkleRoot,
Expand All @@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
return nil, err
}

// We only send the latency metric if the response is successul
agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.batchStartTimeByIdx[batchIndex] = time.Now()
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
Expand Down Expand Up @@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
delete(agg.batchStartTimeByIdx, i)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
Expand Down
230 changes: 224 additions & 6 deletions grafana/provisioning/dashboards/aligned/aggregator_batcher.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 4,
"id": 2,
"links": [],
"liveNow": false,
"panels": [
Expand Down Expand Up @@ -153,7 +153,6 @@
},
{
"datasource": {
"default": true,
"type": "prometheus",
"uid": "prometheus"
},
Expand Down Expand Up @@ -2451,7 +2450,32 @@
]
}
},
"overrides": []
"overrides": [
{
"__systemRef": "hideSeriesFrom",
"matcher": {
"id": "byNames",
"options": {
"mode": "exclude",
"names": [
"{bot=\"aggregator\", instance=\"host.docker.internal:9091\", job=\"aligned-aggregator\"}"
],
"prefix": "All except:",
"readOnly": true
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": false,
"tooltip": false,
"viz": true
}
}
]
}
]
},
"gridPos": {
"h": 7,
Expand Down Expand Up @@ -2625,9 +2649,203 @@
}
],
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 61
},
"id": 43,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "right",
"showLegend": false
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"editorMode": "code",
"expr": "aligned_aggregator_respond_to_task_latency{bot=\"aggregator\"}",
"hide": false,
"instant": false,
"legendFormat": "Latest latency",
"range": true,
"refId": "Latency"
}
],
"title": "Latest respond to task latency",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 69
},
"id": 44,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "right",
"showLegend": false
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"editorMode": "code",
"expr": "aligned_aggregator_task_quorum_reached_latency{bot=\"aggregator\"}",
"hide": false,
"instant": false,
"legendFormat": "Latest latency",
"range": true,
"refId": "A"
}
],
"title": "Latest quorum reached latency",
"type": "timeseries"
}
],
"refresh": "5s",
"refresh": "",
"schemaVersion": 38,
"style": "dark",
"tags": [],
Expand All @@ -2642,6 +2860,6 @@
"timezone": "browser",
"title": "System Data",
"uid": "aggregator",
"version": 9,
"version": 19,
"weekStart": ""
}
}
20 changes: 20 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Metrics struct {
aggregatorGasCostPaidForBatcherTotal prometheus.Gauge
aggregatorNumTimesPaidForBatcher prometheus.Counter
numBumpedGasPriceForAggregatedResponse prometheus.Counter
aggregatorRespondToTaskLatency prometheus.Gauge
aggregatorTaskQuorumReachedLatency prometheus.Gauge
}

const alignedNamespace = "aligned"
Expand Down Expand Up @@ -59,6 +61,16 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging.
Name: "respond_to_task_gas_price_bumped_count",
Help: "Number of times gas price was bumped while sending aggregated response",
}),
aggregatorRespondToTaskLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: alignedNamespace,
Name: "aggregator_respond_to_task_latency",
Help: "Latency of last call to respondToTask on Aligned Service Manager",
}),
aggregatorTaskQuorumReachedLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: alignedNamespace,
Name: "aggregator_task_quorum_reached_latency",
Help: "Time it takes for a task to reach quorum",
}),
}
}

Expand Down Expand Up @@ -116,3 +128,11 @@ func (m *Metrics) AddAggregatorGasPaidForBatcher(value float64) {
func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() {
m.numBumpedGasPriceForAggregatedResponse.Inc()
}

func (m *Metrics) ObserveLatencyForRespondToTask(elapsed time.Duration) {
m.aggregatorRespondToTaskLatency.Set(elapsed.Seconds())
}

func (m *Metrics) ObserveTaskQuorumReached(elapsed time.Duration) {
m.aggregatorTaskQuorumReachedLatency.Set(elapsed.Seconds())
}
Loading