Skip to content

Commit

Permalink
Validate chunks in retrieval client (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 5, 2023
1 parent 5c27140 commit 064f756
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
45 changes: 34 additions & 11 deletions clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (r *retrievalClient) RetrieveBlob(
return nil, fmt.Errorf("no quorum header for quorum %d", quorumID)
}

// Validate the blob length
err = r.encoder.VerifyBlobLength(blobHeader.BlobCommitments)
if err != nil {
return nil, err
}

assignements, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, quorumID, uint(quorumHeader.QuantizationFactor))
if err != nil {
return nil, fmt.Errorf("failed to get assignments")
Expand All @@ -126,36 +132,53 @@ func (r *retrievalClient) RetrieveBlob(
opInfo := indexedOperatorState.IndexedOperators[opID]
pool.Submit(func() {
r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan)
// TODO(ian-shim): validate chunks received from nodes
})
}

chunkLength, err := r.assignmentCoordinator.GetChunkLengthFromHeader(indexedOperatorState.OperatorState, quorumHeader)
if err != nil {
return nil, err
}

minChunkLength, err := r.assignmentCoordinator.GetMinimumChunkLength(uint(len(operators)), blobHeader.BlobCommitments.Length, quorumHeader.QuantizationFactor, quorumHeader.QuorumThreshold, quorumHeader.AdversaryThreshold)
if err != nil {
return nil, err
}

encodingParams, err := core.GetEncodingParams(minChunkLength, info.TotalChunks)
if err != nil {
return nil, err
}

if chunkLength != encodingParams.ChunkLength {
return nil, fmt.Errorf("chunk length mismatch: %d != %d", chunkLength, encodingParams.ChunkLength)
}

var chunks []*core.Chunk
var indices []core.ChunkNumber
// TODO(ian-shim): if we gathered enough chunks, cancel remaining RPC calls
for i := 0; i < len(operators); i++ {
reply := <-chunksChan
if reply.Err != nil {
r.logger.Error("failed to get chunks from operator", "operator", reply.OperatorID, "err", reply.Err)
continue
}
assignment, ok := assignements[reply.OperatorID]
if !ok {
return nil, fmt.Errorf("no assignment to operator %v", reply.OperatorID)
}

err = r.encoder.VerifyChunks(reply.Chunks, assignment.GetIndices(), blobHeader.BlobCommitments, encodingParams)
if err != nil {
r.logger.Error("failed to verify chunks from operator", "operator", reply.OperatorID, "err", err)
continue
} else {
r.logger.Info("verified chunks from operator", "operator", reply.OperatorID)
}

chunks = append(chunks, reply.Chunks...)
indices = append(indices, assignment.GetIndices()...)
}

chunkLength, err := r.assignmentCoordinator.GetChunkLengthFromHeader(indexedOperatorState.OperatorState, quorumHeader)
if err != nil {
return nil, err
}

encodingParams, err := core.GetEncodingParams(chunkLength, info.TotalChunks)
if err != nil {
return nil, err
}

return r.encoder.Decode(chunks, indices, encodingParams, uint64(blobHeader.Length)*bn254.BYTES_PER_COEFFICIENT)
}
3 changes: 2 additions & 1 deletion clients/tests/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ func setup(t *testing.T) {
SecurityParam: core.SecurityParam{
QuorumID: quorumID,
AdversaryThreshold: adversaryThreshold,
QuorumThreshold: quorumThreshold,
},
QuantizationFactor: quantizationFactor,
EncodedBlobLength: quantizationFactor * chunkLength * numOperators,
EncodedBlobLength: quantizationFactor * params.ChunkLength * numOperators,
}

blobHeader = &core.BlobHeader{
Expand Down

0 comments on commit 064f756

Please sign in to comment.