Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Review Version] Refactor Protocol State with Pebble-based Storage #6197

Closed

Conversation

zhangchiqing
Copy link
Member

@zhangchiqing zhangchiqing commented Jul 9, 2024

Close #6137

This PR refactor the protocol state from badger-based storage to pebble-based storage.

Since a lot of code are copied, in order to make it easy to see the actual code-diff, I added a commit that copies all the badger operations to the pebble folder, and created a base branch (leo/v0.33-pebble-base). And by comparing against this base branch, we can see the actual code-diff clearly.

That's why I called this PR "Review Version".

How to keep this branch update to date

If there is updates on v0.33 branch, we should first rebase leo/v0.33-pebble-base with v0.33, and then rebase leo/v0.33-pebble-storage-to-review with leo/v0.33-pebble-base

@@ -383,7 +383,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer that will handle updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
final := finalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
Copy link
Member Author

@zhangchiqing zhangchiqing Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to use a flag for switching between pebble and badger. I had to make a copy of the finalizer, and use a different name for the pebble version of the finalizer.

⚠️ This means if there is any change onto the badger Finalizer, we have to manually bring it to the pebble Finalizer.

Same for the collection finalizer

cmd/scaffold.go Outdated
Comment on lines 1080 to 1090
WithKeepL0InMemory(true).
WithLogger(log).

// the ValueLogFileSize option specifies how big the value of a
// key-value pair is allowed to be saved into badger.
// exceeding this limit, will fail with an error like this:
// could not store data: Value with size <xxxx> exceeded 1073741824 limit
// Maximum value size is 10G, needed by execution node
// TODO: finding a better max value for each node type
WithValueLogFileSize(128 << 23).
WithValueLogMaxEntries(100000) // Default is 1000000
Copy link
Member Author

@zhangchiqing zhangchiqing Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have these find-tuned config for badger, however, I didn't do any for pebble, just used default options in this PR.

The default pebble options works well for execution node in Mainnet Test Execution Node. But might be good to benchmark and find some config to twisk.

Comment on lines 15 to 19
type ReaderBatchWriter struct {
db *pebble.DB
batch *pebble.Batch
callbacks []func()
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

badger supports transactions. However, pebble doesn't.
pebble only supports atomic batch updates. This is the key difference.

With badger transaction, we have a lot operations that create a badger transaction, then reads some data, and update some data depending on the result of the reads, and if the transaction is committed successfully, then we also use callback to update cache.

In order to implement a similar behavior as badger transaction in pebble, I create this ReaderBatchWriter struct. It contains the batch for updates. If we need to read something within the transaction, then the db is be used to read data.

When reading the data, there are chances where we need to take into consideration the writes that has not been committed, this is why I made the IndexedBatch method to return the Batch object, which holds all the write set.

And it also contains the callback that allows us to update cache after successfully committing the batch.

Copy link
Member

@AlexHentschel AlexHentschel Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

I think this is a highly unsafe construction for this refactoring, because it hides conceptually breaking changes for the business logic at a very low level. Further explanation:

  • Previously, reads and writes within the same transaction all operated atomically on a snapshot of the data base. For example, I could read the latest finalized bock within a transaction and decide to write something depending on what I found for the latest finalized bock.
  • The ReaderBatchWriter you are proposing here breaks this. I can read the latest finalized block and decide what I want to write in response, but by the time the write hits the data base, the latest finalized block might have already changed. While the ReaderBatchWriter exposes similar functionality as a badger transaction (reading and writing) it doesn't provide atomicity guarantees of reads at all.
  • what we need to do is to go through every piece of business logic that previously used a badger transaction, confirm that we can asynchronously do the reads upfront and then change the code.
  • I think it would be much safer to not provide ReaderBatchWriter and force ourselves to re-write every single occurrence where we previously used a badger transaction. I think that it would be much much more likely for us to find edge cases when we force ourselves to look at each occurrence of badger transaction with reads and refactor it. It is important to be aware that human brains are just not very good at exhaustively finding all occurrences of ReaderBatchWriter changes the guarantees for component A, which in turn changes guarantees for component B and C.

From my perspective, we are doing a low-level change like this, so that the compiler is happy with the higher-level logic, yet guarantees for the higher-level logic have changed and we relying on humans to identify those areas where something could go wrong. I think an approach like this has disproportional risks to overlook bugs that we cannot afford in a project like Flow.

... maybe I change my mind, when I have reviewed more of your PR.

Comment on lines +18 to +19
var upsert = insert
var update = insert
Copy link
Member Author

@zhangchiqing zhangchiqing Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️
We are not able to support update with pebble.

The update operation in badger will first check if the key exists, and only update if the key does not exist.

However, if we implement the similar logic in pebble, it won't be concurrent-safe anyway, because a read can be dirty without transaction.

For instance, if one operation is to update a key with value 10, and another operation is to update the same key with value 20. And if these operations happen concurrently, then both of them might see the key does not exist, and continue to set the value, and in the end, both operation would succeed, and the value will end up being either 10 or 20 depending on which lose the race.

That's why I decided to not implement update and just use insert everywhere, the insert will simply set the value regardless the key exists or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with removing upsert and update at the low level. 👍

Before merging, I think we should remove these aliases and remove any usages of the functions (I only found one, in TestUpsertEntry).


it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: append(prefix, ffBytes...),
Copy link
Member Author

@zhangchiqing zhangchiqing Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The badger implementation maintains a global max key, so that when iterating from a key range with a start prefix and end prefix, it uses the max key to create the boundary for including the biggest key with the end prefix. :

		} else {
			// for forward iteration, add the 0xff-bytes suffix to the end
			// prefix, to ensure we include all keys with that prefix before
			// finishing.
			length := uint32(len(end))
			diff := max - length
			for i := uint32(0); i < diff; i++ {
				end = append(end, 0xff)
			}
		}

I found a way to simplify it by getting rid of this max value completely.

I noticed when creating the key range, we are usually just iterating identifiers, such as iterating the child block IDs, or iterating the event IDs of a block, or iterating transaction results of a block. They are either flow.Identifier or uint32, which takes 32 bytes. So I could just make a ffBytes appended with 32 FF bytes, and use it as the UpperBound for the key range for the inclusion of any keys with the end prefix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☠️ see my comment blow for details. Unfortunately, I think that change is unacceptable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with the worries about this change. The 0xff suffix approach to prefix-wise iteration depends on tracking database-wide key lengths to be correct. The Badger solution was a little hacky, but at least provided a fairly strong guarantee of correctness by forcing all writes through logic that tracked the max key length. Adding a constant number of bytes is less likely to be robust and correct over time because it does not account for changes to key lengths.

We have encountered very subtle bugs caused by not handling this correctly in the past. (The link on line 356 is supposed to link to this explanation of such a bug).

Change Suggestions

Pebble has an example for prefix-wise iteration in their documentation -- maybe we can use that approach here instead?

Alternatively, we could explicitly overshoot the endpoint of the iteration range, and exit early if we seek to a key with a prefix that doesn't match the supplied prefix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the IterOptions doc

// UpperBound specifies the largest key (exclusive) that the iterator will
// return during iteration. ...

I read this to mean UpperBound needs to be the first non-matching value. effectively prefix+1. it's probably easier and more accurate to omit upper bound and check if the iterator's current key starts with prefix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ideas, I made the changes in this PR in order to highlight the changes and the updated test cases.

it's probably easier and more accurate to omit upper bound and check if the iterator's current key starts with prefix.

This could work for traverse, but won't work for iterate which iterates keys between a start prefix and a end prefix. We would have to implement two solutions, whereas we could just use one solution for both traverse and iterate.

if err != nil {
return fmt.Errorf("failed to insert chunk locator: %w", err)
}

// read the latest index
var latest uint64
err = operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(tx)
err = operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(r)
if err != nil {
return fmt.Errorf("failed to retrieve job index for chunk locator queue: %w", err)
}

// insert to the next index
next := latest + 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to prevent dirty reads of the latest, we have to introduce a mutex lock (storing) here, so that it's concurrent-safe.

result, err = p.results.byID(meta.ResultID)(batch)
if errors.Is(err, storage.ErrNotFound) {
// if the result is not in the previous blocks, check storage
result, err = p.results.ByID(meta.ResultID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When storing payloads, we require the receipt must refer to a result that exists. Given multiple receipts could refer to the same result, and we want the same result only be stored once, this makes this query a bit tricky: when verifying if the result has been stored, not long we need to look at the write-set in the current batch from previous writes, but also the database.

Badger does this two lookup automatically in a transaction, but pebble requires us to do these two queries separately.

@@ -284,6 +283,7 @@ func (state *State) bootstrapSealingSegment(segment *flow.SealingSegment, head *
if !ok {
return fmt.Errorf("missing latest seal for sealing segment block (id=%s)", blockID)
}
fmt.Println("height =====", height, latestSealID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

Comment on lines +54 to +64
if err == nil {
// QC for blockID already exists
return storage.ErrAlreadyExists
}
Copy link
Member Author

@zhangchiqing zhangchiqing Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This check is not concurrent safe. A mutex lock is added to protect.

@vishalchangrani
Copy link
Contributor

Thank you @zhangchiqing for putting this together!

We had a call about this change. Here is the read.ai link for the call.

As you review this PR, I request that you also think about the following:

  • The effort required to maintain two branches - master (deployed on mainnet after the spork) and a feature branch with pebbledb changes, possibly for several months till PebbleDB is rolled out completely.

  • The amount of time we should test this change on testnet before we are comfortable on rolling it out to mainnet.

This will help us determine how we want to roll this change out.

❤️


err := operation.InsertExecutedBlock(rootSeal.BlockID)(txn)
err := operation.InsertExecutedBlock(rootSeal.BlockID)(w)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The badger version of Insert will first check if the key exists, and error if not exist.

However, the pebble version of Insert will just overwrite the key without checking whether the key exists.

This means, we can't detect or prevent block from being executed twice at the database level. We just have to rely on the application level to prevent that. In the worst case, executing the same block twice will not cause serious problem to the protocol as long as it happens rarely. So I think it's OK to skip the check.

@@ -479,7 +479,7 @@ func (s *state) UpdateHighestExecutedBlockIfHigher(ctx context.Context, header *
defer span.End()
}

return operation.RetryOnConflict(s.db.Update, procedure.UpdateHighestExecutedBlockIfHigher(header))
return operation.WithReaderBatchWriter(s.db, procedure.UpdateHighestExecutedBlockIfHigher(header))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This UpdateHighestExecutedBlockIfHigher is no longer concurrent-safe, if two blocks at different height are executed concurrently, a dirty write might write with a wrong highest executed height.

But I think this is OK for the tradeoff of simplicity, because the highest height are only used by places where doesn't require accuracy, such as metrics or getting epoch counter, etc.

Copy link
Member

@AlexHentschel AlexHentschel Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

I disagree. While your argument might be true now, we want to write maintainable code - code that is correct in the future. I am not sure you will still remember this subtle edge case in 12 months from now -- I certainly won't.

So imagine there is an engineer who sees "highest executed block". Clearly, if that is correctly implemented this number can only ever go up. You argue that it is fine right now, but you are breaking the meaning of very clearly defined concepts in the code base. Adding subtle foot-guns here and there with the argument that it doesn't break anything now is exactly the strategy that turns a code base into a unmaintainable mess, where every little change breaks something at a very different part of the code base.
The complexity of our protocol is already high enough. For a high-assurance software we cannot afford leaving subtle edge cases in the code (where the algorithms behave factually incorrect) just with the argument that it doesn't break anything now.

In my opinion, this (and all similar occurrences) absolutely need to be fixed!

}
// retrieve the current finalized cluster state boundary
var boundary uint64
err = operation.RetrieveClusterFinalizedHeight(header.ChainID, &boundary)(f.db)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read used to be in the transaction. However, with pebble, we have to read it before the batch write. So in theory there is chance for a dirty read if the MakeFinal is called concurrently. But since we know hotstuff finalizes block in a single thread, MakeFinal won't be called concurrently, which means the finalized height read from here won't become outdated.

Copy link
Member

@AlexHentschel AlexHentschel Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

While I agree with your argument, I am strongly of the opinion that needs to be reflected in the code base. Our goal is to not leave subtle foot guns in the code base.

The collection finalizer is a standalone component. It doesn't even say anything about not being concurrency safe -- but in fact it is not (anymore). If the finalizer is only to be executed by a specific component, that requirement needs to be reflected in the code base, so it is hard (or at least very obvious to engineers) when they are breaking this requirement. At the moment, I would classify this as a subtle foot-gun, hence as unacceptable for our high-assurance software.

Copy link
Member

@AlexHentschel AlexHentschel Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think addressing this (Leo's comment; my concern) could be its own PR. Added item to issue #6337 (comment)

}

func (w *badgerWriterBatch) DeleteRange(start, end []byte) error {
return fmt.Errorf("not implemented")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to unify the API between badger.Transaction and pebble.Batch.

pebble.Batch has DeleteRange, which badger.Transaction doesn't support.

@@ -70,6 +70,10 @@ func (c *Commits) BatchStore(blockID flow.Identifier, commit flow.StateCommitmen
return operation.BatchIndexStateCommitment(blockID, commit)(writeBatch)
}

func (c *Commits) BatchStore2(blockID flow.Identifier, commit flow.StateCommitment, tx storage.BatchWriter) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be removed

@@ -7,9 +7,6 @@ import (
// Guarantees represents persistent storage for collection guarantees.
type Guarantees interface {

// Store inserts the collection guarantee.
Store(guarantee *flow.CollectionGuarantee) error
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A guarantee is never and should not stored alone. Usually it's stored along with the block. Removing the method from the interface level, but keep the method in the module implementation.

This comment was marked as resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And later, once they finalize it, they add the CollectionGuarantee to their data base and send that guarantee to the Consensus nodes.

A collection guarantee is compound model derived from a cluster block, basically a cluster block QC and block payload. Collection Nodes store this data as HotStuff models (blocks, payloads, QCs), from which they can derive a collection guarantee at any time.

In the same way that Consensus Nodes don't need to separately store a "finalized block" in their database when they finalize a block, Collection Nodes don't need to separately store a collection guarantee once it is guaranteed.

In the context of the Consensus Follower, Leo is right that we do not want to store parts of a block independently; we store either the whole block and all its contents or nothing. So this change makes sense to me.

Copy link
Member Author

@zhangchiqing zhangchiqing Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my point is that collection nodes currently don't store guarantee, but builds them on the fly (see collection finalizer) and submit it to consensus nodes (see collection pusher). And consensus nodes also don't store the guarantee when receiving them, but save them in the mempool ( see consensus ingestion core).

The consensus nodes and other non-consensus nodes only store the guarantee when storing the block payload, which is implemented by Block.Store. That's why I removed the Store method on the Guarantees interface, for the same reason, I also removed Payloads.Store. This guarantees that if the database ever has a guarantee stored, then the block containing this guarantee must also exist in the database, since block and guarantee are stored atomically, and this is the only way to store guarantee.

indexOwnReceiptOps := transaction.WithTx(func(tx *badger.Txn) error {
err := operation.IndexOwnExecutionReceipt(blockID, receiptID)(tx)
// check if we are storing same receipt
if errors.Is(err, storage.ErrAlreadyExists) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't prevent dirty reads without transaction, have to skip this check.

Copy link
Member

@AlexHentschel AlexHentschel Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

Are you sure that is a wise idea? You are the expert, but if I remember correctly we previously had execution nodes crash telling us that they were attempting to override their own result with a different one. With this check removed, they wouldn't notice that anymore.

I think we have options here:

  • Wouldn't including a mutex suffice to prevent dirty reads? Then we can check first, because no other components would be able to write (ideally, we would move all the write operations related to ExecutionReceipts into this package not not export them.

func convertNotFoundError(err error) error {
if errors.Is(err, pebble.ErrNotFound) {
return storage.ErrNotFound
}
return err
}

// O(N) performance
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to implement a general reverse iteration with pebble. Instead, I just iterate all keys and find the highest. For now, this function is only used by looking for the highest version beacon , which usually have very few.

}

return nil
// TODO: use transaction to avoid race condition
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ The method is not concurrent-safe at the database level. If two blocks are inserted concurrently, only one of them is stored.

We are relying on the consensus algorithm to ensure the method is called in a single thread, which it does right now.

Copy link
Member

@AlexHentschel AlexHentschel Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding your comment I think using a transaction provides inadequate guarantees for concurrent access. Please see my comment.

In my opinion, the only way to make this code safe is to delegate the sole authority for writing those indices to one single component. Consequently, this code here should be part of that higher-level component and not exported.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

We are relying on the consensus algorithm to ensure the method is called in a single thread, which it does right now.

I am not sure that is correct ... at least it is not apparent, so we have notable surface to accidentally break this in the future:

Compliance engine and Block builder have different worker threads I think. Both call Extend, which in turn calls Mutator.insert, which calls IndexNewBlock. So we already have concurrent access in my opinion!

@@ -14,6 +14,9 @@ type QuorumCertificates interface {
// StoreTx stores a Quorum Certificate as part of database transaction QC is indexed by QC.BlockID.
// * storage.ErrAlreadyExists if any QC for blockID is already stored
StoreTx(qc *flow.QuorumCertificate) func(*transaction.Tx) error

// * storage.ErrAlreadyExists if any QC for blockID is already stored
StorePebble(qc *flow.QuorumCertificate) func(PebbleReaderBatchWriter) error
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to unify the Store function for both badger and pebble. Because badger needs to support transaction and pebble needs to support batch update. And the API for transaction and batch update are different.

So I ended up create a new method StorePebble. badger will not implement StorePebble, and pebble will not implement StoreTx.

@zhangchiqing
Copy link
Member Author

Just passed benchnet testing with 20tps benchmarking.


type Transaction interface {
// TODO: rename to writer
type BatchWriter interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, the differentiation between BatchWriter and Transaction from Badger doesn't make sense to carry over to Pebble.

Besides IndexedBatchs, which it seems like we can mostly avoid using, all write operations to Pebble are equivalent to the WriteBatch API from Badger. With Pebble, there is no distinction between batch-inserting a resource, and "normal"-inserting a resource.

I think we should:

  • remove low-level storage operations that use the Batch* name scheme (example)
    • if applicable (no non-batch method already exists), modify these functions to accept pebble.Writer instead of storage.BatchWriter
  • remove the BatchWriter type

func findHighestAtOrBelow(
prefix []byte,
height uint64,
entity interface{},
) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
) func(pebble.Reader) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used for finding the latest version beacon.

store heights in 1's compliment? then you can do a forward search.

Yeah that would work. We could also just index the latest version beacon (latest_beacon->beacon_id) when we index height->beacon_id. Same thing we do for latest finalized block.

}

secretsDB, err := bstorage.InitSecret(opts)
secretsDB, err := bstorage.InitSecret(fnb.BaseConfig.secretsdir, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the secrets database contains private Staking and Random Beacon keys

Agree we should keep secrets DB encrypted. Just noting only Random Beacon keys are stored there (not Staking keys)

DeleteRange(start, end []byte) error
}

type Reader interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is unused. Can we use pebble.Reader instead of this interface?

}

type Reader interface {
Get(key []byte) ([]byte, error)
}

// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra
// callbacks which fire after the batch is successfully flushed.
type BatchStorage interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I feel like we can significantly trim down the number of these high-level generic storage interfaces. Ideally, I think we should have only:

  • Reader - read-only access to the committed state of the database (possibly we can use pebble.Reader directly in all cases)
  • BatchWriter - a pebble.WriteBatch wrapped with our OnSucceed callback tracking
    • This can replace both Badger concepts of *Transaction and *WriteBatch
    • Ideally we would not provide read access at all here, IMO. Though practically, to make the migration feasible, we need to provide read access. The isolation level of the reads will different, which is a major source of potential bugs. In addition to auditing our previous use of read+write badger transactions, I think we should name and document this Read API to make it very clear that it is reading global state.

I am guessing that some of these interfaces are necessary to make the migration process manageable, even if they won't be desirable once we complete the transition. My suggestion for this stage is:

  • Agree on the high-level storage interfaces we want to use, once the migration is complete. Define these in the code and make use of them as much as possible during the migration.
  • Mark "legacy" interfaces that are needed during the migration process as deprecated and add TODOs to remove them
  • Remove any left-over interfaces

"github.com/onflow/flow-go/storage"
)

// batchWriter wraps the storage.BatchWriter to make it compatible with pebble.Writer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is a temporary crutch to get us through the migration, but would not be needed after the migration is complete. In particular because, with Pebble, there is no need to distinguish between "transactions" and "write batches".

If I'm understanding this correctly, could we mark these types and functions as deprecated and document why?

return val, nil
}

func (b *Batch) GetReader() storage.Reader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is never called - can we remove it?

return func(tx *badger.Txn) error {
err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx)
func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.PebbleReaderBatchWriter) error {
return func(tx storage.PebbleReaderBatchWriter) error {
Copy link
Member

@jordanschalm jordanschalm Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return func(tx storage.PebbleReaderBatchWriter) error {
return storage.OnlyWriter(operation.IndexResultApproval(resultID, chunkIndex, approvalID))

ErrAlreadyExists is no longer returned here, so we could just do this. To replicate the previous sanity check in full, we'd need a lock here.

// NOTE: SN nodes need to explicitly set --insecure-secrets-db to true in order to
// disable secrets database encryption
if fnb.NodeRole == flow.RoleConsensus.String() && fnb.InsecureSecretsDB {
fnb.Logger.Warn().Msg("starting with secrets database encryption disabled")
} else {
encryptionKey, err := loadSecretsEncryptionKey(fnb.BootstrapDir, fnb.NodeID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Unfortunately Pebble doesn't support encryption from what I could tell. We would need to implement it on top of Pebble. (Or maybe just keep using Badger for only the secrets DB -- it's pretty tiny.)

Copy link
Member

@jordanschalm jordanschalm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of High Level Feedback & Suggestions

Mutually Exclusive Procedures (Isolation)

The difference in isolation levels between Badger and Pebble is the main source of potential bugs during this migration. In addition to finding and re-implementing instances where we rely on Badger's isolation level, I think we should separate and document low-level storage operations that need to be mutex-protected at the higher level:

  • For individual storage operations (eg. read/write one key) that are accessed in a mutex-protected higher-level component:
    • Document the mutex requirement
    • If possible, make them package-private (or internal to the storage/pebble subtree
  • Consolidate mutex-protected compound storage operations (multiple reads and writes, which together must be mutex-protected).
    • I think re-purposing the procedure package, so it (only) contains all these compound, mutex-protected storage methods would make sense

High-Level Storage API

Some of the high-level storage constructs we have don't make sense with a Pebble-backed storage implementation. In particular:

  • we do not need a distinction between "write batch" and "transaction"
  • we should avoid compound storage operations that intermingle reads and writes

Suggestions

Database Encryption

Pebble does not support encryption. I don't think we should ship a version that reverts encryption of the secrets database by default. We can:

  • Implement an encryption layer for storage methods operating on the secrets DB (my preference)
  • Continue using Badger only the secrets DB

Iterative Badger-Based Migration

I'm in favour of @AlexHentschel's suggestion to complete the re-implementation of storage methods that depend on Badger's isolation level iteratively, retaining Badger as the storage backend until our storage implementation is fully safe with a read-committed isolation level.

Comment on lines +358 to +360
if errors.Is(err, storage.ErrAlreadyExists) {
// some evidence about execution fork already stored;
// we only keep the first evidence => noting more to do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Pebble cannot check atomically that is hasn't already stored a value before possibly overwriting it. Thereby we break the implementation's guarantee. Off the top of my head, am not sure how important this guarantee is ... maybe its not important ... maybe we use it somewhere. Just something to investigate later and possibly update the documentation here or the implementation.

Comment on lines +89 to +99
func (t *Transaction) Set(key, value []byte) error {
return t.writer.Set(key, value, pebble.Sync)
}

func (t *Transaction) Delete(key []byte) error {
return t.writer.Delete(key, pebble.Sync)
}

func (t *Transaction) DeleteRange(start, end []byte) error {
return t.writer.DeleteRange(start, end, pebble.Sync)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the argument pebble.Sync that you are passing in: I noticed that none of Pebble's implementations of Batch.Set, or Batch.Delete or Batch.DeleteRange actually use this last value (its just ignored).

Comment on lines +30 to +32
func (b *Batch) GetWriter() storage.BatchWriter {
return &Transaction{b.writer}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it would make sense if our Batch object here would directly implement the BatchWriter interface.

Comment on lines +34 to +45
type reader struct {
db *pebble.DB
}

func (r *reader) Get(key []byte) ([]byte, error) {
val, closer, err := r.db.Get(key)
if err != nil {
return nil, err
}
defer closer.Close()
return val, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of defensive programming, I think it would help if Batch provided no abilities to read the state. If the only functionality you have is to read directly from the data base while Batch is an independent object where you put deferred write operations into, I think it would be relatively clear that reads are not reflecting the latest values as of the pending writes.

Otherwise, I think it is very dangerous to confuse, because Pebble also provides an indexed batch (see pebble's NewIndexedBatch), which we are not using here. And even with an indexed Batch, reads could be stale unless the value is overwritten within the batch. I think the surface for possible problems and foot guns is very high with the current implementation offering also reads, while the extra convenience is probably quite little. The tradeoff is not worth it in my oppionion

Suggested change
type reader struct {
db *pebble.DB
}
func (r *reader) Get(key []byte) ([]byte, error) {
val, closer, err := r.db.Get(key)
if err != nil {
return nil, err
}
defer closer.Close()
return val, nil
}

See also Jordan's comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the Finalizer for the main consensus:

I think using a similar construction here would be beneficial. In addition, this would make it much easier to consolidate the code for the collectors and consensus finalization, which is algorithmically largely the same - it just deviates what we index upon finalization and the actions we take after successful finalization (mempool cleanup and publishing collection). The consensus implementation has already abstrations for this custom logic, so I think it would be relatively easy to consolidate both implementations.

Comment on lines +707 to +708
err = operation.WithReaderBatchWriter(m.db, func(rw storage.PebbleReaderBatchWriter) error {
_, tx := rw.ReaderWriter()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

We need to protect these lines from accidental concurrent access:

err = operation.UpdateFinalizedHeight(header.Height)(tx)
if err != nil {
return fmt.Errorf("could not update finalized height: %w", err)
}
err = operation.UpdateSealedHeight(sealed.Height)(tx)
if err != nil {
return fmt.Errorf("could not update sealed height: %w", err)
}

The previous badger-based code would just fail at IndexBlockHeight right before, because badger checked that there was no value stored for the respective height. Now we will just overwrite the value and possibly decrement the latest finalized block height with an outdated value (thereby breaking the protocol state).

if err != nil {
return fmt.Errorf("could not index candidate seal: %w", err)
}

// index the child block for recovery
err = transaction.WithTx(procedure.IndexNewBlock(blockID, candidate.Header.ParentID))(tx)
err = procedure.IndexNewBlock(blockID, candidate.Header.ParentID)(tx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

The method Extend is called concurrently by the compliance engine (for incoming blocks from other replicas) and the node's internal hotstuff (persisting its own proposal). Extend calls insert here. The function IndexNewBlock is not sufficiently concurrency safe, as it modifies the list of all children of a block, which could very well cause conflicting edits, i.e. loss of information and the corruption of the protocol state.

return transaction.WithTx(operation.InsertQuorumCertificate(qc))
func NewQuorumCertificates(collector module.CacheMetrics, db *pebble.DB, cacheSize uint) *QuorumCertificates {
store := func(_ flow.Identifier, qc *flow.QuorumCertificate) func(storage.PebbleReaderBatchWriter) error {
return storage.OnlyWriter(operation.InsertQuorumCertificate(qc))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

Previously, badger guaranteed that we would only ever write one QC for a block into the database. In other words, a QC for a block that was stored in the database would never change.

This now changes:
If operation.InsertQuorumCertificate is ever called outside of the QuorumCertificates storage abstraction layer, we are risking to update (overwrite) the QC for a block. The reason is that you included a mutex here, which only protects dirty writes mediated by QuorumCertificates, but the lower-level operation.InsertQuorumCertificate remains easily accessible for any logic.

I think the way we are using QCs, overwriting the QC would be fine. We only require a valid QC, but my gut feeling is that it doesn't need to be consistently the same. Nevertheless, this requires careful confirmation and proper documentation.

@@ -30,7 +31,7 @@ type IndexerCore struct {
collections storage.Collections
transactions storage.Transactions
results storage.LightTransactionResults
batcher bstorage.BatchBuilder
batcher *pebble.DB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we are at it, could we rename this to database. Given the following line, I have a hard time making sense of the naming here:

batch := bstorage.NewBatch(c.batcher)

zhangchiqing and others added 6 commits September 13, 2024 11:07
…pproval

[Pebble Refactor] Making indexing approval concurrent-safe
[Pebble Refactor] Making finalization concurrent safe
…ndexer

[Pebble Storage] Refactor indexing new blocks
…ipts

[Pebble Storage] Make indexing own receipts concurrent safe
@zhangchiqing
Copy link
Member Author

Close for now.

We will start with the database operation abstraction, and then replace badger transactions operations with batch updates so that database operations could be database-implementation agnostic, and lastly replacing badger with pebble.

See the 4 phases here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants