From 188b9e6026cb0d37ac3b951e94501b1654f4176a Mon Sep 17 00:00:00 2001 From: KillianG Date: Thu, 21 Nov 2024 16:11:49 +0100 Subject: [PATCH] Refactor NotificationConfigManager to use MongoDB change streams for cache invalidation and improve configuration retrieval with caching mechanism --- .../notification/NotificationConfigManager.js | 236 +++++++++++++----- extensions/utils/LocationStatusStream.js | 4 +- lib/api/BackbeatAPI.js | 6 +- lib/util/LocationStatusManager.js | 3 +- tests/functional/ingestion/IngestionReader.js | 4 +- 5 files changed, 181 insertions(+), 72 deletions(-) diff --git a/extensions/notification/NotificationConfigManager.js b/extensions/notification/NotificationConfigManager.js index 482b97a68..b26aec3ed 100644 --- a/extensions/notification/NotificationConfigManager.js +++ b/extensions/notification/NotificationConfigManager.js @@ -1,13 +1,80 @@ -const MongoConfigManager = require('./configManager/MongoConfigManager'); -const ZookeeperConfigManager = require('./configManager/ZookeeperConfigManager'); +const joi = require('joi'); +const semver = require('semver'); + +const { ZenkoMetrics } = require('arsenal').metrics; +const LRUCache = require('arsenal').algorithms + .cache.LRUCache; +const MongoClient = require('mongodb').MongoClient; +const ChangeStream = require('../../lib/wrappers/ChangeStream'); +const constants = require('./constants'); +const { constructConnectionString, getMongoVersion } = require('../utils/MongoUtils'); + +const paramsJoi = joi.object({ + mongoConfig: joi.object().required(), + logger: joi.object().required(), +}).required(); + +const MAX_CACHED_ENTRIES = Number(process.env.MAX_CACHED_BUCKET_NOTIFICATION_CONFIGS) + || 1000; + +// should equal true if config manager's cache was hit during a get operation +const CONFIG_MANAGER_CACHE_HIT = 'cache_hit'; +// Type of operation performed on the cache +const CONFIG_MANAGER_OPERATION_TYPE = 'op'; + +const cacheUpdates = ZenkoMetrics.createCounter({ + name: 's3_notification_config_manager_cache_updates_total', + help: 'Total number of cache updates', + labelNames: [ + CONFIG_MANAGER_OPERATION_TYPE, + ], +}); + +const configGetLag = ZenkoMetrics.createHistogram({ + name: 's3_notification_config_manager_config_get_seconds', + help: 'Time it takes in seconds to get a bucket notification config from MongoDB', + labelNames: [ + CONFIG_MANAGER_CACHE_HIT, + ], + buckets: [0.001, 0.01, 1, 10, 100, 1000], +}); + +const cachedBuckets = ZenkoMetrics.createGauge({ + name: 's3_notification_config_manager_cached_buckets_count', + help: 'Total number of cached buckets in the notification config manager', +}); + +function onConfigManagerCacheUpdate(op) { + cacheUpdates.inc({ + [CONFIG_MANAGER_OPERATION_TYPE]: op, + }); + if (op === 'add') { + cachedBuckets.inc({}); + } else if (op === 'delete') { + cachedBuckets.dec({}); + } +} + +function onConfigManagerConfigGet(cacheHit, delay) { + configGetLag.observe({ + [CONFIG_MANAGER_CACHE_HIT]: cacheHit, + }, delay); +} /** * @class NotificationConfigManager * - * @classdesc Manages bucket notification configurations + * @classdesc Manages bucket notification configurations, the configurations + * are directly retrieved from the metastore, and are locally cached. Cache + * is invalidated using MongoDB change streams. */ class NotificationConfigManager { - + /** + * @constructor + * @param {Object} params - constructor params + * @param {Object} params.mongoConfig - mongoDB config + * @param {Logger} params.logger - logger object + */ constructor(params) { joi.attempt(params, paramsJoi); this._logger = params.logger; @@ -16,27 +83,6 @@ class NotificationConfigManager { this._mongoClient = null; this._metastore = null; this._metastoreChangeStream = null; - - const { - mongoConfig, bucketMetastore, maxCachedConfigs, zkClient, zkConfig, zkPath, zkConcurrency, logger, - } = params; - if (mongoConfig) { - this._configManagerBackend = new MongoConfigManager({ - mongoConfig, - bucketMetastore, - maxCachedConfigs, - logger, - }); - } else { - this._usesZookeeperBackend = true; - this._configManagerBackend = new ZookeeperConfigManager({ - zkClient, - zkConfig, - zkPath, - zkConcurrency, - logger, - }); - } } /** @@ -50,11 +96,10 @@ class NotificationConfigManager { MongoClient.connect(mongoUrl, { replicaSet: this._mongoConfig.replicaSet, useNewUrlParser: true, - }).then((client) => { + }).then(client => { this._logger.debug('Connected to MongoDB', { method: 'NotificationConfigManager._setupMongoClient', }); - try { this._mongoClient = client.db(this._mongoConfig.database, { ignoreUndefined: true, @@ -76,7 +121,7 @@ class NotificationConfigManager { } catch (error) { return cb(error); } - }).catch((err) => { + }).catch(err => { this._logger.error('Could not connect to MongoDB', { method: 'NotificationConfigManager._setupMongoClient', error: err.message, @@ -117,55 +162,120 @@ class NotificationConfigManager { }); break; } + this._logger.debug('Change stream event processed', { + method: 'NotificationConfigManager._handleChangeStreamChange', + }); } /** - * Get bucket notification configuration - * - * @param {String} bucket - bucket - * @param {function} [cb] - callback - * @return {Object|undefined} - configuration if available or undefined + * Initializes a change stream on the metastore collection + * Only document delete and update/replace operations are + * taken into consideration to invalidate cache. + * Newly created buckets (insert operations) are not cached + * as queue populator instances read from different kafka + * partitions and so don't need the configs for all buckets + * @returns {undefined} */ - getConfig(bucket, cb) { - const val = this._configManagerBackend.getConfig(bucket); - if (!cb) { - return val; - } - if (val instanceof Promise) { - return val.then(res => cb(null, res)).catch(err => cb(err)); - } - return cb(null, val); + _setMetastoreChangeStream() { + /** + * To avoid processing irrelevant events + * we filter by the operation types and + * only project the fields needed + */ + const changeStreamPipeline = [ + { + $match: { + $or: [ + { operationType: 'delete' }, + { operationType: 'replace' }, + { operationType: 'update' }, + ] + } + }, + { + $project: { + '_id': 1, + 'operationType': 1, + 'documentKey._id': 1, + 'fullDocument._id': 1, + 'fullDocument.value.notificationConfiguration': 1 + }, + }, + ]; + this._metastoreChangeStream = new ChangeStream({ + logger: this._logger, + collection: this._metastore, + pipeline: changeStreamPipeline, + handler: this._handleChangeStreamChangeEvent.bind(this), + throwOnError: false, + useStartAfter: semver.gte(this._mongoVersion, '4.2.0'), + }); + // start watching metastore + this._metastoreChangeStream.start(); } /** - * Add/update bucket notification configuration. - * - * @param {String} bucket - bucket - * @param {Object} config - bucket notification configuration - * @return {boolean} - true if set + * Sets up the NotificationConfigManager by + * connecting to mongo and initializing the + * change stream + * @param {Function} cb callback + * @returns {undefined} */ - setConfig(bucket, config) { - return this._configManagerBackend.setConfig(bucket, config); + setup(cb) { + this._setupMongoClient(err => { + if (err) { + this._logger.error('An error occured while setting up mongo client', { + method: 'NotificationConfigManager.setup', + }); + return cb(err); + } + try { + this._setMetastoreChangeStream(); + } catch (error) { + this._logger.error('An error occured while establishing the change stream', { + method: 'NotificationConfigManager._setMetastoreChangeStream', + }); + return cb(error); + } + return cb(); + }); } /** - * Remove bucket notification configuration + * Get bucket notification configuration * * @param {String} bucket - bucket - * @return {undefined} - */ - removeConfig(bucket) { - return this._configManagerBackend.removeConfig(bucket); - } - - /** - * Setup bucket notification configuration manager - * - * @param {function} [cb] - callback - * @return {undefined} + * @return {Object|undefined} - configuration if available or undefined */ - setup(cb) { - return this._configManagerBackend.setup(cb); + async getConfig(bucket) { + const startTime = Date.now(); + // return cached config for bucket if it exists + const cachedConfig = this._cachedConfigs.get(bucket); + if (cachedConfig) { + const delay = (Date.now() - startTime) / 1000; + onConfigManagerConfigGet(true, delay); + return cachedConfig; + } + try { + // retreiving bucket metadata from the metastore + const bucketMetadata = await this._metastore.findOne({ _id: bucket }); + const bucketNotificationConfiguration = (bucketMetadata && bucketMetadata.value && + bucketMetadata.value.notificationConfiguration) || undefined; + // caching the bucket configuration + this._cachedConfigs.add(bucket, bucketNotificationConfiguration); + const delay = (Date.now() - startTime) / 1000; + onConfigManagerConfigGet(false, delay); + onConfigManagerCacheUpdate('add'); + return bucketNotificationConfiguration; + } catch (err) { + this._logger.error('An error occured when getting notification ' + + 'configuration of bucket', { + method: 'NotificationConfigManager.getConfig', + bucket, + error: err.message, + }); + return undefined; + } } } diff --git a/extensions/utils/LocationStatusStream.js b/extensions/utils/LocationStatusStream.js index 44eaac3a3..6240e09b0 100644 --- a/extensions/utils/LocationStatusStream.js +++ b/extensions/utils/LocationStatusStream.js @@ -67,7 +67,7 @@ class LocationStatusStream { replicaSet: this._mongoConfig.replicaSet, useNewUrlParser: true, useUnifiedTopology: true, - }).then((client) => { + }).then(client => { // connect to metadata DB this._mongoClient = client.db(this._mongoConfig.database, { ignoreUndefined: true, @@ -89,7 +89,7 @@ class LocationStatusStream { return cb(); }); return undefined; - }).catch((err) => { + }).catch(err => { this._log.error('Could not connect to MongoDB', { method: 'ServiceStatusManager._setupMongoClient', error: err.message, diff --git a/lib/api/BackbeatAPI.js b/lib/api/BackbeatAPI.js index 7eac1b006..874f40a4d 100644 --- a/lib/api/BackbeatAPI.js +++ b/lib/api/BackbeatAPI.js @@ -1342,7 +1342,7 @@ class BackbeatAPI { replicaSet: mongoConfig.replicaSet, useNewUrlParser: true, useUnifiedTopology: true, - }).then((client) => { + }).then(client => { // connect to metadata DB this._mongoClient = client.db(mongoConfig.database, { ignoreUndefined: true, @@ -1351,14 +1351,12 @@ class BackbeatAPI { method: 'BackbeatAPI._setupMongoClient', }); return cb(); - }).catch((err) => { - if (err) { + }).catch(err => { this._logger.error('Could not connect to MongoDB', { method: 'BackbeatAPI._setupMongoClient', error: err.message, }); return cb(err); - } }); } } diff --git a/lib/util/LocationStatusManager.js b/lib/util/LocationStatusManager.js index aa35a0893..2e0c995cd 100644 --- a/lib/util/LocationStatusManager.js +++ b/lib/util/LocationStatusManager.js @@ -73,7 +73,7 @@ class LocationStatusManager { return cb(); }).catch(err => { // in case the collection already exists, we ignore the error - if (err && err.codeName !== 'NamespaceExists') { + if (err.codeName !== 'NamespaceExists') { this._logger.error('Could not create mongo collection', { method: 'LocationStatusManager._initCollection', collection: locationStatusCollection, @@ -81,6 +81,7 @@ class LocationStatusManager { }); return cb(err); } + return cb(err); }); } diff --git a/tests/functional/ingestion/IngestionReader.js b/tests/functional/ingestion/IngestionReader.js index 32376ecfc..b93ec3d87 100644 --- a/tests/functional/ingestion/IngestionReader.js +++ b/tests/functional/ingestion/IngestionReader.js @@ -112,7 +112,7 @@ describe('ingestion reader tests with mock', function fD() { const topic = testConfig.extensions.ingestion.topic; async.waterfall([ next => { - MongoClient.connect(mongoUrl, {}).then((client) => { + MongoClient.connect(mongoUrl, {}).then(client => { this.client = client; this.db = this.client.db('metadata', { ignoreUndefined: true, @@ -150,7 +150,7 @@ describe('ingestion reader tests with mock', function fD() { consumer.subscribe([testConfig.extensions.ingestion.topic]); setTimeout(next, 2000); }, - next => this.db.createCollection('PENSIEVE').catch((err) => { + next => this.db.createCollection('PENSIEVE').catch(err => { assert.ifError(err); return next(); }).then(next),