Skip to content

Commit

Permalink
Merge pull request #214 from vigneshshettyin/feature-rabbitmq
Browse files Browse the repository at this point in the history
Migrating from Kafka to RabbitMQ
  • Loading branch information
vigneshshettyin authored Oct 20, 2024
2 parents 22d2f7d + d23cc9d commit 3a8a97f
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 158 deletions.
18 changes: 9 additions & 9 deletions kafka-clickhouse/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion kafka-clickhouse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"scripts": {
"dev": "nodemon --env-file=.env build/src/main.js",
"kafka:worker": "nodemon --env-file=.env build/src/services/redis/worker.js",
"kafka:worker:test": "nodemon --env-file=.env build/src/services/redis/subscriber.js",
"start": "node build/src/main.js",
"clean": "rimraf coverage build tmp",
"prebuild": "npm run lint",
Expand All @@ -41,8 +42,8 @@
"@clickhouse/client": "^1.1.0",
"express": "^4.19.2",
"ioredis": "^5.4.1",
"kafkajs": "^2.2.4",
"node-fetch": "^3.3.2",
"rabbitmq-client": "^5.0.0",
"tslib": "~2.6"
},
"volta": {
Expand Down
4 changes: 3 additions & 1 deletion kafka-clickhouse/src/controller/healthcheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ class HealthCheckController {
}
}

export default new HealthCheckController();
const instance = new HealthCheckController();

export default instance;
63 changes: 63 additions & 0 deletions kafka-clickhouse/src/fixtures/rabbitmq_clickhouse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- Create a new database named 'eurl_data'
CREATE DATABASE IF NOT EXISTS eurl_data;

-- Switch to the 'eurl_data' database to ensure all subsequent operations are performed within this database
USE eurl_data;

-- ===========================
-- TABLE: 'click_analytics_rq'
-- ===========================
-- Create a table named 'click_analytics_rq' to consume data from a RabbitMQ queue
-- The table schema includes columns for code, browser, os, device, country, region, and city
-- LowCardinality is used for columns with a limited number of unique values to optimize storage
-- The table uses the RabbitMQ engine, with settings specified for the RabbitMQ broker, routing key, exchange name, and message format

CREATE TABLE IF NOT EXISTS eurl_data.click_analytics_rq (
`code` String, -- A unique code representing the click analytics data
`browser` LowCardinality(String), -- The browser type (e.g., Chrome, Firefox) with LowCardinality to optimize storage
`os` LowCardinality(String), -- The operating system (e.g., Windows, Linux), optimized with LowCardinality
`device` LowCardinality(String), -- The type of device (e.g., Desktop, Mobile), optimized with LowCardinality
`country` LowCardinality(String), -- The country of the user, optimized with LowCardinality
`region` String, -- The region or state where the user is located
`city` String -- The city where the user is located
) ENGINE = RabbitMQ -- Use RabbitMQ as the table engine for real-time data ingestion
SETTINGS
rabbitmq_host_port = '172.17.0.5:5672', -- RabbitMQ broker address
rabbitmq_routing_key_list = 'eurl_click_analytics', -- RabbitMQ routing key for message filtering
rabbitmq_exchange_name = 'exchange', -- The RabbitMQ exchange name where messages are published
rabbitmq_format = 'JSONEachRow'; -- Format for incoming messages from RabbitMQ (JSON format, one message per row)

-- =======================
-- TABLE: 'click_analytics'
-- =======================
-- Create a table named 'click_analytics' to store processed click analytics data
-- The table schema includes columns for code, browser, os, device, country, region, city, and a timestamp for the event
-- The MergeTree engine is used for high-performance OLAP queries
-- Data is partitioned by month and ordered by multiple columns for efficient querying

CREATE TABLE IF NOT EXISTS eurl_data.click_analytics
(
`code` String, -- A unique code representing the click analytics data
`browser` LowCardinality(String), -- Browser type (Chrome, Firefox), optimized with LowCardinality
`os` LowCardinality(String), -- Operating system (Windows, macOS), optimized with LowCardinality
`device` LowCardinality(String), -- Type of device (Desktop, Mobile), optimized with LowCardinality
`country` LowCardinality(String), -- The country where the user is located, optimized with LowCardinality
`region` String, -- The region or state of the user
`city` String, -- The city of the user
`timestamp` Date DEFAULT toDate(now()) -- A timestamp for the event, defaulting to the current date
)
ENGINE = MergeTree -- Use MergeTree for efficient OLAP queries
PARTITION BY toYYYYMM(timestamp) -- Partition data by month (YYYYMM) based on the timestamp
ORDER BY (code, timestamp, browser, os, device, country) -- Order data by multiple columns to speed up queries
SETTINGS index_granularity = 8192; -- Set the granularity of the primary key index for efficient data access

-- =======================================
-- MATERIALIZED VIEW: 'click_analytics_mv'
-- =======================================
-- Create a materialized view to automatically transfer data from the RabbitMQ table to the analytics table
-- The materialized view watches the 'click_analytics_rq' table and inserts the data into 'click_analytics' as it arrives

CREATE MATERIALIZED VIEW IF NOT EXISTS eurl_data.click_analytics_mv
TO eurl_data.click_analytics -- Target table where processed data will be stored
AS
SELECT * FROM eurl_data.click_analytics_rq; -- Select all data from the RabbitMQ table for insertion into the analytics table
93 changes: 0 additions & 93 deletions kafka-clickhouse/src/services/kafka/connection.ts

This file was deleted.

52 changes: 0 additions & 52 deletions kafka-clickhouse/src/services/kafka/producer.ts

This file was deleted.

4 changes: 3 additions & 1 deletion kafka-clickhouse/src/services/location/location.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ class UserLocationService {
}
}

export default new UserLocationService();
const instance = new UserLocationService();

export default instance;
14 changes: 14 additions & 0 deletions kafka-clickhouse/src/services/rabbitmq/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {Connection} from 'rabbitmq-client'


const rabbitmqConnection = new Connection(process.env.RABBITMQ_URI);

rabbitmqConnection.on('error', (error) => {
console.error('RabbitMQ connection error:', error);
});

rabbitmqConnection.on('connection', () => {
console.log('Connection successfully (re)established')
})

export default rabbitmqConnection;
60 changes: 60 additions & 0 deletions kafka-clickhouse/src/services/rabbitmq/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import rabbitmqConnection from './connection.js';
import UserLocationService from '../location/location.js';
import { ProduceMessage, UserLocation } from './types.js';


const exchangeName = 'exchange'; // The exchange name
const routingKey = 'eurl_click_analytics'; // The routing key

const pub = rabbitmqConnection.createPublisher({
// Enable publish confirmations, similar to consumer acknowledgements
confirm: true,
// Enable retries
maxAttempts: 2,
// Ensure the existence of an exchange before we use it
exchanges: [{ exchange: exchangeName, type: 'fanout', durable: true }],
});

// Publish a message for testing
// pub.send({ exchange: exchangeName, routingKey: routingKey }, {
// code: '123',
// browser: 'Chrome',
// os: 'Windows',
// device: 'Desktop',
// country: 'US',
// region: 'CA',
// city: 'Los Angeles',
// })
// .then(() => console.log('Message published'))
// .catch((error) => console.error('Error publishing message:', error));

// TODO: Implement the Producer class
// TODO: Make it batch processing

class Producer {
async produceLogic(ip: string, browser: string, os: string, device: string, code: string): Promise<void> {
const location: UserLocation = await UserLocationService.getUserLocation(ip);
const message: ProduceMessage = {
code,
browser,
os,
device,
country: location.country,
region: location.region,
city: location.city,
};
console.log('Producing message:', message);
pub.send(
{exchange: exchangeName, routingKey: routingKey}, // metadata
message
).catch((error) => {
console.error('Error producing message:', error);
}).finally(() => {
console.log('Message produced');
});
}
}

const instance = new Producer();

export default instance;
2 changes: 1 addition & 1 deletion kafka-clickhouse/src/services/redis/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// worker.js
import { parentPort } from 'worker_threads';
import RedisQueue from './connection.js';
import KafkaProducer from '../kafka/producer.js';
import KafkaProducer from '../rabbitmq/producer.js';

let running = true;

Expand Down

0 comments on commit 3a8a97f

Please sign in to comment.