Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Encoding ]Request with pagination #157

Merged
merged 30 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5dc0c31
Add pagination method to dynamodB
Jan 5, 2024
9b8dada
Fix lint error
Jan 5, 2024
604e29d
return lastEvaluatedKey
Jan 5, 2024
47de551
Update Table to use RequestedAt instead of CreatedAt key
Jan 5, 2024
5198c14
Encoding Request With Pagination
Jan 5, 2024
1946d6f
Update to use Extended BlobStore
Jan 5, 2024
595f74d
Fix PR comment
Jan 5, 2024
74856fc
make blobstofetch a configuration
Jan 5, 2024
f688e0e
Fix PR Comment
Jan 5, 2024
80f8152
Merge branch 'blob_metadata_with_pagination' into encoding_request_wi…
Jan 5, 2024
1f2a6ad
Add Test for Invalid RequestEncoding when no max blob fetch is set
Jan 5, 2024
e804628
Add Test for GetBlobMetadataByStatusWithPagination
Jan 8, 2024
77b36fd
Fix Test
Jan 8, 2024
ca05b43
Add more test
Jan 8, 2024
55b4229
Merge branch 'blob_metadata_with_pagination' into encoding_request_wi…
Jan 8, 2024
de4953c
fix merge conflict
Jan 8, 2024
4365263
Merge branch 'master' into encoding_request_with_pagination
Jan 8, 2024
2931566
remove commented code
Jan 8, 2024
ba22d7b
Set Default Value for BlobsToFetch from store
Jan 8, 2024
ba13aa2
Add MetadataHash and BlobHash to ExclusiveStartKEy
Jan 9, 2024
ee26668
return nil when their are no results
Jan 9, 2024
65c0078
Fetch all items when limit is set to 0
Jan 9, 2024
346bcde
Update comment
Jan 9, 2024
87780da
changes
Jan 9, 2024
034bb63
Handle case when no limit is provided
Jan 9, 2024
350ecdc
remove test
Jan 9, 2024
d40716f
Fix PR Comments
Jan 10, 2024
53f41e5
Fix UT failure
Jan 10, 2024
69398a3
fix pr comment
Jan 11, 2024
e130739
remove required flag
Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,24 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
siddimore marked this conversation as resolved.
Show resolved Hide resolved
func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) {
queryInput := &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Limit: &limit,
var queryInput *dynamodb.QueryInput

// Fetch all items if limit is 0
if limit != 0 {
siddimore marked this conversation as resolved.
Show resolved Hide resolved
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Limit: &limit,
}
} else {
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
}
}

// If a pagination token was provided, set it as the ExclusiveStartKey
Expand All @@ -187,6 +199,10 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string,
return QueryResult{}, err
}

if len(response.Items) == 0 {
return QueryResult{Items: nil, LastEvaluatedKey: nil}, nil
}

// Return the items and the pagination token
return QueryResult{
Items: response.Items,
Expand Down
62 changes: 61 additions & 1 deletion common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,67 @@ func TestQueryIndexPaginationSingleItem(t *testing.T) {
Value: "0",
}}, 1, lastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 0)
assert.Nil(t, queryResult.Items)
assert.Nil(t, queryResult.LastEvaluatedKey)
}

func TestQueryIndexPaginationItemNoLimit(t *testing.T) {
tableName := "ProcessingWithNoPaginationLimit"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)},
}
err := dynamoClient.PutItem(ctx, tableName, item)
assert.NoError(t, err)
}

queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 0, nil)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 30)
assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Nil(t, queryResult.LastEvaluatedKey)

// Save Last Evaluated Key
lastEvaluatedKey := queryResult.LastEvaluatedKey

// Get the next item using LastEvaluatedKey expect to be nil
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 2, lastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 2)
assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.NotNil(t, queryResult.LastEvaluatedKey)
}

func TestQueryIndexPaginationNoStoredItems(t *testing.T) {
tableName := "ProcessingWithPaginationNoItem"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 1, nil)
assert.NoError(t, err)
assert.Nil(t, queryResult.Items)
assert.Nil(t, queryResult.LastEvaluatedKey)
}

Expand Down
12 changes: 7 additions & 5 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type Config struct {
BatchSizeMBLimit uint
MaxNumRetriesPerBlob uint

TargetNumChunks uint
TargetNumChunks uint
MaxBlobsToFetchFromStore int
}

type Batcher struct {
Expand Down Expand Up @@ -97,10 +98,11 @@ func NewBatcher(
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
streamerConfig := StreamerConfig{
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
Expand Down
13 changes: 12 additions & 1 deletion disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type StreamerConfig struct {

// TargetNumChunks is the target number of chunks per encoded blob
TargetNumChunks uint

// Maximum number of Blobs to fetch from store
MaxBlobsToFetchFromStore int
siddimore marked this conversation as resolved.
Show resolved Hide resolved
}

type EncodingStreamer struct {
Expand All @@ -62,6 +65,9 @@ type EncodingStreamer struct {

metrics *EncodingStreamerMetrics
logger common.Logger

// Used to keep track of the last evaluated key for fetching metadatas
exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey
}

type batch struct {
Expand Down Expand Up @@ -107,6 +113,7 @@ func NewEncodingStreamer(
encodingCtxCancelFuncs: make([]context.CancelFunc, 0),
metrics: metrics,
logger: logger,
exclusiveStartKey: nil,
}, nil
}

Expand Down Expand Up @@ -175,7 +182,11 @@ func (e *EncodingStreamer) dedupRequests(metadatas []*disperser.BlobMetadata, re
func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan EncodingResultOrStatus) error {
stageTimer := time.Now()
// pull new blobs and send to encoder
metadatas, err := e.blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
e.mu.Lock()
metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey)
e.exclusiveStartKey = newExclusiveStartKey
e.mu.Unlock()

if err != nil {
return fmt.Errorf("error getting blob metadatas: %w", err)
}
Expand Down
21 changes: 12 additions & 9 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (

var (
streamerConfig = batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
)

Expand Down Expand Up @@ -294,9 +295,10 @@ func TestEncodingFailure(t *testing.T) {
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 1e12)
workerpool := workerpool.New(5)
streamerConfig := batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
metrics := batcher.NewMetrics("9100", logger)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger)
Expand Down Expand Up @@ -467,9 +469,10 @@ func TestIncorrectParameters(t *testing.T) {
ctx := context.Background()

streamerConfig := batcher.StreamerConfig{
SRSOrder: 3000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 3000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}

encodingStreamer, c := createEncodingStreamer(t, 0, 1e12, streamerConfig)
Expand Down
2 changes: 2 additions & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Config struct {
}

func NewConfig(ctx *cli.Context) Config {

config := Config{
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
Expand All @@ -51,6 +52,7 @@ func NewConfig(ctx *cli.Context) Config {
SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name),
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
},
TimeoutConfig: batcher.TimeoutConfig{
EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "TARGET_NUM_CHUNKS"),
Value: 0,
}

MaxBlobsToFetchFromStoreFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"),
Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination",
Required: true,
siddimore marked this conversation as resolved.
Show resolved Hide resolved
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"),
Value: 1,
siddimore marked this conversation as resolved.
Show resolved Hide resolved
}
)

var requiredFlags = []cli.Flag{
Expand Down
72 changes: 69 additions & 3 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,32 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status

// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit
// along with items, also returns a pagination token that can be used to fetch the next set of items
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, map[string]types.AttributeValue, error) {
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) {

var attributeMap map[string]types.AttributeValue = nil
siddimore marked this conversation as resolved.
Show resolved Hide resolved
var err error

// Convert the exclusive start key to a map of AttributeValue
if exclusiveStartKey != nil {
attributeMap, err = convertExclusiveBlobStoreStartKeyToAttributeValueMap(exclusiveStartKey)
if err != nil {
return nil, nil, err
}
}

queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
}}, limit, exclusiveStartKey)
}}, limit, attributeMap)
if err != nil {
return nil, nil, err
}

// When their are no more results to fetch, the LastEvaluatedKey is nil
siddimore marked this conversation as resolved.
Show resolved Hide resolved
if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil {
return nil, nil, nil
}

metadata := make([]*disperser.BlobMetadata, len(queryResult.Items))
for i, item := range queryResult.Items {
metadata[i], err = UnmarshalBlobMetadata(item)
Expand All @@ -119,7 +136,13 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co
if lastEvaluatedKey == nil {
return metadata, nil, nil
}
return metadata, lastEvaluatedKey, nil

// Convert the last evaluated key to a disperser.ExclusiveBlobStoreStartKey
exclusiveStartKey, err = converTypeAttributeValuetToExclusiveBlobStoreStartKey(lastEvaluatedKey)
if err != nil {
return nil, nil, err
}
return metadata, exclusiveStartKey, nil
}

func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) {
Expand Down Expand Up @@ -377,3 +400,46 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*disperser.BlobMetadata, e

return &metadata, nil
}

func converTypeAttributeValuetToExclusiveBlobStoreStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.ExclusiveBlobStoreStartKey, error) {
siddimore marked this conversation as resolved.
Show resolved Hide resolved
siddimore marked this conversation as resolved.
Show resolved Hide resolved
key := disperser.ExclusiveBlobStoreStartKey{}

if bs, ok := exclusiveStartKeyMap["BlobStatus"].(*types.AttributeValueMemberN); ok {
blobStatus, err := strconv.ParseInt(bs.Value, 10, 32)
if err != nil {
return nil, fmt.Errorf("error parsing BlobStatus: %v", err)
}
key.BlobStatus = int32(blobStatus)
}

if ra, ok := exclusiveStartKeyMap["RequestedAt"].(*types.AttributeValueMemberN); ok {
requestedAt, err := strconv.ParseInt(ra.Value, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing RequestedAt: %v", err)
}
key.RequestedAt = requestedAt
}

if bh, ok := exclusiveStartKeyMap["BlobHash"].(*types.AttributeValueMemberS); ok {
key.BlobHash = bh.Value
}

if mh, ok := exclusiveStartKeyMap["MetadataHash"].(*types.AttributeValueMemberS); ok {
key.MetadataHash = mh.Value
}

return &key, nil
}

func convertExclusiveBlobStoreStartKeyToAttributeValueMap(s *disperser.ExclusiveBlobStoreStartKey) (map[string]types.AttributeValue, error) {
if s == nil {
// Return an empty map or nil, depending on your application logic
return nil, nil
}

av, err := attributevalue.MarshalMap(s)
if err != nil {
return nil, err
}
return av, nil
}
10 changes: 10 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) {
})
}

func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) {
ctx := context.Background()
// Query BlobMetadataStore for a blob that does not exist
// This should return nil for both the blob and lastEvaluatedKey
processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil)
assert.NoError(t, err)
assert.Nil(t, processing)
assert.Nil(t, lastEvaluatedKey)
}

func deleteItems(t *testing.T, keys []commondynamodb.Key) {
_, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys)
assert.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (s *SharedBlobStore) GetBlobMetadataByStatus(ctx context.Context, blobStatu
return s.blobMetadataStore.GetBlobMetadataByStatus(ctx, blobStatus)
}

func (s *SharedBlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) {
return s.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, blobStatus, limit, exclusiveStartKey)
}

func (s *SharedBlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) {
return s.blobMetadataStore.GetBlobMetadataInBatch(ctx, batchHeaderHash, blobIndex)
}
Expand Down
Loading
Loading