From 1f2a6adea340130ebb4658ecbaed47c6015d7a5e Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 14:54:47 -0800 Subject: [PATCH] Add Test for Invalid RequestEncoding when no max blob fetch is set --- disperser/batcher/encoding_streamer.go | 7 +++- disperser/batcher/encoding_streamer_test.go | 44 +++++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 0ca0595ca2..cf082cb3bc 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -183,10 +183,9 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan stageTimer := time.Now() // pull new blobs and send to encoder e.mu.Lock() - // TODO: Get Limit from Config metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey) e.exclusiveStartKey = newExclusiveStartKey - e.mu.Lock() + e.mu.Unlock() if err != nil { return fmt.Errorf("error getting blob metadatas: %w", err) @@ -196,6 +195,10 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan return nil } + if len(metadatas) > e.StreamerConfig.MaxBlobsToFetchFromStore { + return fmt.Errorf("number of metadatas fetched from store is %d greater than configured max number of blobs to fetch from store: %d", len(metadatas), e.StreamerConfig.MaxBlobsToFetchFromStore) + } + // read lock to access e.ReferenceBlockNumber e.mu.RLock() referenceBlockNumber := e.ReferenceBlockNumber diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 065c4c7508..146f5741b8 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -464,14 +464,52 @@ func TestPartialBlob(t *testing.T) { assert.Contains(t, batch.BlobMetadata, metadata1) } +func TestIncorrectRequestEncoding(t *testing.T) { + streamerConfig := batcher.StreamerConfig{ + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + } + + encodingStreamer, c := createEncodingStreamer(t, 10, 200_000, streamerConfig) + + securityParams := []*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + QuorumThreshold: 100, + }} + blobData := []byte{1, 2, 3, 4, 5} + + numItems := 30 + for i := 0; i < numItems; i += 1 { + blob := core.Blob{ + RequestHeader: core.BlobRequestHeader{ + SecurityParams: securityParams, + }, + Data: blobData, + } + ctx := context.Background() + _, err := c.blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano())) + assert.Nil(t, err) + } + + out := make(chan batcher.EncodingResultOrStatus) + // Request encoding + err := encodingStreamer.RequestEncoding(context.Background(), out) + assert.NotNil(t, err) + expectedErrMsg := "number of metadatas fetched from store is 30 greater than configured max number of blobs to fetch from store: 0" + assert.Equal(t, expectedErrMsg, err.Error()) +} + func TestIncorrectParameters(t *testing.T) { ctx := context.Background() streamerConfig := batcher.StreamerConfig{ - SRSOrder: 3000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } encodingStreamer, c := createEncodingStreamer(t, 0, 1e12, streamerConfig)