Skip to content

Commit

Permalink
Refactor NotificationConfigManager to use MongoDB change streams for …
Browse files Browse the repository at this point in the history
…cache invalidation and improve configuration retrieval with caching mechanism
  • Loading branch information
KillianG committed Nov 21, 2024
1 parent 0353721 commit 188b9e6
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 72 deletions.
236 changes: 173 additions & 63 deletions extensions/notification/NotificationConfigManager.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,80 @@
const MongoConfigManager = require('./configManager/MongoConfigManager');
const ZookeeperConfigManager = require('./configManager/ZookeeperConfigManager');
const joi = require('joi');
const semver = require('semver');

const { ZenkoMetrics } = require('arsenal').metrics;
const LRUCache = require('arsenal').algorithms
.cache.LRUCache;
const MongoClient = require('mongodb').MongoClient;
const ChangeStream = require('../../lib/wrappers/ChangeStream');
const constants = require('./constants');
const { constructConnectionString, getMongoVersion } = require('../utils/MongoUtils');

const paramsJoi = joi.object({
mongoConfig: joi.object().required(),
logger: joi.object().required(),
}).required();

const MAX_CACHED_ENTRIES = Number(process.env.MAX_CACHED_BUCKET_NOTIFICATION_CONFIGS)
|| 1000;

// should equal true if config manager's cache was hit during a get operation
const CONFIG_MANAGER_CACHE_HIT = 'cache_hit';
// Type of operation performed on the cache
const CONFIG_MANAGER_OPERATION_TYPE = 'op';

const cacheUpdates = ZenkoMetrics.createCounter({
name: 's3_notification_config_manager_cache_updates_total',
help: 'Total number of cache updates',
labelNames: [
CONFIG_MANAGER_OPERATION_TYPE,
],
});

const configGetLag = ZenkoMetrics.createHistogram({
name: 's3_notification_config_manager_config_get_seconds',
help: 'Time it takes in seconds to get a bucket notification config from MongoDB',
labelNames: [
CONFIG_MANAGER_CACHE_HIT,
],
buckets: [0.001, 0.01, 1, 10, 100, 1000],
});

const cachedBuckets = ZenkoMetrics.createGauge({
name: 's3_notification_config_manager_cached_buckets_count',
help: 'Total number of cached buckets in the notification config manager',
});

function onConfigManagerCacheUpdate(op) {
cacheUpdates.inc({
[CONFIG_MANAGER_OPERATION_TYPE]: op,
});
if (op === 'add') {
cachedBuckets.inc({});
} else if (op === 'delete') {
cachedBuckets.dec({});
}
}

function onConfigManagerConfigGet(cacheHit, delay) {
configGetLag.observe({
[CONFIG_MANAGER_CACHE_HIT]: cacheHit,
}, delay);
}

/**
* @class NotificationConfigManager
*
* @classdesc Manages bucket notification configurations
* @classdesc Manages bucket notification configurations, the configurations
* are directly retrieved from the metastore, and are locally cached. Cache
* is invalidated using MongoDB change streams.
*/
class NotificationConfigManager {

/**
* @constructor
* @param {Object} params - constructor params
* @param {Object} params.mongoConfig - mongoDB config
* @param {Logger} params.logger - logger object
*/
constructor(params) {
joi.attempt(params, paramsJoi);
this._logger = params.logger;
Expand All @@ -16,27 +83,6 @@ class NotificationConfigManager {
this._mongoClient = null;
this._metastore = null;
this._metastoreChangeStream = null;

const {
mongoConfig, bucketMetastore, maxCachedConfigs, zkClient, zkConfig, zkPath, zkConcurrency, logger,
} = params;
if (mongoConfig) {
this._configManagerBackend = new MongoConfigManager({
mongoConfig,
bucketMetastore,
maxCachedConfigs,
logger,
});
} else {
this._usesZookeeperBackend = true;
this._configManagerBackend = new ZookeeperConfigManager({
zkClient,
zkConfig,
zkPath,
zkConcurrency,
logger,
});
}
}

/**
Expand All @@ -50,11 +96,10 @@ class NotificationConfigManager {
MongoClient.connect(mongoUrl, {
replicaSet: this._mongoConfig.replicaSet,
useNewUrlParser: true,
}).then((client) => {
}).then(client => {
this._logger.debug('Connected to MongoDB', {
method: 'NotificationConfigManager._setupMongoClient',
});

try {
this._mongoClient = client.db(this._mongoConfig.database, {
ignoreUndefined: true,
Expand All @@ -76,7 +121,7 @@ class NotificationConfigManager {
} catch (error) {
return cb(error);
}
}).catch((err) => {
}).catch(err => {
this._logger.error('Could not connect to MongoDB', {
method: 'NotificationConfigManager._setupMongoClient',
error: err.message,
Expand Down Expand Up @@ -117,55 +162,120 @@ class NotificationConfigManager {
});
break;
}
this._logger.debug('Change stream event processed', {
method: 'NotificationConfigManager._handleChangeStreamChange',
});
}

/**
* Get bucket notification configuration
*
* @param {String} bucket - bucket
* @param {function} [cb] - callback
* @return {Object|undefined} - configuration if available or undefined
* Initializes a change stream on the metastore collection
* Only document delete and update/replace operations are
* taken into consideration to invalidate cache.
* Newly created buckets (insert operations) are not cached
* as queue populator instances read from different kafka
* partitions and so don't need the configs for all buckets
* @returns {undefined}
*/
getConfig(bucket, cb) {
const val = this._configManagerBackend.getConfig(bucket);
if (!cb) {
return val;
}
if (val instanceof Promise) {
return val.then(res => cb(null, res)).catch(err => cb(err));
}
return cb(null, val);
_setMetastoreChangeStream() {
/**
* To avoid processing irrelevant events
* we filter by the operation types and
* only project the fields needed
*/
const changeStreamPipeline = [
{
$match: {
$or: [
{ operationType: 'delete' },
{ operationType: 'replace' },
{ operationType: 'update' },
]
}
},
{
$project: {
'_id': 1,
'operationType': 1,
'documentKey._id': 1,
'fullDocument._id': 1,
'fullDocument.value.notificationConfiguration': 1
},
},
];
this._metastoreChangeStream = new ChangeStream({
logger: this._logger,
collection: this._metastore,
pipeline: changeStreamPipeline,
handler: this._handleChangeStreamChangeEvent.bind(this),
throwOnError: false,
useStartAfter: semver.gte(this._mongoVersion, '4.2.0'),
});
// start watching metastore
this._metastoreChangeStream.start();
}

/**
* Add/update bucket notification configuration.
*
* @param {String} bucket - bucket
* @param {Object} config - bucket notification configuration
* @return {boolean} - true if set
* Sets up the NotificationConfigManager by
* connecting to mongo and initializing the
* change stream
* @param {Function} cb callback
* @returns {undefined}
*/
setConfig(bucket, config) {
return this._configManagerBackend.setConfig(bucket, config);
setup(cb) {
this._setupMongoClient(err => {
if (err) {
this._logger.error('An error occured while setting up mongo client', {
method: 'NotificationConfigManager.setup',
});
return cb(err);
}
try {
this._setMetastoreChangeStream();
} catch (error) {
this._logger.error('An error occured while establishing the change stream', {
method: 'NotificationConfigManager._setMetastoreChangeStream',
});
return cb(error);
}
return cb();
});
}

/**
* Remove bucket notification configuration
* Get bucket notification configuration
*
* @param {String} bucket - bucket
* @return {undefined}
*/
removeConfig(bucket) {
return this._configManagerBackend.removeConfig(bucket);
}

/**
* Setup bucket notification configuration manager
*
* @param {function} [cb] - callback
* @return {undefined}
* @return {Object|undefined} - configuration if available or undefined
*/
setup(cb) {
return this._configManagerBackend.setup(cb);
async getConfig(bucket) {
const startTime = Date.now();
// return cached config for bucket if it exists
const cachedConfig = this._cachedConfigs.get(bucket);
if (cachedConfig) {
const delay = (Date.now() - startTime) / 1000;
onConfigManagerConfigGet(true, delay);
return cachedConfig;
}
try {
// retreiving bucket metadata from the metastore
const bucketMetadata = await this._metastore.findOne({ _id: bucket });
const bucketNotificationConfiguration = (bucketMetadata && bucketMetadata.value &&
bucketMetadata.value.notificationConfiguration) || undefined;
// caching the bucket configuration
this._cachedConfigs.add(bucket, bucketNotificationConfiguration);
const delay = (Date.now() - startTime) / 1000;
onConfigManagerConfigGet(false, delay);
onConfigManagerCacheUpdate('add');
return bucketNotificationConfiguration;
} catch (err) {
this._logger.error('An error occured when getting notification ' +
'configuration of bucket', {
method: 'NotificationConfigManager.getConfig',
bucket,
error: err.message,
});
return undefined;
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions extensions/utils/LocationStatusStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class LocationStatusStream {
replicaSet: this._mongoConfig.replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
}).then((client) => {
}).then(client => {
// connect to metadata DB
this._mongoClient = client.db(this._mongoConfig.database, {
ignoreUndefined: true,
Expand All @@ -89,7 +89,7 @@ class LocationStatusStream {
return cb();
});
return undefined;
}).catch((err) => {
}).catch(err => {
this._log.error('Could not connect to MongoDB', {
method: 'ServiceStatusManager._setupMongoClient',
error: err.message,
Expand Down
6 changes: 2 additions & 4 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ class BackbeatAPI {
replicaSet: mongoConfig.replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
}).then((client) => {
}).then(client => {
// connect to metadata DB
this._mongoClient = client.db(mongoConfig.database, {
ignoreUndefined: true,
Expand All @@ -1351,14 +1351,12 @@ class BackbeatAPI {
method: 'BackbeatAPI._setupMongoClient',
});
return cb();
}).catch((err) => {
if (err) {
}).catch(err => {
this._logger.error('Could not connect to MongoDB', {
method: 'BackbeatAPI._setupMongoClient',
error: err.message,
});
return cb(err);
}
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/util/LocationStatusManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ class LocationStatusManager {
return cb();
}).catch(err => {
// in case the collection already exists, we ignore the error
if (err && err.codeName !== 'NamespaceExists') {
if (err.codeName !== 'NamespaceExists') {
this._logger.error('Could not create mongo collection', {
method: 'LocationStatusManager._initCollection',
collection: locationStatusCollection,
error: err.message,
});
return cb(err);
}
return cb(err);
});
}

Expand Down
4 changes: 2 additions & 2 deletions tests/functional/ingestion/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ describe('ingestion reader tests with mock', function fD() {
const topic = testConfig.extensions.ingestion.topic;
async.waterfall([
next => {
MongoClient.connect(mongoUrl, {}).then((client) => {
MongoClient.connect(mongoUrl, {}).then(client => {
this.client = client;
this.db = this.client.db('metadata', {
ignoreUndefined: true,
Expand Down Expand Up @@ -150,7 +150,7 @@ describe('ingestion reader tests with mock', function fD() {
consumer.subscribe([testConfig.extensions.ingestion.topic]);
setTimeout(next, 2000);
},
next => this.db.createCollection('PENSIEVE').catch((err) => {
next => this.db.createCollection('PENSIEVE').catch(err => {
assert.ifError(err);
return next();
}).then(next),
Expand Down

0 comments on commit 188b9e6

Please sign in to comment.