Skip to content

Commit

Permalink
Merge pull request #462 from travis-ci/meat-consumer-priority
Browse files Browse the repository at this point in the history
Add support for setting RabbitMQ consumer priority
  • Loading branch information
meatballhat authored Jun 21, 2018
2 parents 55dd824 + dbc10a9 commit 5501227
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 28 deletions.
16 changes: 13 additions & 3 deletions amqp_job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (

// AMQPJobQueue is a JobQueue that uses AMQP
type AMQPJobQueue struct {
conn *amqp.Connection
queue string
conn *amqp.Connection
queue string
priority int

stateUpdatePool *tunny.Pool

Expand Down Expand Up @@ -112,7 +113,16 @@ func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err erro
return
}

deliveries, err := jobsChannel.Consume(q.queue, "build-job-consumer", false, false, false, false, nil)
deliveries, err := jobsChannel.Consume(
q.queue, // queue
"build-job-consumer", // consumer

false, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
amqp.Table{"x-priority": int64(q.priority)}) // args

if err != nil {
return
}
Expand Down
5 changes: 5 additions & 0 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ func (i *CLI) buildAMQPJobQueueAndCanceller() (*AMQPJobQueue, *AMQPCanceller, er
i.logger.WithField("canceller", fmt.Sprintf("%#v", canceller)).Debug("built")

jobQueue, err := NewAMQPJobQueue(amqpConn, i.Config.QueueName, i.Config.StateUpdatePoolSize)

// Set the consumer priority directly instead of altering the signature of
// NewAMQPJobQueue :sigh_cat:
jobQueue.priority = i.Config.AmqpConsumerPriority

if err != nil {
return nil, nil, err
}
Expand Down
55 changes: 30 additions & 25 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ var (
Value: 10 * time.Second,
Usage: "The heartbeat timeout value defines after what time the peer TCP connection should be considered unreachable",
}),
NewConfigDef("AmqpConsumerPriority", &cli.IntFlag{
Value: 0,
Usage: "The consumer priority to set when consuming jobs",
}),
NewConfigDef("AmqpURI", &cli.StringFlag{
Value: defaultAmqpURI,
Usage: `The URI to the AMQP server to connect to (only valid for "amqp" queue type)`,
Expand Down Expand Up @@ -326,31 +330,32 @@ func NewConfigDef(fieldName string, flag cli.Flag) *ConfigDef {

// Config contains all the configuration needed to run the worker.
type Config struct {
ProviderName string `config:"provider-name"`
QueueType string `config:"queue-type"`
AmqpURI string `config:"amqp-uri"`
AmqpInsecure bool `config:"amqp-insecure"`
AmqpTlsCert string `config:"amqp-tls-cert"`
AmqpTlsCertPath string `config:"amqp-tls-cert-path"`
AmqpHeartbeat time.Duration `config:"amqp-heartbeat"`
BaseDir string `config:"base-dir"`
PoolSize int `config:"pool-size"`
BuildAPIURI string `config:"build-api-uri"`
QueueName string `config:"queue-name"`
LibratoEmail string `config:"librato-email"`
LibratoToken string `config:"librato-token"`
LibratoSource string `config:"librato-source"`
LogsAmqpURI string `config:"logs-amqp-uri"`
LogsAmqpTlsCert string `config:"logs-amqp-tls-cert"`
LogsAmqpTlsCertPath string `config:"logs-amqp-tls-cert-path"`
SentryDSN string `config:"sentry-dsn"`
Hostname string `config:"hostname"`
DefaultLanguage string `config:"default-language"`
DefaultDist string `config:"default-dist"`
DefaultGroup string `config:"default-group"`
DefaultOS string `config:"default-os"`
JobBoardURL string `config:"job-board-url"`
TravisSite string `config:"travis-site"`
ProviderName string `config:"provider-name"`
QueueType string `config:"queue-type"`
AmqpURI string `config:"amqp-uri"`
AmqpInsecure bool `config:"amqp-insecure"`
AmqpTlsCert string `config:"amqp-tls-cert"`
AmqpTlsCertPath string `config:"amqp-tls-cert-path"`
AmqpHeartbeat time.Duration `config:"amqp-heartbeat"`
AmqpConsumerPriority int `config:"amqp-consumer-priority"`
BaseDir string `config:"base-dir"`
PoolSize int `config:"pool-size"`
BuildAPIURI string `config:"build-api-uri"`
QueueName string `config:"queue-name"`
LibratoEmail string `config:"librato-email"`
LibratoToken string `config:"librato-token"`
LibratoSource string `config:"librato-source"`
LogsAmqpURI string `config:"logs-amqp-uri"`
LogsAmqpTlsCert string `config:"logs-amqp-tls-cert"`
LogsAmqpTlsCertPath string `config:"logs-amqp-tls-cert-path"`
SentryDSN string `config:"sentry-dsn"`
Hostname string `config:"hostname"`
DefaultLanguage string `config:"default-language"`
DefaultDist string `config:"default-dist"`
DefaultGroup string `config:"default-group"`
DefaultOS string `config:"default-os"`
JobBoardURL string `config:"job-board-url"`
TravisSite string `config:"travis-site"`

StateUpdatePoolSize int `config:"state-update-pool-size"`
LogPoolSize int `config:"log-pool-size"`
Expand Down

0 comments on commit 5501227

Please sign in to comment.