Skip to content

Commit

Permalink
Merge branch 'hibiken:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
linhbkhn95 authored Dec 18, 2023
2 parents 8bed28b + fdbf54e commit e946388
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 257 deletions.
32 changes: 16 additions & 16 deletions .github/workflows/benchstat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ jobs:
runs-on: ubuntu-latest
services:
redis:
image: redis
image: redis:7
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.16.x
go-version: 1.21.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt
- name: Upload Benchmark
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: bench-incoming
path: new.txt
Expand All @@ -33,22 +33,22 @@ jobs:
runs-on: ubuntu-latest
services:
redis:
image: redis
image: redis:7
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
with:
ref: master
- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.15.x
go-version: 1.21.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a old.txt
- name: Upload Benchmark
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: bench-current
path: old.txt
Expand All @@ -58,25 +58,25 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.15.x
go-version: 1.21.x
- name: Install benchstat
run: go get -u golang.org/x/perf/cmd/benchstat
- name: Download Incoming
uses: actions/download-artifact@v2
uses: actions/download-artifact@v3
with:
name: bench-incoming
- name: Download Current
uses: actions/download-artifact@v2
uses: actions/download-artifact@v3
with:
name: bench-current
- name: Benchstat Results
run: benchstat old.txt new.txt | tee -a benchstat.txt
- name: Upload benchstat results
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: benchstat
path: benchstat.txt
18 changes: 10 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x]
go-version: [1.20.x, 1.21.x]
runs-on: ${{ matrix.os }}
services:
redis:
image: redis
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
cache: false

- name: Build core module
run: go build -v ./...
Expand All @@ -44,20 +45,21 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.18.x]
go-version: [1.20.x, 1.21.x]
runs-on: ${{ matrix.os }}
services:
redis:
image: redis
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
cache: false

- name: Build tools module
run: cd tools && go build -v ./... && cd ..
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ If you are using this package in production, **please consider sponsoring the pr

## Quickstart

Make sure you have Go installed ([download](https://golang.org/dl/)). Version `1.14` or higher is required.
Make sure you have Go installed ([download](https://golang.org/dl/)). Latest two Go versions are supported (See https://go.dev/dl).

Initialize your project by creating a folder and then running `go mod init github.com/your/repo` ([learn more](https://blog.golang.org/using-go-modules)) inside the folder. Then install Asynq library with the [`go get`](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) command:

Expand Down
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
module github.com/hibiken/asynq

go 1.14
go 1.20

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
Expand All @@ -14,5 +13,10 @@ require (
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
golang.org/x/time v0.3.0
google.golang.org/protobuf v1.31.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
12 changes: 0 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand All @@ -22,20 +18,16 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu4k=
github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -48,7 +40,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down Expand Up @@ -76,15 +67,12 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 4 additions & 1 deletion internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,7 +1284,10 @@ local res = {}
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, id in ipairs(ids) do
local key = ARGV[2] .. id
table.insert(res, redis.call("HGET", key, "msg"))
local v = redis.call("HGET", key, "msg")
if v then
table.insert(res, v)
end
end
return res
`)
Expand Down
77 changes: 40 additions & 37 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type processor struct {
// orderedQueues is set only in strict-priority mode.
orderedQueues []string

retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool
taskCheckInterval time.Duration
retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool

errHandler ErrorHandler
shutdownTimeout time.Duration
Expand Down Expand Up @@ -72,20 +73,21 @@ type processor struct {
}

type processorParams struct {
logger *log.Logger
broker base.Broker
baseCtxFn func() context.Context
retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool
syncCh chan<- *syncRequest
cancelations *base.Cancelations
concurrency int
queues map[string]int
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time.Duration
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
logger *log.Logger
broker base.Broker
baseCtxFn func() context.Context
retryDelayFunc RetryDelayFunc
taskCheckInterval time.Duration
isFailureFunc func(error) bool
syncCh chan<- *syncRequest
cancelations *base.Cancelations
concurrency int
queues map[string]int
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time.Duration
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
}

// newProcessor constructs a new processor.
Expand All @@ -96,26 +98,27 @@ func newProcessor(params processorParams) *processor {
orderedQueues = sortByPriority(queues)
}
return &processor{
logger: params.logger,
broker: params.broker,
baseCtxFn: params.baseCtxFn,
clock: timeutil.NewRealClock(),
queueConfig: queues,
orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc,
isFailureFunc: params.isFailureFunc,
syncRequestCh: params.syncCh,
cancelations: params.cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}),
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
shutdownTimeout: params.shutdownTimeout,
starting: params.starting,
finished: params.finished,
logger: params.logger,
broker: params.broker,
baseCtxFn: params.baseCtxFn,
clock: timeutil.NewRealClock(),
queueConfig: queues,
orderedQueues: orderedQueues,
taskCheckInterval: params.taskCheckInterval,
retryDelayFunc: params.retryDelayFunc,
isFailureFunc: params.isFailureFunc,
syncRequestCh: params.syncCh,
cancelations: params.cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}),
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
shutdownTimeout: params.shutdownTimeout,
starting: params.starting,
finished: params.finished,
}
}

Expand Down Expand Up @@ -178,7 +181,7 @@ func (p *processor) exec() {
// Sleep to avoid slamming redis and let scheduler move tasks into queues.
// Note: We are not using blocking pop operation and polling queues instead.
// This adds significant load to redis.
time.Sleep(time.Second)
time.Sleep(p.taskCheckInterval)
<-p.sema // release token
return
case err != nil:
Expand Down
Loading

0 comments on commit e946388

Please sign in to comment.