Skip to content

Commit

Permalink
refacotr protocol state with pebble-based storage
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jul 10, 2024
1 parent 3905715 commit db380d1
Show file tree
Hide file tree
Showing 269 changed files with 4,380 additions and 5,413 deletions.
8 changes: 4 additions & 4 deletions admin/commands/storage/read_range_cluster_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"fmt"

"github.com/dgraph-io/badger/v2"
"github.com/cockroachdb/pebble"
"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/cmd/util/cmd/read-light-block"
"github.com/onflow/flow-go/model/flow"
storage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/pebble"
)

var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
Expand All @@ -21,12 +21,12 @@ var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
const Max_Range_Cluster_Block_Limit = uint64(10001)

type ReadRangeClusterBlocksCommand struct {
db *badger.DB
db *pebble.DB
headers *storage.Headers
payloads *storage.ClusterPayloads
}

func NewReadRangeClusterBlocksCommand(db *badger.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
func NewReadRangeClusterBlocksCommand(db *pebble.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
return &ReadRangeClusterBlocksCommand{
db: db,
headers: headers,
Expand Down
32 changes: 16 additions & 16 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ import (
"github.com/onflow/flow-go/network/underlay"
"github.com/onflow/flow-go/network/validator"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
pStorage "github.com/onflow/flow-go/storage/pebble"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -250,7 +250,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
Expand Down Expand Up @@ -320,12 +320,12 @@ func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilde
builder.Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}

followerState, err := badgerState.NewFollowerState(
followerState, err := pebbleState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -383,7 +383,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer that will handle updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
final := finalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)

packer := signature.NewConsensusSigDataPacker(builder.Committee)
// initialize the verifier for the protocol consensus
Expand Down Expand Up @@ -725,27 +725,27 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = pstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
builder.Storage.LightTransactionResults = pstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, pstorage.DefaultCacheSize)
return nil
}).
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// Note: using a DependableComponent here to ensure that the indexer does not block
// other components from starting while bootstrapping the register db since it may
// take hours to complete.

pdb, err := pStorage.OpenRegisterPebbleDB(builder.registersDBPath)
pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath)
if err != nil {
return nil, fmt.Errorf("could not open registers db: %w", err)
}
builder.ShutdownFunc(func() error {
return pdb.Close()
})

bootstrapped, err := pStorage.IsBootstrapped(pdb)
bootstrapped, err := pstorage.IsBootstrapped(pdb)
if err != nil {
return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err)
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

rootHash := ledger.RootHash(builder.RootSeal.FinalState)
bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
if err != nil {
return nil, fmt.Errorf("could not create registers bootstrap: %w", err)
}
Expand All @@ -790,18 +790,18 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}
}

registers, err := pStorage.NewRegisters(pdb)
registers, err := pstorage.NewRegisters(pdb)
if err != nil {
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
cacheType, err := pstorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
registersCache, err := pstorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func (builder *FlowAccessNodeBuilder) Initialize() error {

builder.EnqueueTracer()
builder.PreInit(cmd.DynamicStartPreInit)
builder.ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange)
builder.ValidateRootSnapshot(pebbleState.ValidRootSnapshotContainsEntityExpiryRange)

return nil
}
Expand Down Expand Up @@ -1596,7 +1596,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
builder.Storage.Events = pstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("events index", func(node *cmd.NodeConfig) error {
Expand Down
20 changes: 10 additions & 10 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ import (
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/storage/badger"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -182,10 +182,10 @@ func main() {
nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand {
clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*badger.Headers)
clusterPayloads := pebble.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*pebble.Headers)
if !ok {
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as badger headers")
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as pebble headers")
}
return storageCommands.NewReadRangeClusterBlocksCommand(conf.DB, headers, clusterPayloads)
}).
Expand All @@ -195,13 +195,13 @@ func main() {
return nil
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// For now, we only support state implementations from package pebble.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
return fmt.Errorf("only implementations of type pebble.State are currently supported but read-only state has type %T", node.State)
}
followerState, err = badgerState.NewFollowerState(
followerState, err = pebbleState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -287,7 +287,7 @@ func main() {
Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer for updating the protocol
// state when the follower detects newly finalized blocks
finalizer := confinalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)
finalizer := confinalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, followerState, node.Tracer)
finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
if err != nil {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
Expand Down
22 changes: 11 additions & 11 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ import (
"github.com/onflow/flow-go/module/validation"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/io"
)

Expand Down Expand Up @@ -209,7 +209,7 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange).
ValidateRootSnapshot(pebbleState.ValidRootSnapshotContainsEntityExpiryRange).
Module("consensus node metrics", func(node *cmd.NodeConfig) error {
conMetrics = metrics.NewConsensusCollector(node.Tracer, node.MetricsRegisterer)
return nil
Expand Down Expand Up @@ -244,11 +244,11 @@ func main() {
return err
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// For now, we only support state implementations from package pebble.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
return fmt.Errorf("only implementations of type pebble.State are currently supported but read-only state has type %T", node.State)
}

chunkAssigner, err = chmodule.NewChunkAssigner(chunkAlpha, node.State)
Expand Down Expand Up @@ -278,7 +278,7 @@ func main() {
return err
}

mutableState, err = badgerState.NewFullConsensusState(
mutableState, err = pebbleState.NewFullConsensusState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -559,12 +559,12 @@ func main() {
}).
Component("hotstuff modules", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the block finalizer
finalize := finalizer.NewFinalizer(
finalize := finalizer.NewFinalizerPebble(
node.DB,
node.Storage.Headers,
mutableState,
node.Tracer,
finalizer.WithCleanup(finalizer.CleanupMempools(
finalizer.WithCleanupPebble(finalizer.CleanupMempools(
node.Metrics.Mempool,
conMetrics,
node.Storage.Payloads,
Expand Down Expand Up @@ -605,7 +605,7 @@ func main() {
notifier.AddFollowerConsumer(followerDistributor)

// initialize the persister
persist := persister.New(node.DB, node.RootChainID)
persist := persister.NewPersisterPebble(node.DB, node.RootChainID)

finalizedBlock, err := node.State.Final().Head()
if err != nil {
Expand Down Expand Up @@ -722,7 +722,7 @@ func main() {
Component("consensus participant", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the block builder
var build module.Builder
build, err = builder.NewBuilder(
build, err = builder.NewBuilderPebble(
node.Metrics.Mempool,
node.DB,
mutableState,
Expand Down
4 changes: 2 additions & 2 deletions cmd/dynamic_startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/state/protocol"
badgerstate "github.com/onflow/flow-go/state/protocol/badger"
pebblestate "github.com/onflow/flow-go/state/protocol/pebble"
utilsio "github.com/onflow/flow-go/utils/io"

"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -152,7 +152,7 @@ func DynamicStartPreInit(nodeConfig *NodeConfig) error {
log := nodeConfig.Logger.With().Str("component", "dynamic-startup").Logger()

// skip dynamic startup if the protocol state is bootstrapped
isBootstrapped, err := badgerstate.IsBootstrapped(nodeConfig.DB)
isBootstrapped, err := pebblestate.IsBootstrapped(nodeConfig.DB)
if err != nil {
return fmt.Errorf("could not check if state is boostrapped: %w", err)
}
Expand Down
Loading

0 comments on commit db380d1

Please sign in to comment.