From 9baf147dc7e5637be488ce99156378f903b41266 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Thu, 7 Sep 2023 21:05:55 +0200 Subject: [PATCH 1/6] Allow configuring kafka producer's max request size Issue: BB-436 --- conf/config.json | 3 ++- extensions/gc/GarbageCollectorProducer.js | 1 + extensions/lifecycle/LifecycleQueuePopulator.js | 1 + .../bucketProcessor/LifecycleBucketProcessor.js | 1 + .../lifecycle/conductor/LifecycleConductor.js | 1 + extensions/oplogPopulator/OplogPopulator.js | 1 + .../oplogPopulator/modules/ConnectorsManager.js | 3 +++ .../replication/failedCRR/FailedCRRProducer.js | 1 + .../queueProcessor/QueueProcessor.js | 1 + extensions/replication/replay/ReplayProducer.js | 1 + .../ReplicationStatusProcessor.js | 1 + lib/BackbeatProducer.js | 10 +++++----- lib/Config.js | 2 +- lib/MetricsProducer.js | 1 + lib/api/BackbeatAPI.js | 1 + lib/config.joi.js | 7 ++++++- lib/queuePopulator/IngestionPopulator.js | 1 + lib/queuePopulator/LogReader.js | 1 + tests/unit/backbeatProducer.js | 17 +++++++++++++++++ tests/unit/oplogPopulator/ConnectorsManager.js | 2 ++ 20 files changed, 49 insertions(+), 8 deletions(-) diff --git a/conf/config.json b/conf/config.json index 7bb70ad02..7f84cf8cb 100644 --- a/conf/config.json +++ b/conf/config.json @@ -8,7 +8,8 @@ "backlogMetrics": { "zkPath": "/backbeat/run/kafka-backlog-metrics", "intervalS": 60 - } + }, + "maxRequestSize": 5000020 }, "s3": { "host": "127.0.0.1", diff --git a/extensions/gc/GarbageCollectorProducer.js b/extensions/gc/GarbageCollectorProducer.js index 7f43ba4b0..3a3e5b7d2 100644 --- a/extensions/gc/GarbageCollectorProducer.js +++ b/extensions/gc/GarbageCollectorProducer.js @@ -25,6 +25,7 @@ class GarbageCollectorProducer { setupProducer(cb) { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, topic: this._topic, }); producer.once('error', () => {}); diff --git a/extensions/lifecycle/LifecycleQueuePopulator.js b/extensions/lifecycle/LifecycleQueuePopulator.js index 09ede7d0d..452edcc58 100644 --- a/extensions/lifecycle/LifecycleQueuePopulator.js +++ b/extensions/lifecycle/LifecycleQueuePopulator.js @@ -54,6 +54,7 @@ class LifecycleQueuePopulator extends QueuePopulatorExtension { } const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, + maxRequestSize: this.kafkaConfig.maxRequestSize, topic, }); producer.once('error', done); diff --git a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js index 3766ff42b..542c5a973 100644 --- a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js +++ b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js @@ -363,6 +363,7 @@ class LifecycleBucketProcessor { _setupProducer(cb) { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, topic: this._lcConfig.objectTasksTopic, }); producer.once('error', err => { diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index 3f263a2d3..7447bdd9a 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -699,6 +699,7 @@ class LifecycleConductor { _setupProducer(cb) { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, + maxRequestSize: this.kafkaConfig.maxRequestSize, topic: this.lcConfig.bucketTasksTopic, }); producer.once('error', cb); diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index a67211eb5..5afe82529 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -259,6 +259,7 @@ class OplogPopulator { heartbeatIntervalMs: this._config.heartbeatIntervalMs, kafkaConnectHost: this._config.kafkaConnectHost, kafkaConnectPort: this._config.kafkaConnectPort, + kafkaMaxRequestSize: this._config.kafka.maxRequestSize, metricsHandler: this._metricsHandler, logger: this._logger, }); diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index 4262085ea..d3a1810ab 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -20,6 +20,7 @@ const paramsJoi = joi.object({ heartbeatIntervalMs: joi.number().required(), kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), + kafkaMaxRequestSize: joi.number().required(), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), logger: joi.object().required(), @@ -61,6 +62,7 @@ class ConnectorsManager { kafkaConnectPort: this._kafkaConnectPort, logger: this._logger, }); + this._kafkaMaxRequestSize = params.kafkaMaxRequestSize; this._metricsHandler = params.metricsHandler; this._database = params.database; this._mongoUrl = params.mongoUrl; @@ -87,6 +89,7 @@ class ConnectorsManager { // hearbeat prevents having an outdated resume token in the connectors // by constantly updating the offset to the last object in the oplog 'heartbeat.interval.ms': this._heartbeatIntervalMs, + 'producer.override.max.request.size': this._kafkaMaxRequestSize, }; return { ...constants.defaultConnectorConfig, diff --git a/extensions/replication/failedCRR/FailedCRRProducer.js b/extensions/replication/failedCRR/FailedCRRProducer.js index 71b322f9c..333915f12 100644 --- a/extensions/replication/failedCRR/FailedCRRProducer.js +++ b/extensions/replication/failedCRR/FailedCRRProducer.js @@ -27,6 +27,7 @@ class FailedCRRProducer { setupProducer(cb) { this._producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, topic: this._topic, }); this._producer.once('error', () => {}); diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 4ea82af35..04dcc2b7c 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -371,6 +371,7 @@ class QueueProcessor extends EventEmitter { _setupProducer(done) { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, + maxRequestSize: this.kafkaConfig.maxRequestSize, topic: this.repConfig.replicationStatusTopic, }); producer.once('error', done); diff --git a/extensions/replication/replay/ReplayProducer.js b/extensions/replication/replay/ReplayProducer.js index a9248b256..8e0a6fc09 100644 --- a/extensions/replication/replay/ReplayProducer.js +++ b/extensions/replication/replay/ReplayProducer.js @@ -27,6 +27,7 @@ class ReplayProducer { setupProducer(cb) { this._producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, topic: this._topic, }); this._producer.once('error', () => {}); diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index bb6324164..2be56b650 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -386,6 +386,7 @@ class ReplicationStatusProcessor { this.bucketNotificationConfig.topic; const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, + maxRequestSize: this.kafkaConfig.maxRequestSize, topic: internalTopic, }); producer.once('error', done); diff --git a/lib/BackbeatProducer.js b/lib/BackbeatProducer.js index c8c79bc52..d77e5dfd7 100644 --- a/lib/BackbeatProducer.js +++ b/lib/BackbeatProducer.js @@ -10,7 +10,7 @@ const KafkaBacklogMetrics = require('./KafkaBacklogMetrics'); const Constants = require('./constants'); const DEFAULT_POLL_INTERVAL = 2000; -const PRODUCER_MESSAGE_MAX_BYTES = 5000020; +const { KAFKA_PRODUCER_MESSAGE_MAX_BYTES } = require('./config.joi'); const FLUSH_TIMEOUT = 5000; @@ -45,7 +45,7 @@ class BackbeatProducer extends EventEmitter { }).required(), topic: joi.string(), pollIntervalMs: joi.number().default(DEFAULT_POLL_INTERVAL), - messageMaxBytes: joi.number().default(PRODUCER_MESSAGE_MAX_BYTES), + maxRequestSize: joi.number().default(KAFKA_PRODUCER_MESSAGE_MAX_BYTES), } ); } @@ -65,7 +65,7 @@ class BackbeatProducer extends EventEmitter { get producerConfig() { return { 'metadata.broker.list': this._kafkaHosts, - 'message.max.bytes': this._messageMaxBytes, + 'message.max.bytes': this._maxRequestSize, 'dr_cb': true, 'compression.type': Constants.compressionType, }; @@ -103,12 +103,12 @@ class BackbeatProducer extends EventEmitter { kafka, topic, pollIntervalMs, - messageMaxBytes, + maxRequestSize, } = joiResult; this._kafkaHosts = kafka.hosts; this._topic = topic && withTopicPrefix(topic); this._pollIntervalMs = pollIntervalMs; - this._messageMaxBytes = messageMaxBytes; + this._maxRequestSize = maxRequestSize; } connect() { diff --git a/lib/Config.js b/lib/Config.js index 24ec0d40f..5bcbea1a7 100644 --- a/lib/Config.js +++ b/lib/Config.js @@ -8,7 +8,7 @@ const path = require('path'); const crypto = require('crypto'); const extensions = require('../extensions'); -const backbeatConfigJoi = require('./config.joi'); +const { backbeatConfigJoi } = require('./config.joi'); const locationTypeMatch = { 'location-mem-v1': 'mem', diff --git a/lib/MetricsProducer.js b/lib/MetricsProducer.js index 40baa829f..0493234b1 100644 --- a/lib/MetricsProducer.js +++ b/lib/MetricsProducer.js @@ -23,6 +23,7 @@ class MetricsProducer { setupProducer(done) { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, topic: this._topic, }); producer.once('error', done); diff --git a/lib/api/BackbeatAPI.js b/lib/api/BackbeatAPI.js index 29ef4b4ca..e23d1ba69 100644 --- a/lib/api/BackbeatAPI.js +++ b/lib/api/BackbeatAPI.js @@ -1300,6 +1300,7 @@ class BackbeatAPI { _setProducer(topic, cb) { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, topic, }); diff --git a/lib/config.joi.js b/lib/config.joi.js index 8a5e8260f..b20bc83e7 100644 --- a/lib/config.joi.js +++ b/lib/config.joi.js @@ -11,6 +11,7 @@ const { authJoi, } = require('./config/configItems.joi'); +const KAFKA_PRODUCER_MESSAGE_MAX_BYTES = 5000020; const logSourcesJoi = joi.string().valid('bucketd', 'mongo', 'ingestion', 'dmd', 'kafka'); @@ -26,6 +27,7 @@ const joiSchema = joi.object({ zkPath: joi.string().default('/backbeat/run/kafka-backlog-metrics'), intervalS: joi.number().default(60), }, + maxRequestSize: joi.number().default(KAFKA_PRODUCER_MESSAGE_MAX_BYTES), }, transport: transportJoi, s3: hostPortJoi.required(), @@ -92,4 +94,7 @@ const joiSchema = joi.object({ internalCertFilePaths: certFilePathsJoi, }); -module.exports = joiSchema; +module.exports = { + backbeatConfigJoi: joiSchema, + KAFKA_PRODUCER_MESSAGE_MAX_BYTES, +}; diff --git a/lib/queuePopulator/IngestionPopulator.js b/lib/queuePopulator/IngestionPopulator.js index b6b9d4c01..c077ce0ef 100644 --- a/lib/queuePopulator/IngestionPopulator.js +++ b/lib/queuePopulator/IngestionPopulator.js @@ -170,6 +170,7 @@ class IngestionPopulator { const topic = this.ingestionConfig.topic; const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, + maxRequestSize: this.kafkaConfig.maxRequestSize, topic, pollIntervalMs: POLL_INTERVAL_MS, }); diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index e57c1f545..cc0b24d37 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -562,6 +562,7 @@ class LogReader { } const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, + maxRequestSize: this.kafkaConfig.maxRequestSize, topic, }); producer.once('error', done); diff --git a/tests/unit/backbeatProducer.js b/tests/unit/backbeatProducer.js index 47d85d55d..2b6582142 100644 --- a/tests/unit/backbeatProducer.js +++ b/tests/unit/backbeatProducer.js @@ -22,6 +22,23 @@ describe('backbeatProducer', () => { assert.strictEqual(backbeatProducer._topic, 'testing.my-test-topic'); }); + it('should use default value if maxRequestSize not provided', () => { + const backbeatProducer = new BackbeatProducer({ + kafka, + topic: 'my-test-topic', + }); + assert.strictEqual(backbeatProducer._maxRequestSize, 5000020); + }); + + it('should use the explicitely provided maxRequestSize', () => { + const backbeatProducer = new BackbeatProducer({ + kafka, + topic: 'my-test-topic', + maxRequestSize: 1234567, + }); + assert.strictEqual(backbeatProducer._maxRequestSize, 1234567); + }); + it('should return an error if send() called on producer with no default ' + 'topic', done => { const backbeatProducer = new BackbeatProducer({ diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 986fedeb7..e585b2558 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -56,6 +56,7 @@ const connectorConfig = { }], }), 'heartbeat.interval.ms': 10000, + 'producer.override.max.request.size': 5000020, }; describe('ConnectorsManager', () => { @@ -91,6 +92,7 @@ describe('ConnectorsManager', () => { heartbeatIntervalMs: 10000, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + kafkaMaxRequestSize: 5000020, metricsHandler: new OplogPopulatorMetrics(logger), logger, }); From 4ed2435109d721cf7038a2d8d94cdc22b3beace8 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 8 Sep 2023 10:48:08 +0200 Subject: [PATCH 2/6] GHA: Bump build-push-action v4 Issue: BB-436 --- .github/workflows/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index f1bcc6658..8631ef2d4 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -53,7 +53,7 @@ jobs: cache-to: type=gha,mode=max - name: Build and push MongoDB - uses: docker/build-push-action@v2 + uses: docker/build-push-action@v4 with: push: true context: .github/dockerfiles/mongodb From 4e4e4894d04385ee60e5cd068665fd3674ff2754 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 8 Sep 2023 10:58:02 +0200 Subject: [PATCH 3/6] GHA: Bump checkout v4 Issue: BB-436 --- .github/workflows/alerts.yaml | 2 +- .github/workflows/docker-build.yaml | 2 +- .github/workflows/release.yaml | 2 +- .github/workflows/tests.yaml | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/alerts.yaml b/.github/workflows/alerts.yaml index 41a401653..7a75e4894 100644 --- a/.github/workflows/alerts.yaml +++ b/.github/workflows/alerts.yaml @@ -12,7 +12,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Render and test lifecycle uses: scality/action-prom-render-test@1.0.2 diff --git a/.github/workflows/docker-build.yaml b/.github/workflows/docker-build.yaml index aaa4caaf9..5e5bbf9a3 100644 --- a/.github/workflows/docker-build.yaml +++ b/.github/workflows/docker-build.yaml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Docker Buildk uses: docker/setup-buildx-action@v2 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 5d4602eab..25d177877 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 # TODO: remove the following step once Oras CLI 0.13.0 bug https://github.com/oras-project/oras/issues/447 is fixed. - name: Downgrade Oras to 0.12.0 diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 8631ef2d4..fba8f26dd 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -19,7 +19,7 @@ jobs: packages: write steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Docker Buildk uses: docker/setup-buildx-action@v2 @@ -92,7 +92,7 @@ jobs: - 27019:27019 steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install build dependencies run: | sudo apt-get update From e34e825e56167a8f1895e2c7559d439bf9f89113 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 8 Sep 2023 12:08:40 +0200 Subject: [PATCH 4/6] Properly validate oplogPopulator config Issue: BB-436 --- extensions/oplogPopulator/OplogPopulator.js | 11 ++++++++--- .../OplogPopulatorConfigValidator.js | 5 ++++- .../oplogPopulator/OplogPopulatorTask.js | 5 ++++- extensions/oplogPopulator/index.js | 2 +- tests/unit/oplogPopulator/oplogPopulator.js | 18 +++++++++++++++++- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 5afe82529..8e8f6c63b 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -9,10 +9,14 @@ const Allocator = require('./modules/Allocator'); const ConnectorsManager = require('./modules/ConnectorsManager'); const { ZenkoMetrics } = require('arsenal').metrics; const OplogPopulatorMetrics = require('./OplogPopulatorMetrics'); +const { OplogPopulatorConfigJoiSchema } = require('./OplogPopulatorConfigValidator'); +const { mongoJoi } = require('../../lib/config/configItems.joi'); const paramsJoi = joi.object({ - config: joi.object().required(), - mongoConfig: joi.object().required(), + config: OplogPopulatorConfigJoiSchema.keys({ + maxRequestSize: joi.number().required(), + }).required(), + mongoConfig: mongoJoi.required(), activeExtensions: joi.array().required(), logger: joi.object().required(), enableMetrics: joi.boolean().default(true), @@ -30,6 +34,7 @@ class OplogPopulator { * @constructor * @param {Object} params - constructor params * @param {Object} params.config - oplog populator config + * @param {Object} params.config.maxRequestSize - kafka producer's max request size * @param {Object} params.mongoConfig - mongo connection config * @param {Object} params.mongoConfig.authCredentials - mongo auth credentials * @param {Object} params.mongoConfig.replicaSetHosts - mongo replication hosts @@ -259,7 +264,7 @@ class OplogPopulator { heartbeatIntervalMs: this._config.heartbeatIntervalMs, kafkaConnectHost: this._config.kafkaConnectHost, kafkaConnectPort: this._config.kafkaConnectPort, - kafkaMaxRequestSize: this._config.kafka.maxRequestSize, + kafkaMaxRequestSize: this._maxRequestSize, metricsHandler: this._metricsHandler, logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index abd562cec..2f9685618 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -17,4 +17,7 @@ function configValidator(backbeatConfig, extConfig) { return validatedConfig; } -module.exports = configValidator; +module.exports = { + OplogPopulatorConfigJoiSchema: joiSchema, + OplogPopulatorConfigValidator: configValidator +}; diff --git a/extensions/oplogPopulator/OplogPopulatorTask.js b/extensions/oplogPopulator/OplogPopulatorTask.js index 90c33ff89..6d3e513ca 100644 --- a/extensions/oplogPopulator/OplogPopulatorTask.js +++ b/extensions/oplogPopulator/OplogPopulatorTask.js @@ -28,7 +28,10 @@ const activeExtensions = [ ].filter(ext => config.extensions[ext]); const oplogPopulator = new OplogPopulator({ - config: oplogPopulatorConfig, + config: { + ...oplogPopulatorConfig, + maxRequestSize: config.kafka.maxRequestSize, + }, mongoConfig, activeExtensions, logger, diff --git a/extensions/oplogPopulator/index.js b/extensions/oplogPopulator/index.js index 79b2e0a15..a7d95fda8 100644 --- a/extensions/oplogPopulator/index.js +++ b/extensions/oplogPopulator/index.js @@ -1,4 +1,4 @@ -const OplogPopulatorConfigValidator = +const { OplogPopulatorConfigValidator } = require('./OplogPopulatorConfigValidator'); module.exports = { diff --git a/tests/unit/oplogPopulator/oplogPopulator.js b/tests/unit/oplogPopulator/oplogPopulator.js index ff8364fbc..a1d6422a7 100644 --- a/tests/unit/oplogPopulator/oplogPopulator.js +++ b/tests/unit/oplogPopulator/oplogPopulator.js @@ -15,7 +15,10 @@ const ChangeStream = const oplogPopulatorConfig = { topic: 'oplog', kafkaConnectHost: '127.0.0.1', - kafkaConnectPort: 8083 + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { port: 8552 }, + maxRequestSize: 5000020, }; const mongoConfig = { @@ -90,6 +93,19 @@ describe('OplogPopulator', () => { activeExtensions, logger, })); + assert.throws(() => new OplogPopulator({ + config: { + topic: 'oplog', + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, + numberOfConnectors: 1, + probeServer: { port: 8552 }, + }, + mongoConfig, + activeExtensions, + enableMetrics: false, + logger, + })); const op = new OplogPopulator({ config: oplogPopulatorConfig, mongoConfig, From 6482189d05e397ee6a913765aef46efe1581f0c5 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 8 Sep 2023 12:10:10 +0200 Subject: [PATCH 5/6] Remove conf/Config.js Seems to be a remnant from Backbeat 7.x, config has been moved to `lib/` in 8.x and should thus be removed from there. Issue: BB-436 --- conf/Config.js | 107 --------------------------------------------- conf/config.joi.js | 61 -------------------------- 2 files changed, 168 deletions(-) delete mode 100644 conf/Config.js delete mode 100644 conf/config.joi.js diff --git a/conf/Config.js b/conf/Config.js deleted file mode 100644 index 2eed82130..000000000 --- a/conf/Config.js +++ /dev/null @@ -1,107 +0,0 @@ -'use strict'; // eslint-disable-line - -const fs = require('fs'); -const path = require('path'); -const joi = require('joi'); - -const extensions = require('../extensions'); -const backbeatConfigJoi = require('./config.joi'); - -class Config { - constructor() { - /* - * By default, the config file is "config.json" at the root. - * It can be overridden using the BACKBEAT_CONFIG_FILE environment var. - */ - this._basePath = __dirname; - if (process.env.BACKBEAT_CONFIG_FILE !== undefined) { - this._configPath = process.env.BACKBEAT_CONFIG_FILE; - } else { - this._configPath = path.join(this._basePath, 'config.json'); - } - - let config; - try { - const data = fs.readFileSync(this._configPath, - { encoding: 'utf-8' }); - config = JSON.parse(data); - } catch (err) { - throw new Error(`could not parse config file: ${err.message}`); - } - - const parsedConfig = joi.attempt(config, backbeatConfigJoi, - 'invalid backbeat config'); - - if (parsedConfig.extensions) { - Object.keys(parsedConfig.extensions).forEach(extName => { - const index = extensions[extName]; - if (!index) { - throw new Error(`configured extension ${extName}: ` + - 'not found in extensions directory'); - } - if (index.configValidator) { - const extConfig = parsedConfig.extensions[extName]; - const validatedConfig = - index.configValidator(this, extConfig); - parsedConfig.extensions[extName] = validatedConfig; - } - }); - } - // whitelist IP, CIDR for health checks - const defaultHealthChecks = ['127.0.0.1/8', '::1']; - const healthChecks = parsedConfig.server.healthChecks; - healthChecks.allowFrom = - healthChecks.allowFrom.concat(defaultHealthChecks); - - // default to standalone configuration if sentinel not setup - if (!parsedConfig.redis || !parsedConfig.redis.sentinels) { - this.redis = Object.assign({}, parsedConfig.redis, - { host: '127.0.0.1', port: 6379 }); - } - - // additional certs checks - if (parsedConfig.certFilePaths) { - parsedConfig.https = this._parseCertFilePaths( - parsedConfig.certFilePaths); - } - if (parsedConfig.internalCertFilePaths) { - parsedConfig.internalHttps = this._parseCertFilePaths( - parsedConfig.internalCertFilePaths); - } - // config is validated, safe to assign directly to the config object - Object.assign(this, parsedConfig); - } - - _parseCertFilePaths(certFilePaths) { - const { key, cert, ca } = certFilePaths; - - const makePath = value => - (value.startsWith('/') ? - value : `${this._basePath}/${value}`); - const https = {}; - if (key && cert) { - const keypath = makePath(key); - const certpath = makePath(cert); - fs.accessSync(keypath, fs.F_OK | fs.R_OK); - fs.accessSync(certpath, fs.F_OK | fs.R_OK); - https.cert = fs.readFileSync(certpath, 'ascii'); - https.key = fs.readFileSync(keypath, 'ascii'); - } - if (ca) { - const capath = makePath(ca); - fs.accessSync(capath, fs.F_OK | fs.R_OK); - https.ca = fs.readFileSync(capath, 'ascii'); - } - return https; - } - - getBasePath() { - return this._basePath; - } - - getConfigPath() { - return this._configPath; - } -} - -module.exports = new Config(); diff --git a/conf/config.joi.js b/conf/config.joi.js deleted file mode 100644 index 90cc41b7c..000000000 --- a/conf/config.joi.js +++ /dev/null @@ -1,61 +0,0 @@ -'use strict'; // eslint-disable-line - -const joi = require('joi'); -const { - hostPortJoi, - transportJoi, - logJoi, - certFilePathsJoi, -} = require('../lib/config/configItems.joi'); - -const joiSchema = { - zookeeper: { - connectionString: joi.string().required(), - autoCreateNamespace: joi.boolean().default(false), - }, - kafka: { - hosts: joi.string().required(), - site: joi.string(), - }, - queuePopulator: { - cronRule: joi.string().required(), - batchMaxRead: joi.number().default(10000), - zookeeperPath: joi.string().required(), - logSource: joi.alternatives().try('bucketd', 'dmd').required(), - bucketd: hostPortJoi - .keys({ transport: transportJoi }) - .when('logSource', { is: 'bucketd', then: joi.required() }), - dmd: hostPortJoi.keys({ - logName: joi.string().default('s3-recordlog'), - }).when('logSource', { is: 'dmd', then: joi.required() }), - probeServer: joi.object({ - bindAddress: joi.string().default('localhost'), - port: joi.number().required(), - }), - }, - log: logJoi, - extensions: joi.object(), - metrics: { - topic: joi.string().required(), - }, - server: { - healthChecks: joi.object({ - allowFrom: joi.array().items(joi.string()).default([]), - }).required(), - host: joi.string().required(), - port: joi.number().default(8900), - }, - redis: { - name: joi.string().default('backbeat'), - password: joi.string().default('').allow(''), - sentinels: joi.array().items(joi.object({ - host: joi.string().required(), - port: joi.number().required(), - })), - sentinelPassword: joi.string().default('').allow(''), - }, - certFilePaths: certFilePathsJoi, - internalCertFilePaths: certFilePathsJoi, -}; - -module.exports = joiSchema; From 4ed2a4fa25f4b92f0ec3cd7b40e0fb678158d260 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 8 Sep 2023 18:18:10 +0200 Subject: [PATCH 6/6] Remove kafkaMaxRequestSize from oplog populator Will be set globally in kafka-connect (via ZKOP), and allows custom values to be (manually) set on each connector if ever needed: as any "extra" config will be preserved by oplog populator. Issue: BB-436 --- extensions/oplogPopulator/OplogPopulator.js | 6 +----- extensions/oplogPopulator/OplogPopulatorTask.js | 5 +---- .../oplogPopulator/modules/ConnectorsManager.js | 3 --- tests/unit/oplogPopulator/ConnectorsManager.js | 2 -- tests/unit/oplogPopulator/oplogPopulator.js | 14 -------------- 5 files changed, 2 insertions(+), 28 deletions(-) diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 8e8f6c63b..22694f0aa 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -13,9 +13,7 @@ const { OplogPopulatorConfigJoiSchema } = require('./OplogPopulatorConfigValidat const { mongoJoi } = require('../../lib/config/configItems.joi'); const paramsJoi = joi.object({ - config: OplogPopulatorConfigJoiSchema.keys({ - maxRequestSize: joi.number().required(), - }).required(), + config: OplogPopulatorConfigJoiSchema.required(), mongoConfig: mongoJoi.required(), activeExtensions: joi.array().required(), logger: joi.object().required(), @@ -34,7 +32,6 @@ class OplogPopulator { * @constructor * @param {Object} params - constructor params * @param {Object} params.config - oplog populator config - * @param {Object} params.config.maxRequestSize - kafka producer's max request size * @param {Object} params.mongoConfig - mongo connection config * @param {Object} params.mongoConfig.authCredentials - mongo auth credentials * @param {Object} params.mongoConfig.replicaSetHosts - mongo replication hosts @@ -264,7 +261,6 @@ class OplogPopulator { heartbeatIntervalMs: this._config.heartbeatIntervalMs, kafkaConnectHost: this._config.kafkaConnectHost, kafkaConnectPort: this._config.kafkaConnectPort, - kafkaMaxRequestSize: this._maxRequestSize, metricsHandler: this._metricsHandler, logger: this._logger, }); diff --git a/extensions/oplogPopulator/OplogPopulatorTask.js b/extensions/oplogPopulator/OplogPopulatorTask.js index 6d3e513ca..90c33ff89 100644 --- a/extensions/oplogPopulator/OplogPopulatorTask.js +++ b/extensions/oplogPopulator/OplogPopulatorTask.js @@ -28,10 +28,7 @@ const activeExtensions = [ ].filter(ext => config.extensions[ext]); const oplogPopulator = new OplogPopulator({ - config: { - ...oplogPopulatorConfig, - maxRequestSize: config.kafka.maxRequestSize, - }, + config: oplogPopulatorConfig, mongoConfig, activeExtensions, logger, diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index d3a1810ab..4262085ea 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -20,7 +20,6 @@ const paramsJoi = joi.object({ heartbeatIntervalMs: joi.number().required(), kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), - kafkaMaxRequestSize: joi.number().required(), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), logger: joi.object().required(), @@ -62,7 +61,6 @@ class ConnectorsManager { kafkaConnectPort: this._kafkaConnectPort, logger: this._logger, }); - this._kafkaMaxRequestSize = params.kafkaMaxRequestSize; this._metricsHandler = params.metricsHandler; this._database = params.database; this._mongoUrl = params.mongoUrl; @@ -89,7 +87,6 @@ class ConnectorsManager { // hearbeat prevents having an outdated resume token in the connectors // by constantly updating the offset to the last object in the oplog 'heartbeat.interval.ms': this._heartbeatIntervalMs, - 'producer.override.max.request.size': this._kafkaMaxRequestSize, }; return { ...constants.defaultConnectorConfig, diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index e585b2558..986fedeb7 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -56,7 +56,6 @@ const connectorConfig = { }], }), 'heartbeat.interval.ms': 10000, - 'producer.override.max.request.size': 5000020, }; describe('ConnectorsManager', () => { @@ -92,7 +91,6 @@ describe('ConnectorsManager', () => { heartbeatIntervalMs: 10000, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, - kafkaMaxRequestSize: 5000020, metricsHandler: new OplogPopulatorMetrics(logger), logger, }); diff --git a/tests/unit/oplogPopulator/oplogPopulator.js b/tests/unit/oplogPopulator/oplogPopulator.js index a1d6422a7..3a387938f 100644 --- a/tests/unit/oplogPopulator/oplogPopulator.js +++ b/tests/unit/oplogPopulator/oplogPopulator.js @@ -18,7 +18,6 @@ const oplogPopulatorConfig = { kafkaConnectPort: 8083, numberOfConnectors: 1, probeServer: { port: 8552 }, - maxRequestSize: 5000020, }; const mongoConfig = { @@ -93,19 +92,6 @@ describe('OplogPopulator', () => { activeExtensions, logger, })); - assert.throws(() => new OplogPopulator({ - config: { - topic: 'oplog', - kafkaConnectHost: '127.0.0.1', - kafkaConnectPort: 8083, - numberOfConnectors: 1, - probeServer: { port: 8552 }, - }, - mongoConfig, - activeExtensions, - enableMetrics: false, - logger, - })); const op = new OplogPopulator({ config: oplogPopulatorConfig, mongoConfig,