-
Notifications
You must be signed in to change notification settings - Fork 2
/
amqp-worker.js
135 lines (110 loc) · 3.45 KB
/
amqp-worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
const amqp = require('amqplib')
const rabbitMQHost = process.env.RABBIT_MQ_HOST || 'localhost'
const rabbitMQPort = process.env.RABBIT_MQ_PORT || 5672
const rabbitMQUser = process.env.RABBIT_MQ_USER || 'showrunner'
const rabbitMQPass = process.env.RABBIT_MQ_PASS || 'showrunner'
const rabbitMQScheme = process.env.RABBIT_MQ_SCHEME || 'amqp'
class QueueWorker {
constructor (host, port, opts) {
opts = opts || {}
this.connection = void 0
this.channel = void 0
this.host = host || rabbitMQHost
this.port = port || rabbitMQPort
this.hostURL = `${rabbitMQScheme}://${rabbitMQUser}:${rabbitMQPass}@${this.host}:${this.port}/`
this.listening = false
this.assertOpts = opts.assertOpts || {}
this.consumeOpts = opts.consumeOpts || {}
this.sendOpts = opts.sendOpts || {}
}
async initialize () {
if (!this.connection) {
try {
this.connection = await amqp.connect(this.hostURL)
} catch (err) {
return this._handleError(err)
}
}
this.connection.on('error', (err) => {
this._handleError(err)
})
return this.connection
}
async getChannel () {
if (!this.connection) {
await this.initialize()
}
if (!this.channel) {
try {
this.channel = await this.connection.createChannel()
} catch (err) {
this._handleError(err)
}
}
return this.channel
}
async disconnect () {
if (typeof this.beforeDisconnect === 'function') {
await this.beforeDisconnect()
}
await this.channel.close()
await this.connection.close()
this.channel = void 0
this.connection = void 0
}
serializeMessage (msg) {
return msg
}
async messageHandler (msg) {
throw new Error('You must implement this.messageHandler')
}
async sendMessage (msg, opts) {
opts = opts || {}
if (!this.channel && !opts.channel) {
await this.getChannel()
}
const channel = opts.channel || this.channel
const queue = opts.queue || this.queue
const sendOpts = Object.assign({}, opts)
delete sendOpts.queue
delete sendOpts.channel
return new Promise((resolve, reject) => {
if (typeof this.queue !== 'string') {
const err = new Error('You must specify a queue with this.queue')
return reject(err)
}
const data = this.serializeMessage(msg)
if (!Buffer.isBuffer(data)) {
const err = new Error('msg must be a buffer')
return reject(err)
}
const options = Object.assign({}, this.sendOpts, sendOpts)
channel.sendToQueue(queue, data, options)
resolve()
})
}
async listen (assertOpts, consumeOpts) {
if (this.listening) {
throw new Error(`A listener for ${this.queue} has already been attached`)
}
if (typeof this.queue !== 'string' || this.queue.length < 1) {
throw new Error('You must specify a queue with this.queue')
}
assertOpts = Object.assign({}, this.assertOpts, assertOpts)
consumeOpts = Object.assign({}, this.consumeOpts, consumeOpts)
await this.getChannel()
await this.channel.assertQueue(this.queue, assertOpts)
const {consumerTag} = await this.channel.consume(this.queue, (msg) => this.messageHandler(msg), consumeOpts)
this.consumerTag = consumerTag
this.listening = true
return consumerTag
}
_handleError (err) {
if (typeof this.handleError === 'function') {
this.handleError(err)
} else {
throw err
}
}
}
module.exports = QueueWorker