Skip to content

Commit

Permalink
code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshettyin committed Jun 3, 2024
1 parent 123dc68 commit 2280f12
Showing 1 changed file with 10 additions and 18 deletions.
28 changes: 10 additions & 18 deletions kafka/redis/sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,53 @@ const ProducerController = require("../controller/producer");

const redis = new Redis(process.env.REDIS_URI);

// Queue implementation
class RedisQueue {
constructor(redisClient, queueName) {
this.redisClient = redisClient;
this.queueName = queueName;
}

// Add a message to the queue
async enqueue(message) {
await this.redisClient.lpush(this.queueName, JSON.stringify(message));
}

// Get a message from the queue with blocking pop
async dequeue(timeout = 0) {
const result = await this.redisClient.brpop(this.queueName, timeout);
if (result) {
const [, message] = result;
return JSON.parse(message);
}
return null; // Return null if timeout occurred and no message was available
return null;
}
}

let running = true;

async function processQueue() {
const queue = new RedisQueue(redis, 'user_anlytics');
const queue = new RedisQueue(redis, "user_anlytics");

while (running) {
try {
const message = await queue.dequeue(10);
if (message) {
console.log('Dequeued message:', message);
const { ip, browser, os, device, code } = JSON.parse(message);
const { ip, browser, os, device, code } = message;
ProducerController.produce_logic(ip, browser, os, device, code);
}
} catch (error) {
console.error('Error processing queue:', error);
console.error("Error processing queue:", error);
}
}

// Close the connection when done
redis.quit();
}

// Handle termination signals
process.on('SIGINT', () => {
console.log('Received SIGINT. Shutting down...');
process.on("SIGINT", () => {
console.log("Received SIGINT. Shutting down...");
running = false;
});

process.on('SIGTERM', () => {
console.log('Received SIGTERM. Shutting down...');
process.on("SIGTERM", () => {
console.log("Received SIGTERM. Shutting down...");
running = false;
});

// Start processing the queue
processQueue();
processQueue();

0 comments on commit 2280f12

Please sign in to comment.