Skip to content

Commit

Permalink
move txn manager out of batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Apr 26, 2024
1 parent 8f24e8a commit c044a7a
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/common"
"github.com/stretchr/testify/mock"
)

type MockTxnManager struct {
mock.Mock

Requests []*batcher.TxnRequest
Requests []*common.TxnRequest
}

var _ batcher.TxnManager = (*MockTxnManager)(nil)
var _ common.TxnManager = (*MockTxnManager)(nil)

func NewTxnManager() *MockTxnManager {
return &MockTxnManager{}
}

func (b *MockTxnManager) Start(ctx context.Context) {}

func (b *MockTxnManager) ProcessTransaction(ctx context.Context, req *batcher.TxnRequest) error {
func (b *MockTxnManager) ProcessTransaction(ctx context.Context, req *common.TxnRequest) error {
args := b.Called()
b.Requests = append(b.Requests, req)
return args.Error(0)
}

func (b *MockTxnManager) ReceiptChan() chan *batcher.ReceiptOrErr {
func (b *MockTxnManager) ReceiptChan() chan *common.ReceiptOrErr {
args := b.Called()
return args.Get(0).(chan *batcher.ReceiptOrErr)
return args.Get(0).(chan *common.ReceiptOrErr)
}
7 changes: 3 additions & 4 deletions disperser/batcher/txn_manager.go → common/txn_manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package batcher
package common

import (
"context"
Expand All @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/Layr-Labs/eigenda/common"
walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -67,7 +66,7 @@ type ReceiptOrErr struct {
type txnManager struct {
mu sync.Mutex

ethClient common.EthClient
ethClient EthClient
wallet walletsdk.Wallet
numConfirmations int
requestChan chan *TxnRequest
Expand All @@ -82,7 +81,7 @@ type txnManager struct {

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
func NewTxnManager(ethClient EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
wallet: wallet,
Expand Down
77 changes: 77 additions & 0 deletions common/txn_manager_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package common

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type TxnManagerMetrics struct {
Latency *prometheus.SummaryVec
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
NumTx *prometheus.CounterVec
}

func NewTxnManagerMetrics(namespace string, reg *prometheus.Registry) *TxnManagerMetrics {
return &TxnManagerMetrics{
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
SpeedUps: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "speed_ups",
Help: "number of times the gas price was increased",
},
),
TxQueue: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "tx_queue",
Help: "number of transactions in transaction queue",
},
),
NumTx: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "tx_total",
Help: "number of transactions processed",
},
[]string{"state"},
),
}
}

func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
t.Latency.WithLabelValues(stage).Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
t.GasUsed.Set(float64(gasUsed))
}

func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) {
t.SpeedUps.Set(float64(speedUps))
}

func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
t.TxQueue.Set(float64(txQueue))
}

func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package batcher_test
package common_test

import (
"context"
Expand All @@ -7,6 +7,7 @@ import (
"testing"
"time"

dacommon "github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/disperser/batcher"
sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks"
Expand All @@ -24,7 +25,7 @@ func TestProcessTransaction(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -40,7 +41,7 @@ func TestProcessTransaction(t *testing.T) {
}, nil).Times(2),
)

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -56,7 +57,7 @@ func TestProcessTransaction(t *testing.T) {
w.EXPECT().GetTransactionReceipt(gomock.Any(), gomock.Any()).Return(nil, randomErr).AnyTimes()
w.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return("", randomErr).AnyTimes()

err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err = txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -74,7 +75,7 @@ func TestReplaceGasFee(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -93,7 +94,7 @@ func TestReplaceGasFee(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -110,7 +111,7 @@ func TestTransactionReplacementFailure(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -126,7 +127,7 @@ func TestTransactionReplacementFailure(t *testing.T) {
w.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return(badTxID, nil)
w.EXPECT().GetTransactionReceipt(gomock.Any(), badTxID).Return(nil, errors.New("blah")).AnyTimes()

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -143,7 +144,7 @@ func TestSendTransactionReceiptRetry(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -161,7 +162,7 @@ func TestSendTransactionReceiptRetry(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -181,7 +182,7 @@ func TestSendTransactionRetrySuccess(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -203,7 +204,7 @@ func TestSendTransactionRetrySuccess(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -223,7 +224,7 @@ func TestSendTransactionRetryFailure(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -241,7 +242,7 @@ func TestSendTransactionRetryFailure(t *testing.T) {
// assume that the transaction is not mined within the timeout
w.EXPECT().GetTransactionReceipt(gomock.Any(), txID).Return(nil, walletsdk.ErrReceiptNotYetAvailable).AnyTimes()

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
Expand All @@ -261,7 +262,7 @@ func TestTransactionNotBroadcasted(t *testing.T) {
w := sdkmock.NewMockWallet(ctrl)
logger := logging.NewNoopLogger()
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 48*time.Second, logger, metrics.TxnManagerMetrics)
txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -275,14 +276,14 @@ func TestTransactionNotBroadcasted(t *testing.T) {
// assume that the transaction does not get broadcasted to the network
w.EXPECT().GetTransactionReceipt(gomock.Any(), txID).Return(nil, walletsdk.ErrNotYetBroadcasted).AnyTimes()

err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
res := <-txnManager.ReceiptChan()
assert.ErrorAs(t, res.Err, &batcher.ErrTransactionNotBroadcasted)
assert.ErrorAs(t, res.Err, &dacommon.ErrTransactionNotBroadcasted)
assert.Nil(t, res.Receipt)
}
8 changes: 4 additions & 4 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Batcher struct {
Aggregator core.SignatureAggregator
EncodingStreamer *EncodingStreamer
Transactor core.Transactor
TransactionManager TxnManager
TransactionManager common.TxnManager
Metrics *Metrics
HeartbeatChan chan time.Time

Expand All @@ -101,7 +101,7 @@ func NewBatcher(
ethClient common.EthClient,
finalizer Finalizer,
transactor core.Transactor,
txnManager TxnManager,
txnManager common.TxnManager,
logger logging.Logger,
metrics *Metrics,
heartbeatChan chan time.Time,
Expand Down Expand Up @@ -319,7 +319,7 @@ func (b *Batcher) updateConfirmationInfo(
return blobsToRetry, nil
}

func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *ReceiptOrErr) error {
func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *common.ReceiptOrErr) error {
if receiptOrErr.Metadata == nil {
return errors.New("failed to process confirmed batch: no metadata from transaction manager response")
}
Expand Down Expand Up @@ -492,7 +492,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("HandleSingleBatch: error building confirmBatch transaction: %w", err)
}
err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{
err = b.TransactionManager.ProcessTransaction(ctx, common.NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{
batchHeader: batch.BatchHeader,
blobs: batch.BlobMetadata,
blobHeaders: batch.BlobHeaders,
Expand Down
Loading

0 comments on commit c044a7a

Please sign in to comment.