Skip to content

Commit

Permalink
refactor: improve concurrency safety and testing
Browse files Browse the repository at this point in the history
- Enable race condition detection in Go tests by adding `-race` flag
- Refactor `Queue` to use a local variable for `workerCount` with proper locking
- Refactor `Ring` to use a local variable for `count` with proper locking and defer unlocking
- Replace direct access to `busyWorkers` metric with `BusyWorkers()` method in tests

Signed-off-by: Bo-Yi Wu <[email protected]>
  • Loading branch information
appleboy committed Feb 18, 2024
1 parent 841b61d commit c613937
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
${{ runner.os }}-go-
- name: Run Tests
run: |
go test -v -covermode=atomic -coverprofile=coverage.out
go test -race -v -covermode=atomic -coverprofile=coverage.out
- name: Run Benchmark
run: |
Expand Down
7 changes: 6 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func NewQueue(opts ...Option) (*Queue, error) {

// Start to enable all worker
func (q *Queue) Start() {
if q.workerCount == 0 {
q.Lock()
count := q.workerCount
q.Unlock()
if count == 0 {
return
}
q.routineGroup.Run(func() {
Expand Down Expand Up @@ -262,7 +265,9 @@ func (q *Queue) handle(m *job.Message) error {

// UpdateWorkerCount to update worker number dynamically.
func (q *Queue) UpdateWorkerCount(num int) {
q.Lock()
q.workerCount = num
q.Unlock()
q.schedule()
}

Expand Down
9 changes: 6 additions & 3 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func (s *Ring) Shutdown() error {
}

s.stopOnce.Do(func() {
if s.count > 0 {
s.Lock()
count := s.count
s.Unlock()
if count > 0 {
<-s.exit
}
})
Expand Down Expand Up @@ -75,10 +78,11 @@ func (s *Ring) Request() (core.QueuedMessage, error) {
return nil, ErrQueueHasBeenClosed
}

s.Lock()
defer s.Unlock()
if s.count == 0 {
return nil, ErrNoTaskInQueue
}
s.Lock()
data := s.taskQueue[s.head]
s.taskQueue[s.head] = nil
s.head = (s.head + 1) % len(s.taskQueue)
Expand All @@ -87,7 +91,6 @@ func (s *Ring) Request() (core.QueuedMessage, error) {
if n := len(s.taskQueue) / 2; n > 2 && s.count <= n {
s.resize(n)
}
s.Unlock()

return data, nil
}
Expand Down
2 changes: 1 addition & 1 deletion ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)}))
q.Start()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 2, int(q.metric.busyWorkers))
assert.Equal(t, 2, q.BusyWorkers())
q.Release()
}

Expand Down

0 comments on commit c613937

Please sign in to comment.