Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Review Version] Refactor Protocol State with Pebble-based Storage #6197

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5af77d0
refacotr protocol state with pebble-based storage
zhangchiqing Jul 9, 2024
28f96be
fix iteration
zhangchiqing Jul 10, 2024
e98b073
update utils with pebble
zhangchiqing Jul 10, 2024
f724039
rename to read pebble
zhangchiqing Jul 10, 2024
a7d08c5
refactor iterator
zhangchiqing Jul 10, 2024
2bd8329
add comment
zhangchiqing Jul 10, 2024
9182dbf
add delete range operation to batch writer
zhangchiqing Jul 10, 2024
05e758d
update git ignore
zhangchiqing Jul 10, 2024
5af2b0c
fix util
zhangchiqing Jul 10, 2024
dd6c183
remove truncate
zhangchiqing Jul 10, 2024
2efb925
preventing dirty reads when storing qcs
zhangchiqing Jul 10, 2024
e2ecc2c
small refactor
zhangchiqing Jul 11, 2024
b777c5e
update mocks
zhangchiqing Jul 11, 2024
c3cd924
removing some outdated comments and whitespaces
Aug 10, 2024
51969ec
Merge branch 'leo/v0.33-pebble-base' into alex/v0.33-pebble-storage-t…
Aug 10, 2024
b277372
refactor iterate
zhangchiqing Aug 10, 2024
d5d8b21
refactor using keyUpperBound
zhangchiqing Aug 15, 2024
80f1a2f
making indexing own receipts concurrent safe
zhangchiqing Aug 15, 2024
26865bb
address review comments
zhangchiqing Aug 15, 2024
aad84b1
Apply suggestions from code review
zhangchiqing Aug 15, 2024
c2ad1e9
address review comments
zhangchiqing Aug 15, 2024
d2c1016
refactor getStartEndKeys with prefixUpperBound
zhangchiqing Aug 15, 2024
789d2bb
refactor indexing new block
zhangchiqing Aug 15, 2024
148c877
refactor new follower state
zhangchiqing Aug 16, 2024
2ea5698
add test case to events
zhangchiqing Aug 16, 2024
77b64c0
not releasing the lock until the pebblw batch update is committed
zhangchiqing Aug 16, 2024
c904ec4
fix concurrency
zhangchiqing Aug 16, 2024
95043aa
making finalization concurrency safe
zhangchiqing Aug 16, 2024
f9436d8
adding test cases
zhangchiqing Aug 16, 2024
afd8526
add concurrency safety tests
zhangchiqing Aug 19, 2024
9737a92
making it concurrent-safe for indexing approval
zhangchiqing Aug 19, 2024
a01d317
update test cases for Index
zhangchiqing Aug 19, 2024
8eb0f74
Merge pull request #6374 from onflow/leo/v0.33-pebble-storage-index-a…
zhangchiqing Sep 13, 2024
29d78d8
Merge pull request #6373 from onflow/leo/v0.33-pebble-storage-finalizing
zhangchiqing Sep 13, 2024
ce2775e
Merge pull request #6360 from onflow/leo/v0.33-pebble-storage-block-i…
zhangchiqing Sep 13, 2024
d527912
Apply suggestions from code review
zhangchiqing Sep 13, 2024
7fff0ee
Merge pull request #6354 from onflow/leo/v0.33-pebble-storage-my-rece…
zhangchiqing Sep 13, 2024
38dd0c8
Merge pull request #6316 from onflow/leo/v0.33-pebble-storage-iterate
zhangchiqing Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ language/tools/vscode-extension/out/*
**/gomock_reflect*/*

# command line tool
read-pebble
read-badger
read-protocol-state
remove-execution-fork
Expand Down
8 changes: 4 additions & 4 deletions admin/commands/storage/read_range_cluster_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"fmt"

"github.com/dgraph-io/badger/v2"
"github.com/cockroachdb/pebble"
"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/cmd/util/cmd/read-light-block"
"github.com/onflow/flow-go/model/flow"
storage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/pebble"
)

var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
Expand All @@ -21,12 +21,12 @@ var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
const Max_Range_Cluster_Block_Limit = uint64(10001)

type ReadRangeClusterBlocksCommand struct {
db *badger.DB
db *pebble.DB
headers *storage.Headers
payloads *storage.ClusterPayloads
}

func NewReadRangeClusterBlocksCommand(db *badger.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
func NewReadRangeClusterBlocksCommand(db *pebble.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
return &ReadRangeClusterBlocksCommand{
db: db,
headers: headers,
Expand Down
32 changes: 16 additions & 16 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ import (
"github.com/onflow/flow-go/network/underlay"
"github.com/onflow/flow-go/network/validator"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
pStorage "github.com/onflow/flow-go/storage/pebble"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -250,7 +250,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
Expand Down Expand Up @@ -320,12 +320,12 @@ func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilde
builder.Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}

followerState, err := badgerState.NewFollowerState(
followerState, err := pebbleState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -383,7 +383,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer that will handle updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
final := finalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
Copy link
Member Author

@zhangchiqing zhangchiqing Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to use a flag for switching between pebble and badger. I had to make a copy of the finalizer, and use a different name for the pebble version of the finalizer.

⚠️ This means if there is any change onto the badger Finalizer, we have to manually bring it to the pebble Finalizer.

Same for the collection finalizer


packer := signature.NewConsensusSigDataPacker(builder.Committee)
// initialize the verifier for the protocol consensus
Expand Down Expand Up @@ -725,27 +725,27 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = pstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
builder.Storage.LightTransactionResults = pstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, pstorage.DefaultCacheSize)
return nil
}).
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// Note: using a DependableComponent here to ensure that the indexer does not block
// other components from starting while bootstrapping the register db since it may
// take hours to complete.

pdb, err := pStorage.OpenRegisterPebbleDB(builder.registersDBPath)
pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath)
if err != nil {
return nil, fmt.Errorf("could not open registers db: %w", err)
}
builder.ShutdownFunc(func() error {
return pdb.Close()
})

bootstrapped, err := pStorage.IsBootstrapped(pdb)
bootstrapped, err := pstorage.IsBootstrapped(pdb)
if err != nil {
return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err)
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

rootHash := ledger.RootHash(builder.RootSeal.FinalState)
bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
if err != nil {
return nil, fmt.Errorf("could not create registers bootstrap: %w", err)
}
Expand All @@ -790,18 +790,18 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}
}

registers, err := pStorage.NewRegisters(pdb)
registers, err := pstorage.NewRegisters(pdb)
if err != nil {
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
cacheType, err := pstorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
registersCache, err := pstorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func (builder *FlowAccessNodeBuilder) Initialize() error {

builder.EnqueueTracer()
builder.PreInit(cmd.DynamicStartPreInit)
builder.ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange)
builder.ValidateRootSnapshot(pebbleState.ValidRootSnapshotContainsEntityExpiryRange)

return nil
}
Expand Down Expand Up @@ -1596,7 +1596,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
builder.Storage.Events = pstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("events index", func(node *cmd.NodeConfig) error {
Expand Down
20 changes: 10 additions & 10 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ import (
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/storage/badger"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -182,10 +182,10 @@ func main() {
nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand {
clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*badger.Headers)
clusterPayloads := pebble.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*pebble.Headers)
if !ok {
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as badger headers")
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as pebble headers")
}
return storageCommands.NewReadRangeClusterBlocksCommand(conf.DB, headers, clusterPayloads)
}).
Expand All @@ -195,13 +195,13 @@ func main() {
return nil
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// For now, we only support state implementations from package pebble.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
return fmt.Errorf("only implementations of type pebble.State are currently supported but read-only state has type %T", node.State)
}
followerState, err = badgerState.NewFollowerState(
followerState, err = pebbleState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -287,7 +287,7 @@ func main() {
Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer for updating the protocol
// state when the follower detects newly finalized blocks
finalizer := confinalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)
finalizer := confinalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, followerState, node.Tracer)
finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
if err != nil {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
Expand Down
26 changes: 15 additions & 11 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ import (
"github.com/onflow/flow-go/module/validation"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/pebble/procedure"
"github.com/onflow/flow-go/utils/io"
)

Expand Down Expand Up @@ -132,6 +133,7 @@ func main() {
getSealingConfigs module.SealingConfigsGetter
)
var deprecatedFlagBlockRateDelay time.Duration
blockIndexer := procedure.NewBlockIndexer()

nodeBuilder := cmd.FlowNode(flow.RoleConsensus.String())
nodeBuilder.ExtraFlags(func(flags *pflag.FlagSet) {
Expand Down Expand Up @@ -209,7 +211,7 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange).
ValidateRootSnapshot(pebbleState.ValidRootSnapshotContainsEntityExpiryRange).
Module("consensus node metrics", func(node *cmd.NodeConfig) error {
conMetrics = metrics.NewConsensusCollector(node.Tracer, node.MetricsRegisterer)
return nil
Expand Down Expand Up @@ -244,11 +246,11 @@ func main() {
return err
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// For now, we only support state implementations from package pebble.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
return fmt.Errorf("only implementations of type pebble.State are currently supported but read-only state has type %T", node.State)
}

chunkAssigner, err = chmodule.NewChunkAssigner(chunkAlpha, node.State)
Expand Down Expand Up @@ -278,13 +280,14 @@ func main() {
return err
}

mutableState, err = badgerState.NewFullConsensusState(
mutableState, err = pebbleState.NewFullConsensusState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blockIndexer,
blockTimer,
receiptValidator,
sealValidator,
Expand Down Expand Up @@ -559,12 +562,12 @@ func main() {
}).
Component("hotstuff modules", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the block finalizer
finalize := finalizer.NewFinalizer(
finalize := finalizer.NewFinalizerPebble(
node.DB,
node.Storage.Headers,
mutableState,
node.Tracer,
finalizer.WithCleanup(finalizer.CleanupMempools(
finalizer.WithCleanupPebble(finalizer.CleanupMempools(
node.Metrics.Mempool,
conMetrics,
node.Storage.Payloads,
Expand Down Expand Up @@ -605,7 +608,7 @@ func main() {
notifier.AddFollowerConsumer(followerDistributor)

// initialize the persister
persist := persister.New(node.DB, node.RootChainID)
persist := persister.NewPersisterPebble(node.DB, node.RootChainID)

finalizedBlock, err := node.State.Final().Head()
if err != nil {
Expand Down Expand Up @@ -722,14 +725,15 @@ func main() {
Component("consensus participant", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the block builder
var build module.Builder
build, err = builder.NewBuilder(
build, err = builder.NewBuilderPebble(
node.Metrics.Mempool,
node.DB,
mutableState,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Index,
node.Storage.Blocks,
blockIndexer,
node.Storage.Results,
node.Storage.Receipts,
guarantees,
Expand Down
4 changes: 2 additions & 2 deletions cmd/dynamic_startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/state/protocol"
badgerstate "github.com/onflow/flow-go/state/protocol/badger"
pebblestate "github.com/onflow/flow-go/state/protocol/pebble"
utilsio "github.com/onflow/flow-go/utils/io"

"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -152,7 +152,7 @@ func DynamicStartPreInit(nodeConfig *NodeConfig) error {
log := nodeConfig.Logger.With().Str("component", "dynamic-startup").Logger()

// skip dynamic startup if the protocol state is bootstrapped
isBootstrapped, err := badgerstate.IsBootstrapped(nodeConfig.DB)
isBootstrapped, err := pebblestate.IsBootstrapped(nodeConfig.DB)
if err != nil {
return fmt.Errorf("could not check if state is boostrapped: %w", err)
}
Expand Down
Loading
Loading