Skip to content

Commit

Permalink
feat: make stage copiable and introduce allow to customize prefix of …
Browse files Browse the repository at this point in the history
…db (#56)

* make stage copiable and introduce allow to customize prefix of db

* add UTC to all time.Unix interface
  • Loading branch information
beer-1 authored Dec 6, 2024
1 parent 7be59a6 commit f26cecd
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 110 deletions.
4 changes: 1 addition & 3 deletions challenger/child/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ func (ch *Child) endBlockHandler(ctx types.Context, args nodetypes.EndBlockArgs)
return err
}

err = ch.stage.ExecuteFnWithDB(ch.challenger.DB(), func() error {
return challengerdb.SavePendingChallenges(ch.stage, pendingChallenges)
})
err = challengerdb.SavePendingChallenges(ch.stage.WithPrefixedKey(ch.challenger.DB().PrefixedKey), pendingChallenges)
if err != nil {
return errors.Wrap(err, "failed to save pending events on child db")
}
Expand Down
8 changes: 2 additions & 6 deletions challenger/host/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
}

// save all pending events to child db
err = h.stage.ExecuteFnWithDB(h.child.DB(), func() error {
return eventhandler.SavePendingEvents(h.stage, h.eventQueue)
})
err = eventhandler.SavePendingEvents(h.stage.WithPrefixedKey(h.child.DB().PrefixedKey), h.eventQueue)
if err != nil {
return errors.Wrap(err, "failed to save pending events on child db")
}
Expand Down Expand Up @@ -59,9 +57,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
return err
}

err = h.stage.ExecuteFnWithDB(h.challenger.DB(), func() error {
return challengerdb.SavePendingChallenges(h.stage, pendingChallenges)
})
err = challengerdb.SavePendingChallenges(h.stage.WithPrefixedKey(h.challenger.DB().PrefixedKey), pendingChallenges)
if err != nil {
return errors.Wrap(err, "failed to save pending events on child db")
}
Expand Down
2 changes: 1 addition & 1 deletion challenger/types/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,5 @@ func ParseChallenge(key []byte) (time.Time, ChallengeId, error) {
typeBz := key[cursor : cursor+1]
cursor += 1 + 1 // u8 + splitter
idBz := key[cursor:]
return time.Unix(0, types.MustUint64ToInt64(dbtypes.ToUint64Key(timeBz))), ChallengeId{Type: EventType(typeBz[0]), Id: dbtypes.ToUint64Key(idBz)}, nil
return time.Unix(0, types.MustUint64ToInt64(dbtypes.ToUint64Key(timeBz))).UTC(), ChallengeId{Type: EventType(typeBz[0]), Id: dbtypes.ToUint64Key(idBz)}, nil
}
5 changes: 1 addition & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,5 @@ func (db LevelDB) GetPrefix() []byte {
}

func (db *LevelDB) NewStage() types.CommitDB {
return &Stage{
kvmap: make(map[string][]byte),
parent: db,
}
return newStage(db)
}
2 changes: 1 addition & 1 deletion db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestNewStage(t *testing.T) {
tstage := db.NewStage()
require.NotNil(t, tstage)

stage, ok := tstage.(*Stage)
stage, ok := tstage.(Stage)
require.True(t, ok)
require.Equal(t, stage.batch.Len(), 0)
require.Equal(t, len(stage.kvmap), 0)
Expand Down
55 changes: 27 additions & 28 deletions db/stage.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,72 @@
package db

import (
dbtypes "github.com/initia-labs/opinit-bots/db/types"
"github.com/initia-labs/opinit-bots/types"
"github.com/syndtr/goleveldb/leveldb"
"golang.org/x/exp/maps"
)

type Stage struct {
batch leveldb.Batch
batch *leveldb.Batch
kvmap map[string][]byte
parent *LevelDB

prefixedKey func(key []byte) []byte
}

func newStage(parent *LevelDB) Stage {
return Stage{
batch: new(leveldb.Batch),
kvmap: make(map[string][]byte),
parent: parent,

prefixedKey: parent.PrefixedKey,
}
}

func (s Stage) WithPrefixedKey(prefixedKey func(key []byte) []byte) types.CommitDB {
s.prefixedKey = prefixedKey
return s
}

var _ types.CommitDB = (*Stage)(nil)
var _ types.CommitDB = Stage{}

func (s *Stage) Set(key []byte, value []byte) error {
prefixedKey := s.parent.PrefixedKey(key)
func (s Stage) Set(key []byte, value []byte) error {
prefixedKey := s.prefixedKey(key)
s.batch.Put(prefixedKey, value)
s.kvmap[string(prefixedKey)] = value
return nil
}

func (s Stage) Get(key []byte) ([]byte, error) {
prefixedKey := s.parent.PrefixedKey(key)
prefixedKey := s.prefixedKey(key)
value, ok := s.kvmap[string(prefixedKey)]
if ok {
return value, nil
}
return s.parent.Get(key)
}

func (s *Stage) Delete(key []byte) error {
prefixedKey := s.parent.PrefixedKey(key)
func (s Stage) Delete(key []byte) error {
prefixedKey := s.prefixedKey(key)
s.batch.Delete(prefixedKey)
s.kvmap[string(prefixedKey)] = nil
return nil
}

func (s *Stage) Commit() error {
err := s.parent.db.Write(&s.batch, nil)
func (s Stage) Commit() error {
err := s.parent.db.Write(s.batch, nil)
if err != nil {
return err
}
return nil
}

// ExecuteFnWithDB executes the given function with the given db.
// It temporarily sets the given db as the parent db of the stage and restores the original parent db after the function execution.
func (s *Stage) ExecuteFnWithDB(db types.DB, fn func() error) error {
existing := s.parent
defer func() {
s.parent = existing
}()

leveldb, ok := db.(*LevelDB)
if !ok {
return dbtypes.ErrInvalidParentDBType
}
s.parent = leveldb

return fn()
}

func (s Stage) Len() int {
return s.batch.Len()
}

func (s *Stage) Reset() {
func (s Stage) Reset() {
s.batch.Reset()
maps.Clear(s.kvmap)
}
Expand Down
23 changes: 10 additions & 13 deletions db/stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"golang.org/x/exp/maps"
)

func CreateTestStage(t *testing.T, db *LevelDB) (*LevelDB, *Stage, error) {
func CreateTestStage(t *testing.T, db *LevelDB) (*LevelDB, Stage, error) {
var err error
if db == nil {
db, err = NewMemDB()
require.NoError(t, err)
}
tstage := db.NewStage()
stage, ok := tstage.(*Stage)
stage, ok := tstage.(Stage)
require.True(t, ok)
return db, stage, nil
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestStageCommit(t *testing.T) {
require.Error(t, err)
}

func TestExecuteFnWithDB(t *testing.T) {
func TestWithPrefixedKey(t *testing.T) {
db, err := NewMemDB()
require.NoError(t, err)

Expand All @@ -214,14 +214,15 @@ func TestExecuteFnWithDB(t *testing.T) {
err = stage1.Set([]byte("key1"), []byte("value1"))
require.NoError(t, err)

err = stage1.ExecuteFnWithDB(db2, func() error {
err := stage1.Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)
return err
})
err = stage1.WithPrefixedKey(db2.PrefixedKey).Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)

// previous WithPrefixedKey should not affect the new one
err = stage1.Set([]byte("key2"), []byte("value2"))
require.NoError(t, err)

require.Equal(t, stage1.kvmap[string(db1.PrefixedKey([]byte("key1")))], []byte("value1"))
require.Equal(t, stage1.kvmap[string(db1.PrefixedKey([]byte("key2")))], []byte("value2"))
require.Equal(t, stage1.kvmap[string(db2.PrefixedKey([]byte("key1")))], []byte("value2"))
}

Expand All @@ -239,11 +240,7 @@ func TestStageAll(t *testing.T) {
err = stage1.Set([]byte("key1"), []byte("value1"))
require.NoError(t, err)

err = stage1.ExecuteFnWithDB(db2, func() error {
err := stage1.Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)
return err
})
err = stage1.WithPrefixedKey(db2.PrefixedKey).Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)

allKVs := stage1.All()
Expand Down
32 changes: 16 additions & 16 deletions executor/batchsubmitter/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestPrepareBatch(t *testing.T) {
existingLocalBatchInfo: executortypes.LocalBatchInfo{
Start: 1,
End: 100,
LastSubmissionTime: time.Unix(0, 10000),
LastSubmissionTime: time.Unix(0, 10000).UTC(),
BatchSize: 100,
},
batchInfoQueue: []ophosttypes.BatchInfoWithOutput{
Expand All @@ -89,7 +89,7 @@ func TestPrepareBatch(t *testing.T) {
expectedLocalBatchInfo: executortypes.LocalBatchInfo{
Start: 101,
End: 0,
LastSubmissionTime: time.Unix(0, 10000),
LastSubmissionTime: time.Unix(0, 10000).UTC(),
BatchSize: 0,
},
expectedChanges: []types.KV{},
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestCheckBatch(t *testing.T) {
name: "block time >= last submission time + 2/3 interval, block height == latest height",
localBatchInfo: &executortypes.LocalBatchInfo{
Start: 1,
LastSubmissionTime: time.Unix(0, 0),
LastSubmissionTime: time.Unix(0, 0).UTC(),
},
batchConfig: executortypes.BatchConfig{
MaxChunks: 100,
Expand All @@ -555,7 +555,7 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 10,
blockTime: time.Unix(0, 10001),
blockTime: time.Unix(0, 10001).UTC(),
expected: true,
},
{
Expand All @@ -571,14 +571,14 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 20,
blockTime: time.Unix(0, 10001),
blockTime: time.Unix(0, 10001).UTC(),
expected: false,
},
{
name: "block time < last submission time + 2/3 interval, block height == latest height",
localBatchInfo: &executortypes.LocalBatchInfo{
Start: 1,
LastSubmissionTime: time.Unix(0, 10000),
LastSubmissionTime: time.Unix(0, 10000).UTC(),
},
batchConfig: executortypes.BatchConfig{
MaxChunks: 100,
Expand All @@ -587,14 +587,14 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 10,
blockTime: time.Unix(0, 10001),
blockTime: time.Unix(0, 10001).UTC(),
expected: false,
},
{
name: "block time > last submission time + max submission time, block height == latest height",
localBatchInfo: &executortypes.LocalBatchInfo{
Start: 1,
LastSubmissionTime: time.Unix(0, 0),
LastSubmissionTime: time.Unix(0, 0).UTC(),
},
batchConfig: executortypes.BatchConfig{
MaxChunks: 100,
Expand All @@ -603,14 +603,14 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 10,
blockTime: time.Unix(0, 1000*1000*1000+1),
blockTime: time.Unix(0, 1000*1000*1000+1).UTC(),
expected: true,
},
{
name: "block time > last submission time + max submission time, block height != latest height",
localBatchInfo: &executortypes.LocalBatchInfo{
Start: 1,
LastSubmissionTime: time.Unix(0, 0),
LastSubmissionTime: time.Unix(0, 0).UTC(),
},
batchConfig: executortypes.BatchConfig{
MaxChunks: 100,
Expand All @@ -619,14 +619,14 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 20,
blockTime: time.Unix(0, 1000*1000*1000+1),
blockTime: time.Unix(0, 1000*1000*1000+1).UTC(),
expected: false,
},
{
name: "block time < last submission time + max submission time, block height == latest height",
localBatchInfo: &executortypes.LocalBatchInfo{
Start: 1,
LastSubmissionTime: time.Unix(0, 0),
LastSubmissionTime: time.Unix(0, 0).UTC(),
},
batchConfig: executortypes.BatchConfig{
MaxChunks: 100,
Expand All @@ -635,14 +635,14 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 10,
blockTime: time.Unix(0, 1000*1000*1000),
blockTime: time.Unix(0, 1000*1000*1000).UTC(),
expected: true,
},
{
name: "batch size >= (max chunks - 1) * max chunk size",
localBatchInfo: &executortypes.LocalBatchInfo{
Start: 1,
LastSubmissionTime: time.Unix(0, 0),
LastSubmissionTime: time.Unix(0, 0).UTC(),
BatchSize: 1000,
},
batchConfig: executortypes.BatchConfig{
Expand All @@ -652,7 +652,7 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 20,
blockTime: time.Unix(0, 500),
blockTime: time.Unix(0, 500).UTC(),
expected: true,
},
{
Expand All @@ -669,7 +669,7 @@ func TestCheckBatch(t *testing.T) {
},
blockHeight: 10,
latestHeight: 20,
blockTime: time.Unix(0, 500),
blockTime: time.Unix(0, 500).UTC(),
expected: false,
},
}
Expand Down
4 changes: 1 addition & 3 deletions executor/batchsubmitter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ func (bs *BatchSubmitter) rawBlockHandler(ctx types.Context, args nodetypes.RawB
}
if bs.da.HasBroadcaster() {
// save processed msgs to stage using host db
err := bs.stage.ExecuteFnWithDB(bs.da.DB(), func() error {
return broadcaster.SaveProcessedMsgsBatch(bs.stage, bs.da.Codec(), bs.processedMsgs)
})
err := broadcaster.SaveProcessedMsgsBatch(bs.stage.WithPrefixedKey(bs.da.DB().PrefixedKey), bs.da.Codec(), bs.processedMsgs)
if err != nil {
return errors.Wrap(err, "failed to save processed msgs")
}
Expand Down
Loading

0 comments on commit f26cecd

Please sign in to comment.