Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka messages consumer to the Framework #2613

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/proto-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ jobs:
run: |
git clone https://github.com/AliceO2Group/Control.git aliecs
cp aliecs/core/protos/o2control.proto Control/protobuf
cp aliecs/common/protos/events.proto Control/protobuf/protos
cp aliecs/common/protos/common.proto Control/protobuf/protos/protos
cp aliecs/common/protos/events.proto Framework/Backend/protos
cp aliecs/common/protos/common.proto Framework/Backend/protos/protos
cp aliecs/apricot/protos/apricot.proto Control/protobuf/o2apricot.proto
rm -rf aliecs
- name: Check if there are any differences and create PR
Expand Down
8 changes: 6 additions & 2 deletions Control/lib/control-core/GrpcProxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
*/

// Doc: https://grpc.io/docs/languages/node/
const protoLoader = require('@grpc/proto-loader');
const grpcLibrary = require('@grpc/grpc-js');
const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui');
const path = require('path');
const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js');
const {Status} = require(path.join(__dirname, './../../protobuf/status_pb.js'));
Expand All @@ -34,7 +35,10 @@ class GrpcProxy {
*/
constructor(config, path) {
if (this._isConfigurationValid(config, path)) {
const packageDefinition = protoLoader.loadSync(path, {longs: String, keepCase: false, arrays: true});
const packageDefinition = protoLoader.loadSync(
path,
{longs: String, keepCase: false, arrays: true, includeDirs: [getWebUiProtoIncludeDir()]},
);
const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition);
const protoService = octlProto[this._package][this._label];
const address = `${config.hostname}:${config.port}`;
Expand Down
3 changes: 2 additions & 1 deletion Control/test/config/apricot-grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const path = require('path');
// Doc: https://grpc.io/grpc/node/grpc.html
const protoLoader = require('@grpc/proto-loader');
const grpcLibrary = require('@grpc/grpc-js');
const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui');

const PROTO_PATH = path.join(__dirname, './../../protobuf/o2apricot.proto');

Expand All @@ -28,7 +29,7 @@ const apricotGRPCServer = (config) => {
let calls = {};

const server = new grpcLibrary.Server();
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false}); // change to camel case
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false, includeDirs: [getWebUiProtoIncludeDir()]}); // change to camel case
const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition);
const credentials = grpcLibrary.ServerCredentials.createInsecure();
const address = `${config.apricot.hostname}:${config.apricot.port}`;
Expand Down
27 changes: 15 additions & 12 deletions Control/test/config/core-grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
*/
/* eslint-disable require-jsdoc */

const path = require('path');

// Doc: https://grpc.io/grpc/node/grpc.html
const protoLoader = require('@grpc/proto-loader');
const grpcLibrary = require('@grpc/grpc-js');
const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui');

const PROTO_PATH = path.join(__dirname, './../../protobuf/o2control.proto');

Expand All @@ -29,7 +30,10 @@ const coreGRPCServer = (config) => {
let calls = {};

const server = new grpcLibrary.Server();
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false});// change to camel case
const packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: false, includeDirs: [getWebUiProtoIncludeDir()]},
);// change to camel case
const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition);
const credentials = grpcLibrary.ServerCredentials.createInsecure();
const address = `${config.grpc.hostname}:${config.grpc.port}`;
Expand All @@ -43,7 +47,7 @@ const coreGRPCServer = (config) => {
calls['getEnvironments'] = true;
const responseData = {
frameworkId: '74917838-27cb-414d-bfcd-7e74f85d4926-0000',
environments: [envTest.environment]
environments: [envTest.environment],
};
callback(null, responseData);
},
Expand Down Expand Up @@ -130,37 +134,36 @@ const envTest = {
mid_enabled: 'false',
mid_something: 'test',
dd_enabled: 'true',
run_type: 'run'
run_type: 'run',
},
vars: {
odc_enabled: 'true',
mid_enabled: 'false',
mid_something: 'test',
dd_enabled: 'true',
run_type: 'run'
run_type: 'run',
},
defaults: {
dcs_topology: 'test',
dd_enabled: 'true',
run_type: 'run'
run_type: 'run',
},
integratedServices: {
}
integratedServices: {},
},
workflow: {},
workflowTemplates: {
workflowTemplates: [
{
repo: 'git.cern.ch/some-user/some-repo/',
template: 'prettyreadout-1', revision: 'master', description: 'something'
template: 'prettyreadout-1', revision: 'master', description: 'something',
},
]
],
},
listRepos: {
repos: [
{name: 'git.cern.ch/some-user/some-repo/', default: true, defaultRevision: 'dev', revisions: ['master', 'dev']},
{name: 'git.com/alice-user/alice-repo/'}]
}
{name: 'git.com/alice-user/alice-repo/'}],
},
};

module.exports = {coreGRPCServer};
3 changes: 3 additions & 0 deletions Framework/Backend/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
updateAndSendExpressResponseFromNativeError,
} = require('./errors/updateAndSendExpressResponseFromNativeError.js');
const { Logger } = require('./log/Logger');
const {getWebUiProtoIncludeDir} = require('./protobuf/getWebUiProtoIncludeDir');

Check failure on line 40 in Framework/Backend/index.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

A space is required after '{'

Check failure on line 40 in Framework/Backend/index.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

A space is required before '}'

Check failure on line 40 in Framework/Backend/index.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

A space is required after '{'

Check failure on line 40 in Framework/Backend/index.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

A space is required before '}'

Check failure on line 40 in Framework/Backend/index.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

A space is required after '{'

Check failure on line 40 in Framework/Backend/index.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

A space is required before '}'

exports.ConsulService = ConsulService;

Expand Down Expand Up @@ -82,3 +83,5 @@
exports.grpcErrorToNativeError = grpcErrorToNativeError;

exports.updateAndSendExpressResponseFromNativeError = updateAndSendExpressResponseFromNativeError;

exports.getWebUiProtoIncludeDir = getWebUiProtoIncludeDir;
52 changes: 52 additions & 0 deletions Framework/Backend/kafka/AliEcsEventMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
const protobuf = require('protobufjs');
const path = require('node:path');
const { KafkaMessagesConsumer } = require('./KafkaMessagesConsumer.js');

const protoDir = path.resolve(__dirname, '../protos');
const root = protobuf.loadSync(path.resolve(protoDir, 'events.proto'));
const EventMessage = root.lookupType('events.Event');

/**
* @callback MessageReceivedCallback
* @param {EventMessage} message received message
* @return {Promise<void>}
*/

/**
* Consumer that consume ECS event messages and pass them to previously-registered listeners
*/
class AliEcsEventMessagesConsumer extends KafkaMessagesConsumer {
// eslint-disable-next-line valid-jsdoc

Check failure on line 31 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 2 spaces but found 4

Check failure on line 31 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Definition for rule 'valid-jsdoc' was not found

Check failure on line 31 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 2 spaces but found 4

Check failure on line 31 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Definition for rule 'valid-jsdoc' was not found

Check failure on line 31 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Expected indentation of 2 spaces but found 4

Check failure on line 31 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Definition for rule 'valid-jsdoc' was not found
/**

Check failure on line 32 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 2 spaces but found 4

Check failure on line 32 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 2 spaces but found 4

Check failure on line 32 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Expected indentation of 2 spaces but found 4
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
*/
constructor(kafkaClient, groupId, topics) {

Check failure on line 39 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 2 spaces but found 4

Check failure on line 39 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 2 spaces but found 4

Check failure on line 39 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Expected indentation of 2 spaces but found 4
super(kafkaClient, groupId, topics, EventMessage);

Check failure on line 40 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 4 spaces but found 8

Check failure on line 40 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 4 spaces but found 8

Check failure on line 40 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Expected indentation of 4 spaces but found 8
}

Check failure on line 41 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 2 spaces but found 4

Check failure on line 41 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 2 spaces but found 4

Check failure on line 41 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Expected indentation of 2 spaces but found 4

// eslint-disable-next-line valid-jsdoc

Check failure on line 43 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 2 spaces but found 4

Check failure on line 43 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Definition for rule 'valid-jsdoc' was not found

Check failure on line 43 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 2 spaces but found 4

Check failure on line 43 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Definition for rule 'valid-jsdoc' was not found

Check failure on line 43 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Expected indentation of 2 spaces but found 4

Check failure on line 43 in Framework/Backend/kafka/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Test on windows-latest

Definition for rule 'valid-jsdoc' was not found
/**
* @inheritDoc
*/
getLoggerLabel() {
return 'ALI-ECS-EVENT-CONSUMER';
}
}

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
99 changes: 99 additions & 0 deletions Framework/Backend/kafka/KafkaMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
const { LogManager } = require('@aliceo2/web-ui');

/**
* Generic Kafka Message consumer extracting objects according to a protobuf definition
* @template T extends import('protobufjs').Type
*/
class KafkaMessagesConsumer {
// eslint-disable-next-line valid-jsdoc
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
* @param {import('protobufjs').Type} protoType the type definition of the handled message
*/
constructor(kafkaClient, groupId, topics, protoType) {
this.consumer = kafkaClient.consumer({ groupId });
this._topics = topics;
this._protoType = protoType;

/**
* @type {MessageReceivedCallback[]}
* @private
*/
this._listeners = [];

this._logger = LogManager.getLogger(this.getLoggerLabel());
}

/**
* Register a listener to listen on event message being received
*
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
*
* @param {MessageReceivedCallback} listener the listener to register
* @return {void}
*/
onMessageReceived(listener) {
this._listeners.push(listener);
}

/**
* Start the kafka consumer
*
* @return {Promise<void>} Resolves once the consumer started to consume messages
*/
async start() {
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
await this.consumer.connect();
await this.consumer.subscribe({ topics: this._topics });
await this.consumer.run({
eachMessage: async ({ message, topic }) => {
const error = this._protoType.verify(message.value);
if (error) {
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
return;
}
this._logger.debugMessage(`Received message on ${topic}`);

try {
await this._handleEvent(this._protoType.toObject(
this._protoType.decode(message.value),
{ enums: String },
));
} catch (error) {
this._logger.errorMessage(`Failed to convert message to object on topic ${topic}: ${error}`);
}
},
});
}

/**
* Call every registered listeners by passing the given message to it
*
* @param {T} message the message to pass to listeners
* @return {void}
*/
async _handleEvent(message) {
for (const listener of this._listeners) {
try {
await listener(message);
} catch (error) {
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
}
}
}

/**
* Return the label to be used by the logger
*
* @return {string} the logger label
*/
getLoggerLabel() {
return 'EVENT-CONSUMER';
}
}

exports.KafkaMessagesConsumer = KafkaMessagesConsumer;
1 change: 1 addition & 0 deletions Framework/Backend/protobuf/getWebUiProtoIncludeDir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
exports.getWebUiProtoIncludeDir = () => __dirname;
Loading