Skip to content

Commit

Permalink
Remove kafkaMaxRequestSize from oplog populator
Browse files Browse the repository at this point in the history
Will be set globally in kafka-connect (via ZKOP), and allows custom
values to be (manually) set on each connector if ever needed: as any
"extra" config will be preserved by oplog populator.

Issue: BB-436
  • Loading branch information
francoisferrand committed Sep 11, 2023
1 parent 6482189 commit f0a23f5
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 24 deletions.
6 changes: 1 addition & 5 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ const { OplogPopulatorConfigJoiSchema } = require('./OplogPopulatorConfigValidat
const { mongoJoi } = require('../../lib/config/configItems.joi');

const paramsJoi = joi.object({
config: OplogPopulatorConfigJoiSchema.keys({
maxRequestSize: joi.number().required(),
}).required(),
config: OplogPopulatorConfigJoiSchema.required(),
mongoConfig: mongoJoi.required(),
activeExtensions: joi.array().required(),
logger: joi.object().required(),
Expand All @@ -34,7 +32,6 @@ class OplogPopulator {
* @constructor
* @param {Object} params - constructor params
* @param {Object} params.config - oplog populator config
* @param {Object} params.config.maxRequestSize - kafka producer's max request size
* @param {Object} params.mongoConfig - mongo connection config
* @param {Object} params.mongoConfig.authCredentials - mongo auth credentials
* @param {Object} params.mongoConfig.replicaSetHosts - mongo replication hosts
Expand Down Expand Up @@ -264,7 +261,6 @@ class OplogPopulator {
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
kafkaMaxRequestSize: this._maxRequestSize,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
Expand Down
3 changes: 0 additions & 3 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const paramsJoi = joi.object({
heartbeatIntervalMs: joi.number().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
kafkaMaxRequestSize: joi.number().required(),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
Expand Down Expand Up @@ -62,7 +61,6 @@ class ConnectorsManager {
kafkaConnectPort: this._kafkaConnectPort,
logger: this._logger,
});
this._kafkaMaxRequestSize = params.kafkaMaxRequestSize;
this._metricsHandler = params.metricsHandler;
this._database = params.database;
this._mongoUrl = params.mongoUrl;
Expand All @@ -89,7 +87,6 @@ class ConnectorsManager {
// hearbeat prevents having an outdated resume token in the connectors
// by constantly updating the offset to the last object in the oplog
'heartbeat.interval.ms': this._heartbeatIntervalMs,
'producer.override.max.request.size': this._kafkaMaxRequestSize,
};
return {
...constants.defaultConnectorConfig,
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const connectorConfig = {
}],
}),
'heartbeat.interval.ms': 10000,
'producer.override.max.request.size': 5000020,
};

describe('ConnectorsManager', () => {
Expand Down Expand Up @@ -92,7 +91,6 @@ describe('ConnectorsManager', () => {
heartbeatIntervalMs: 10000,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
kafkaMaxRequestSize: 5000020,
metricsHandler: new OplogPopulatorMetrics(logger),
logger,
});
Expand Down
14 changes: 0 additions & 14 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const oplogPopulatorConfig = {
kafkaConnectPort: 8083,
numberOfConnectors: 1,
probeServer: { port: 8552 },
maxRequestSize: 5000020,
};

const mongoConfig = {
Expand Down Expand Up @@ -93,19 +92,6 @@ describe('OplogPopulator', () => {
activeExtensions,
logger,
}));
assert.throws(() => new OplogPopulator({
config: {
topic: 'oplog',
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
numberOfConnectors: 1,
probeServer: { port: 8552 },
},
mongoConfig,
activeExtensions,
enableMetrics: false,
logger,
}));
const op = new OplogPopulator({
config: oplogPopulatorConfig,
mongoConfig,
Expand Down

0 comments on commit f0a23f5

Please sign in to comment.