Skip to content

Commit

Permalink
Workers defined for Redis Subs
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshettyin committed Jun 9, 2024
1 parent 051c8dc commit 4d22c84
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 1 deletion.
2 changes: 1 addition & 1 deletion kafka-clickhouse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
},
"scripts": {
"dev": "nodemon --env-file=.env build/src/main.js",
"kafka:worker": "nodemon --env-file=.env build/src/services/redis/subscriber.js",
"kafka:worker": "nodemon --env-file=.env build/src/services/redis/worker.js",
"start": "node build/src/main.js",
"clean": "rimraf coverage build tmp",
"prebuild": "npm run lint",
Expand Down
5 changes: 5 additions & 0 deletions kafka-clickhouse/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import express, { Application } from 'express';

import { secretKeyValidator } from './middleware.js';
import router from './router.js';
import log from './services/redis/worker.js'

const app: Application = express();

Expand All @@ -15,8 +16,12 @@ app.get('/', async (_req, res) => {
});
});



app.use('/api', router);

console.log(log());

const PORT = process.env.PORT;
app.listen(PORT, () => {
console.log(`Server is listening on port ${PORT}`);
Expand Down
1 change: 1 addition & 0 deletions kafka-clickhouse/src/services/redis/connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Redis } from 'ioredis';
import {UserDeviceInfo} from './types.js'


// Create a Redis client instance
const redis = new Redis(process.env.REDIS_URI);

Expand Down
10 changes: 10 additions & 0 deletions kafka-clickhouse/src/services/redis/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// worker.js
import { parentPort } from 'worker_threads';
import RedisQueue from './connection.js';
import KafkaProducer from '../kafka/producer.js';

Expand Down Expand Up @@ -33,3 +35,11 @@ console.log('Queue processor started. Waiting for messages...');
processQueue().catch((error) => {
console.error('Error starting queue processor:', error);
});

if (parentPort) {
parentPort.on('message', (message) => {
if (message === 'stop') {
running = false;
}
});
}
50 changes: 50 additions & 0 deletions kafka-clickhouse/src/services/redis/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Worker } from 'worker_threads';

const numWorkers = 2;
const workers = [];

import path from 'path';

const __dirname = path.resolve();

function getDynamicPath(): string {
const loc = path.resolve(__dirname, 'build/src/services/redis/subscriber.js');
return loc;
}

for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(getDynamicPath());
workers.push(worker);

worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
}
});

worker.on('error', (error) => {
console.error('Worker error:', error);
});
}

process.on('SIGINT', () => {
console.log('Received SIGINT. Shutting down...');
stopWorkers();
});

process.on('SIGTERM', () => {
console.log('Received SIGTERM. Shutting down...');
stopWorkers();
});

function stopWorkers(): void {
for (const worker of workers) {
worker.postMessage('stop');
}
}

function log(): string {
return 'Worker threads started';
}

export default log;

0 comments on commit 4d22c84

Please sign in to comment.