Skip to content

Commit

Permalink
Fix lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
graduta committed Oct 9, 2024
1 parent 141ef59 commit 92b1e68
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 117 deletions.
2 changes: 1 addition & 1 deletion Control/lib/adapters/DcsIntegratedEventAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ class DcsIntegratedEventAdapter {
}
}

exports.DcsIntegratedEventAdapter = DcsIntegratedEventAdapter;
exports.DcsIntegratedEventAdapter = DcsIntegratedEventAdapter;
132 changes: 66 additions & 66 deletions Control/lib/control-core/AliEcsEventMessagesConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,79 +31,79 @@ const EventMessage = root.lookupType('events.Event');
* - https://github.com/AliceO2Group/Bookkeeping/blob/main/lib/server/kafka/AliEcsEventMessagesConsumer.js
*/
class AliEcsEventMessagesConsumer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
*/
constructor(kafkaClient, groupId, topics) {
this.consumer = kafkaClient.consumer({ groupId });
this._topics = topics;

/**
* @type {MessageReceivedCallback[]}
* @private
*/
this._listeners = [];

this._logger = LogManager.getLogger('cog/ecs-event-consumer');
}
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
*/
constructor(kafkaClient, groupId, topics) {
this.consumer = kafkaClient.consumer({ groupId });
this._topics = topics;

/**
* Register a listener to listen on event message being received
*
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
*
* @param {MessageReceivedCallback} listener the listener to register
* @return {void}
* @type {MessageReceivedCallback[]}
* @private
*/
onMessageReceived(listener) {
this._listeners.push(listener);
}
this._listeners = [];

/**
* Start the kafka consumer
*
* @return {Promise<void>} Resolves once the consumer started to consume messages
*/
async start() {
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
await this.consumer.connect();
await this.consumer.subscribe({ topics: this._topics });
await this.consumer.run({
eachMessage: async ({ message, topic }) => {
const error = EventMessage.verify(message.value);
if (error) {
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
return;
}
await this._handleEvent(
EventMessage.toObject(
EventMessage.decode(message.value),
{ enums: String },
)
);
},
});
}
this._logger = LogManager.getLogger('cog/ecs-event-consumer');
}

/**
* Call every registered listeners by passing the given message to it
*
* @param {EventMessage} message the message to pass to listeners
* @return {void}
*/
async _handleEvent(message) {
for (const listener of this._listeners) {
try {
await listener(message);
} catch (error) {
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
}
/**
* Register a listener to listen on event message being received
*
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
*
* @param {MessageReceivedCallback} listener the listener to register
* @return {void}
*/
onMessageReceived(listener) {
this._listeners.push(listener);
}

/**
* Start the kafka consumer
*
* @return {Promise<void>} Resolves once the consumer started to consume messages
*/
async start() {
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
await this.consumer.connect();
await this.consumer.subscribe({ topics: this._topics });
await this.consumer.run({
eachMessage: async ({ message, topic }) => {
const error = EventMessage.verify(message.value);
if (error) {
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);

Check failure on line 79 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 10 spaces but found 12

Check failure on line 79 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 10 spaces but found 12
return;

Check failure on line 80 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 10 spaces but found 12

Check failure on line 80 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 10 spaces but found 12
}
await this._handleEvent(
EventMessage.toObject(

Check failure on line 83 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 10 spaces but found 12

Check failure on line 83 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 10 spaces but found 12
EventMessage.decode(message.value),

Check failure on line 84 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 12 spaces but found 16

Check failure on line 84 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 12 spaces but found 16
{ enums: String },

Check failure on line 85 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 12 spaces but found 16

Check failure on line 85 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 12 spaces but found 16
)

Check failure on line 86 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

Expected indentation of 10 spaces but found 12

Check failure on line 86 in Control/lib/control-core/AliEcsEventMessagesConsumer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

Expected indentation of 10 spaces but found 12
);
},
});
}

/**
* Call every registered listeners by passing the given message to it
*
* @param {EventMessage} message the message to pass to listeners
* @return {void}
*/
async _handleEvent(message) {
for (const listener of this._listeners) {
try {
await listener(message);
} catch (error) {
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
}
}
}
}

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
99 changes: 50 additions & 49 deletions Control/lib/control-core/AliEcsSynchronizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,57 +24,58 @@ const SOR_EVENT_NAME = 'readout-dataflow.dcs.sor';
* Utility synchronizing AliECS data into control-gui, listening to kafka
*/
class AliEcsSynchronizer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient - configured kafka client
* @param {CacheService} cacheService - instance of CacheService
*/
constructor(kafkaClient, cacheService) {
this._cacheService = cacheService;
this._logger = LogManager.getLogger('cog/ali-ecs-synchronizer');
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient - configured kafka client
* @param {CacheService} cacheService - instance of CacheService
*/
constructor(kafkaClient, cacheService) {
this._cacheService = cacheService;
this._logger = LogManager.getLogger('cog/ali-ecs-synchronizer');

this._ecsIntegratedServiceConsumer = new AliEcsEventMessagesConsumer(kafkaClient, INTEGRATED_SERVICES_CONSUMER_GROUP, INTEGRATED_SERVICES_TOPICS);
this._ecsIntegratedServiceConsumer.onMessageReceived(async (eventMessage) => {
const { timestamp, integratedServiceEvent } = eventMessage;
try {
if (integratedServiceEvent.name === SOR_EVENT_NAME) {
const dcsSorEvent = DcsIntegratedEventAdapter.buildDcsIntegratedEvent(integratedServiceEvent, timestamp);
console.log(dcsSorEvent)
if (!dcsSorEvent) return;
const { environmentId } = dcsSorEvent;
let cachedDcsSteps = this._cacheService.getByKey(CacheKeys.DCS.SOR);
if (!cachedDcsSteps) {
cachedDcsSteps = {};
}
if (!cachedDcsSteps?.[environmentId]) {
cachedDcsSteps[environmentId] = {
displayCache: true,
dcsOperations: [dcsSorEvent]
};
} else {
cachedDcsSteps[environmentId].dcsOperations.push(dcsSorEvent);
}
cachedDcsSteps[environmentId].dcsOperations.sort((a, b) => a.timestamp - b.timestamp);
this._cacheService.updateByKeyAndBroadcast(CacheKeys.DCS.SOR, cachedDcsSteps, {command: CacheKeys.DCS.SOR});
}
} catch (error) {
this._logger.errorMessage(`Error when parsing event message: ${error.message}\n${error.trace}`);
}
});
}
this._ecsIntegratedServiceConsumer = new AliEcsEventMessagesConsumer(kafkaClient, INTEGRATED_SERVICES_CONSUMER_GROUP, INTEGRATED_SERVICES_TOPICS);

Check failure on line 37 in Control/lib/control-core/AliEcsSynchronizer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

This line has a length of 150. Maximum allowed is 120

Check failure on line 37 in Control/lib/control-core/AliEcsSynchronizer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

This line has a length of 150. Maximum allowed is 120
this._ecsIntegratedServiceConsumer.onMessageReceived(async (eventMessage) => {
const { timestamp, integratedServiceEvent } = eventMessage;
try {
if (integratedServiceEvent.name === SOR_EVENT_NAME) {
const dcsSorEvent = DcsIntegratedEventAdapter.buildDcsIntegratedEvent(integratedServiceEvent, timestamp);
if (!dcsSorEvent) {
return;
}
const { environmentId } = dcsSorEvent;
let cachedDcsSteps = this._cacheService.getByKey(CacheKeys.DCS.SOR);
if (!cachedDcsSteps) {
cachedDcsSteps = {};
}
if (!cachedDcsSteps?.[environmentId]) {
cachedDcsSteps[environmentId] = {
displayCache: true,
dcsOperations: [dcsSorEvent]
};
} else {
cachedDcsSteps[environmentId].dcsOperations.push(dcsSorEvent);
}
cachedDcsSteps[environmentId].dcsOperations.sort((a, b) => a.timestamp - b.timestamp);
this._cacheService.updateByKeyAndBroadcast(CacheKeys.DCS.SOR, cachedDcsSteps, {command: CacheKeys.DCS.SOR});
}
} catch (error) {
this._logger.errorMessage(`Error when parsing event message: ${error.message}\n${error.trace}`);
}
});
}

/**
* Start the synchronization process
*
* @return {void}
*/
start() {
this._logger.infoMessage('Starting to consume AliECS messages for integrated services');
this._ecsIntegratedServiceConsumer
.start()
.catch((error) => this._logger.errorMessage(`Error when starting ECS integrated services consumer: ${error.message}\n${error.trace}`));
}
/**
* Start the synchronization process
*
* @return {void}
*/
start() {
this._logger.infoMessage('Starting to consume AliECS messages for integrated services');
this._ecsIntegratedServiceConsumer
.start()
.catch((error) => this._logger.errorMessage(`Error when starting ECS integrated services consumer: ${error.message}\n${error.trace}`));

Check failure on line 77 in Control/lib/control-core/AliEcsSynchronizer.js

View workflow job for this annotation

GitHub Actions / Tests on macos-latest

This line has a length of 141. Maximum allowed is 120

Check failure on line 77 in Control/lib/control-core/AliEcsSynchronizer.js

View workflow job for this annotation

GitHub Actions / Tests & coverage on ubuntu-latest

This line has a length of 141. Maximum allowed is 120
}
}

exports.AliEcsSynchronizer = AliEcsSynchronizer;
2 changes: 1 addition & 1 deletion Control/public/app.css
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@

.grid-container { display: grid; gap: 1 rem; grid-template-columns: repeat(auto-fill, minmax(150px, 1fr)); }
.bg-yellow { background-color: #e6e200; }
.italic { font-style: italic; }
.italic { font-style: italic; }

0 comments on commit 92b1e68

Please sign in to comment.