Skip to content

Commit

Permalink
Revert "Save snapshots only to the file"
Browse files Browse the repository at this point in the history
This reverts commit 77e92ea.
  • Loading branch information
lumos42 committed Feb 26, 2024
1 parent a068e37 commit c429e49
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 31 deletions.
80 changes: 66 additions & 14 deletions plugin/evm/limit_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
hu "github.com/ava-labs/subnet-evm/plugin/evm/orderbook/hubbleutils"
"github.com/ava-labs/subnet-evm/utils"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/snow"
"github.com/ethereum/go-ethereum/log"
)

const (
snapshotInterval uint64 = 10 // save snapshot every 1000 blocks
memoryDBSnapshotKey string = "memoryDBSnapshot"
snapshotInterval uint64 = 10 // save snapshot every 1000 blocks
)

type LimitOrderProcesser interface {
Expand All @@ -50,6 +52,7 @@ type limitOrderProcesser struct {
contractEventProcessor *orderbook.ContractEventsProcessor
matchingPipeline *orderbook.MatchingPipeline
filterAPI *filters.FilterAPI
hubbleDB database.Database
configService orderbook.IConfigService
blockBuilder *blockBuilder
isValidator bool
Expand All @@ -60,7 +63,7 @@ type limitOrderProcesser struct {
tradingAPI *orderbook.TradingAPI
}

func NewLimitOrderProcesser(ctx *snow.Context, txPool *txpool.TxPool, shutdownChan <-chan struct{}, shutdownWg *sync.WaitGroup, backend *eth.EthAPIBackend, blockChain *core.BlockChain, validatorPrivateKey string, config Config) LimitOrderProcesser {
func NewLimitOrderProcesser(ctx *snow.Context, txPool *txpool.TxPool, shutdownChan <-chan struct{}, shutdownWg *sync.WaitGroup, backend *eth.EthAPIBackend, blockChain *core.BlockChain, hubbleDB database.Database, validatorPrivateKey string, config Config) LimitOrderProcesser {
log.Info("**** NewLimitOrderProcesser")
configService := orderbook.NewConfigService(blockChain)
memoryDb := orderbook.NewInMemoryDatabase(configService)
Expand Down Expand Up @@ -98,6 +101,7 @@ func NewLimitOrderProcesser(ctx *snow.Context, txPool *txpool.TxPool, shutdownCh
shutdownWg: shutdownWg,
backend: backend,
memoryDb: memoryDb,
hubbleDB: hubbleDB,
blockChain: blockChain,
limitOrderTxProcessor: lotp,
contractEventProcessor: contractEventProcessor,
Expand All @@ -121,7 +125,7 @@ func (lop *limitOrderProcesser) ListenAndProcessTransactions(blockBuilder *block

if lop.loadFromSnapshotEnabled {
// first load the last snapshot containing finalised data till block x and query the logs of [x+1, latest]
acceptedBlockNumber, err := lop.loadMemoryDBSnapshotFromFile()
acceptedBlockNumber, err := lop.loadMemoryDBSnapshot()
if err != nil {
log.Error("ListenAndProcessTransactions - error in loading snapshot", "err", err)
} else {
Expand Down Expand Up @@ -197,7 +201,7 @@ func (lop *limitOrderProcesser) GetTradingAPI() *orderbook.TradingAPI {
}

func (lop *limitOrderProcesser) GetTestingAPI() *orderbook.TestingAPI {
return orderbook.NewTestingAPI(lop.memoryDb, lop.backend, lop.configService, lop.snapshotFilePath)
return orderbook.NewTestingAPI(lop.memoryDb, lop.backend, lop.configService, lop.hubbleDB)
}

func (lop *limitOrderProcesser) listenAndStoreLimitOrderTransactions() {
Expand Down Expand Up @@ -264,9 +268,8 @@ func (lop *limitOrderProcesser) listenAndStoreLimitOrderTransactions() {
log.Info("Saving memory DB snapshot", "snapshotBlockNumber", snapshotBlockNumber, "current blockNumber", blockNumber, "blockNumberFloor", blockNumberFloor)
snapshotBlock := lop.blockChain.GetBlockByNumber(snapshotBlockNumber)
lop.memoryDb.Accept(snapshotBlockNumber, snapshotBlock.Timestamp())
err := lop.saveMemoryDBSnapshotToFile(big.NewInt(int64(snapshotBlockNumber)))
err := lop.saveMemoryDBSnapshot(big.NewInt(int64(snapshotBlockNumber)))
if err != nil {
orderbook.SnapshotWriteFailuresCounter.Inc(1)
log.Error("Error in saving memory DB snapshot", "err", err, "snapshotBlockNumber", snapshotBlockNumber, "current blockNumber", blockNumber, "blockNumberFloor", blockNumberFloor)
}
}
Expand Down Expand Up @@ -347,6 +350,51 @@ func (lop *limitOrderProcesser) runMatchingTimer() {
}, orderbook.RunMatchingPipelinePanicMessage, orderbook.RunMatchingPipelinePanicsCounter)
}

func (lop *limitOrderProcesser) loadMemoryDBSnapshot() (acceptedBlockNumber uint64, err error) {
// logging is done in the respective functions
acceptedBlockNumber, err = lop.loadMemoryDBSnapshotFromHubbleDB()
if err != nil || acceptedBlockNumber == 0 {
acceptedBlockNumber, err = lop.loadMemoryDBSnapshotFromFile()
}
return acceptedBlockNumber, err
}

func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromHubbleDB() (acceptedBlockNumber uint64, err error) {
snapshotFound, err := lop.hubbleDB.Has([]byte(memoryDBSnapshotKey))
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in checking snapshot in hubbleDB: err=%v", err)
}

if !snapshotFound {
return acceptedBlockNumber, nil
}

memorySnapshotBytes, err := lop.hubbleDB.Get([]byte(memoryDBSnapshotKey))
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in fetching snapshot from hubbleDB; err=%v", err)
}

buf := bytes.NewBuffer(memorySnapshotBytes)
var snapshot orderbook.Snapshot
err = gob.NewDecoder(buf).Decode(&snapshot)
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in snapshot parsing from hubbleDB; err=%v", err)
}

if snapshot.AcceptedBlockNumber != nil && snapshot.AcceptedBlockNumber.Uint64() > 0 {
err = lop.memoryDb.LoadFromSnapshot(snapshot)
if err != nil {
return acceptedBlockNumber, fmt.Errorf("Error in loading snapshot from hubbleDB: err=%v", err)
} else {
log.Info("ListenAndProcessTransactions - memory DB snapshot loaded from hubbleDB", "acceptedBlockNumber", acceptedBlockNumber)
}

return snapshot.AcceptedBlockNumber.Uint64(), nil
} else {
return acceptedBlockNumber, nil
}
}

func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromFile() (acceptedBlockNumber uint64, err error) {
if lop.snapshotFilePath == "" {
return acceptedBlockNumber, fmt.Errorf("snapshot file path not set")
Expand Down Expand Up @@ -379,16 +427,13 @@ func (lop *limitOrderProcesser) loadMemoryDBSnapshotFromFile() (acceptedBlockNum
}

// assumes that memory DB lock is held
func (lop *limitOrderProcesser) saveMemoryDBSnapshotToFile(acceptedBlockNumber *big.Int) error {
if lop.snapshotFilePath == "" {
return fmt.Errorf("snapshot file path not set")
}
func (lop *limitOrderProcesser) saveMemoryDBSnapshot(acceptedBlockNumber *big.Int) error {
start := time.Now()
currentHeadBlock := lop.blockChain.CurrentBlock()

memoryDBCopy, err := lop.memoryDb.GetOrderBookDataCopy()
if err != nil {
return fmt.Errorf("error in getting memory DB copy: err=%v", err)
return fmt.Errorf("Error in getting memory DB copy: err=%v", err)
}
if currentHeadBlock.Number.Cmp(acceptedBlockNumber) == 1 {
// if current head is ahead of the accepted block, then certain events(OrderBook)
Expand Down Expand Up @@ -429,10 +474,17 @@ func (lop *limitOrderProcesser) saveMemoryDBSnapshotToFile(acceptedBlockNumber *

snapshotBytes := buf.Bytes()

// write to snapshot file
err = os.WriteFile(lop.snapshotFilePath, snapshotBytes, 0644)
err = lop.hubbleDB.Put([]byte(memoryDBSnapshotKey), snapshotBytes)
if err != nil {
return fmt.Errorf("Error in writing to snapshot file: err=%v", err)
return fmt.Errorf("Error in saving to DB: err=%v", err)
}

// write to snapshot file
if lop.snapshotFilePath != "" {
err = os.WriteFile(lop.snapshotFilePath, snapshotBytes, 0644)
if err != nil {
return fmt.Errorf("Error in writing to snapshot file: err=%v", err)
}
}

lop.snapshotSavedBlockNumber = acceptedBlockNumber.Uint64()
Expand Down
3 changes: 0 additions & 3 deletions plugin/evm/orderbook/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,4 @@ var (

// makerbook write failures
makerBookWriteFailuresCounter = metrics.NewRegisteredCounter("makerbook_write_failures", nil)

// snapshot write failures
SnapshotWriteFailuresCounter = metrics.NewRegisteredCounter("snapshot_write_failures", nil)
)
28 changes: 14 additions & 14 deletions plugin/evm/orderbook/testing_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ import (
"encoding/gob"
"fmt"
"math/big"
"os"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/subnet-evm/eth"
"github.com/ava-labs/subnet-evm/precompile/contracts/bibliophile"
"github.com/ava-labs/subnet-evm/rpc"
"github.com/ethereum/go-ethereum/common"
)

type TestingAPI struct {
db LimitOrderDatabase
backend *eth.EthAPIBackend
configService IConfigService
snapshotFilePath string
db LimitOrderDatabase
backend *eth.EthAPIBackend
configService IConfigService
hubbleDB database.Database
}

func NewTestingAPI(database LimitOrderDatabase, backend *eth.EthAPIBackend, configService IConfigService, snapshotFilePath string) *TestingAPI {
func NewTestingAPI(database LimitOrderDatabase, backend *eth.EthAPIBackend, configService IConfigService, hubbleDB database.Database) *TestingAPI {
return &TestingAPI{
db: database,
backend: backend,
configService: configService,
snapshotFilePath: snapshotFilePath,
db: database,
backend: backend,
configService: configService,
hubbleDB: hubbleDB,
}
}

Expand Down Expand Up @@ -60,16 +60,16 @@ func (api *TestingAPI) GetOrderBookVars(ctx context.Context, traderAddress strin

func (api *TestingAPI) GetSnapshot(ctx context.Context) (Snapshot, error) {
var snapshot Snapshot

memorySnapshotBytes, err := os.ReadFile(api.snapshotFilePath)
memoryDBSnapshotKey := "memoryDBSnapshot"
memorySnapshotBytes, err := api.hubbleDB.Get([]byte(memoryDBSnapshotKey))
if err != nil {
return snapshot, fmt.Errorf("error in fetching snapshot from hubbleDB; err=%v", err)
return snapshot, fmt.Errorf("Error in fetching snapshot from hubbleDB; err=%v", err)
}

buf := bytes.NewBuffer(memorySnapshotBytes)
err = gob.NewDecoder(buf).Decode(&snapshot)
if err != nil {
return snapshot, fmt.Errorf("error in snapshot parsing; err=%v", err)
return snapshot, fmt.Errorf("Error in snapshot parsing; err=%v", err)
}

return snapshot, nil
Expand Down
1 change: 1 addition & 0 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ func (vm *VM) NewLimitOrderProcesser() LimitOrderProcesser {
&vm.shutdownWg,
vm.eth.APIBackend,
vm.blockChain,
vm.hubbleDB,
validatorPrivateKey,
vm.config,
)
Expand Down

0 comments on commit c429e49

Please sign in to comment.