Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update accounting logic for total encoded blob size #421

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ type BlobMessage struct {
Bundles Bundles
}

func (b Bundle) Size() int64 {
size := int64(0)
func (b Bundle) Size() uint64 {
size := uint64(0)
for _, chunk := range b {
size += int64(chunk.Size())
size += chunk.Size()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for your information, referring to yesterdays discussion, all chunk will now contains 32bytes. Nothing wrong here

}
return size
}
Expand All @@ -181,8 +181,8 @@ func (cb Bundles) Serialize() (map[uint32][][]byte, error) {
}

// Returns the size of the bundles in bytes.
func (cb Bundles) Size() int64 {
size := int64(0)
func (cb Bundles) Size() uint64 {
size := uint64(0)
for _, bundle := range cb {
size += bundle.Size()
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestBatcherIterations(t *testing.T) {
assert.NoError(t, err)
count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)
assert.Equal(t, uint64(197632), size)
assert.Equal(t, uint64(23808), size)

txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil)
Expand Down
8 changes: 2 additions & 6 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,7 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID {
return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID))
}

// getChunksSize returns the total size of all the chunks in the encoded result in bytes
func getChunksSize(result *EncodingResult) uint64 {
var size uint64

for _, chunk := range result.Chunks {
size += uint64(len(chunk.Coeffs) * 256) // 256 bytes per symbol
}
return size + 256*2 // + 256 * 2 bytes for proof
return core.Bundle(result.Chunks).Size()
}
14 changes: 7 additions & 7 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestEncodingQueueLimit(t *testing.T) {
}

func TestBatchTrigger(t *testing.T) {
encodingStreamer, c := createEncodingStreamer(t, 10, 200_000, streamerConfig)
encodingStreamer, c := createEncodingStreamer(t, 10, 20_000, streamerConfig)

blob := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
Expand All @@ -152,7 +152,7 @@ func TestBatchTrigger(t *testing.T) {
assert.Nil(t, err)
count, size := encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(15872))

// try encode the same blobs again at different block (this happens when the blob is retried)
encodingStreamer.ReferenceBlockNumber = 11
Expand All @@ -163,7 +163,7 @@ func TestBatchTrigger(t *testing.T) {

count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(15872))

// don't notify yet
select {
Expand All @@ -182,7 +182,7 @@ func TestBatchTrigger(t *testing.T) {

count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 2)
assert.Equal(t, size, uint64(131584)*2)
assert.Equal(t, size, uint64(15872)*2)

// notify
select {
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestStreamingEncoding(t *testing.T) {
assert.True(t, isRequested)
count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(15872))

// Cancel previous blob so it doesn't get reencoded.
err = c.blobStore.MarkBlobFailed(ctx, metadataKey)
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestStreamingEncoding(t *testing.T) {
assert.True(t, isRequested)
count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(15872))

// Request the same blob, which should be dedupped
_, err = c.blobStore.StoreBlob(ctx, &blob, requestedAt)
Expand All @@ -284,7 +284,7 @@ func TestStreamingEncoding(t *testing.T) {
// It should not have been added to the encoded blob store
count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, count, 1)
assert.Equal(t, size, uint64(131584))
assert.Equal(t, size, uint64(15872))
}

func TestEncodingFailure(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions encoding/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (f *Frame) Length() int {
return len(f.Coeffs)
}

// Returns the size of chunk in bytes.
func (f *Frame) Size() int {
return f.Length() * BYTES_PER_COEFFICIENT
// Size return the size of chunks in bytes.
func (f *Frame) Size() uint64 {
return uint64(f.Length() * BYTES_PER_COEFFICIENT)
}

// Sample is a chunk with associated metadata used by the Universal Batch Verifier
Expand Down
4 changes: 2 additions & 2 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ func (g *Metrics) RemoveNCurrentBatch(numBatches int, totalBatchSize int64) {
g.AccuRemovedBatches.WithLabelValues("size").Add(float64(totalBatchSize))
}

func (g *Metrics) AcceptBlobs(quorumId core.QuorumID, blobSize int64) {
func (g *Metrics) AcceptBlobs(quorumId core.QuorumID, blobSize uint64) {
quorum := strconv.Itoa(int(quorumId))
g.AccuBlobs.WithLabelValues("number", quorum).Inc()
g.AccuBlobs.WithLabelValues("size", quorum).Add(float64(blobSize))
}

func (g *Metrics) AcceptBatches(status string, batchSize int64) {
func (g *Metrics) AcceptBatches(status string, batchSize uint64) {
g.AccuBatches.WithLabelValues("number", status).Inc()
g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize))
}
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
log.Debug("Processing batch", "num of blobs", len(blobs))

// Measure num batches received and its size in bytes
batchSize := int64(0)
batchSize := uint64(0)
for _, blob := range blobs {
for quorumID, bundle := range blob.Bundles {
n.Metrics.AcceptBlobs(quorumID, bundle.Size())
Expand Down
Loading