diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index ae18404cba..25741d7d64 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -119,7 +119,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) { } fragmentInfo := &encoding.FragmentInfo{ TotalChunkSizeBytes: 100, - NumFragments: 10, + FragmentSizeBytes: 1024 * 1024 * 4, } err := blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo) assert.NoError(t, err) diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index 916490d5e4..cf4c063877 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -119,7 +119,7 @@ func TestHandleBatch(t *testing.T) { c := newTestComponents(t) c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{ TotalChunkSizeBytes: 100, - NumFragments: 5, + FragmentSizeBytes: 1024 * 1024 * 4, }, nil) err = c.EncodingManager.HandleBatch(ctx) @@ -139,7 +139,7 @@ func TestHandleBatch(t *testing.T) { assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey) } assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) - assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5)) + assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) } func TestHandleBatchNoBlobs(t *testing.T) { @@ -178,7 +178,7 @@ func TestHandleBatchRetrySuccess(t *testing.T) { c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{ TotalChunkSizeBytes: 100, - NumFragments: 5, + FragmentSizeBytes: 1024 * 1024 * 4, }, nil) err = c.EncodingManager.HandleBatch(ctx) @@ -198,7 +198,7 @@ func TestHandleBatchRetrySuccess(t *testing.T) { assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey) } assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) - assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5)) + assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) } diff --git a/encoding/data.go b/encoding/data.go index 2c43561456..39a1106c25 100644 --- a/encoding/data.go +++ b/encoding/data.go @@ -88,7 +88,10 @@ type SubBatch struct { type ChunkNumber = uint +// FragmentInfo contains metadata about how chunk coefficients file is stored. type FragmentInfo struct { + // TotalChunkSizeBytes is the total size of the file containing all chunk coefficients for the blob. TotalChunkSizeBytes uint32 - NumFragments uint32 + // FragmentSizeBytes is the maximum fragment size used to store the chunk coefficients. + FragmentSizeBytes uint32 } diff --git a/encoding/rs/frame.go b/encoding/rs/frame.go index 4b4ba0bed0..ea10e7fe27 100644 --- a/encoding/rs/frame.go +++ b/encoding/rs/frame.go @@ -2,8 +2,10 @@ package rs import ( "bytes" + "encoding/binary" "encoding/gob" - + "fmt" + "github.com/Layr-Labs/eigenda/encoding" "github.com/consensys/gnark-crypto/ecc/bn254/fr" ) @@ -13,6 +15,7 @@ type Frame struct { Coeffs []fr.Element } +// Encode serializes the frame into a byte slice. func (f *Frame) Encode() ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) @@ -23,6 +26,7 @@ func (f *Frame) Encode() ([]byte, error) { return buf.Bytes(), nil } +// Decode deserializes a byte slice into a frame. func Decode(b []byte) (Frame, error) { var f Frame buf := bytes.NewBuffer(b) @@ -33,3 +37,108 @@ func Decode(b []byte) (Frame, error) { } return f, nil } + +// GnarkEncodeFrames serializes a slice of frames into a byte slice. +// +// Serialization format: +// [number of frames: 4 byte uint32] +// [size of frame 1: 4 byte uint32][frame 1] +// [size of frame 2: 4 byte uint32][frame 2] +// ... +// [size of frame n: 4 byte uint32][frame n] +// +// Where relevant, big endian encoding is used. +func GnarkEncodeFrames(frames []*Frame) ([]byte, error) { + + // Count the number of bytes. + encodedSize := uint32(4) // stores the number of frames + for _, frame := range frames { + encodedSize += 4 // stores the size of the frame + encodedSize += GnarkFrameSize(frame) // size of the frame + } + + serializedBytes := make([]byte, encodedSize) + binary.BigEndian.PutUint32(serializedBytes, uint32(len(frames))) + index := uint32(4) + + for _, frame := range frames { + index += GnarkEncodeFrame(frame, serializedBytes[index:]) + } + + if index != encodedSize { + // Sanity check, this should never happen. + return nil, fmt.Errorf("encoded size mismatch: expected %d, got %d", encodedSize, index) + } + + return serializedBytes, nil +} + +// GnarkEncodeFrame serializes a frame into a target byte slice. Returns the number of bytes written. +func GnarkEncodeFrame(frame *Frame, target []byte) uint32 { + binary.BigEndian.PutUint32(target, uint32(len(frame.Coeffs))) + index := uint32(4) + + for _, coeff := range frame.Coeffs { + serializedCoeff := coeff.Marshal() + copy(target[index:], serializedCoeff) + index += uint32(len(serializedCoeff)) + } + + return index +} + +// GnarkFrameSize returns the size of a frame in bytes. +func GnarkFrameSize(frame *Frame) uint32 { + return uint32(encoding.BYTES_PER_SYMBOL * len(frame.Coeffs)) +} + +// GnarkDecodeFrames deserializes a byte slice into a slice of frames. +func GnarkDecodeFrames(serializedFrames []byte) ([]*Frame, error) { + frameCount := binary.BigEndian.Uint32(serializedFrames) + index := uint32(4) + + frames := make([]*Frame, frameCount) + + for i := 0; i < int(frameCount); i++ { + frame, bytesRead, err := GnarkDecodeFrame(serializedFrames[index:]) + + if err != nil { + return nil, fmt.Errorf("failed to decode frame %d: %w", i, err) + } + + frames[i] = frame + index += bytesRead + } + + if index != uint32(len(serializedFrames)) { + return nil, fmt.Errorf("decoded size mismatch: expected %d, got %d", len(serializedFrames), index) + } + + return frames, nil +} + +// GnarkDecodeFrame deserializes a byte slice into a frame. Returns the frame and the number of bytes read. +func GnarkDecodeFrame(serializedFrame []byte) (*Frame, uint32, error) { + if len(serializedFrame) < 4 { + return nil, 0, fmt.Errorf("invalid frame size: %d", len(serializedFrame)) + } + + frameCount := binary.BigEndian.Uint32(serializedFrame) + index := uint32(4) + + if len(serializedFrame) < int(index+frameCount*encoding.BYTES_PER_SYMBOL) { + return nil, 0, fmt.Errorf("invalid frame size: %d", len(serializedFrame)) + } + + coeffs := make([]fr.Element, frameCount) + for i := 0; i < int(frameCount); i++ { + coeff := fr.Element{} + coeff.Unmarshal(serializedFrame[index : index+encoding.BYTES_PER_SYMBOL]) + coeffs[i] = coeff + index += uint32(encoding.BYTES_PER_SYMBOL) + } + + frame := &Frame{Coeffs: coeffs} + + return frame, index, nil +} diff --git a/encoding/rs/frame_test.go b/encoding/rs/frame_test.go index 54335786c8..ccf664af15 100644 --- a/encoding/rs/frame_test.go +++ b/encoding/rs/frame_test.go @@ -1,6 +1,7 @@ package rs_test import ( + "fmt" "math" "testing" @@ -47,3 +48,81 @@ func TestEncodeDecodeFrame_AreInverses(t *testing.T) { assert.Equal(t, frame, frames[0]) } + +func TestGnarkEncodeDecodeFrame_AreInverses(t *testing.T) { + teardownSuite := setupSuite(t) + defer teardownSuite(t) + + params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES))) + enc, _ := rs.NewEncoder(params, true) + + n := uint8(math.Log2(float64(enc.NumEvaluations()))) + if enc.ChunkLength == 1 { + n = uint8(math.Log2(float64(2 * enc.NumChunks))) + } + fs := fft.NewFFTSettings(n) + + RsComputeDevice := &rs_cpu.RsCpuComputeDevice{ + Fs: fs, + EncodingParams: params, + } + + enc.Computer = RsComputeDevice + require.NotNil(t, enc) + + frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES) + require.Nil(t, err) + require.NotNil(t, frames, err) + + serializedSize := rs.GnarkFrameSize(&frames[0]) + 4 + bytes := make([]byte, serializedSize) + rs.GnarkEncodeFrame(&frames[0], bytes) + + fmt.Printf("\n\n\n") + + deserializedFrame, bytesRead, err := rs.GnarkDecodeFrame(bytes) + assert.NoError(t, err) + assert.Equal(t, bytesRead, serializedSize) + assert.Equal(t, &frames[0], deserializedFrame) +} + +func TestGnarkEncodeDecodeFrames_AreInverses(t *testing.T) { + teardownSuite := setupSuite(t) + defer teardownSuite(t) + + params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES))) + enc, _ := rs.NewEncoder(params, true) + + n := uint8(math.Log2(float64(enc.NumEvaluations()))) + if enc.ChunkLength == 1 { + n = uint8(math.Log2(float64(2 * enc.NumChunks))) + } + fs := fft.NewFFTSettings(n) + + RsComputeDevice := &rs_cpu.RsCpuComputeDevice{ + Fs: fs, + EncodingParams: params, + } + + enc.Computer = RsComputeDevice + require.NotNil(t, enc) + + frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES) + assert.NoError(t, err) + + framesPointers := make([]*rs.Frame, len(frames)) + for i, frame := range frames { + framesPointers[i] = &frame + } + + encodedFrames, err := rs.GnarkEncodeFrames(framesPointers) + assert.NoError(t, err) + + decodedFrames, err := rs.GnarkDecodeFrames(encodedFrames) + assert.NoError(t, err) + + assert.Equal(t, len(framesPointers), len(decodedFrames)) + for i := range framesPointers { + assert.Equal(t, *framesPointers[i], *decodedFrames[i]) + } +} diff --git a/relay/chunkstore/chunk_reader.go b/relay/chunkstore/chunk_reader.go new file mode 100644 index 0000000000..843a0276d8 --- /dev/null +++ b/relay/chunkstore/chunk_reader.go @@ -0,0 +1,116 @@ +package chunkstore + +import ( + "context" + "fmt" + "github.com/Layr-Labs/eigenda/common/aws/s3" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/rs" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" +) + +// ChunkReader reads chunks written by ChunkWriter. +type ChunkReader interface { + // GetChunkProofs reads a slice of proofs from the chunk store. + GetChunkProofs(ctx context.Context, blobKey v2.BlobKey) ([]*encoding.Proof, error) + // GetChunkCoefficients reads a slice of frames from the chunk store. The metadata parameter + // should match the metadata returned by PutChunkCoefficients. + GetChunkCoefficients( + ctx context.Context, + blobKey v2.BlobKey, + fragmentInfo *encoding.FragmentInfo) ([]*rs.Frame, error) +} + +var _ ChunkReader = (*chunkReader)(nil) + +type chunkReader struct { + logger logging.Logger + metadataStore *blobstore.BlobMetadataStore + client s3.Client + bucket string + shards []uint32 +} + +// NewChunkReader creates a new ChunkReader. +// +// This chunk reader will only return data for the shards specified in the shards parameter. +// If empty, it will return data for all shards. (Note: shard feature is not yet implemented.) +func NewChunkReader( + logger logging.Logger, + metadataStore *blobstore.BlobMetadataStore, + s3Client s3.Client, + bucketName string, + shards []uint32) ChunkReader { + + return &chunkReader{ + logger: logger, + metadataStore: metadataStore, + client: s3Client, + bucket: bucketName, + shards: shards, + } +} + +func (r *chunkReader) GetChunkProofs( + ctx context.Context, + blobKey v2.BlobKey) ([]*encoding.Proof, error) { + + s3Key := blobKey.Hex() + + bytes, err := r.client.DownloadObject(ctx, r.bucket, s3Key) + if err != nil { + r.logger.Error("Failed to download chunks from S3: %v", err) + return nil, fmt.Errorf("failed to download chunks from S3: %w", err) + } + + if len(bytes)%bn254.SizeOfG1AffineCompressed != 0 { + r.logger.Error("Invalid proof size") + return nil, fmt.Errorf("invalid proof size: %w", err) + } + + proofCount := len(bytes) / bn254.SizeOfG1AffineCompressed + proofs := make([]*encoding.Proof, proofCount) + + for i := 0; i < proofCount; i++ { + proof := encoding.Proof{} + err := proof.Unmarshal(bytes[i*bn254.SizeOfG1AffineCompressed:]) + if err != nil { + r.logger.Error("Failed to unmarshal proof: %v", err) + return nil, fmt.Errorf("failed to unmarshal proof: %w", err) + } + proofs[i] = &proof + } + + return proofs, nil +} + +func (r *chunkReader) GetChunkCoefficients( + ctx context.Context, + blobKey v2.BlobKey, + fragmentInfo *encoding.FragmentInfo) ([]*rs.Frame, error) { + + s3Key := blobKey.Hex() + + bytes, err := r.client.FragmentedDownloadObject( + ctx, + r.bucket, + s3Key, + int(fragmentInfo.TotalChunkSizeBytes), + int(fragmentInfo.FragmentSizeBytes)) + + if err != nil { + r.logger.Error("Failed to download chunks from S3: %v", err) + return nil, fmt.Errorf("failed to download chunks from S3: %w", err) + } + + frames, err := rs.GnarkDecodeFrames(bytes) + if err != nil { + r.logger.Error("Failed to decode frames: %v", err) + return nil, fmt.Errorf("failed to decode frames: %w", err) + } + + return frames, nil +} diff --git a/relay/chunkstore/chunk_store_test.go b/relay/chunkstore/chunk_store_test.go new file mode 100644 index 0000000000..dfac33e117 --- /dev/null +++ b/relay/chunkstore/chunk_store_test.go @@ -0,0 +1,269 @@ +package chunkstore + +import ( + "context" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/s3" + "github.com/Layr-Labs/eigenda/common/mock" + tu "github.com/Layr-Labs/eigenda/common/testutils" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/fft" + "github.com/Layr-Labs/eigenda/encoding/rs" + rs_cpu "github.com/Layr-Labs/eigenda/encoding/rs/cpu" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + "math" + "math/rand" + "os" + "testing" +) + +var ( + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource +) + +const ( + localstackPort = "4570" + localstackHost = "http://0.0.0.0:4570" + bucket = "eigen-test" +) + +type clientBuilder struct { + // This method is called at the beginning of the test. + start func() error + // This method is called to build a new client. + build func() (s3.Client, error) + // This method is called at the end of the test when all operations are done. + finish func() error +} + +var clientBuilders = []*clientBuilder{ + { + start: func() error { + return nil + }, + build: func() (s3.Client, error) { + return mock.NewS3Client(), nil + }, + finish: func() error { + return nil + }, + }, + { + start: func() error { + return setupLocalstack() + }, + build: func() (s3.Client, error) { + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + if err != nil { + return nil, err + } + + config := aws.DefaultClientConfig() + config.EndpointURL = localstackHost + config.Region = "us-east-1" + + err = os.Setenv("AWS_ACCESS_KEY_ID", "localstack") + if err != nil { + return nil, err + } + err = os.Setenv("AWS_SECRET_ACCESS_KEY", "localstack") + if err != nil { + return nil, err + } + + client, err := s3.NewClient(context.Background(), *config, logger) + if err != nil { + return nil, err + } + + err = client.CreateBucket(context.Background(), bucket) + if err != nil { + return nil, err + } + + return client, nil + }, + finish: func() error { + teardownLocalstack() + return nil + }, + }, +} + +func setupLocalstack() error { + deployLocalStack := !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localstackPort) + if err != nil && err.Error() == "container already exists" { + teardownLocalstack() + return err + } + } + return nil +} + +func teardownLocalstack() { + deployLocalStack := !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} + +func getProofs(t *testing.T, count int) []*encoding.Proof { + proofs := make([]*encoding.Proof, count) + + // Note from Cody: I'd rather use randomized proofs here, but I'm not sure how to generate them. + // Using random data breaks since the deserialization logic rejects invalid proofs. + var x, y fp.Element + _, err := x.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") + require.NoError(t, err) + _, err = y.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") + require.NoError(t, err) + + for i := 0; i < count; i++ { + proof := encoding.Proof{ + X: x, + Y: y, + } + proofs[i] = &proof + + } + + return proofs +} + +func RandomProofsTest(t *testing.T, client s3.Client) { + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + fragmentSize := rand.Intn(1024) + 100 // ignored since we aren't writing coefficients + + writer := NewChunkWriter(logger, client, bucket, fragmentSize) + reader := NewChunkReader(logger, nil, client, bucket, make([]uint32, 0)) + + expectedValues := make(map[v2.BlobKey][]*encoding.Proof) + + // Write data + for i := 0; i < 100; i++ { + key := v2.BlobKey(tu.RandomBytes(32)) + + proofs := getProofs(t, rand.Intn(100)+100) + expectedValues[key] = proofs + + err := writer.PutChunkProofs(context.Background(), key, proofs) + require.NoError(t, err) + } + + // Read data + for key, expectedProofs := range expectedValues { + proofs, err := reader.GetChunkProofs(context.Background(), key) + require.NoError(t, err) + require.Equal(t, expectedProofs, proofs) + } +} + +func TestRandomProofs(t *testing.T) { + tu.InitializeRandom() + for _, builder := range clientBuilders { + err := builder.start() + require.NoError(t, err) + + client, err := builder.build() + require.NoError(t, err) + RandomProofsTest(t, client) + + err = builder.finish() + require.NoError(t, err) + } +} + +func generateRandomFrames(t *testing.T, encoder *rs.Encoder, size int) []*rs.Frame { + frames, _, err := encoder.EncodeBytes(codec.ConvertByPaddingEmptyByte(tu.RandomBytes(size))) + result := make([]*rs.Frame, len(frames)) + require.NoError(t, err) + + for i := 0; i < len(frames); i++ { + result[i] = &frames[i] + } + + return result +} + +func RandomCoefficientsTest(t *testing.T, client s3.Client) { + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + + chunkSize := uint64(rand.Intn(1024) + 100) + fragmentSize := int(chunkSize / 2) + + params := encoding.ParamsFromSysPar(3, 1, chunkSize) + encoder, _ := rs.NewEncoder(params, true) + + n := uint8(math.Log2(float64(encoder.NumEvaluations()))) + if encoder.ChunkLength == 1 { + n = uint8(math.Log2(float64(2 * encoder.NumChunks))) + } + fs := fft.NewFFTSettings(n) + + RsComputeDevice := &rs_cpu.RsCpuComputeDevice{ + Fs: fs, + EncodingParams: params, + } + + encoder.Computer = RsComputeDevice + require.NotNil(t, encoder) + + writer := NewChunkWriter(logger, client, bucket, fragmentSize) + reader := NewChunkReader(logger, nil, client, bucket, make([]uint32, 0)) + + expectedValues := make(map[v2.BlobKey][]*rs.Frame) + metadataMap := make(map[v2.BlobKey]*encoding.FragmentInfo) + + // Write data + for i := 0; i < 100; i++ { + key := v2.BlobKey(tu.RandomBytes(32)) + + coefficients := generateRandomFrames(t, encoder, int(chunkSize)) + expectedValues[key] = coefficients + + metadata, err := writer.PutChunkCoefficients(context.Background(), key, coefficients) + require.NoError(t, err) + metadataMap[key] = metadata + } + + // Read data + for key, expectedCoefficients := range expectedValues { + coefficients, err := reader.GetChunkCoefficients(context.Background(), key, metadataMap[key]) + require.NoError(t, err) + require.Equal(t, len(expectedCoefficients), len(coefficients)) + for i := 0; i < len(expectedCoefficients); i++ { + require.Equal(t, *expectedCoefficients[i], *coefficients[i]) + } + } +} + +func TestRandomCoefficients(t *testing.T) { + tu.InitializeRandom() + for _, builder := range clientBuilders { + err := builder.start() + require.NoError(t, err) + + client, err := builder.build() + require.NoError(t, err) + RandomCoefficientsTest(t, client) + + err = builder.finish() + require.NoError(t, err) + } +} diff --git a/relay/chunkstore/chunk_writer.go b/relay/chunkstore/chunk_writer.go new file mode 100644 index 0000000000..ea54f28b19 --- /dev/null +++ b/relay/chunkstore/chunk_writer.go @@ -0,0 +1,91 @@ +package chunkstore + +import ( + "context" + "fmt" + "github.com/Layr-Labs/eigenda/common/aws/s3" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/rs" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" +) + +// ChunkWriter writes chunks that can be read by ChunkReader. +type ChunkWriter interface { + // PutChunkProofs writes a slice of proofs to the chunk store. + PutChunkProofs(ctx context.Context, blobKey v2.BlobKey, proofs []*encoding.Proof) error + // PutChunkCoefficients writes a slice of frames to the chunk store. + PutChunkCoefficients( + ctx context.Context, + blobKey v2.BlobKey, + frames []*rs.Frame) (*encoding.FragmentInfo, error) +} + +var _ ChunkWriter = (*chunkWriter)(nil) + +type chunkWriter struct { + logger logging.Logger + s3Client s3.Client + bucketName string + fragmentSize int +} + +// NewChunkWriter creates a new ChunkWriter. +func NewChunkWriter( + logger logging.Logger, + s3Client s3.Client, + bucketName string, + fragmentSize int) ChunkWriter { + + return &chunkWriter{ + logger: logger, + s3Client: s3Client, + bucketName: bucketName, + fragmentSize: fragmentSize, + } +} + +func (c *chunkWriter) PutChunkProofs(ctx context.Context, blobKey v2.BlobKey, proofs []*encoding.Proof) error { + s3Key := blobKey.Hex() + + bytes := make([]byte, 0, bn254.SizeOfG1AffineCompressed*len(proofs)) + for _, proof := range proofs { + proofBytes := proof.Bytes() + bytes = append(bytes, proofBytes[:]...) + } + + err := c.s3Client.UploadObject(ctx, c.bucketName, s3Key, bytes) + + if err != nil { + c.logger.Error("Failed to upload chunks to S3: %v", err) + return fmt.Errorf("failed to upload chunks to S3: %w", err) + } + + return nil +} + +func (c *chunkWriter) PutChunkCoefficients( + ctx context.Context, + blobKey v2.BlobKey, + frames []*rs.Frame) (*encoding.FragmentInfo, error) { + + s3Key := blobKey.Hex() + + bytes, err := rs.GnarkEncodeFrames(frames) + if err != nil { + c.logger.Error("Failed to encode frames: %v", err) + return nil, fmt.Errorf("failed to encode frames: %w", err) + } + + err = c.s3Client.FragmentedUploadObject(ctx, c.bucketName, s3Key, bytes, c.fragmentSize) + if err != nil { + c.logger.Error("Failed to upload chunks to S3: %v", err) + return nil, fmt.Errorf("failed to upload chunks to S3: %w", err) + } + + return &encoding.FragmentInfo{ + TotalChunkSizeBytes: uint32(len(bytes)), + FragmentSizeBytes: uint32(c.fragmentSize), + }, nil +}