From ea558f71931b98875cba90eded33441995cb0990 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Wed, 8 May 2024 09:16:15 -0700 Subject: [PATCH 1/2] Optimize the node store writing perf (#547) --- node/store.go | 23 ++++++++++++++++------- node/store_test.go | 15 +++++++++++++-- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/node/store.go b/node/store.go index 9768da8ec1..d5bf8f68d8 100644 --- a/node/store.go +++ b/node/store.go @@ -233,6 +233,20 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs keys = append(keys, blobHeaderKey) values = append(values, blobHeaderBytes) + // Get raw chunks + rawBlob := blobsProto[idx] + if len(rawBlob.GetBundles()) != len(blob.Bundles) { + return nil, errors.New("internal error: the number of bundles in parsed blob must be the same as in raw blob") + } + rawChunks := make(map[core.QuorumID][][]byte) + for i, chunks := range rawBlob.GetBundles() { + quorumID := uint8(rawBlob.GetHeader().GetQuorumHeaders()[i].GetQuorumId()) + rawChunks[quorumID] = make([][]byte, len(chunks.GetChunks())) + for j, chunk := range chunks.GetChunks() { + rawChunks[quorumID][j] = chunk + } + } + // blob chunks for quorumID, bundle := range blob.Bundles { key, err := EncodeBlobKey(batchHeaderHash, idx, quorumID) @@ -242,12 +256,8 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs } bundleRaw := make([][]byte, len(bundle)) - for i, chunk := range bundle { - bundleRaw[i], err = chunk.Serialize() - if err != nil { - log.Error("Cannot serialize chunk:", "err", err) - return nil, err - } + for i := 0; i < len(bundle); i++ { + bundleRaw[i] = rawChunks[quorumID][i] } chunkBytes, err := encodeChunks(bundleRaw) if err != nil { @@ -257,7 +267,6 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs keys = append(keys, key) values = append(values, chunkBytes) - } } diff --git a/node/store_test.go b/node/store_test.go index 176345dd93..552e116863 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -74,6 +74,8 @@ func CreateBatch(t *testing.T) (*core.BatchHeader, []*core.BlobMessage, []*pb.Bl Proof: commitment, Coeffs: []encoding.Symbol{encoding.ONE}, } + chunk1bytes, err := chunk1.Serialize() + assert.Nil(t, err) blobMessage := []*core.BlobMessage{ { @@ -163,12 +165,21 @@ func CreateBatch(t *testing.T) (*core.BatchHeader, []*core.BlobMessage, []*pb.Bl Length: uint32(50), QuorumHeaders: []*pb.BlobQuorumInfo{quorumHeaderProto}, } + bundles := []*pb.Bundle{ + { + Chunks: [][]byte{ + chunk1bytes, + }, + }, + } blobs := []*pb.Blob{ { - Header: blobHeaderProto0, + Header: blobHeaderProto0, + Bundles: bundles, }, { - Header: blobHeaderProto1, + Header: blobHeaderProto1, + Bundles: bundles, }, } return &batchHeader, blobMessage, blobs From d2712a86ce33f070f268154c3a6f4b4ca81b5842 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Wed, 8 May 2024 10:36:05 -0700 Subject: [PATCH 2/2] Instrument the store batch (#546) --- node/store.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/node/store.go b/node/store.go index d5bf8f68d8..6f0ed428fc 100644 --- a/node/store.go +++ b/node/store.go @@ -167,6 +167,8 @@ func (s *Store) deleteNBatches(currentTimeUnixSec int64, numBatches int) (int, e // // These entries will be stored atomically, i.e. either all or none entries will be stored. func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs []*core.BlobMessage, blobsProto []*node.Blob) (*[][]byte, error) { + storeBatchStart := time.Now() + log := s.logger batchHeaderHash, err := header.GetBatchHeaderHash() if err != nil { @@ -218,6 +220,7 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs // Generate key/value pairs for all blob headers and blob chunks . size := int64(0) + var serializationDuration, encodingDuration time.Duration for idx, blob := range blobs { // blob header blobHeaderKey, err := EncodeBlobHeaderKey(batchHeaderHash, idx) @@ -234,6 +237,7 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs values = append(values, blobHeaderBytes) // Get raw chunks + start := time.Now() rawBlob := blobsProto[idx] if len(rawBlob.GetBundles()) != len(blob.Bundles) { return nil, errors.New("internal error: the number of bundles in parsed blob must be the same as in raw blob") @@ -246,7 +250,8 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs rawChunks[quorumID][j] = chunk } } - + serializationDuration += time.Since(start) + start = time.Now() // blob chunks for quorumID, bundle := range blob.Bundles { key, err := EncodeBlobKey(batchHeaderHash, idx, quorumID) @@ -254,6 +259,9 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs log.Error("Cannot generate the key for storing blob:", "err", err) return nil, err } + if len(rawChunks[quorumID]) != len(bundle) { + return nil, errors.New("internal error: the number of chunks in parsed blob bundle must be the same as in raw blob bundle") + } bundleRaw := make([][]byte, len(bundle)) for i := 0; i < len(bundle); i++ { @@ -268,14 +276,17 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs keys = append(keys, key) values = append(values, chunkBytes) } + encodingDuration += time.Since(start) } + start := time.Now() // Write all the key/value pairs to the local database atomically. err = s.db.WriteBatch(keys, values) if err != nil { log.Error("Failed to write the batch into local database:", "err", err) return nil, err } + log.Debug("StoreBatch succeeded", "chunk serialization duration", serializationDuration, "bytes encoding duration", encodingDuration, "write batch duration", time.Since(start), "total store batch duration", time.Since(storeBatchStart), "total bytes", size) return &keys, nil }