Skip to content

Commit

Permalink
Merge pull request #2485 from smartcontractkit/release/0.7.8
Browse files Browse the repository at this point in the history
Release/0.7.8
  • Loading branch information
samsondav authored Mar 18, 2020
2 parents c02a0f5 + 2491629 commit 7ad8e51
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 123 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ workflows:
- develop
- /^release\/.*/
- master
- /^hotfix\/.*/
- go-sqlite:
filters: # all branches, and /^v../ tags for build-publish...
tags:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.7
0.7.8
48 changes: 3 additions & 45 deletions core/adapters/eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package adapters
import (
"encoding/json"
"fmt"
"net"
"regexp"

"chainlink/core/eth"
"chainlink/core/logger"
"chainlink/core/store"
strpkg "chainlink/core/store"
"chainlink/core/store/models"
"chainlink/core/utils"
Expand Down Expand Up @@ -92,10 +89,8 @@ func createTxRunResult(
gasPrice.ToInt(),
gasLimit,
)
if IsClientRetriable(err) {
return pendingConfirmationsOrConnection(input)
} else if err != nil {
return models.NewRunOutputError(err)
if err != nil {
return models.NewRunOutputPendingConfirmationsWithData(models.JSON{})
}

output, err := models.JSON{}.Add("result", tx.Hash.String())
Expand All @@ -105,12 +100,8 @@ func createTxRunResult(

txAttempt := tx.Attempts[0]
receipt, state, err := store.TxManager.CheckAttempt(txAttempt, tx.SentAt)
if IsClientRetriable(err) {
return models.NewRunOutputPendingConnectionWithData(output)
} else if IsClientEmptyError(err) {
if err != nil {
return models.NewRunOutputPendingConfirmationsWithData(output)
} else if err != nil {
return models.NewRunOutputError(err)
}

logger.Debugw(
Expand Down Expand Up @@ -138,12 +129,6 @@ func ensureTxRunResult(input models.RunInput, str *strpkg.Store) models.RunOutpu
hash := common.HexToHash(val)
receipt, state, err := str.TxManager.BumpGasUntilSafe(hash)
if err != nil {
if IsClientEmptyError(err) {
return models.NewRunOutputPendingConfirmations()
} else if state == strpkg.Unknown {
return models.NewRunOutputError(err)
}

// We failed to get one of the TxAttempt receipts, so we won't mark this
// run as errored in order to try again
logger.Warn("EthTx Adapter Perform Resuming: ", err)
Expand Down Expand Up @@ -209,33 +194,6 @@ func addReceiptToResult(
return models.NewRunOutputComplete(data)
}

// IsClientRetriable does its best effort to see if an error indicates one that
// might have a different outcome if we retried the operation
func IsClientRetriable(err error) bool {
if err == nil {
return false
}

if err, ok := err.(net.Error); ok {
return err.Timeout() || err.Temporary()
} else if errors.Cause(err) == store.ErrPendingConnection {
return true
}

return false
}

var (
parityEmptyResponseRegex = regexp.MustCompile("Error cause was EmptyResponse")
)

// Parity light clients can return an EmptyResponse error when they don't have
// access to the transaction in the mempool. If we wait long enough it should
// eventually return a transaction receipt.
func IsClientEmptyError(err error) bool {
return err != nil && parityEmptyResponseRegex.MatchString(err.Error())
}

func pendingConfirmationsOrConnection(input models.RunInput) models.RunOutput {
// If the input is not pending confirmations next time
// then it may submit a new transaction.
Expand Down
36 changes: 6 additions & 30 deletions core/adapters/eth_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestEthTxAdapter_Perform_WithError(t *testing.T) {
adapter := adapters.EthTx{}
input := cltest.NewRunInputWithResult("0x9786856756")
output := adapter.Perform(input, store)
assert.EqualError(t, output.Error(), "Cannot connect to node")
assert.NoError(t, output.Error())

txManager.AssertExpectations(t)
}
Expand All @@ -269,8 +269,8 @@ func TestEthTxAdapter_Perform_PendingConfirmations_WithFatalErrorInTxManager(t *
)
output := adapter.Perform(input, store)

assert.Equal(t, models.RunStatusErrored, output.Status())
assert.NotNil(t, output.Error())
assert.Equal(t, models.RunStatusPendingConfirmations, output.Status())
assert.NoError(t, output.Error())

txManager.AssertExpectations(t)
}
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestEthTxAdapter_Perform_CreateTxWithGasErrorTreatsAsNotConnected(t *testin
data := adapter.Perform(models.RunInput{}, store)

require.NoError(t, data.Error())
assert.Equal(t, models.RunStatusPendingConnection, data.Status())
assert.Equal(t, models.RunStatusPendingConfirmations, data.Status())

txManager.AssertExpectations(t)
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestEthTxAdapter_Perform_CheckAttemptErrorTreatsAsNotConnected(t *testing.T
data := adapter.Perform(models.RunInput{}, store)

require.NoError(t, data.Error())
assert.Equal(t, models.RunStatusPendingConnection, data.Status())
assert.Equal(t, models.RunStatusPendingConfirmations, data.Status())

txManager.AssertExpectations(t)
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) {
adapter := adapters.EthTx{}
input := cltest.NewRunInputWithResult("0x9786856756")
result := adapter.Perform(input, store)
require.Error(t, result.Error())
require.NoError(t, result.Error())

txAttempt := &models.TxAttempt{}
tx := &models.Tx{Attempts: []*models.TxAttempt{txAttempt}}
Expand All @@ -477,27 +477,3 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) {

txManager.AssertExpectations(t)
}

func TestEthTxAdapter_IsClientRetriable(t *testing.T) {
t.Parallel()

tests := []struct {
name string
error error
retriable bool
}{
{"nil error", nil, false},
{"http invalid method", errors.New("net/http: invalid method SGET"), false},
{"syscall.ECONNRESET", syscall.ECONNRESET, false},
{"syscall.ECONNABORTED", syscall.ECONNABORTED, false},
{"syscall.EWOULDBLOCK", syscall.EWOULDBLOCK, true},
{"syscall.ETIMEDOUT", syscall.ETIMEDOUT, true},
}

for _, tt := range tests {
test := tt
t.Run(test.name, func(t *testing.T) {
require.Equal(t, test.retriable, adapters.IsClientRetriable(test.error))
})
}
}
2 changes: 1 addition & 1 deletion core/adapters/no_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ type NoOpPend struct{}
// Perform on this adapter type returns an empty RunResult with an
// added field for the status to indicate the task is Pending.
func (noa *NoOpPend) Perform(_ models.RunInput, _ *store.Store) models.RunOutput {
return models.NewRunOutputPendingConfirmations()
return models.NewRunOutputPendingConfirmationsWithData(models.JSON{})
}
27 changes: 27 additions & 0 deletions core/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,33 @@ func NewApp(client *Client) *cli.App {
Usage: "Run the chainlink node",
Action: client.RunNode,
},
{
Name: "rebroadcast-transactions",
Usage: "manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue",
Action: client.RebroadcastTransactions,
Flags: []cli.Flag{
cli.Uint64Flag{
Name: "beginningNonce",
Usage: "beginning of nonce range to rebroadcast",
},
cli.Uint64Flag{
Name: "endingNonce",
Usage: "end of nonce range to rebroadcast (inclusive)",
},
cli.Uint64Flag{
Name: "gasPriceWei",
Usage: "gas price (in Wei) to rebroadcast transactions at",
},
cli.StringFlag{
Name: "password, p",
Usage: "text file holding the password for the node's account",
},
cli.Uint64Flag{
Name: "gasLimit",
Usage: "OPTIONAL: gas limit to use for each transaction ",
},
},
},
},
},

Expand Down
96 changes: 96 additions & 0 deletions core/cmd/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"strings"

Expand Down Expand Up @@ -138,6 +139,101 @@ func logConfigVariables(store *strpkg.Store) error {
return nil
}

// RebroadcastTransactions run locally to force manual rebroadcasting of
// transactions in a given nonce range. This MUST NOT be run concurrently with
// the node. Currently the advisory lock in FindAllTxsInNonceRange prevents
// this.
func (cli *Client) RebroadcastTransactions(c *clipkg.Context) error {
beginningNonce := c.Uint("beginningNonce")
endingNonce := c.Uint("endingNonce")
gasPriceWei := c.Uint64("gasPriceWei")
overrideGasLimit := c.Uint64("gasLimit")

logger.SetLogger(cli.Config.CreateProductionLogger())
app := cli.AppFactory.NewApplication(cli.Config)
defer app.Stop()

store := app.GetStore()

pwd, err := passwordFromFile(c.String("password"))
if err != nil {
return cli.errorOut(fmt.Errorf("error reading password: %+v", err))
}
_, err = cli.KeyStoreAuthenticator.Authenticate(store, pwd)
if err != nil {
return cli.errorOut(fmt.Errorf("error authenticating keystore: %+v", err))
}

err = store.Start()
if err != nil {
return err
}

lastHead, err := store.LastHead()
if err != nil {
return err
}
err = store.TxManager.Connect(lastHead)
if err != nil {
return err
}

transactions, err := store.FindAllTxsInNonceRange(beginningNonce, endingNonce)
if err != nil {
return err
}
n := len(transactions)
for i, tx := range transactions {
var gasLimit uint64
if overrideGasLimit == 0 {
gasLimit = tx.GasLimit
} else {
gasLimit = overrideGasLimit
}
logger.Infow("Rebroadcasting transaction", "idx", i, "of", n, "nonce", tx.Nonce, "id", tx.ID)

gasPrice := big.NewInt(int64(gasPriceWei))
rawTx, err := store.TxManager.SignedRawTxWithBumpedGas(tx, gasLimit, *gasPrice)
if err != nil {
logger.Error(err)
continue
}

hash, err := store.TxManager.SendRawTx(rawTx)
if err != nil {
logger.Error(err)
continue
}

logger.Infow("Sent transaction", "idx", i, "of", n, "nonce", tx.Nonce, "id", tx.ID, "hash", hash)

jobRunID, err := models.NewIDFromString(tx.SurrogateID.ValueOrZero())
if err != nil {
logger.Errorw("could not get UUID from surrogate ID", "SurrogateID", tx.SurrogateID.ValueOrZero())
continue
}
jobRun, err := store.FindJobRun(jobRunID)
if err != nil {
logger.Errorw("could not find job run", "id", jobRunID)
continue
}
for taskIndex := range jobRun.TaskRuns {
taskRun := &jobRun.TaskRuns[taskIndex]
if taskRun.Status == models.RunStatusPendingConfirmations {
taskRun.Status = models.RunStatusErrored
}
}
jobRun.Status = models.RunStatusErrored

err = store.ORM.SaveJobRun(&jobRun)
if err != nil {
logger.Errorw("error saving job run", "id", jobRunID)
continue
}
}
return nil
}

// DeleteUser is run locally to remove the User row from the node's database.
func (cli *Client) DeleteUser(c *clipkg.Context) error {
logger.SetLogger(cli.Config.CreateProductionLogger())
Expand Down
9 changes: 5 additions & 4 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,22 @@ func CreateTx(
from common.Address,
sentAt uint64,
) *models.Tx {
return CreateTxWithNonce(t, store, from, sentAt, 0)
return CreateTxWithNonceAndGasPrice(t, store, from, sentAt, 0, 1)
}

// CreateTxWithNonce creates a Tx from a specified address, sentAt, and nonce
func CreateTxWithNonce(
// CreateTxWithNonceAndGasPrice creates a Tx from a specified address, sentAt, nonce and gas price
func CreateTxWithNonceAndGasPrice(
t testing.TB,
store *strpkg.Store,
from common.Address,
sentAt uint64,
nonce uint64,
gasPrice int64,
) *models.Tx {
data := make([]byte, 36)
binary.LittleEndian.PutUint64(data, sentAt)

transaction := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 250000, big.NewInt(1), data)
transaction := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 250000, big.NewInt(gasPrice), data)
tx := &models.Tx{
From: from,
SentAt: sentAt,
Expand Down
21 changes: 21 additions & 0 deletions core/internal/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions core/store/models/run_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ func NewRunOutputComplete(data JSON) RunOutput {
return RunOutput{status: RunStatusCompleted, data: data}
}

// NewRunOutputPendingConfirmations returns a new RunOutput that indicates the
// task is pending confirmations
func NewRunOutputPendingConfirmations() RunOutput {
return RunOutput{status: RunStatusPendingConfirmations}
}

// NewRunOutputPendingConfirmationsWithData returns a new RunOutput that
// indicates the task is pending confirmations but also has some data that
// needs to be fed in on next invocation
Expand Down
Loading

0 comments on commit 7ad8e51

Please sign in to comment.