Skip to content

Commit

Permalink
update encoded blob size accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Mar 30, 2024
1 parent 4db2480 commit 9dcc124
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 24 deletions.
10 changes: 5 additions & 5 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ type BlobMessage struct {
Bundles Bundles
}

func (b Bundle) Size() int64 {
size := int64(0)
func (b Bundle) Size() uint64 {
size := uint64(0)
for _, chunk := range b {
size += int64(chunk.Size())
size += chunk.Size()
}
return size
}
Expand All @@ -181,8 +181,8 @@ func (cb Bundles) Serialize() (map[uint32][][]byte, error) {
}

// Returns the size of the bundles in bytes.
func (cb Bundles) Size() int64 {
size := int64(0)
func (cb Bundles) Size() uint64 {
size := uint64(0)
for _, bundle := range cb {
size += bundle.Size()
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestBatcherIterations(t *testing.T) {
assert.NoError(t, err)
count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)
assert.Equal(t, uint64(197632), size)
assert.Equal(t, uint64(27904), size)

txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil)
Expand Down
10 changes: 3 additions & 7 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type EncodingResult struct {
ReferenceBlockNumber uint
BlobQuorumInfo *core.BlobQuorumInfo
Commitment *encoding.BlobCommitments
Chunks []*encoding.Frame
Chunks core.Bundle
Assignments map[core.OperatorID]core.Assignment
Status status
}
Expand Down Expand Up @@ -191,11 +191,7 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID {
return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID))
}

// getChunksSize returns the total size of all the chunks in the encoded result in bytes
func getChunksSize(result *EncodingResult) uint64 {
var size uint64

for _, chunk := range result.Chunks {
size += uint64(len(chunk.Coeffs) * 256) // 256 bytes per symbol
}
return size + 256*2 // + 256 * 2 bytes for proof
return result.Chunks.Size()
}
12 changes: 6 additions & 6 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestBatchTrigger(t *testing.T) {
assert.Nil(t, err)
count, size := encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(17920))

// try encode the same blobs again at different block (this happens when the blob is retried)
encodingStreamer.ReferenceBlockNumber = 11
Expand All @@ -163,7 +163,7 @@ func TestBatchTrigger(t *testing.T) {

count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(17920))

// don't notify yet
select {
Expand All @@ -182,7 +182,7 @@ func TestBatchTrigger(t *testing.T) {

count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 2)
assert.Equal(t, size, uint64(131584)*2)
assert.Equal(t, size, uint64(17920)*2)

// notify
select {
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestStreamingEncoding(t *testing.T) {
assert.True(t, isRequested)
count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(17920))

// Cancel previous blob so it doesn't get reencoded.
err = c.blobStore.MarkBlobFailed(ctx, metadataKey)
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestStreamingEncoding(t *testing.T) {
assert.True(t, isRequested)
count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(17920))

// Request the same blob, which should be dedupped
_, err = c.blobStore.StoreBlob(ctx, &blob, requestedAt)
Expand All @@ -284,7 +284,7 @@ func TestStreamingEncoding(t *testing.T) {
// It should not have been added to the encoded blob store
count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(17920))
}

func TestEncodingFailure(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions encoding/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func (f *Frame) Length() int {
}

// Returns the size of chunk in bytes.
func (f *Frame) Size() int {
return f.Length() * BYTES_PER_COEFFICIENT
func (f *Frame) Size() uint64 {
proofSize := uint64(2 * 32)
return uint64(f.Length()*BYTES_PER_COEFFICIENT) + proofSize
}

// Sample is a chunk with associated metadata used by the Universal Batch Verifier
Expand Down
4 changes: 2 additions & 2 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ func (g *Metrics) RemoveNCurrentBatch(numBatches int, totalBatchSize int64) {
g.AccuRemovedBatches.WithLabelValues("size").Add(float64(totalBatchSize))
}

func (g *Metrics) AcceptBlobs(quorumId core.QuorumID, blobSize int64) {
func (g *Metrics) AcceptBlobs(quorumId core.QuorumID, blobSize uint64) {
quorum := strconv.Itoa(int(quorumId))
g.AccuBlobs.WithLabelValues("number", quorum).Inc()
g.AccuBlobs.WithLabelValues("size", quorum).Add(float64(blobSize))
}

func (g *Metrics) AcceptBatches(status string, batchSize int64) {
func (g *Metrics) AcceptBatches(status string, batchSize uint64) {
g.AccuBatches.WithLabelValues("number", status).Inc()
g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize))
}
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
log.Debug("Processing batch", "num of blobs", len(blobs))

// Measure num batches received and its size in bytes
batchSize := int64(0)
batchSize := uint64(0)
for _, blob := range blobs {
for quorumID, bundle := range blob.Bundles {
n.Metrics.AcceptBlobs(quorumID, bundle.Size())
Expand Down

0 comments on commit 9dcc124

Please sign in to comment.