-
Notifications
You must be signed in to change notification settings - Fork 0
/
backgroundJob.js
100 lines (90 loc) · 3.2 KB
/
backgroundJob.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
const Consumer = require("./consumer");
let consumerRabbitInstance;
const processMessage = (message) => {
// console.log("[x] %s: %s", message.properties, message.content.toString());
let json = JSON.parse(message.content.toString());
//simulate job processing between 500ms to 1s
let forwardData = {
messageId: message.properties.messageId,
replyTo: message.properties.replyTo,
headers: {
retries: message.properties.headers.retries,
jobType: message.properties.headers.jobType,
},
};
if (json.jobId % 2 === 0) {
setTimeout(() => {
// console.log("->", consumerRabbitInstance);
consumerRabbitInstance.channel.sendToQueue(
message.properties.replyTo,
Buffer.from(JSON.stringify(json)),
forwardData
);
consumerRabbitInstance.channel.ack(message);
}, Math.ceil(Math.random() * (1000 - 500 + 1) + 500));
} else {
//simulating some error which occurred by setting the error block which will be checked when sent back
json.error = "LOL TESTING";
setTimeout(() => {
// console.log("->", consumerRabbitInstance);
//trying to simulate a failure kind of thing, send it to be requeue and tried again
//currently this is always leading to infinite times being requeued since there is no way to keep track of retires in this setup
consumerRabbitInstance.channel.sendToQueue(
message.properties.replyTo,
Buffer.from(JSON.stringify(json)),
forwardData
);
consumerRabbitInstance.channel.nack(message, false, false);
}, Math.ceil(Math.random() * (1000 - 500 + 1) + 500));
}
};
const setupConsumer = async () => {
consumerRabbitInstance = new Consumer();
const jobTypes = ["some_queue", "another_queue"];
await consumerRabbitInstance.initialize(jobTypes);
//tell it to fetch 1 job at a time for processing (kind of like concurrency)
consumerRabbitInstance.channel.prefetch(10);
// await consumerRabbitInstance.channel.assertQueue("some_queue", {
// durable: true,
// });
// await consumerRabbitInstance.channel.assertQueue("another_queue", {
// durable: true,
// });
// await consumerRabbitInstance.channel.assertQueue("response_queue", {
// durable: true,
// });
// await consumerRabbitInstance.channel.bindQueue(
// "some_queue",
// "backgroundJob",
// "sometype"
// );
//
// await consumerRabbitInstance.channel.bindQueue(
// "another_queue",
// "backgroundJob",
// "anothertype"
// );
//
// await consumerRabbitInstance.channel.bindQueue(
// "response_queue",
// "backgroundJob"
// );
consumerRabbitInstance.on("error", (err, job) => {
console.log("ERROR: ", err);
});
consumerRabbitInstance.on("failed", (err, job) => {
console.log("ERROR(on failed): ", err);
});
consumerRabbitInstance.channel.consume("some_queue", processMessage, {
noAck: false,
});
consumerRabbitInstance.channel.consume("another_queue", processMessage, {
noAck: false,
});
// consumerRabbitInstance.channel.consume("response_queue", processResponse, {
// noAck: true,
// });
// await consumerRabbitInstance.addResponseHandler(processResponse);
process.send("INITIALIZED CONSUMER");
};
setupConsumer();