Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swap, swap/txqueue: add transaction queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Feb 6, 2020
1 parent 4dd76ef commit 2725351
Show file tree
Hide file tree
Showing 9 changed files with 1,114 additions and 93 deletions.
135 changes: 85 additions & 50 deletions swap/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package swap
import (
"context"
"crypto/ecdsa"
"errors"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -32,8 +33,9 @@ const CashChequeBeneficiaryTransactionCost = 50000

// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend txqueue.Backend // ethereum backend to use
privateKey *ecdsa.PrivateKey // private key to use
backend txqueue.Backend // ethereum backend to use
transactionQueue *txqueue.TxQueue // transaction queue to use
chequeCashedChan chan *CashoutRequest
}

// CashoutRequest represents a request for a cashout operation
Expand All @@ -42,42 +44,26 @@ type CashoutRequest struct {
Destination common.Address // destination for the payout
}

// ActiveCashout stores the necessary information for a cashout in progess
type ActiveCashout struct {
Request CashoutRequest // the request that caused this cashout
TransactionHash common.Hash // the hash of the current transaction for this request
}

// newCashoutProcessor creates a new instance of CashoutProcessor
func newCashoutProcessor(backend txqueue.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
return &CashoutProcessor{
backend: backend,
privateKey: privateKey,
func newCashoutProcessor(transactionQueue *txqueue.TxQueue, backend txqueue.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
c := &CashoutProcessor{
backend: backend,
transactionQueue: transactionQueue,
}

transactionQueue.SetComponent("cashout", c)
return c
}

func (c *CashoutProcessor) SetChequeCashedChan(chequeCashedChan chan *CashoutRequest) {
c.chequeCashedChan = chequeCashedChan
}

// cashCheque tries to cash the cheque specified in the request
// after the transaction is sent it waits on its success
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
cheque := request.Cheque
opts := bind.NewKeyedTransactor(c.privateKey)
opts.Context = ctx

otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
if err != nil {
return err
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
if err != nil {
return err
}

// this blocks until the cashout has been successfully processed
return c.waitForAndProcessActiveCashout(&ActiveCashout{
Request: *request,
TransactionHash: tx.Hash(),
})
_, err := c.transactionQueue.QueueRequest("cashout", "CashoutRequest", request)
return err
}

// estimatePayout estimates the payout for a given cheque as well as the transaction cost
Expand Down Expand Up @@ -124,30 +110,79 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (
return expectedPayout, transactionCosts, nil
}

// waitForAndProcessActiveCashout waits for activeCashout to complete
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
defer cancel()

receipt, err := txqueue.WaitFunc(ctx, c.backend, activeCashout.TransactionHash)
if err != nil {
return err
func (*CashoutProcessor) GetTypeInstance(requestType string) txqueue.TransactionRequest {
switch requestType {
case "CashoutRequest":
return &CashoutRequest{}
default:
return nil
}
}

otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
if err != nil {
return err
}
func (*CashoutProcessor) SendTransactionRequest(id uint64, request txqueue.TransactionRequest, backend txqueue.Backend, opts *bind.TransactOpts) (common.Hash, error) {
switch request := request.(type) {
case *CashoutRequest:
cheque := request.Cheque

result := otherSwap.CashChequeBeneficiaryResult(receipt)
otherSwap, err := contract.InstanceAt(cheque.Contract, backend)
if err != nil {
return common.Hash{}, err
}

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())
tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
if err != nil {
return common.Hash{}, err
}

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
return tx.Hash(), nil
default:
return common.Hash{}, errors.New("unknown type")
}
}

swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
return nil
func (c *CashoutProcessor) HandleNotification(id uint64, notification interface{}) error {
switch notification.(type) {
case *txqueue.TransactionReceiptNotification:
notification := notification.(*txqueue.TransactionReceiptNotification)
requestInfo, err := c.transactionQueue.GetRequestInfo(id)
if err != nil {
return err
}

cashoutRequest, ok := requestInfo.Request.(*CashoutRequest)
if !ok {
return nil
}
otherSwap, err := contract.InstanceAt(cashoutRequest.Cheque.Contract, c.backend)
if err != nil {
return err
}

receipt := &notification.Receipt

if receipt.Status == 0 {
swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash)
return nil
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
}

swapLog.Info("cheque cashed", "honey", cashoutRequest.Cheque.Honey)

select {
case c.chequeCashedChan <- cashoutRequest:
default:
}

return nil
default:
return nil
}
}
25 changes: 23 additions & 2 deletions swap/cashout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package swap
import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/swap/txqueue"
"github.com/ethersphere/swarm/uint256"
)
Expand Down Expand Up @@ -120,7 +122,12 @@ func TestCashCheque(t *testing.T) {
reset := setupContractTest()
defer reset()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
store := state.NewInmemoryStore()
defer store.Close()
transactionQueue := txqueue.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey)
payout := uint256.FromUint64(42)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand All @@ -133,6 +140,10 @@ func TestCashCheque(t *testing.T) {
t.Fatal(err)
}

backend.cashChequeDone = make(chan *CashoutRequest)
defer close(backend.cashChequeDone)
cashoutProcessor.SetChequeCashedChan(backend.cashChequeDone)

err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
Cheque: *testCheque,
Destination: ownerAddress,
Expand All @@ -141,6 +152,11 @@ func TestCashCheque(t *testing.T) {
t.Fatal(err)
}

select {
case <-backend.cashChequeDone:
case <-time.After(5 * time.Second):
}

paidOut, err := chequebook.PaidOut(nil, ownerAddress)
if err != nil {
t.Fatal(err)
Expand All @@ -158,7 +174,12 @@ func TestEstimatePayout(t *testing.T) {
reset := setupContractTest()
defer reset()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
store := state.NewInmemoryStore()
defer store.Close()
transactionQueue := txqueue.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey)
payout := uint256.FromUint64(42)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand Down
25 changes: 3 additions & 22 deletions swap/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type swapTestBackend struct {
factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network
tokenAddress common.Address // address of the token in the simulated network
// the async cashing go routine needs synchronization for tests
cashDone chan struct{}
cashChequeDone chan *CashoutRequest
}

func (b *swapTestBackend) Close() error {
Expand Down Expand Up @@ -133,6 +133,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend)
usedBackend = newTestBackend(t)
}
swap, dir := newBaseTestSwap(t, key, usedBackend)
swap.transactionQueue.Start()
clean := func() {
swap.Close()
// only close if created by newTestSwap to avoid double close
Expand Down Expand Up @@ -213,19 +214,6 @@ func newRandomTestCheque() *Cheque {
return cheque
}

// During tests, because the cashing in of cheques is async, we should wait for the function to be returned
// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`,
// causing subsequent TX to possibly fail due to nonce mismatch
func testCashCheque(s *Swap, cheque *Cheque) {
cashCheque(s, cheque)
// send to the channel, signals to clients that this function actually finished
if stb, ok := s.backend.(*swapTestBackend); ok {
if stb.cashDone != nil {
stb.cashDone <- struct{}{}
}
}
}

// when testing, we don't need to wait for a transaction to be mined
func testWaitForTx(ctx context.Context, backend txqueue.Backend, txHash common.Hash) (*types.Receipt, error) {

Expand All @@ -240,9 +228,6 @@ func testWaitForTx(ctx context.Context, backend txqueue.Backend, txHash common.H
if err != nil {
return nil, err
}
if receipt.Status != types.ReceiptStatusSuccessful {
return nil, txqueue.ErrTransactionReverted
}
return receipt, nil
}

Expand All @@ -251,16 +236,12 @@ func testWaitForTx(ctx context.Context, backend txqueue.Backend, txHash common.H
func setupContractTest() func() {
// we overwrite the waitForTx function with one which the simulated backend
// immediately commits
currentWaitFunc := txqueue.WaitFunc
// we also need to store the previous cashCheque function in case this is called multiple times
currentCashCheque := defaultCashCheque
defaultCashCheque = testCashCheque
currentWaitFunc := txqueue.WaitFunc
// overwrite only for the duration of the test, so...
txqueue.WaitFunc = testWaitForTx
return func() {
// ...we need to set it back to original when done
txqueue.WaitFunc = currentWaitFunc
defaultCashCheque = currentCashCheque
}
}

Expand Down
5 changes: 3 additions & 2 deletions swap/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ func TestEmitCheque(t *testing.T) {
cleanup := setupContractTest()
defer cleanup()
// now we need to create the channel...
testBackend.cashDone = make(chan struct{})
testBackend.cashChequeDone = make(chan *CashoutRequest)
creditorSwap.cashoutProcessor.SetChequeCashedChan(testBackend.cashChequeDone)

log.Debug("deploy to simulated backend")

Expand Down Expand Up @@ -319,7 +320,7 @@ func TestEmitCheque(t *testing.T) {

// we wait until the cashCheque is actually terminated (ensures proper nounce count)
select {
case <-creditorSwap.backend.(*swapTestBackend).cashDone:
case <-testBackend.cashChequeDone:
log.Debug("cash transaction completed and committed")
case <-time.After(4 * time.Second):
t.Fatalf("Timeout waiting for cash transaction to complete")
Expand Down
20 changes: 14 additions & 6 deletions swap/simulations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ For integration tests, run test cluster deployments with all integration moduele
(blockchains, oracles, etc.)
*/
// swapSimulationParams allows to avoid global variables for the test

type swapSimulationParams struct {
swaps map[int]*Swap
dirs map[int]string
Expand Down Expand Up @@ -164,9 +165,10 @@ func newSimServiceMap(params *swapSimulationParams) map[string]simulation.Servic
if err != nil {
return nil, nil, err
}
ts.swap.cashoutProcessor.transactionQueue.Start()

cleanup = func() {
ts.swap.store.Close()
ts.swap.Close()
os.RemoveAll(dir)
}

Expand Down Expand Up @@ -268,8 +270,8 @@ func TestPingPongChequeSimulation(t *testing.T) {
cleanup := setupContractTest()
defer cleanup()

params.backend.cashDone = make(chan struct{}, 1)
defer close(params.backend.cashDone)
params.backend.cashChequeDone = make(chan *CashoutRequest, 1)
defer close(params.backend.cashChequeDone)

// initialize the simulation
sim := simulation.NewBzzInProc(newSimServiceMap(params), false)
Expand Down Expand Up @@ -299,6 +301,9 @@ func TestPingPongChequeSimulation(t *testing.T) {
ts1 := sim.Service("swap", p1).(*testService)
ts2 := sim.Service("swap", p2).(*testService)

ts1.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone)
ts2.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone)

var ts1Len, ts2Len, ts1sLen, ts2sLen int
timeout := time.After(10 * time.Second)

Expand Down Expand Up @@ -398,8 +403,8 @@ func TestMultiChequeSimulation(t *testing.T) {
cleanup := setupContractTest()
defer cleanup()

params.backend.cashDone = make(chan struct{}, 1)
defer close(params.backend.cashDone)
params.backend.cashChequeDone = make(chan *CashoutRequest, 1)
defer close(params.backend.cashChequeDone)
// initialize the simulation
sim := simulation.NewBzzInProc(newSimServiceMap(params), false)
defer sim.Close()
Expand Down Expand Up @@ -431,6 +436,9 @@ func TestMultiChequeSimulation(t *testing.T) {
// get the testService for the creditor
creditorSvc := sim.Service("swap", creditor).(*testService)

debitorSvc.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone)
creditorSvc.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone)

var debLen, credLen, debSwapLen, credSwapLen int
timeout := time.After(10 * time.Second)
for {
Expand Down Expand Up @@ -726,7 +734,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr
lock.Unlock()
wg.Done()
return
case <-backend.cashDone:
case <-backend.cashChequeDone:
wg.Done()
return
}
Expand Down
Loading

0 comments on commit 2725351

Please sign in to comment.