Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swap, swap/chain, contracts/swap: add transaction queue #2124

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
422ae58
swap/chain: add persistent queue
ralph-pichler Mar 3, 2020
65c5899
swap/chain: add txqueue
ralph-pichler Mar 3, 2020
145097c
swap, swap/chain, contract/swap: use txqueue for cashout
ralph-pichler Mar 3, 2020
f6cdcab
swap, swap/chain, contracts/swap: address pr comments
ralph-pichler Mar 6, 2020
97ce00b
swap: fix a spelling mistake
ralph-pichler Mar 6, 2020
eea13b6
Merge remote-tracking branch 'origin/master' into swap_cashout_8
ralph-pichler Mar 6, 2020
8a3d48c
swap: don't export logger in cashout processor
ralph-pichler Mar 6, 2020
47ee895
swap/chain: don't export persistentQueue type
ralph-pichler Mar 7, 2020
7486071
swap/chain: rename queue function to enqueue
ralph-pichler Mar 10, 2020
255927d
swap/chain: add copyright headers
ralph-pichler Mar 10, 2020
8a9ea5e
contract/swap, swap, swap/chain: refactor gas estimation, estimate in…
ralph-pichler Mar 13, 2020
1a59564
swap: add comment about interface
ralph-pichler Mar 13, 2020
c872d34
swap: log transaction errors
ralph-pichler Mar 13, 2020
a67d6ea
swap: fix logs and log pending hash
ralph-pichler Mar 13, 2020
2deea57
swap/chain: update comment regarding id
ralph-pichler Mar 13, 2020
67fd670
Merge remote-tracking branch 'origin/master' into swap_cashout_8
ralph-pichler Mar 13, 2020
f936138
swap/chain: wait for start in handlers
ralph-pichler Mar 16, 2020
d9e54ba
swap/chain: fix linting issue
ralph-pichler Mar 17, 2020
40327f9
swap/chain: add line after copyright header
ralph-pichler Mar 18, 2020
be6d28c
swap/chain: add lock to persistentqueue test
ralph-pichler Mar 27, 2020
89ceeb5
Merge remote-tracking branch 'origin/master' into swap_cashout_8
ralph-pichler Mar 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions contracts/swap/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package swap
import (
"fmt"
"math/big"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -37,8 +39,8 @@ type Contract interface {
Withdraw(auth *bind.TransactOpts, amount *big.Int) (*types.Receipt, error)
// Deposit sends a raw transaction to the chequebook, triggering the fallback—depositing amount
Deposit(auth *bind.TransactOpts, amout *big.Int) (*types.Receipt, error)
// CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary
CashChequeBeneficiaryStart(opts *bind.TransactOpts, beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*types.Transaction, error)
// CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction
Eknir marked this conversation as resolved.
Show resolved Hide resolved
CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error)
// CashChequeBeneficiaryResult processes the receipt from a CashChequeBeneficiary transaction
CashChequeBeneficiaryResult(receipt *types.Receipt) *CashChequeResult
// LiquidBalance returns the LiquidBalance (total balance in ERC20-token - total hard deposits in ERC20-token) of the chequebook
Expand Down Expand Up @@ -75,19 +77,30 @@ type Params struct {

type simpleContract struct {
instance *contract.ERC20SimpleSwap
abi abi.ABI
mortelli marked this conversation as resolved.
Show resolved Hide resolved
address common.Address
backend chain.Backend
}

// InstanceAt creates a new instance of a contract at a specific address.
// It assumes that there is an existing contract instance at the given address, or an error is returned
// It assumes that there is an existing contract instance at the given address
// This function is needed to communicate with remote Swap contracts (e.g. sending a cheque)
func InstanceAt(address common.Address, backend chain.Backend) (Contract, error) {
instance, err := contract.NewERC20SimpleSwap(address, backend)
if err != nil {
Eknir marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
c := simpleContract{instance: instance, address: address, backend: backend}

contractABI, err := abi.JSON(strings.NewReader(contract.ERC20SimpleSwapABI))
if err != nil {
return nil, err
}
c := simpleContract{
abi: contractABI,
instance: instance,
address: address,
backend: backend,
}
return c, err
}

Expand Down Expand Up @@ -130,15 +143,19 @@ func (s simpleContract) Deposit(auth *bind.TransactOpts, amount *big.Int) (*type
return chain.WaitMined(auth.Context, s.backend, tx.Hash())
}

// CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary
func (s simpleContract) CashChequeBeneficiaryStart(opts *bind.TransactOpts, beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*types.Transaction, error) {
// CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction
func (s simpleContract) CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error) {
payout := cumulativePayout.Value()
// send a copy of cumulativePayout to instance as it modifies the supplied big int internally
tx, err := s.instance.CashChequeBeneficiary(opts, beneficiary, big.NewInt(0).Set(&payout), ownerSig)
callData, err := s.abi.Pack("cashChequeBeneficiary", beneficiary, big.NewInt(0).Set(&payout), ownerSig)
if err != nil {
return nil, err
}
return tx, nil

return &chain.TxRequest{
To: s.address,
Value: big.NewInt(0),
Data: callData,
}, nil
}

// CashChequeBeneficiaryResult processes the receipt from a CashChequeBeneficiary transaction
Expand Down
154 changes: 93 additions & 61 deletions swap/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
contract "github.com/ethersphere/swarm/contracts/swap"
"github.com/ethersphere/swarm/swap/chain"
Expand All @@ -31,58 +32,117 @@ import (
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
const CashChequeBeneficiaryTransactionCost = 50000

// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
privateKey *ecdsa.PrivateKey // private key to use
Logger Logger
}
// CashoutRequestHandlerID is the handlerID used by the CashoutProcessor for CashoutRequests
const CashoutRequestHandlerID = "CashoutProcessor_CashoutRequest"

// CashoutRequest represents a request for a cashout operation
type CashoutRequest struct {
Cheque Cheque // cheque to be cashed
Destination common.Address // destination for the payout
Logger Logger
}

// ActiveCashout stores the necessary information for a cashout in progess
type ActiveCashout struct {
Request CashoutRequest // the request that caused this cashout
TransactionHash common.Hash // the hash of the current transaction for this request
Logger Logger
// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
txScheduler chain.TxScheduler // transaction queue to use
cashoutResultHandler CashoutResultHandler
logger Logger
}

// CashoutResultHandler is an interface which accepts CashChequeResults from a CashoutProcessor
type CashoutResultHandler interface {
// Called by the CashoutProcessor when a CashoutRequest was successfully executed
// It will be called again if an error is returned
Comment on lines +54 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments seem to contradict each other

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't. The CashoutProcessor calls this when a CashoutRequest was successfully executed (during the NotifyReceipt handler). If an error is returned it will be called again, because the NotifyReceipt will run again at some point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a very vital feature of the queue and deserves (in my opinion) a somewhat more explicit comment here.

I was following the code to actually verify that the txqueue keeps calling HandleCashoutResult if HandleCashoutResult returns an error, but could not find this logic:

  • processQueue calls processRequest (directly or via processActveRequest). (txqueue.go:634)
  • When the status is pending, processRequest calls txq.waitForActiveTransaction (txqueue.go:477)
  • waitForActiveTransaction blocks until the tx is mined (or txqueue/tx context times out). If tx is mined, it calls txq.finalizeRequestConfirmed (txqueue.go:610)
  • finalizeRequestConfirmed calls txq.notify, who on it's turn returns the trigger function. The trigger function is returned by notify, allowing the caller of notify to tell the queue that an item is waiting.
  • finalizeRequestConfirmed calles the trigger function, returned by txq.notify

And now I lost the track... Can you help me by explaining how the call proceeds from here? How and where is this trigger signal received and how/where can I see that indeed, HandleCashoutResult is retried if it failed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All notification queues are processed in their respective go routine started by the SetHandlers function. If a handler is registered the trigger will wake the key, err := notifyQueue.Next(txq.ctx, &item, &txq.lock) call on line 266.

HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error
}

// newCashoutProcessor creates a new instance of CashoutProcessor
func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
return &CashoutProcessor{
backend: backend,
privateKey: privateKey,
func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler, logger Logger) *CashoutProcessor {
c := &CashoutProcessor{
backend: backend,
txScheduler: txScheduler,
cashoutResultHandler: cashoutResultHandler,
Eknir marked this conversation as resolved.
Show resolved Hide resolved
logger: logger,
}
}

// cashCheque tries to cash the cheque specified in the request
// after the transaction is sent it waits on its success
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
cheque := request.Cheque
opts := bind.NewKeyedTransactor(c.privateKey)
opts.Context = ctx
txScheduler.SetHandlers(CashoutRequestHandlerID, &chain.TxRequestHandlers{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are only setting the handler for the NotifyReceipt. Can you explain to me why you decided to define all other handlers (NotifyPending, NotifyCancelled, NotifyStatusUnkown)?
Again: consider using all handlers which you define or else add them in later PRs (I don't think it is good style to define code-components and then don't use them).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are all used in tests at least. Technically all places which schedule transactions also should monitor at least NotifyCancelled and NotifyStatusUnkown. Those handlers will be added soon, once we have decided what we actually want to do in those scenarios (e.g. for cashing we might want to retry on Cancelled and log on StatusUnknown). Anyway I decided to leave this out to keep the PR smaller.

Anyway those handlers are important already as otherwise we cannot properly test the error conditions during the txqueue tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Maybe, for now, we can already register the handlers for now and just write a log output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error {
var request *CashoutRequest
err := c.txScheduler.GetExtraData(id, &request)
Eknir marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
if err != nil {
return err
}

receipt := &notification.Receipt
if receipt.Status == 0 {
c.logger.Error(CashChequeAction, "cheque cashing transaction reverted", "tx", receipt.TxHash)
return nil
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)
return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt)
},
NotifyPending: func(ctx context.Context, id uint64, notification *chain.TxPendingNotification) error {
c.logger.Debug(CashChequeAction, "cheque cashing transaction sent", "hash", notification.Transaction.Hash())
return nil
},
NotifyCancelled: func(ctx context.Context, id uint64, notification *chain.TxCancelledNotification) error {
c.logger.Warn(CashChequeAction, "cheque cashing transaction cancelled", "reason", notification.Reason)
return nil
},
NotifyStatusUnknown: func(ctx context.Context, id uint64, notification *chain.TxStatusUnknownNotification) error {
c.logger.Error(CashChequeAction, "cheque cashing transaction status unknown", "reason", notification.Reason)
return nil
},
})
return c
}

otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
// submitCheque submits a cheque for cashout
// the cheque might not be cashed if it is not deemed profitable
func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) {
expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque)
if err != nil {
return err
c.logger.Error(CashChequeAction, "could not estimate payout", "error", err)
return
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
costsMultiplier := uint256.FromUint64(2)
costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
if err != nil {
return err
c.logger.Error(CashChequeAction, "overflow in transaction fee", "error", err)
return
}

// this blocks until the cashout has been successfully processed
return c.waitForAndProcessActiveCashout(&ActiveCashout{
Request: *request,
TransactionHash: tx.Hash(),
Logger: request.Logger,
})
// do a payout transaction if we get more than 2 times the gas costs
if expectedPayout.Cmp(costThreshold) == 1 {
c.logger.Info(CashChequeAction, "queueing cashout", "cheque", &request.Cheque)

cheque := request.Cheque
otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
if err != nil {
c.logger.Error(CashChequeAction, "could not get swap instance", "error", err)
return
}

txRequest, err := otherSwap.CashChequeBeneficiaryRequest(cheque.Beneficiary, cheque.CumulativePayout, cheque.Signature)
if err != nil {
metrics.GetOrRegisterCounter("swap/cheques/cashed/errors", nil).Inc(1)
c.logger.Error(CashChequeAction, "cashing cheque:", "error", err)
return
}

_, err = c.txScheduler.ScheduleRequest(CashoutRequestHandlerID, *txRequest, request)
Eknir marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
metrics.GetOrRegisterCounter("swap/cheques/cashed/errors", nil).Inc(1)
c.logger.Error(CashChequeAction, "cashing cheque:", "error", err)
}
}
}

// estimatePayout estimates the payout for a given cheque as well as the transaction cost
Expand Down Expand Up @@ -128,31 +188,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (

return expectedPayout, transactionCosts, nil
}

// waitForAndProcessActiveCashout waits for activeCashout to complete
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
defer cancel()

receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
if err != nil {
return err
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap/cheques/cashed/honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap/cheques/cashed/bounced", nil).Inc(1)
activeCashout.Logger.Warn(CashChequeAction, "cheque bounced", "tx", receipt.TxHash)
}

activeCashout.Logger.Info(CashChequeAction, "cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
return nil
}
Loading