Skip to content

Commit

Permalink
initial savepoints and rollbacks implementation
Browse files Browse the repository at this point in the history
* updates the TreeStore to follow the Submodule pattern
* adds module_test.go for testing TreeStore module creation
  • Loading branch information
dylanlott committed Jun 29, 2023
1 parent fd30526 commit 6030ada
Show file tree
Hide file tree
Showing 12 changed files with 566 additions and 61 deletions.
2 changes: 1 addition & 1 deletion persistence/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *PostgresContext) GetHeight() (int64, error) {
return p.Height, nil
}

// Creates a block protobuf object using the schema defined in the persistence module
// Fetches all transactions for the current height and creates a block protobuf object using the schema defined in the persistence module.
func (p *PostgresContext) prepareBlock(proposerAddr, quorumCert []byte) (*coreTypes.Block, error) {
// Retrieve the previous block hash
var prevBlockHash string
Expand Down
19 changes: 11 additions & 8 deletions persistence/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ type PostgresContext struct {
networkId string
}

func (p *PostgresContext) NewSavePoint(bytes []byte) error {
p.logger.Info().Bool("TODO", true).Msg("NewSavePoint not implemented")
// NewSavePoint generates a new Savepoint for this context.
func (p *PostgresContext) NewSavePoint() error {
if err := p.stateTrees.Savepoint(); err != nil {
return err
}
return nil
}

// TECHDEBT(#327): Guarantee atomicity betweens `prepareBlock`, `insertBlock` and `storeBlock` for save points & rollbacks.
func (p *PostgresContext) RollbackToSavePoint(bytes []byte) error {
p.logger.Info().Bool("TODO", true).Msg("RollbackToSavePoint not fully implemented")
return p.tx.Rollback(context.TODO())
// RollbackToSavepoint triggers a rollback for the current pgx transaction and the underylying submodule stores.
func (p *PostgresContext) RollbackToSavePoint() error {
ctx := context.TODO()
err := p.tx.Rollback(ctx)
p.stateTrees.Rollback()
return err
}

// IMPROVE(#361): Guarantee the integrity of the state
// Full details in the thread from the PR review: https://github.com/pokt-network/pocket/pull/285#discussion_r1018471719
func (p *PostgresContext) ComputeStateHash() (string, error) {
stateHash, err := p.stateTrees.Update(p.tx, uint64(p.Height))
Expand All @@ -58,7 +62,6 @@ func (p *PostgresContext) ComputeStateHash() (string, error) {
return p.stateHash, nil
}

// TECHDEBT(#327): Make sure these operations are atomic
func (p *PostgresContext) Commit(proposerAddr, quorumCert []byte) error {
p.logger.Info().Int64("height", p.Height).Msg("About to commit block & context")

Expand Down
10 changes: 8 additions & 2 deletions persistence/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package persistence

import (
"context"
"errors"
"fmt"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -105,18 +106,23 @@ func (*persistenceModule) Create(bus modules.Bus, options ...modules.ModuleOptio
treeModule, err := trees.Create(
bus,
trees.WithTreeStoreDirectory(persistenceCfg.TreesStoreDir),
trees.WithLogger(m.logger))
trees.WithLogger(m.logger),
trees.WithTxIndexer(txIndexer))
if err != nil {
return nil, err
}
treeStoreModule, ok := treeModule.(modules.TreeStoreModule)
if !ok {
return nil, errors.New("error casting TreeStoreModule")
}

m.config = persistenceCfg
m.genesisState = genesisState
m.networkId = runtimeMgr.GetConfig().NetworkId

m.blockStore = blockStore
m.txIndexer = txIndexer
m.stateTrees = treeModule
m.stateTrees = treeStoreModule

// TECHDEBT: reconsider if this is the best place to call `populateGenesisState`. Note that
// this forces the genesis state to be reloaded on every node startup until state
Expand Down
40 changes: 32 additions & 8 deletions persistence/trees/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@ package trees
import (
"fmt"

"github.com/pokt-network/pocket/persistence/indexer"
"github.com/pokt-network/pocket/persistence/kvstore"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/smt"
)

func (*treeStore) Create(bus modules.Bus, options ...modules.TreeStoreOption) (modules.TreeStoreModule, error) {
var _ modules.Module = &treeStore{}

func (*treeStore) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
m := &treeStore{}

bus.RegisterModule(m)

for _, option := range options {
option(m)
}

m.SetBus(bus)
if m.txi == nil {
m.txi = bus.GetPersistenceModule().GetTxIndexer()
}

if err := m.setupTrees(); err != nil {
return nil, err
Expand All @@ -24,13 +31,13 @@ func (*treeStore) Create(bus modules.Bus, options ...modules.TreeStoreOption) (m
return m, nil
}

func Create(bus modules.Bus, options ...modules.TreeStoreOption) (modules.TreeStoreModule, error) {
func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(treeStore).Create(bus, options...)
}

// WithLogger assigns a logger for the tree store
func WithLogger(logger *modules.Logger) modules.TreeStoreOption {
return func(m modules.TreeStoreModule) {
func WithLogger(logger *modules.Logger) modules.ModuleOption {
return func(m modules.InitializableModule) {
if mod, ok := m.(*treeStore); ok {
mod.logger = logger
}
Expand All @@ -39,14 +46,31 @@ func WithLogger(logger *modules.Logger) modules.TreeStoreOption {

// WithTreeStoreDirectory assigns the path where the tree store
// saves its data.
func WithTreeStoreDirectory(path string) modules.TreeStoreOption {
return func(m modules.TreeStoreModule) {
if mod, ok := m.(*treeStore); ok {
func WithTreeStoreDirectory(path string) modules.ModuleOption {
return func(m modules.InitializableModule) {
mod, ok := m.(*treeStore)
if ok {
mod.treeStoreDir = path
}
}
}

// WithTxIndexer assigns a TxIndexer for use during operation.
func WithTxIndexer(txi indexer.TxIndexer) modules.ModuleOption {
return func(m modules.InitializableModule) {
mod, ok := m.(*treeStore)
if ok {
mod.txi = txi
}
}
}

func (t *treeStore) GetModuleName() string { return modules.TreeStoreModuleName }
func (t *treeStore) Start() error { return nil }
func (t *treeStore) Stop() error { return nil }
func (t *treeStore) GetBus() modules.Bus { return t.bus }
func (t *treeStore) SetBus(bus modules.Bus) { t.bus = bus }

func (t *treeStore) setupTrees() error {
if t.treeStoreDir == ":memory:" {
return t.setupInMemory()
Expand Down
101 changes: 101 additions & 0 deletions persistence/trees/module_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package trees_test

import (
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/persistence/trees"
"github.com/pokt-network/pocket/runtime/genesis"
"github.com/pokt-network/pocket/runtime/test_artifacts"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
)

const (
serviceURLFormat = "node%d.consensus:42069"
)

func TestTreeStore_Create(t *testing.T) {
ctrl := gomock.NewController(t)
mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl)
mockBus := createMockBus(t, mockRuntimeMgr)

genesisStateMock := createMockGenesisState(nil)
persistenceMock := preparePersistenceMock(t, mockBus, genesisStateMock)

mockBus.EXPECT().GetPersistenceModule().Return(persistenceMock).AnyTimes()
persistenceMock.EXPECT().GetBus().AnyTimes().Return(mockBus)
persistenceMock.EXPECT().NewRWContext(int64(0)).AnyTimes()
persistenceMock.EXPECT().GetTxIndexer().AnyTimes()

treemod, err := trees.Create(mockBus,
trees.WithTreeStoreDirectory(":memory:"))
assert.NoError(t, err)
got := treemod.GetBus()
assert.Equal(t, got, mockBus)
}

func TestTreeStore_DebugClearAll(t *testing.T) {
// TODO: Write test case for the DebugClearAll method
t.Skip("TODO: Write test case for DebugClearAll method")
}

// createMockGenesisState configures and returns a mocked GenesisState
func createMockGenesisState(valKeys []cryptoPocket.PrivateKey) *genesis.GenesisState {
genesisState := new(genesis.GenesisState)
validators := make([]*coreTypes.Actor, len(valKeys))
for i, valKey := range valKeys {
addr := valKey.Address().String()
mockActor := &coreTypes.Actor{
ActorType: coreTypes.ActorType_ACTOR_TYPE_VAL,
Address: addr,
PublicKey: valKey.PublicKey().String(),
ServiceUrl: validatorId(i + 1),
StakedAmount: test_artifacts.DefaultStakeAmountString,
PausedHeight: int64(0),
UnstakingHeight: int64(0),
Output: addr,
}
validators[i] = mockActor
}
genesisState.Validators = validators

return genesisState
}

// Persistence mock - only needed for validatorMap access
func preparePersistenceMock(t *testing.T, busMock *mockModules.MockBus, genesisState *genesis.GenesisState) *mockModules.MockPersistenceModule {
ctrl := gomock.NewController(t)

persistenceModuleMock := mockModules.NewMockPersistenceModule(ctrl)
readCtxMock := mockModules.NewMockPersistenceReadContext(ctrl)

readCtxMock.EXPECT().GetAllValidators(gomock.Any()).Return(genesisState.GetValidators(), nil).AnyTimes()
readCtxMock.EXPECT().GetAllStakedActors(gomock.Any()).DoAndReturn(func(height int64) ([]*coreTypes.Actor, error) {
return testutil.Concatenate[*coreTypes.Actor](
genesisState.GetValidators(),
genesisState.GetServicers(),
genesisState.GetFishermen(),
genesisState.GetApplications(),
), nil
}).AnyTimes()
persistenceModuleMock.EXPECT().NewReadContext(gomock.Any()).Return(readCtxMock, nil).AnyTimes()
readCtxMock.EXPECT().Release().AnyTimes()

persistenceModuleMock.EXPECT().GetBus().Return(busMock).AnyTimes()
persistenceModuleMock.EXPECT().SetBus(busMock).AnyTimes()
persistenceModuleMock.EXPECT().GetModuleName().Return(modules.PersistenceModuleName).AnyTimes()
busMock.RegisterModule(persistenceModuleMock)

return persistenceModuleMock
}

func validatorId(i int) string {
return fmt.Sprintf(serviceURLFormat, i)
}
Loading

0 comments on commit 6030ada

Please sign in to comment.