Skip to content

Commit

Permalink
Simple batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshettyin committed Nov 9, 2024
1 parent 5fc0125 commit 57b6503
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 28 deletions.
89 changes: 63 additions & 26 deletions kafka-clickhouse/src/services/rabbitmq/producer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import rabbitmqConnection from './connection.js';
import UserLocationService from '../location/location.js';
import { ProduceMessage, UserLocation } from './types.js';
import { ProduceMessage, UserLocation, SourceObjects } from './types.js';


const exchangeName = 'exchange'; // The exchange name
Expand All @@ -15,42 +15,79 @@ const pub = rabbitmqConnection.createPublisher({
exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }],
});

// Publish a message for testing
// pub.send({ exchange: exchangeName, routingKey: routingKey }, {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// })
// const test_data = [
// {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// },
// {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// },
// {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// },
// {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// }
// ]

// // Publish a message for testing
// pub.send({ exchange: exchangeName, routingKey: routingKey }, test_data)
// .then(() => console.log('Message published'))
// .catch((error) => console.error('Error publishing message:', error));

// TODO: Implement the Producer class
// TODO: Make it batch processing

class Producer {
async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise<void> {
const location: UserLocation = await UserLocationService.getUserLocation(ip);
const message: ProduceMessage = {
code,
browser,
os,
device,
country: location.country,
region: location.region,
city: location.city,
};
console.log('Producing message:', message);
async produceLogic(produceObjects : SourceObjects[]): Promise<void> {
const messages: ProduceMessage[] = [];
for (const obj of produceObjects) {
const { ip, browser, os, device, code } = obj;
const location: UserLocation = await UserLocationService.getUserLocation(ip);
const message: ProduceMessage = {
code,
browser,
os,
device,
country: location.country,
region: location.region,
city: location.city,
};
messages.push(message);
}
pub.send(
{exchange: exchangeName, routingKey: routingKey}, // metadata
message
{exchange: exchangeName, routingKey: routingKey},
messages
).catch((error) => {
console.error('Error producing message:', error);
messages.length = 0;
}).finally(() => {
console.log('Message produced');
console.log(`[Info] : Produced ${messages.length} messages at ${new Date().toISOString()}`);
messages.length = 0;
});
}
}
Expand Down
7 changes: 7 additions & 0 deletions kafka-clickhouse/src/services/rabbitmq/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ export interface ProduceMessage {
region: string | null;
city: string | null;
}
export interface SourceObjects {
ip: string;
browser: string;
os: string;
device: string;
code: string;
}
13 changes: 11 additions & 2 deletions kafka-clickhouse/src/services/redis/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@
import { parentPort } from 'worker_threads';
import RedisQueue from './connection.js';
import KafkaProducer from '../rabbitmq/producer.js';
import { SourceObjects } from '../rabbitmq/types.js';

let running = true;
const batch = 10;

async function processQueue(): Promise<void> {
const batchMessages : SourceObjects[] = [];
while (running) {
try {
const message = await RedisQueue.dequeue(10);
if (message) {
const { ip, browser, os, device, code } = message;
await KafkaProducer.produceLogic(ip, browser, os, device, code);
batchMessages.push(message);
if (batchMessages.length < batch) {
continue;
}
else {
await KafkaProducer.produceLogic(batchMessages);
batchMessages.length = 0;
}
}
} catch (error) {
console.error('Error processing queue:', error);
Expand Down

0 comments on commit 57b6503

Please sign in to comment.