From ccca528cdfc24a2741594303a78c7f46b3e86768 Mon Sep 17 00:00:00 2001 From: martinboulais <31805063+martinboulais@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:01:49 +0200 Subject: [PATCH] Add kafka messages consumer to the Framework Add kafka messages consumer --- .github/workflows/proto-sync.yml | 4 +- Control/lib/control-core/GrpcProxy.js | 8 +- Control/test/config/apricot-grpc.js | 3 +- Control/test/config/core-grpc.js | 27 ++--- Framework/Backend/index.js | 3 + .../kafka/AliEcsEventMessagesConsumer.js | 52 ++++++++++ .../Backend/kafka/KafkaMessagesConsumer.js | 99 +++++++++++++++++++ .../protobuf/getWebUiProtoIncludeDir.js | 1 + .../Backend/protobuf}/protos/common.proto | 0 .../Backend}/protobuf/protos/events.proto | 0 10 files changed, 180 insertions(+), 17 deletions(-) create mode 100644 Framework/Backend/kafka/AliEcsEventMessagesConsumer.js create mode 100644 Framework/Backend/kafka/KafkaMessagesConsumer.js create mode 100644 Framework/Backend/protobuf/getWebUiProtoIncludeDir.js rename {Control/protobuf/protos => Framework/Backend/protobuf}/protos/common.proto (100%) rename {Control => Framework/Backend}/protobuf/protos/events.proto (100%) diff --git a/.github/workflows/proto-sync.yml b/.github/workflows/proto-sync.yml index 6fc47c8a1..d289973bc 100644 --- a/.github/workflows/proto-sync.yml +++ b/.github/workflows/proto-sync.yml @@ -19,8 +19,8 @@ jobs: run: | git clone https://github.com/AliceO2Group/Control.git aliecs cp aliecs/core/protos/o2control.proto Control/protobuf - cp aliecs/common/protos/events.proto Control/protobuf/protos - cp aliecs/common/protos/common.proto Control/protobuf/protos/protos + cp aliecs/common/protos/events.proto Framework/Backend/protos + cp aliecs/common/protos/common.proto Framework/Backend/protos/protos cp aliecs/apricot/protos/apricot.proto Control/protobuf/o2apricot.proto rm -rf aliecs - name: Check if there are any differences and create PR diff --git a/Control/lib/control-core/GrpcProxy.js b/Control/lib/control-core/GrpcProxy.js index 2c34c7d38..f80a0b37f 100644 --- a/Control/lib/control-core/GrpcProxy.js +++ b/Control/lib/control-core/GrpcProxy.js @@ -10,11 +10,12 @@ * In applying this license CERN does not waive the privileges and immunities * granted to it by virtue of its status as an Intergovernmental Organization * or submit itself to any jurisdiction. -*/ + */ // Doc: https://grpc.io/docs/languages/node/ const protoLoader = require('@grpc/proto-loader'); const grpcLibrary = require('@grpc/grpc-js'); +const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui'); const path = require('path'); const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js'); const {Status} = require(path.join(__dirname, './../../protobuf/status_pb.js')); @@ -34,7 +35,10 @@ class GrpcProxy { */ constructor(config, path) { if (this._isConfigurationValid(config, path)) { - const packageDefinition = protoLoader.loadSync(path, {longs: String, keepCase: false, arrays: true}); + const packageDefinition = protoLoader.loadSync( + path, + {longs: String, keepCase: false, arrays: true, includeDirs: [getWebUiProtoIncludeDir()]}, + ); const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition); const protoService = octlProto[this._package][this._label]; const address = `${config.hostname}:${config.port}`; diff --git a/Control/test/config/apricot-grpc.js b/Control/test/config/apricot-grpc.js index 31660ccce..b892c8938 100644 --- a/Control/test/config/apricot-grpc.js +++ b/Control/test/config/apricot-grpc.js @@ -18,6 +18,7 @@ const path = require('path'); // Doc: https://grpc.io/grpc/node/grpc.html const protoLoader = require('@grpc/proto-loader'); const grpcLibrary = require('@grpc/grpc-js'); +const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui'); const PROTO_PATH = path.join(__dirname, './../../protobuf/o2apricot.proto'); @@ -28,7 +29,7 @@ const apricotGRPCServer = (config) => { let calls = {}; const server = new grpcLibrary.Server(); - const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false}); // change to camel case + const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false, includeDirs: [getWebUiProtoIncludeDir()]}); // change to camel case const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition); const credentials = grpcLibrary.ServerCredentials.createInsecure(); const address = `${config.apricot.hostname}:${config.apricot.port}`; diff --git a/Control/test/config/core-grpc.js b/Control/test/config/core-grpc.js index 9648e54cd..27960b588 100644 --- a/Control/test/config/core-grpc.js +++ b/Control/test/config/core-grpc.js @@ -10,7 +10,7 @@ * In applying this license CERN does not waive the privileges and immunities * granted to it by virtue of its status as an Intergovernmental Organization * or submit itself to any jurisdiction. -*/ + */ /* eslint-disable require-jsdoc */ const path = require('path'); @@ -18,6 +18,7 @@ const path = require('path'); // Doc: https://grpc.io/grpc/node/grpc.html const protoLoader = require('@grpc/proto-loader'); const grpcLibrary = require('@grpc/grpc-js'); +const {getWebUiProtoIncludeDir} = require('@aliceo2/web-ui'); const PROTO_PATH = path.join(__dirname, './../../protobuf/o2control.proto'); @@ -29,7 +30,10 @@ const coreGRPCServer = (config) => { let calls = {}; const server = new grpcLibrary.Server(); - const packageDefinition = protoLoader.loadSync(PROTO_PATH, {keepCase: false});// change to camel case + const packageDefinition = protoLoader.loadSync( + PROTO_PATH, + {keepCase: false, includeDirs: [getWebUiProtoIncludeDir()]}, + );// change to camel case const octlProto = grpcLibrary.loadPackageDefinition(packageDefinition); const credentials = grpcLibrary.ServerCredentials.createInsecure(); const address = `${config.grpc.hostname}:${config.grpc.port}`; @@ -43,7 +47,7 @@ const coreGRPCServer = (config) => { calls['getEnvironments'] = true; const responseData = { frameworkId: '74917838-27cb-414d-bfcd-7e74f85d4926-0000', - environments: [envTest.environment] + environments: [envTest.environment], }; callback(null, responseData); }, @@ -130,37 +134,36 @@ const envTest = { mid_enabled: 'false', mid_something: 'test', dd_enabled: 'true', - run_type: 'run' + run_type: 'run', }, vars: { odc_enabled: 'true', mid_enabled: 'false', mid_something: 'test', dd_enabled: 'true', - run_type: 'run' + run_type: 'run', }, defaults: { dcs_topology: 'test', dd_enabled: 'true', - run_type: 'run' + run_type: 'run', }, - integratedServices: { - } + integratedServices: {}, }, workflow: {}, workflowTemplates: { workflowTemplates: [ { repo: 'git.cern.ch/some-user/some-repo/', - template: 'prettyreadout-1', revision: 'master', description: 'something' + template: 'prettyreadout-1', revision: 'master', description: 'something', }, - ] + ], }, listRepos: { repos: [ {name: 'git.cern.ch/some-user/some-repo/', default: true, defaultRevision: 'dev', revisions: ['master', 'dev']}, - {name: 'git.com/alice-user/alice-repo/'}] - } + {name: 'git.com/alice-user/alice-repo/'}], + }, }; module.exports = {coreGRPCServer}; diff --git a/Framework/Backend/index.js b/Framework/Backend/index.js index c72631575..86efd897d 100644 --- a/Framework/Backend/index.js +++ b/Framework/Backend/index.js @@ -37,6 +37,7 @@ const { updateAndSendExpressResponseFromNativeError, } = require('./errors/updateAndSendExpressResponseFromNativeError.js'); const { Logger } = require('./log/Logger'); +const {getWebUiProtoIncludeDir} = require('./protobuf/getWebUiProtoIncludeDir'); exports.ConsulService = ConsulService; @@ -82,3 +83,5 @@ exports.UnauthorizedAccessError = UnauthorizedAccessError; exports.grpcErrorToNativeError = grpcErrorToNativeError; exports.updateAndSendExpressResponseFromNativeError = updateAndSendExpressResponseFromNativeError; + +exports.getWebUiProtoIncludeDir = getWebUiProtoIncludeDir; diff --git a/Framework/Backend/kafka/AliEcsEventMessagesConsumer.js b/Framework/Backend/kafka/AliEcsEventMessagesConsumer.js new file mode 100644 index 000000000..5c2a45da5 --- /dev/null +++ b/Framework/Backend/kafka/AliEcsEventMessagesConsumer.js @@ -0,0 +1,52 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ +const protobuf = require('protobufjs'); +const path = require('node:path'); +const { KafkaMessagesConsumer } = require('./KafkaMessagesConsumer.js'); + +const protoDir = path.resolve(__dirname, '../protos'); +const root = protobuf.loadSync(path.resolve(protoDir, 'events.proto')); +const EventMessage = root.lookupType('events.Event'); + +/** + * @callback MessageReceivedCallback + * @param {EventMessage} message received message + * @return {Promise} + */ + +/** + * Consumer that consume ECS event messages and pass them to previously-registered listeners + */ +class AliEcsEventMessagesConsumer extends KafkaMessagesConsumer { + // eslint-disable-next-line valid-jsdoc + /** + * Constructor + * + * @param {import('kafkajs').Kafka} kafkaClient configured kafka client + * @param {string} groupId the group id to use for the kafka consumer + * @param {string[]} topics the list of topics to consume + */ + constructor(kafkaClient, groupId, topics) { + super(kafkaClient, groupId, topics, EventMessage); + } + + // eslint-disable-next-line valid-jsdoc + /** + * @inheritDoc + */ + getLoggerLabel() { + return 'ALI-ECS-EVENT-CONSUMER'; + } +} + +exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer; diff --git a/Framework/Backend/kafka/KafkaMessagesConsumer.js b/Framework/Backend/kafka/KafkaMessagesConsumer.js new file mode 100644 index 000000000..f8a311dab --- /dev/null +++ b/Framework/Backend/kafka/KafkaMessagesConsumer.js @@ -0,0 +1,99 @@ +const { LogManager } = require('@aliceo2/web-ui'); + +/** + * Generic Kafka Message consumer extracting objects according to a protobuf definition + * @template T extends import('protobufjs').Type + */ +class KafkaMessagesConsumer { + // eslint-disable-next-line valid-jsdoc + /** + * Constructor + * + * @param {import('kafkajs').Kafka} kafkaClient configured kafka client + * @param {string} groupId the group id to use for the kafka consumer + * @param {string[]} topics the list of topics to consume + * @param {import('protobufjs').Type} protoType the type definition of the handled message + */ + constructor(kafkaClient, groupId, topics, protoType) { + this.consumer = kafkaClient.consumer({ groupId }); + this._topics = topics; + this._protoType = protoType; + + /** + * @type {MessageReceivedCallback[]} + * @private + */ + this._listeners = []; + + this._logger = LogManager.getLogger(this.getLoggerLabel()); + } + + /** + * Register a listener to listen on event message being received + * + * Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged + * + * @param {MessageReceivedCallback} listener the listener to register + * @return {void} + */ + onMessageReceived(listener) { + this._listeners.push(listener); + } + + /** + * Start the kafka consumer + * + * @return {Promise} Resolves once the consumer started to consume messages + */ + async start() { + this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`); + await this.consumer.connect(); + await this.consumer.subscribe({ topics: this._topics }); + await this.consumer.run({ + eachMessage: async ({ message, topic }) => { + const error = this._protoType.verify(message.value); + if (error) { + this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`); + return; + } + this._logger.debugMessage(`Received message on ${topic}`); + + try { + await this._handleEvent(this._protoType.toObject( + this._protoType.decode(message.value), + { enums: String }, + )); + } catch (error) { + this._logger.errorMessage(`Failed to convert message to object on topic ${topic}: ${error}`); + } + }, + }); + } + + /** + * Call every registered listeners by passing the given message to it + * + * @param {T} message the message to pass to listeners + * @return {void} + */ + async _handleEvent(message) { + for (const listener of this._listeners) { + try { + await listener(message); + } catch (error) { + this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`); + } + } + } + + /** + * Return the label to be used by the logger + * + * @return {string} the logger label + */ + getLoggerLabel() { + return 'EVENT-CONSUMER'; + } +} + +exports.KafkaMessagesConsumer = KafkaMessagesConsumer; diff --git a/Framework/Backend/protobuf/getWebUiProtoIncludeDir.js b/Framework/Backend/protobuf/getWebUiProtoIncludeDir.js new file mode 100644 index 000000000..7794b0b42 --- /dev/null +++ b/Framework/Backend/protobuf/getWebUiProtoIncludeDir.js @@ -0,0 +1 @@ +exports.getWebUiProtoIncludeDir = () => __dirname; diff --git a/Control/protobuf/protos/protos/common.proto b/Framework/Backend/protobuf/protos/common.proto similarity index 100% rename from Control/protobuf/protos/protos/common.proto rename to Framework/Backend/protobuf/protos/common.proto diff --git a/Control/protobuf/protos/events.proto b/Framework/Backend/protobuf/protos/events.proto similarity index 100% rename from Control/protobuf/protos/events.proto rename to Framework/Backend/protobuf/protos/events.proto