Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swap, swap/chain: use txqueue for cashout
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Feb 24, 2020
1 parent 7264e48 commit b930ce1
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 181 deletions.
136 changes: 80 additions & 56 deletions swap/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
contract "github.com/ethersphere/swarm/contracts/swap"
"github.com/ethersphere/swarm/swap/chain"
Expand All @@ -30,10 +31,9 @@ import (
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
const CashChequeBeneficiaryTransactionCost = 50000

// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
privateKey *ecdsa.PrivateKey // private key to use
var CashoutRequestTypeID = chain.TxRequestTypeID{
Handler: "cashout",
RequestType: "CashoutRequest",
}

// CashoutRequest represents a request for a cashout operation
Expand All @@ -42,42 +42,94 @@ type CashoutRequest struct {
Destination common.Address // destination for the payout
}

// ActiveCashout stores the necessary information for a cashout in progess
type ActiveCashout struct {
Request CashoutRequest // the request that caused this cashout
TransactionHash common.Hash // the hash of the current transaction for this request
// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
txScheduler chain.TxScheduler // transaction queue to use
cashoutResultHandler CashoutResultHandler
cashoutDone chan *CashoutRequest
}

type CashoutResultHandler interface {
HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error
}

// newCashoutProcessor creates a new instance of CashoutProcessor
func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
return &CashoutProcessor{
backend: backend,
privateKey: privateKey,
func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler) *CashoutProcessor {
c := &CashoutProcessor{
backend: backend,
txScheduler: txScheduler,
cashoutResultHandler: cashoutResultHandler,
}
}

// cashCheque tries to cash the cheque specified in the request
// after the transaction is sent it waits on its success
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
cheque := request.Cheque
opts := bind.NewKeyedTransactor(c.privateKey)
opts.Context = ctx
txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TxRequestHandlers{
Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) {
var request CashoutRequest
if err := c.txScheduler.GetRequest(id, &request); err != nil {
return common.Hash{}, err
}

cheque := request.Cheque

otherSwap, err := contract.InstanceAt(cheque.Contract, backend)
if err != nil {
return common.Hash{}, err
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
if err != nil {
return common.Hash{}, err
}
return tx.Hash(), nil
},
NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error {
var request *CashoutRequest
err := c.txScheduler.GetRequest(id, &request)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
if err != nil {
return err
}

receipt := &notification.Receipt
if receipt.Status == 0 {
swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash)
return nil
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)
return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt)
},
})
return c
}

otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) {
expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque)
if err != nil {
return err
swapLog.Error("could not estimate payout", "error", err)
return
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
costsMultiplier := uint256.FromUint64(2)
costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
if err != nil {
return err
swapLog.Error("overflow in transaction fee", "error", err)
return
}

// this blocks until the cashout has been successfully processed
return c.waitForAndProcessActiveCashout(&ActiveCashout{
Request: *request,
TransactionHash: tx.Hash(),
})
// do a payout transaction if we get 2 times the gas costs
if expectedPayout.Cmp(costThreshold) == 1 {
swapLog.Info("queueing cashout", "cheque", &request.Cheque)
_, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request)
if err != nil {
metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1)
swapLog.Error("cashing cheque:", "error", err)
}
}
}

// estimatePayout estimates the payout for a given cheque as well as the transaction cost
Expand Down Expand Up @@ -123,31 +175,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (

return expectedPayout, transactionCosts, nil
}

// waitForAndProcessActiveCashout waits for activeCashout to complete
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
defer cancel()

receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
if err != nil {
return err
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
}

swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
return nil
}
42 changes: 29 additions & 13 deletions swap/cashout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package swap
import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/swap/chain"
"github.com/ethersphere/swarm/uint256"
)
Expand All @@ -33,8 +35,7 @@ import (
// afterwards it attempts to cash-in a bouncing cheque
func TestContractIntegration(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand Down Expand Up @@ -116,11 +117,18 @@ func TestContractIntegration(t *testing.T) {
// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque
func TestCashCheque(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()

transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()

cashoutHandler := newTestCashoutResultHandler(nil)
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, cashoutHandler)
payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
Expand All @@ -132,12 +140,14 @@ func TestCashCheque(t *testing.T) {
t.Fatal(err)
}

err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
cashoutProcessor.submitCheque(context.Background(), &CashoutRequest{
Cheque: *testCheque,
Destination: ownerAddress,
})
if err != nil {
t.Fatal(err)

select {
case <-cashoutHandler.cashChequeDone:
case <-time.After(5 * time.Second):
}

paidOut, err := chequebook.PaidOut(nil, ownerAddress)
Expand All @@ -154,12 +164,18 @@ func TestCashCheque(t *testing.T) {
// TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout
func TestEstimatePayout(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()

transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()

cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, &testCashoutResultHandler{})

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 2 additions & 0 deletions swap/chain/txqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ func (txq *TxQueue) Start() {
return
}

log.Info("starting transaction queue", "queue", txq.prefix)

txq.running = true
txq.wg.Add(1)
go func() {
Expand Down
Loading

0 comments on commit b930ce1

Please sign in to comment.