Skip to content

Commit

Permalink
move txn manager out of batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Apr 26, 2024
1 parent 8f24e8a commit 939bf42
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 78 deletions.
File renamed without changes.
426 changes: 426 additions & 0 deletions common/txn_manager.go

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions common/txn_manager_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package common

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type TxnManagerMetrics struct {
Latency *prometheus.SummaryVec
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
NumTx *prometheus.CounterVec
}

func NewTxnManagerMetrics(namespace string, reg *prometheus.Registry) *TxnManagerMetrics {
return &TxnManagerMetrics{
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
SpeedUps: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "speed_ups",
Help: "number of times the gas price was increased",
},
),
TxQueue: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "tx_queue",
Help: "number of transactions in transaction queue",
},
),
NumTx: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "tx_total",
Help: "number of transactions processed",
},
[]string{"state"},
),
}
}

func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
t.Latency.WithLabelValues(stage).Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
t.GasUsed.Set(float64(gasUsed))
}

func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) {
t.SpeedUps.Set(float64(speedUps))
}

func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
t.TxQueue.Set(float64(txQueue))
}

func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package batcher_test
package common_test

import (
"context"
Expand Down
5 changes: 2 additions & 3 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
bat "github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/batcher/mock"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
"github.com/Layr-Labs/eigenda/disperser/common/inmem"
dmock "github.com/Layr-Labs/eigenda/disperser/mock"
Expand All @@ -38,7 +37,7 @@ var (

type batcherComponents struct {
transactor *coremock.MockTransactor
txnManager *batchermock.MockTxnManager
txnManager *cmock.MockTxnManager
blobStore disperser.BlobStore
encoderClient *disperser.LocalEncoderClient
encodingStreamer *bat.EncodingStreamer
Expand Down Expand Up @@ -121,7 +120,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
encoderClient := disperser.NewLocalEncoderClient(p)
finalizer := batchermock.NewFinalizer()
ethClient := &cmock.MockEthClient{}
txnManager := mock.NewTxnManager()
txnManager := cmock.NewTxnManager()

b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
assert.NoError(t, err)
Expand Down
74 changes: 4 additions & 70 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -35,14 +36,6 @@ type EncodingStreamerMetrics struct {
EncodedBlobs *prometheus.GaugeVec
}

type TxnManagerMetrics struct {
Latency *prometheus.SummaryVec
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
NumTx *prometheus.CounterVec
}

type FinalizerMetrics struct {
NumBlobs *prometheus.CounterVec
LastSeenFinalizedBlock prometheus.Gauge
Expand All @@ -55,7 +48,7 @@ type DispatcherMetrics struct {

type Metrics struct {
*EncodingStreamerMetrics
*TxnManagerMetrics
*common.TxnManagerMetrics
*FinalizerMetrics
*DispatcherMetrics

Expand Down Expand Up @@ -88,46 +81,7 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
),
}

txnManagerMetrics := TxnManagerMetrics{
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
SpeedUps: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "speed_ups",
Help: "number of times the gas price was increased",
},
),
TxQueue: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "tx_queue",
Help: "number of transactions in transaction queue",
},
),
NumTx: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "tx_total",
Help: "number of transactions processed",
},
[]string{"state"},
),
}
txnManagerMetrics := common.NewTxnManagerMetrics(namespace, reg)

finalizerMetrics := FinalizerMetrics{
NumBlobs: promauto.With(reg).NewCounterVec(
Expand Down Expand Up @@ -170,7 +124,7 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {

metrics := &Metrics{
EncodingStreamerMetrics: &encodingStreamerMetrics,
TxnManagerMetrics: &txnManagerMetrics,
TxnManagerMetrics: txnManagerMetrics,
FinalizerMetrics: &finalizerMetrics,
DispatcherMetrics: &dispatcherMatrics,
Blob: promauto.With(reg).NewCounterVec(
Expand Down Expand Up @@ -314,26 +268,6 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
e.EncodedBlobs.WithLabelValues("number").Set(float64(count))
}

func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
t.Latency.WithLabelValues(stage).Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
t.GasUsed.Set(float64(gasUsed))
}

func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) {
t.SpeedUps.Set(float64(speedUps))
}

func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
t.TxQueue.Set(float64(txQueue))
}

func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}

func (f *FinalizerMetrics) IncrementNumBlobs(state string) {
f.NumBlobs.WithLabelValues(state).Inc()
}
Expand Down
4 changes: 2 additions & 2 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ type txnManager struct {
queueSize int
txnBroadcastTimeout time.Duration
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
metrics *common.TxnManagerMetrics
}

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *common.TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
Expand Down
4 changes: 2 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type TestDisperser struct {
server *apiserver.DispersalServer
encoderServer *encoder.Server
transactor *coremock.MockTransactor
txnManager *batchermock.MockTxnManager
txnManager *commonmock.MockTxnManager
}

func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger logging.Logger) TestDisperser {
Expand Down Expand Up @@ -169,7 +169,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
finalizer := batchermock.NewFinalizer()

disperserMetrics := disperser.NewMetrics("9100", logger)
txnManager := batchermock.NewTxnManager()
txnManager := commonmock.NewTxnManager()

batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)
if err != nil {
Expand Down

0 comments on commit 939bf42

Please sign in to comment.