Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swap, swap/chain: use txqueue for cashout
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Feb 25, 2020
1 parent 2e02111 commit 5c9e792
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 181 deletions.
136 changes: 80 additions & 56 deletions swap/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
contract "github.com/ethersphere/swarm/contracts/swap"
"github.com/ethersphere/swarm/swap/chain"
Expand All @@ -30,10 +31,9 @@ import (
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
const CashChequeBeneficiaryTransactionCost = 50000

// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
privateKey *ecdsa.PrivateKey // private key to use
var CashoutRequestTypeID = chain.TxRequestTypeID{
Handler: "cashout",
RequestType: "CashoutRequest",
}

// CashoutRequest represents a request for a cashout operation
Expand All @@ -42,42 +42,94 @@ type CashoutRequest struct {
Destination common.Address // destination for the payout
}

// ActiveCashout stores the necessary information for a cashout in progess
type ActiveCashout struct {
Request CashoutRequest // the request that caused this cashout
TransactionHash common.Hash // the hash of the current transaction for this request
// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
txScheduler chain.TxScheduler // transaction queue to use
cashoutResultHandler CashoutResultHandler
cashoutDone chan *CashoutRequest
}

type CashoutResultHandler interface {
HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error
}

// newCashoutProcessor creates a new instance of CashoutProcessor
func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
return &CashoutProcessor{
backend: backend,
privateKey: privateKey,
func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler) *CashoutProcessor {
c := &CashoutProcessor{
backend: backend,
txScheduler: txScheduler,
cashoutResultHandler: cashoutResultHandler,
}
}

// cashCheque tries to cash the cheque specified in the request
// after the transaction is sent it waits on its success
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
cheque := request.Cheque
opts := bind.NewKeyedTransactor(c.privateKey)
opts.Context = ctx
txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TxRequestHandlers{
Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) {
var request CashoutRequest
if err := c.txScheduler.GetRequest(id, &request); err != nil {
return common.Hash{}, err
}

cheque := request.Cheque

otherSwap, err := contract.InstanceAt(cheque.Contract, backend)
if err != nil {
return common.Hash{}, err
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
if err != nil {
return common.Hash{}, err
}
return tx.Hash(), nil
},
NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error {
var request *CashoutRequest
err := c.txScheduler.GetRequest(id, &request)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
if err != nil {
return err
}

receipt := &notification.Receipt
if receipt.Status == 0 {
swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash)
return nil
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)
return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt)
},
})
return c
}

otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) {
expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque)
if err != nil {
return err
swapLog.Error("could not estimate payout", "error", err)
return
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
costsMultiplier := uint256.FromUint64(2)
costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
if err != nil {
return err
swapLog.Error("overflow in transaction fee", "error", err)
return
}

// this blocks until the cashout has been successfully processed
return c.waitForAndProcessActiveCashout(&ActiveCashout{
Request: *request,
TransactionHash: tx.Hash(),
})
// do a payout transaction if we get 2 times the gas costs
if expectedPayout.Cmp(costThreshold) == 1 {
swapLog.Info("queueing cashout", "cheque", &request.Cheque)
_, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request)
if err != nil {
metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1)
swapLog.Error("cashing cheque:", "error", err)
}
}
}

// estimatePayout estimates the payout for a given cheque as well as the transaction cost
Expand Down Expand Up @@ -123,31 +175,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (

return expectedPayout, transactionCosts, nil
}

// waitForAndProcessActiveCashout waits for activeCashout to complete
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
defer cancel()

receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
if err != nil {
return err
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
}

swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
return nil
}
42 changes: 29 additions & 13 deletions swap/cashout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package swap
import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/swap/chain"
"github.com/ethersphere/swarm/uint256"
)
Expand All @@ -33,8 +35,7 @@ import (
// afterwards it attempts to cash-in a bouncing cheque
func TestContractIntegration(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand Down Expand Up @@ -116,11 +117,18 @@ func TestContractIntegration(t *testing.T) {
// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque
func TestCashCheque(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()

transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()

cashoutHandler := newTestCashoutResultHandler(nil)
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, cashoutHandler)
payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
Expand All @@ -132,12 +140,14 @@ func TestCashCheque(t *testing.T) {
t.Fatal(err)
}

err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
cashoutProcessor.submitCheque(context.Background(), &CashoutRequest{
Cheque: *testCheque,
Destination: ownerAddress,
})
if err != nil {
t.Fatal(err)

select {
case <-cashoutHandler.cashChequeDone:
case <-time.After(5 * time.Second):
}

paidOut, err := chequebook.PaidOut(nil, ownerAddress)
Expand All @@ -154,12 +164,18 @@ func TestCashCheque(t *testing.T) {
// TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout
func TestEstimatePayout(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()

transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()

cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, &testCashoutResultHandler{})

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
t.Fatal(err)
Expand Down
77 changes: 44 additions & 33 deletions swap/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
contractFactory "github.com/ethersphere/go-sw3/contracts-v0-2-0/simpleswapfactory"
contract "github.com/ethersphere/swarm/contracts/swap"
cswap "github.com/ethersphere/swarm/contracts/swap"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/p2p/protocols"
Expand All @@ -34,8 +36,6 @@ type swapTestBackend struct {
*mock.TestBackend
factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network
tokenAddress common.Address // address of the token in the simulated network
// the async cashing go routine needs synchronization for tests
cashDone chan struct{}
}

var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{
Expand Down Expand Up @@ -67,7 +67,6 @@ func newTestBackend(t *testing.T) *swapTestBackend {
TestBackend: backend,
factoryAddress: factoryAddress,
tokenAddress: tokenAddress,
cashDone: make(chan struct{}),
}
}

Expand Down Expand Up @@ -105,7 +104,9 @@ func newBaseTestSwapWithParams(t *testing.T, key *ecdsa.PrivateKey, params *Para
if err != nil {
t.Fatal(err)
}
swap := newSwapInstance(stateStore, owner, backend, 10, params, factory)

txqueue := chain.NewTxQueue(stateStore, "chain", backend, owner.privateKey)
swap := newSwapInstance(stateStore, owner, backend, 10, params, factory, txqueue)
return swap, dir
}

Expand All @@ -126,6 +127,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend)
usedBackend = newTestBackend(t)
}
swap, dir := newBaseTestSwap(t, key, usedBackend)
swap.txScheduler.Start()
clean := func() {
swap.Close()
// only close if created by newTestSwap to avoid double close
Expand Down Expand Up @@ -206,32 +208,6 @@ func newRandomTestCheque() *Cheque {
return cheque
}

// During tests, because the cashing in of cheques is async, we should wait for the function to be returned
// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`,
// causing subsequent TX to possibly fail due to nonce mismatch
func testCashCheque(s *Swap, cheque *Cheque) {
cashCheque(s, cheque)
// send to the channel, signals to clients that this function actually finished
if stb, ok := s.backend.(*swapTestBackend); ok {
if stb.cashDone != nil {
stb.cashDone <- struct{}{}
}
}
}

// setupContractTest is a helper function for setting up the
// blockchain wait function for testing
func setupContractTest() func() {
// we also need to store the previous cashCheque function in case this is called multiple times
currentCashCheque := defaultCashCheque
defaultCashCheque = testCashCheque
// overwrite only for the duration of the test, so...
return func() {
// ...we need to set it back to original when done
defaultCashCheque = currentCashCheque
}
}

// deploy for testing (needs simulated backend commit)
func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) {
opts := bind.NewKeyedTransactor(privateKey)
Expand All @@ -248,9 +224,6 @@ func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privat
return nil, err
}

// setup the wait for mined transaction function for testing
cleanup := setupContractTest()
defer cleanup()
contract, err := factory.DeploySimpleSwap(opts, ownerAddress, big.NewInt(int64(defaultHarddepositTimeoutDuration)))
if err != nil {
return nil, err
Expand Down Expand Up @@ -315,3 +288,41 @@ func (d *dummyMsgRW) ReadMsg() (p2p.Msg, error) {
func (d *dummyMsgRW) WriteMsg(msg p2p.Msg) error {
return nil
}

type cashChequeDoneData struct {
request *CashoutRequest
result *contract.CashChequeResult
receipt *types.Receipt
}

type testCashoutResultHandler struct {
swap *Swap
cashChequeDone chan cashChequeDoneData
}

func newTestCashoutResultHandler(swap *Swap) *testCashoutResultHandler {
return &testCashoutResultHandler{
swap: swap,
cashChequeDone: make(chan cashChequeDoneData),
}
}

func (h *testCashoutResultHandler) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error {
if h.swap != nil {
if err := h.swap.HandleCashoutResult(request, result, receipt); err != nil {
return err
}
}
h.cashChequeDone <- cashChequeDoneData{
request: request,
result: result,
receipt: receipt,
}
return nil
}

func overrideCashoutResultHandler(swap *Swap) *testCashoutResultHandler {
cashoutResultHandler := newTestCashoutResultHandler(swap)
swap.cashoutProcessor.cashoutResultHandler = cashoutResultHandler
return cashoutResultHandler
}
Loading

0 comments on commit 5c9e792

Please sign in to comment.