Skip to content

Commit

Permalink
client/batch: allow tokenCh of batch controller to be nil (#8903)
Browse files Browse the repository at this point in the history
ref #8690

Allow `tokenCh` of batch controller be nil.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Dec 17, 2024
1 parent 5d62787 commit e8bef5f
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 15 deletions.
37 changes: 24 additions & 13 deletions client/pkg/batch/batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,18 @@ func NewController[T any](maxBatchSize int, finisher FinisherFunc[T], bestBatchO
// It returns nil error if everything goes well, otherwise a non-nil error which means we should stop the service.
// It's guaranteed that if this function failed after collecting some requests, then these requests will be cancelled
// when the function returns, so the caller don't need to clear them manually.
// `tokenCh` is an optional parameter:
// - If it's nil, the batching process will not wait for the token to arrive to continue.
// - If it's not nil, the batching process will wait for a token to arrive before continuing.
// The token will be given back if any error occurs, otherwise it's the caller's responsibility
// to decide when to recycle the signal.
func (bc *Controller[T]) FetchPendingRequests(ctx context.Context, requestCh <-chan T, tokenCh chan struct{}, maxBatchWaitInterval time.Duration) (errRet error) {
var tokenAcquired bool
defer func() {
if errRet != nil {
// Something went wrong when collecting a batch of requests. Release the token and cancel collected requests
// if any.
if tokenAcquired {
if tokenAcquired && tokenCh != nil {
tokenCh <- struct{}{}
}
bc.FinishCollectedRequests(bc.finisher, errRet)
Expand All @@ -80,6 +85,9 @@ func (bc *Controller[T]) FetchPendingRequests(ctx context.Context, requestCh <-c
// If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more
// requests, and return when token is ready.
if bc.collectedRequestCount >= bc.maxBatchSize && !tokenAcquired {
if tokenCh == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -88,20 +96,23 @@ func (bc *Controller[T]) FetchPendingRequests(ctx context.Context, requestCh <-c
}
}

select {
case <-ctx.Done():
return ctx.Err()
case req := <-requestCh:
// Start to batch when the first request arrives.
bc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
if tokenCh != nil {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-requestCh:
// Start to batch when the first request arrives.
bc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
}
}

// The token is ready. If the first request didn't arrive, wait for it.
// After the token is ready or it's working without token,
// wait for the first request to arrive.
if bc.collectedRequestCount == 0 {
select {
case <-ctx.Done():
Expand Down
61 changes: 59 additions & 2 deletions client/pkg/batch/batch_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/stretchr/testify/require"
)

const testMaxBatchSize = 20

func TestAdjustBestBatchSize(t *testing.T) {
re := require.New(t)
bc := NewController[int](20, nil, nil)
bc := NewController[int](testMaxBatchSize, nil, nil)
re.Equal(defaultBestBatchSize, bc.bestBatchSize)
bc.AdjustBestBatchSize()
re.Equal(defaultBestBatchSize-1, bc.bestBatchSize)
Expand Down Expand Up @@ -52,7 +54,7 @@ type testRequest struct {

func TestFinishCollectedRequests(t *testing.T) {
re := require.New(t)
bc := NewController[*testRequest](20, nil, nil)
bc := NewController[*testRequest](testMaxBatchSize, nil, nil)
// Finish with zero request count.
re.Zero(bc.collectedRequestCount)
bc.FinishCollectedRequests(nil, nil)
Expand Down Expand Up @@ -81,3 +83,58 @@ func TestFinishCollectedRequests(t *testing.T) {
re.Equal(context.Canceled, requests[i].err)
}
}

func TestFetchPendingRequests(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

re := require.New(t)
bc := NewController[int](testMaxBatchSize, nil, nil)
requestCh := make(chan int, testMaxBatchSize+1)
// Fetch a nil `tokenCh`.
requestCh <- 1
re.NoError(bc.FetchPendingRequests(ctx, requestCh, nil, 0))
re.Empty(requestCh)
re.Equal(1, bc.collectedRequestCount)
// Fetch a nil `tokenCh` with max batch size.
for i := range testMaxBatchSize {
requestCh <- i
}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, nil, 0))
re.Empty(requestCh)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Fetch a nil `tokenCh` with max batch size + 1.
for i := range testMaxBatchSize + 1 {
requestCh <- i
}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, nil, 0))
re.Len(requestCh, 1)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Drain the requestCh.
<-requestCh
// Fetch a non-nil `tokenCh`.
tokenCh := make(chan struct{}, 1)
requestCh <- 1
tokenCh <- struct{}{}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, tokenCh, 0))
re.Empty(requestCh)
re.Equal(1, bc.collectedRequestCount)
// Fetch a non-nil `tokenCh` with max batch size.
for i := range testMaxBatchSize {
requestCh <- i
}
tokenCh <- struct{}{}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, tokenCh, 0))
re.Empty(requestCh)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Fetch a non-nil `tokenCh` with max batch size + 1.
for i := range testMaxBatchSize + 1 {
requestCh <- i
}
tokenCh <- struct{}{}
re.NoError(bc.FetchPendingRequests(ctx, requestCh, tokenCh, 0))
re.Len(requestCh, 1)
re.Equal(testMaxBatchSize, bc.collectedRequestCount)
// Drain the requestCh.
<-requestCh
}

0 comments on commit e8bef5f

Please sign in to comment.