From c613937ebeacc5a3b1e54a7aaac8f969849ff0a7 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 18 Feb 2024 10:59:24 +0800 Subject: [PATCH] refactor: improve concurrency safety and testing - Enable race condition detection in Go tests by adding `-race` flag - Refactor `Queue` to use a local variable for `workerCount` with proper locking - Refactor `Ring` to use a local variable for `count` with proper locking and defer unlocking - Replace direct access to `busyWorkers` metric with `BusyWorkers()` method in tests Signed-off-by: Bo-Yi Wu --- .github/workflows/go.yml | 2 +- queue.go | 7 ++++++- ring.go | 9 ++++++--- ring_test.go | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 985ebdf..ffb724b 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -66,7 +66,7 @@ jobs: ${{ runner.os }}-go- - name: Run Tests run: | - go test -v -covermode=atomic -coverprofile=coverage.out + go test -race -v -covermode=atomic -coverprofile=coverage.out - name: Run Benchmark run: | diff --git a/queue.go b/queue.go index 4068f3c..3d2c024 100644 --- a/queue.go +++ b/queue.go @@ -57,7 +57,10 @@ func NewQueue(opts ...Option) (*Queue, error) { // Start to enable all worker func (q *Queue) Start() { - if q.workerCount == 0 { + q.Lock() + count := q.workerCount + q.Unlock() + if count == 0 { return } q.routineGroup.Run(func() { @@ -262,7 +265,9 @@ func (q *Queue) handle(m *job.Message) error { // UpdateWorkerCount to update worker number dynamically. func (q *Queue) UpdateWorkerCount(num int) { + q.Lock() q.workerCount = num + q.Unlock() q.schedule() } diff --git a/ring.go b/ring.go index dd70c28..5b40255 100644 --- a/ring.go +++ b/ring.go @@ -37,7 +37,10 @@ func (s *Ring) Shutdown() error { } s.stopOnce.Do(func() { - if s.count > 0 { + s.Lock() + count := s.count + s.Unlock() + if count > 0 { <-s.exit } }) @@ -75,10 +78,11 @@ func (s *Ring) Request() (core.QueuedMessage, error) { return nil, ErrQueueHasBeenClosed } + s.Lock() + defer s.Unlock() if s.count == 0 { return nil, ErrNoTaskInQueue } - s.Lock() data := s.taskQueue[s.head] s.taskQueue[s.head] = nil s.head = (s.head + 1) % len(s.taskQueue) @@ -87,7 +91,6 @@ func (s *Ring) Request() (core.QueuedMessage, error) { if n := len(s.taskQueue) / 2; n > 2 && s.count <= n { s.resize(n) } - s.Unlock() return data, nil } diff --git a/ring_test.go b/ring_test.go index 031119b..b3f90a7 100644 --- a/ring_test.go +++ b/ring_test.go @@ -142,7 +142,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)})) q.Start() time.Sleep(10 * time.Millisecond) - assert.Equal(t, 2, int(q.metric.busyWorkers)) + assert.Equal(t, 2, q.BusyWorkers()) q.Release() }