diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 5afe825297..6451c02308 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -9,10 +9,14 @@ const Allocator = require('./modules/Allocator'); const ConnectorsManager = require('./modules/ConnectorsManager'); const { ZenkoMetrics } = require('arsenal').metrics; const OplogPopulatorMetrics = require('./OplogPopulatorMetrics'); +const { OplogPopulatorConfigJoiSchema } = require('./OplogPopulatorConfigValidator') +const { mongoJoi } = require('../../lib/config/configItems.joi') const paramsJoi = joi.object({ - config: joi.object().required(), - mongoConfig: joi.object().required(), + config: OplogPopulatorConfigJoiSchema.keys({ + maxRequestSize: joi.number().required(), + }).required(), + mongoConfig: mongoJoi.required(), activeExtensions: joi.array().required(), logger: joi.object().required(), enableMetrics: joi.boolean().default(true), @@ -30,6 +34,7 @@ class OplogPopulator { * @constructor * @param {Object} params - constructor params * @param {Object} params.config - oplog populator config + * @param {Object} params.config.maxRequestSize - kafka producer's max request size * @param {Object} params.mongoConfig - mongo connection config * @param {Object} params.mongoConfig.authCredentials - mongo auth credentials * @param {Object} params.mongoConfig.replicaSetHosts - mongo replication hosts @@ -259,7 +264,7 @@ class OplogPopulator { heartbeatIntervalMs: this._config.heartbeatIntervalMs, kafkaConnectHost: this._config.kafkaConnectHost, kafkaConnectPort: this._config.kafkaConnectPort, - kafkaMaxRequestSize: this._config.kafka.maxRequestSize, + kafkaMaxRequestSize: this._maxRequestSize, metricsHandler: this._metricsHandler, logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index abd562ceca..2f96856188 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -17,4 +17,7 @@ function configValidator(backbeatConfig, extConfig) { return validatedConfig; } -module.exports = configValidator; +module.exports = { + OplogPopulatorConfigJoiSchema: joiSchema, + OplogPopulatorConfigValidator: configValidator +}; diff --git a/extensions/oplogPopulator/OplogPopulatorTask.js b/extensions/oplogPopulator/OplogPopulatorTask.js index 90c33ff896..6d3e513ca2 100644 --- a/extensions/oplogPopulator/OplogPopulatorTask.js +++ b/extensions/oplogPopulator/OplogPopulatorTask.js @@ -28,7 +28,10 @@ const activeExtensions = [ ].filter(ext => config.extensions[ext]); const oplogPopulator = new OplogPopulator({ - config: oplogPopulatorConfig, + config: { + ...oplogPopulatorConfig, + maxRequestSize: config.kafka.maxRequestSize, + }, mongoConfig, activeExtensions, logger, diff --git a/extensions/oplogPopulator/index.js b/extensions/oplogPopulator/index.js index 79b2e0a15d..a7d95fda81 100644 --- a/extensions/oplogPopulator/index.js +++ b/extensions/oplogPopulator/index.js @@ -1,4 +1,4 @@ -const OplogPopulatorConfigValidator = +const { OplogPopulatorConfigValidator } = require('./OplogPopulatorConfigValidator'); module.exports = { diff --git a/tests/unit/oplogPopulator/oplogPopulator.js b/tests/unit/oplogPopulator/oplogPopulator.js index ff8364fbc5..a1d6422a7f 100644 --- a/tests/unit/oplogPopulator/oplogPopulator.js +++ b/tests/unit/oplogPopulator/oplogPopulator.js @@ -15,7 +15,10 @@ const ChangeStream = const oplogPopulatorConfig = { topic: 'oplog', kafkaConnectHost: '127.0.0.1', - kafkaConnectPort: 8083 + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { port: 8552 }, + maxRequestSize: 5000020, }; const mongoConfig = { @@ -90,6 +93,19 @@ describe('OplogPopulator', () => { activeExtensions, logger, })); + assert.throws(() => new OplogPopulator({ + config: { + topic: 'oplog', + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { port: 8552 }, + }, + mongoConfig, + activeExtensions, + enableMetrics: false, + logger, + })); const op = new OplogPopulator({ config: oplogPopulatorConfig, mongoConfig,