Skip to content

Commit

Permalink
Allow tokenCh of batch controller be nil
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 11, 2024
1 parent 5d62787 commit aff73b6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 14 deletions.
29 changes: 17 additions & 12 deletions client/pkg/batch/batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (bc *Controller[T]) FetchPendingRequests(ctx context.Context, requestCh <-c
if errRet != nil {
// Something went wrong when collecting a batch of requests. Release the token and cancel collected requests
// if any.
if tokenAcquired {
if tokenAcquired && tokenCh != nil {
tokenCh <- struct{}{}
}
bc.FinishCollectedRequests(bc.finisher, errRet)
Expand All @@ -80,6 +80,9 @@ func (bc *Controller[T]) FetchPendingRequests(ctx context.Context, requestCh <-c
// If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more
// requests, and return when token is ready.
if bc.collectedRequestCount >= bc.maxBatchSize && !tokenAcquired {
if tokenCh == nil {
return nil
}

Check warning on line 85 in client/pkg/batch/batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/pkg/batch/batch_controller.go#L83-L85

Added lines #L83 - L85 were not covered by tests
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -88,17 +91,19 @@ func (bc *Controller[T]) FetchPendingRequests(ctx context.Context, requestCh <-c
}
}

select {
case <-ctx.Done():
return ctx.Err()
case req := <-requestCh:
// Start to batch when the first request arrives.
bc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
if tokenCh != nil {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-requestCh:
// Start to batch when the first request arrives.
bc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
}
}

// The token is ready. If the first request didn't arrive, wait for it.
Expand Down
61 changes: 59 additions & 2 deletions client/pkg/batch/batch_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/stretchr/testify/require"
)

const testMaxBatchSize = 20

func TestAdjustBestBatchSize(t *testing.T) {
re := require.New(t)
bc := NewController[int](20, nil, nil)
bc := NewController[int](testMaxBatchSize, nil, nil)
re.Equal(defaultBestBatchSize, bc.bestBatchSize)
bc.AdjustBestBatchSize()
re.Equal(defaultBestBatchSize-1, bc.bestBatchSize)
Expand Down Expand Up @@ -52,7 +54,7 @@ type testRequest struct {

func TestFinishCollectedRequests(t *testing.T) {
re := require.New(t)
bc := NewController[*testRequest](20, nil, nil)
bc := NewController[*testRequest](testMaxBatchSize, nil, nil)
// Finish with zero request count.
re.Zero(bc.collectedRequestCount)
bc.FinishCollectedRequests(nil, nil)
Expand Down Expand Up @@ -81,3 +83,58 @@ func TestFinishCollectedRequests(t *testing.T) {
re.Equal(context.Canceled, requests[i].err)
}
}

func TestFetchPendingRequests(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

re := require.New(t)
bc := NewController[int](testMaxBatchSize, nil, nil)
requestCh := make(chan int, testMaxBatchSize+1)
// Fetch a nil `tokenCh`.
requestCh <- 1
re.NoError(bc.FetchPendingRequests(ctx, requestCh, nil, 0))
re.Empty(requestCh)
re.Equal(1, bc.collectedRequestCount)
// Fetch a nil `tokenCh` with max batch size.
for i := range testMaxBatchSize {
requestCh <- i
}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, nil, 0))
re.Empty(requestCh)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Fetch a nil `tokenCh` with max batch size + 1.
for i := range testMaxBatchSize + 1 {
requestCh <- i
}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, nil, 0))
re.Len(requestCh, 1)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Drain the requestCh.
<-requestCh
// Fetch a non-nil `tokenCh`.
tokenCh := make(chan struct{}, 1)
requestCh <- 1
tokenCh <- struct{}{}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, tokenCh, 0))
re.Empty(requestCh)
re.Equal(1, bc.collectedRequestCount)
// Fetch a non-nil `tokenCh` with max batch size.
for i := range testMaxBatchSize {
requestCh <- i
}
tokenCh <- struct{}{}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, tokenCh, 0))
re.Empty(requestCh)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Fetch a non-nil `tokenCh` with max batch size + 1.
for i := range testMaxBatchSize + 1 {
requestCh <- i
}
tokenCh <- struct{}{}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, tokenCh, 0))
re.Len(requestCh, 1)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Drain the requestCh.
<-requestCh
}

0 comments on commit aff73b6

Please sign in to comment.