From a4c852f20e3343eb2a4eb55ca81b88a2160940bd Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 8 May 2024 00:36:37 +0000 Subject: [PATCH 1/5] optimize --- node/store.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/node/store.go b/node/store.go index 9768da8ec1..0c1c2b6345 100644 --- a/node/store.go +++ b/node/store.go @@ -233,6 +233,20 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs keys = append(keys, blobHeaderKey) values = append(values, blobHeaderBytes) + // Get raw chunks + rawBlob := blobsProto[idx] + if len(rawBlob.GetBundles()) != len(blob.Bundles) { + return nil, errors.New("internal error: the number of bundles in pased blob must be the same as in raw blob") + } + rawChunks := make(map[core.QuorumID][][]byte) + for i, chunks := range rawBlob.GetBundles() { + quorumID := uint8(rawBlob.GetHeader().GetQuorumHeaders()[i].GetQuorumId()) + rawChunks[quorumID] = make([][]byte, len(chunks.GetChunks())) + for j, chunk := range chunks.GetChunks() { + rawChunks[quorumID][j] = chunk + } + } + // blob chunks for quorumID, bundle := range blob.Bundles { key, err := EncodeBlobKey(batchHeaderHash, idx, quorumID) @@ -242,12 +256,8 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs } bundleRaw := make([][]byte, len(bundle)) - for i, chunk := range bundle { - bundleRaw[i], err = chunk.Serialize() - if err != nil { - log.Error("Cannot serialize chunk:", "err", err) - return nil, err - } + for i := 0; i < len(bundle); i++ { + bundleRaw[i] = rawChunks[quorumID][i] } chunkBytes, err := encodeChunks(bundleRaw) if err != nil { From 768fd8764a4fcdbd85284b7a663cd66ac87331b8 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 8 May 2024 02:49:56 +0000 Subject: [PATCH 2/5] test --- node/store_test.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/node/store_test.go b/node/store_test.go index 176345dd93..5b77c6da82 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -74,6 +74,11 @@ func CreateBatch(t *testing.T) (*core.BatchHeader, []*core.BlobMessage, []*pb.Bl Proof: commitment, Coeffs: []encoding.Symbol{encoding.ONE}, } + chunk1bytes, err := chunk1.Serialize() + assert.Nil(t, err) + for i := 0; i < 10000; i++ { + chunk1.Coeffs = append(chunk1.Coeffs, encoding.ONE) + } blobMessage := []*core.BlobMessage{ { @@ -163,12 +168,21 @@ func CreateBatch(t *testing.T) (*core.BatchHeader, []*core.BlobMessage, []*pb.Bl Length: uint32(50), QuorumHeaders: []*pb.BlobQuorumInfo{quorumHeaderProto}, } + bundles := []*pb.Bundle{ + { + Chunks: [][]byte{ + chunk1bytes, + }, + }, + } blobs := []*pb.Blob{ { - Header: blobHeaderProto0, + Header: blobHeaderProto0, + Bundles: bundles, }, { - Header: blobHeaderProto1, + Header: blobHeaderProto1, + Bundles: bundles, }, } return &batchHeader, blobMessage, blobs From 27a1a2ee387563df971e0dbccb8b6c0dfc590d50 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 8 May 2024 02:52:56 +0000 Subject: [PATCH 3/5] test --- node/store_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/node/store_test.go b/node/store_test.go index 5b77c6da82..552e116863 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -76,9 +76,6 @@ func CreateBatch(t *testing.T) (*core.BatchHeader, []*core.BlobMessage, []*pb.Bl } chunk1bytes, err := chunk1.Serialize() assert.Nil(t, err) - for i := 0; i < 10000; i++ { - chunk1.Coeffs = append(chunk1.Coeffs, encoding.ONE) - } blobMessage := []*core.BlobMessage{ { From a9c4ebad0168ce8d5abc9aa2e0e29f7f9ade67a1 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 8 May 2024 03:39:33 +0000 Subject: [PATCH 4/5] test --- node/store.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/node/store.go b/node/store.go index 0c1c2b6345..763fdad982 100644 --- a/node/store.go +++ b/node/store.go @@ -218,6 +218,7 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs // Generate key/value pairs for all blob headers and blob chunks . size := int64(0) + var serializationDuration, encodingDuration time.Duration for idx, blob := range blobs { // blob header blobHeaderKey, err := EncodeBlobHeaderKey(batchHeaderHash, idx) @@ -234,6 +235,7 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs values = append(values, blobHeaderBytes) // Get raw chunks + start := time.Now() rawBlob := blobsProto[idx] if len(rawBlob.GetBundles()) != len(blob.Bundles) { return nil, errors.New("internal error: the number of bundles in pased blob must be the same as in raw blob") @@ -246,6 +248,8 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs rawChunks[quorumID][j] = chunk } } + serializationDuration += time.Since(start) + start = time.Now() // blob chunks for quorumID, bundle := range blob.Bundles { @@ -267,16 +271,18 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs keys = append(keys, key) values = append(values, chunkBytes) - } + encodingDuration += time.Since(start) } // Write all the key/value pairs to the local database atomically. + start := time.Now() err = s.db.WriteBatch(keys, values) if err != nil { log.Error("Failed to write the batch into local database:", "err", err) return nil, err } + log.Debug("StoreBatch succeeded", "chunk serialization duration", serializationDuration, "bytes encoding duration", encodingDuration, "write batch duration", time.Since(start), "total bytes", size) return &keys, nil } From a65c410467deb8d1fa8fb7aa30c460c7ec38d2c4 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 8 May 2024 03:50:00 +0000 Subject: [PATCH 5/5] test --- node/store.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/node/store.go b/node/store.go index 763fdad982..d5bf8f68d8 100644 --- a/node/store.go +++ b/node/store.go @@ -218,7 +218,6 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs // Generate key/value pairs for all blob headers and blob chunks . size := int64(0) - var serializationDuration, encodingDuration time.Duration for idx, blob := range blobs { // blob header blobHeaderKey, err := EncodeBlobHeaderKey(batchHeaderHash, idx) @@ -235,10 +234,9 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs values = append(values, blobHeaderBytes) // Get raw chunks - start := time.Now() rawBlob := blobsProto[idx] if len(rawBlob.GetBundles()) != len(blob.Bundles) { - return nil, errors.New("internal error: the number of bundles in pased blob must be the same as in raw blob") + return nil, errors.New("internal error: the number of bundles in parsed blob must be the same as in raw blob") } rawChunks := make(map[core.QuorumID][][]byte) for i, chunks := range rawBlob.GetBundles() { @@ -248,8 +246,6 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs rawChunks[quorumID][j] = chunk } } - serializationDuration += time.Since(start) - start = time.Now() // blob chunks for quorumID, bundle := range blob.Bundles { @@ -272,17 +268,14 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs keys = append(keys, key) values = append(values, chunkBytes) } - encodingDuration += time.Since(start) } // Write all the key/value pairs to the local database atomically. - start := time.Now() err = s.db.WriteBatch(keys, values) if err != nil { log.Error("Failed to write the batch into local database:", "err", err) return nil, err } - log.Debug("StoreBatch succeeded", "chunk serialization duration", serializationDuration, "bytes encoding duration", encodingDuration, "write batch duration", time.Since(start), "total bytes", size) return &keys, nil }