diff --git a/clients/retrieval_client.go b/clients/retrieval_client.go index bffd4caf95..6f03e76831 100644 --- a/clients/retrieval_client.go +++ b/clients/retrieval_client.go @@ -113,6 +113,12 @@ func (r *retrievalClient) RetrieveBlob( return nil, fmt.Errorf("no quorum header for quorum %d", quorumID) } + // Validate the blob length + err = r.encoder.VerifyBlobLength(blobHeader.BlobCommitments) + if err != nil { + return nil, err + } + assignements, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, quorumID, uint(quorumHeader.QuantizationFactor)) if err != nil { return nil, fmt.Errorf("failed to get assignments") @@ -126,16 +132,35 @@ func (r *retrievalClient) RetrieveBlob( opInfo := indexedOperatorState.IndexedOperators[opID] pool.Submit(func() { r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan) - // TODO(ian-shim): validate chunks received from nodes }) } + chunkLength, err := r.assignmentCoordinator.GetChunkLengthFromHeader(indexedOperatorState.OperatorState, quorumHeader) + if err != nil { + return nil, err + } + + minChunkLength, err := r.assignmentCoordinator.GetMinimumChunkLength(uint(len(operators)), blobHeader.BlobCommitments.Length, quorumHeader.QuantizationFactor, quorumHeader.QuorumThreshold, quorumHeader.AdversaryThreshold) + if err != nil { + return nil, err + } + + encodingParams, err := core.GetEncodingParams(minChunkLength, info.TotalChunks) + if err != nil { + return nil, err + } + + if chunkLength != encodingParams.ChunkLength { + return nil, fmt.Errorf("chunk length mismatch: %d != %d", chunkLength, encodingParams.ChunkLength) + } + var chunks []*core.Chunk var indices []core.ChunkNumber // TODO(ian-shim): if we gathered enough chunks, cancel remaining RPC calls for i := 0; i < len(operators); i++ { reply := <-chunksChan if reply.Err != nil { + r.logger.Error("failed to get chunks from operator", "operator", reply.OperatorID, "err", reply.Err) continue } assignment, ok := assignements[reply.OperatorID] @@ -143,19 +168,17 @@ func (r *retrievalClient) RetrieveBlob( return nil, fmt.Errorf("no assignment to operator %v", reply.OperatorID) } + err = r.encoder.VerifyChunks(reply.Chunks, assignment.GetIndices(), blobHeader.BlobCommitments, encodingParams) + if err != nil { + r.logger.Error("failed to verify chunks from operator", "operator", reply.OperatorID, "err", err) + continue + } else { + r.logger.Info("verified chunks from operator", "operator", reply.OperatorID) + } + chunks = append(chunks, reply.Chunks...) indices = append(indices, assignment.GetIndices()...) } - chunkLength, err := r.assignmentCoordinator.GetChunkLengthFromHeader(indexedOperatorState.OperatorState, quorumHeader) - if err != nil { - return nil, err - } - - encodingParams, err := core.GetEncodingParams(chunkLength, info.TotalChunks) - if err != nil { - return nil, err - } - return r.encoder.Decode(chunks, indices, encodingParams, uint64(blobHeader.Length)*bn254.BYTES_PER_COEFFICIENT) } diff --git a/clients/tests/retrieval_client_test.go b/clients/tests/retrieval_client_test.go index 7f01c9eba9..9709f454a6 100644 --- a/clients/tests/retrieval_client_test.go +++ b/clients/tests/retrieval_client_test.go @@ -122,9 +122,10 @@ func setup(t *testing.T) { SecurityParam: core.SecurityParam{ QuorumID: quorumID, AdversaryThreshold: adversaryThreshold, + QuorumThreshold: quorumThreshold, }, QuantizationFactor: quantizationFactor, - EncodedBlobLength: quantizationFactor * chunkLength * numOperators, + EncodedBlobLength: quantizationFactor * params.ChunkLength * numOperators, } blobHeader = &core.BlobHeader{