From 9dcc1248e2cf395217423ef88b4c7f7446fea3e8 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Sat, 30 Mar 2024 14:41:05 -0700 Subject: [PATCH] update encoded blob size accounting --- core/data.go | 10 +++++----- disperser/batcher/batcher_test.go | 2 +- disperser/batcher/encoded_blob_store.go | 10 +++------- disperser/batcher/encoding_streamer_test.go | 12 ++++++------ encoding/data.go | 5 +++-- node/metrics.go | 4 ++-- node/node.go | 2 +- 7 files changed, 21 insertions(+), 24 deletions(-) diff --git a/core/data.go b/core/data.go index aaa8975ce8..e4b8f9bfc6 100644 --- a/core/data.go +++ b/core/data.go @@ -157,10 +157,10 @@ type BlobMessage struct { Bundles Bundles } -func (b Bundle) Size() int64 { - size := int64(0) +func (b Bundle) Size() uint64 { + size := uint64(0) for _, chunk := range b { - size += int64(chunk.Size()) + size += chunk.Size() } return size } @@ -181,8 +181,8 @@ func (cb Bundles) Serialize() (map[uint32][][]byte, error) { } // Returns the size of the bundles in bytes. -func (cb Bundles) Size() int64 { - size := int64(0) +func (cb Bundles) Size() uint64 { + size := uint64(0) for _, bundle := range cb { size += bundle.Size() } diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 9b5749b2b2..7ed3ef04d0 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -213,7 +213,7 @@ func TestBatcherIterations(t *testing.T) { assert.NoError(t, err) count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, 2, count) - assert.Equal(t, uint64(197632), size) + assert.Equal(t, uint64(27904), size) txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index bdf56f8bbd..4fa944708a 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -35,7 +35,7 @@ type EncodingResult struct { ReferenceBlockNumber uint BlobQuorumInfo *core.BlobQuorumInfo Commitment *encoding.BlobCommitments - Chunks []*encoding.Frame + Chunks core.Bundle Assignments map[core.OperatorID]core.Assignment Status status } @@ -191,11 +191,7 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID)) } +// getChunksSize returns the total size of all the chunks in the encoded result in bytes func getChunksSize(result *EncodingResult) uint64 { - var size uint64 - - for _, chunk := range result.Chunks { - size += uint64(len(chunk.Coeffs) * 256) // 256 bytes per symbol - } - return size + 256*2 // + 256 * 2 bytes for proof + return result.Chunks.Size() } diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 0277ec5ffa..be886495e1 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -152,7 +152,7 @@ func TestBatchTrigger(t *testing.T) { assert.Nil(t, err) count, size := encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(131584)) + assert.Equal(t, size, uint64(17920)) // try encode the same blobs again at different block (this happens when the blob is retried) encodingStreamer.ReferenceBlockNumber = 11 @@ -163,7 +163,7 @@ func TestBatchTrigger(t *testing.T) { count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(131584)) + assert.Equal(t, size, uint64(17920)) // don't notify yet select { @@ -182,7 +182,7 @@ func TestBatchTrigger(t *testing.T) { count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 2) - assert.Equal(t, size, uint64(131584)*2) + assert.Equal(t, size, uint64(17920)*2) // notify select { @@ -243,7 +243,7 @@ func TestStreamingEncoding(t *testing.T) { assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(131584)) + assert.Equal(t, size, uint64(17920)) // Cancel previous blob so it doesn't get reencoded. err = c.blobStore.MarkBlobFailed(ctx, metadataKey) @@ -273,7 +273,7 @@ func TestStreamingEncoding(t *testing.T) { assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(131584)) + assert.Equal(t, size, uint64(17920)) // Request the same blob, which should be dedupped _, err = c.blobStore.StoreBlob(ctx, &blob, requestedAt) @@ -284,7 +284,7 @@ func TestStreamingEncoding(t *testing.T) { // It should not have been added to the encoded blob store count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(131584)) + assert.Equal(t, size, uint64(17920)) } func TestEncodingFailure(t *testing.T) { diff --git a/encoding/data.go b/encoding/data.go index 2bcc2bfc21..de637514ca 100644 --- a/encoding/data.go +++ b/encoding/data.go @@ -41,8 +41,9 @@ func (f *Frame) Length() int { } // Returns the size of chunk in bytes. -func (f *Frame) Size() int { - return f.Length() * BYTES_PER_COEFFICIENT +func (f *Frame) Size() uint64 { + proofSize := uint64(2 * 32) + return uint64(f.Length()*BYTES_PER_COEFFICIENT) + proofSize } // Sample is a chunk with associated metadata used by the Universal Batch Verifier diff --git a/node/metrics.go b/node/metrics.go index 1208661b46..4844ecae96 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -140,13 +140,13 @@ func (g *Metrics) RemoveNCurrentBatch(numBatches int, totalBatchSize int64) { g.AccuRemovedBatches.WithLabelValues("size").Add(float64(totalBatchSize)) } -func (g *Metrics) AcceptBlobs(quorumId core.QuorumID, blobSize int64) { +func (g *Metrics) AcceptBlobs(quorumId core.QuorumID, blobSize uint64) { quorum := strconv.Itoa(int(quorumId)) g.AccuBlobs.WithLabelValues("number", quorum).Inc() g.AccuBlobs.WithLabelValues("size", quorum).Add(float64(blobSize)) } -func (g *Metrics) AcceptBatches(status string, batchSize int64) { +func (g *Metrics) AcceptBatches(status string, batchSize uint64) { g.AccuBatches.WithLabelValues("number", status).Inc() g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize)) } diff --git a/node/node.go b/node/node.go index ddfce30d96..4e26a61309 100644 --- a/node/node.go +++ b/node/node.go @@ -268,7 +268,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs log.Debug("Processing batch", "num of blobs", len(blobs)) // Measure num batches received and its size in bytes - batchSize := int64(0) + batchSize := uint64(0) for _, blob := range blobs { for quorumID, bundle := range blob.Bundles { n.Metrics.AcceptBlobs(quorumID, bundle.Size())