Skip to content

Commit

Permalink
feat: stop events listener when postage contract is paused (ethersphe…
Browse files Browse the repository at this point in the history
  • Loading branch information
martinconic authored Aug 4, 2024
1 parent 7f9207c commit 6744499
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@ func NewBee(

if batchSvc != nil && chainEnabled {
logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")

paused, err := postageStampContractService.Paused(ctx)
if paused {
return nil, fmt.Errorf("postage contract is paused: %w", err)
}
if err != nil {
logger.Error(err, "Error checking postage contract is paused")
}

if o.FullNodeMode {
err = batchSvc.Start(ctx, postageSyncStart, initBatchState)
syncStatus.Store(true)
Expand All @@ -767,6 +776,7 @@ func NewBee(
}
}()
}

}

minThreshold := big.NewInt(2 * refreshRate)
Expand Down
5 changes: 5 additions & 0 deletions pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type listener struct {
batchTopUpTopic common.Hash
batchDepthIncreaseTopic common.Hash
priceUpdateTopic common.Hash
pausedTopic common.Hash
}

func New(
Expand Down Expand Up @@ -94,6 +95,7 @@ func New(
batchTopUpTopic: postageStampContractABI.Events["BatchTopUp"].ID,
batchDepthIncreaseTopic: postageStampContractABI.Events["BatchDepthIncrease"].ID,
priceUpdateTopic: postageStampContractABI.Events["PriceUpdate"].ID,
pausedTopic: postageStampContractABI.Events["Paused"].ID,
}
}

Expand Down Expand Up @@ -172,6 +174,9 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
c.Price,
e.TxHash,
)
case l.pausedTopic:
l.logger.Warning("Postage contract is paused.")
return context.Canceled
default:
l.metrics.EventErrors.Inc()
return errors.New("unknown event")
Expand Down
31 changes: 31 additions & 0 deletions pkg/postage/postagecontract/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Interface interface {
CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) (common.Hash, []byte, error)
TopUpBatch(ctx context.Context, batchID []byte, topupBalance *big.Int) (common.Hash, error)
DiluteBatch(ctx context.Context, batchID []byte, newDepth uint8) (common.Hash, error)
Paused(ctx context.Context) (bool, error)
PostageBatchExpirer
}

Expand Down Expand Up @@ -492,6 +493,32 @@ func (c *postageContract) DiluteBatch(ctx context.Context, batchID []byte, newDe
return
}

func (c *postageContract) Paused(ctx context.Context) (bool, error) {
callData, err := c.postageStampContractABI.Pack("paused")
if err != nil {
return false, err
}

result, err := c.transactionService.Call(ctx, &transaction.TxRequest{
To: &c.postageStampContractAddress,
Data: callData,
})
if err != nil {
return false, err
}

results, err := c.postageStampContractABI.Unpack("paused", result)
if err != nil {
return false, err
}

if len(results) == 0 {
return false, errors.New("unexpected empty results")
}

return results[0].(bool), nil
}

type batchCreatedEvent struct {
BatchId [32]byte
TotalAmount *big.Int
Expand All @@ -514,6 +541,10 @@ func (m *noOpPostageContract) DiluteBatch(context.Context, []byte, uint8) (commo
return common.Hash{}, ErrChainDisabled
}

func (m *noOpPostageContract) Paused(context.Context) (bool, error) {
return false, nil
}

func (m *noOpPostageContract) ExpireBatches(context.Context) error {
return ErrChainDisabled
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/postage/postagecontract/mock/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type contractMock struct {
topupBatch func(ctx context.Context, id []byte, amount *big.Int) (common.Hash, error)
diluteBatch func(ctx context.Context, id []byte, newDepth uint8) (common.Hash, error)
expireBatches func(ctx context.Context) error
paused func(ctx context.Context) (bool, error)
}

func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) (common.Hash, []byte, error) {
Expand All @@ -35,6 +36,10 @@ func (c *contractMock) ExpireBatches(ctx context.Context) error {
return c.expireBatches(ctx)
}

func (s *contractMock) Paused(ctx context.Context) (bool, error) {
return s.paused(ctx)
}

// Option is a an option passed to New
type Option func(*contractMock)

Expand Down Expand Up @@ -72,3 +77,9 @@ func WithExpiresBatchesFunc(f func(ctx context.Context) error) Option {
m.expireBatches = f
}
}

func WithPaused(f func(ctx context.Context) (bool, error)) Option {
return func(mock *contractMock) {
mock.paused = f
}
}

0 comments on commit 6744499

Please sign in to comment.