Skip to content

Commit

Permalink
Introduce shared context for all head tracker callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Aug 25, 2020
1 parent 81dd2fc commit 0312bc1
Show file tree
Hide file tree
Showing 20 changed files with 172 additions and 137 deletions.
4 changes: 3 additions & 1 deletion core/internal/cltest/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,9 @@ func (m *MockHeadTrackable) DisconnectedCount() int32 {
}

// OnNewLongestChain increases the OnNewLongestChainCount count by one
func (m *MockHeadTrackable) OnNewLongestChain(models.Head) { atomic.AddInt32(&m.onNewHeadCount, 1) }
func (m *MockHeadTrackable) OnNewLongestChain(context.Context, models.Head) {
atomic.AddInt32(&m.onNewHeadCount, 1)
}

// OnNewLongestChainCount returns the count of new heads, safely.
func (m *MockHeadTrackable) OnNewLongestChainCount() int32 {
Expand Down
16 changes: 9 additions & 7 deletions core/internal/mocks/head_trackable.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions core/internal/mocks/job_subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions core/internal/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/services/balance_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewBalanceMonitor(store *store.Store) BalanceMonitor {
// Connect complies with HeadTrackable
func (bm *balanceMonitor) Connect(_ *models.Head) error {
// Connect head can be out of date, so always query the latest balance
bm.checkBalance(nil)
bm.checkBalance(context.TODO(), nil)
return nil
}

Expand All @@ -49,11 +49,11 @@ func (bm *balanceMonitor) Disconnect() {}
const ethFetchTimeout = 2 * time.Second

// OnNewLongestChain checks the balance for each key
func (bm *balanceMonitor) OnNewLongestChain(head models.Head) {
bm.checkBalance(&head)
func (bm *balanceMonitor) OnNewLongestChain(ctx context.Context, head models.Head) {
bm.checkBalance(ctx, &head)
}

func (bm *balanceMonitor) checkBalance(head *models.Head) {
func (bm *balanceMonitor) checkBalance(parentCtx context.Context, head *models.Head) {
keys, err := bm.store.Keys()
if err != nil {
logger.Error("BalanceMonitor: error getting keys", err)
Expand All @@ -67,7 +67,7 @@ func (bm *balanceMonitor) checkBalance(head *models.Head) {
go func(k models.Key) {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), ethFetchTimeout)
ctx, cancel := context.WithTimeout(parentCtx, ethFetchTimeout)
defer cancel()

var headNum *big.Int
Expand Down
13 changes: 7 additions & 6 deletions core/services/balance_monitor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package services_test

import (
"context"
"math/big"
"testing"

Expand Down Expand Up @@ -132,7 +133,7 @@ func TestBalanceMonitor_OnNewLongestChain_UpdatesBalance(t *testing.T) {
gethClient.On("BalanceAt", mock.Anything, k1Addr, big.NewInt(head.Number)).Once().Return(k1bal, nil)

// Do the thing
bm.OnNewLongestChain(*head)
bm.OnNewLongestChain(context.TODO(), *head)

assert.Equal(t, k0bal, bm.GetEthBalance(k0Addr).ToInt())
assert.Equal(t, k1bal, bm.GetEthBalance(k1Addr).ToInt())
Expand All @@ -146,7 +147,7 @@ func TestBalanceMonitor_OnNewLongestChain_UpdatesBalance(t *testing.T) {
gethClient.On("BalanceAt", mock.Anything, k0Addr, big.NewInt(head.Number)).Once().Return(k0bal2, nil)
gethClient.On("BalanceAt", mock.Anything, k1Addr, big.NewInt(head.Number)).Once().Return(k1bal2, nil)

bm.OnNewLongestChain(*head)
bm.OnNewLongestChain(context.TODO(), *head)

assert.Equal(t, k0bal2, bm.GetEthBalance(k0Addr).ToInt())
assert.Equal(t, k1bal2, bm.GetEthBalance(k1Addr).ToInt())
Expand All @@ -169,27 +170,27 @@ func TestBalanceMonitor_OnNewLongestChain_UpdatesBalance(t *testing.T) {
k0bal := big.NewInt(42)
gethClient.On("BalanceAt", mock.Anything, k0Addr, big.NewInt(0)).Once().Return(k0bal, nil)
head := cltest.Head(0)
bm.OnNewLongestChain(*head)
bm.OnNewLongestChain(context.TODO(), *head)

assert.Equal(t, k0bal, bm.GetEthBalance(k0Addr).ToInt())

// If lagged head would be negative, just uses 0
k0bal = big.NewInt(43)
gethClient.On("BalanceAt", mock.Anything, k0Addr, big.NewInt(0)).Once().Return(k0bal, nil)
head = cltest.Head(1)
bm.OnNewLongestChain(*head)
bm.OnNewLongestChain(context.TODO(), *head)

// If lagged head is exactly 0, uses 0
k0bal = big.NewInt(44)
gethClient.On("BalanceAt", mock.Anything, k0Addr, big.NewInt(0)).Once().Return(k0bal, nil)
head = cltest.Head(2)
bm.OnNewLongestChain(*head)
bm.OnNewLongestChain(context.TODO(), *head)

// If lagged head is positive, uses it
k0bal = big.NewInt(44)
gethClient.On("BalanceAt", mock.Anything, k0Addr, big.NewInt(1)).Once().Return(k0bal, nil)
head = cltest.Head(3)
bm.OnNewLongestChain(*head)
bm.OnNewLongestChain(context.TODO(), *head)

gethClient.AssertExpectations(t)
})
Expand Down
6 changes: 3 additions & 3 deletions core/services/bulletprooftxmanager/bulletprooftxmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
const (
// maxEthNodeRequestTime is the worst case time we will wait for a response
// from the eth node before we consider it to be an error
maxEthNodeRequestTime = 2 * time.Minute
maxEthNodeRequestTime = 15 * time.Second
)

// SendEther creates a transaction that transfers the given value of ether
Expand Down Expand Up @@ -85,13 +85,13 @@ func signTx(keyStore strpkg.KeyStoreInterface, account gethAccounts.Account, tx

// send broadcasts the transaction to the ethereum network, writes any relevant
// data onto the attempt and returns an error (or nil) depending on the status
func sendTransaction(ethClient eth.Client, a models.EthTxAttempt) *sendError {
func sendTransaction(ctx context.Context, ethClient eth.Client, a models.EthTxAttempt) *sendError {
signedTx, err := a.GetSignedTx()
if err != nil {
return FatalSendError(err)
}

ctx, cancel := context.WithTimeout(context.Background(), maxEthNodeRequestTime)
ctx, cancel := context.WithTimeout(ctx, maxEthNodeRequestTime)
defer cancel()
err = ethClient.SendTransaction(ctx, signedTx)
err = errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion core/services/bulletprooftxmanager/eth_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ func (eb *ethBroadcaster) handleInProgressEthTx(etx models.EthTx, attempt models
broadcastAt = etx.CreatedAt
}

sendError := sendTransaction(eb.ethClient, attempt)
ctx, cancel := context.WithTimeout(context.Background(), maxEthNodeRequestTime)
defer cancel()
sendError := sendTransaction(ctx, eb.ethClient, attempt)

if sendError.Fatal() {
etx.Error = sendError.StrPtr()
Expand Down
Loading

0 comments on commit 0312bc1

Please sign in to comment.