diff --git a/lib/MetricsConsumer.js b/lib/MetricsConsumer.js deleted file mode 100644 index 9fdc4efda..000000000 --- a/lib/MetricsConsumer.js +++ /dev/null @@ -1,234 +0,0 @@ -'use strict'; // eslint-disable-line strict - -const Logger = require('werelogs').Logger; -const { RedisClient, StatsModel } = require('arsenal').metrics; -const errors = require('arsenal').errors; - -const BackbeatConsumer = require('./BackbeatConsumer'); -const { - redisKeys: crrRedisKeys, - metricsExtension: crrExtension, -} = require('../extensions/replication/constants'); -const { - redisKeys: ingestionRedisKeys, - metricsExtension: ingestionExtension, -} = require('../extensions/ingestion/constants'); - -// StatsClient constant defaults for site metrics -const INTERVAL = 300; // 5 minutes; -const EXPIRY = 86400; // 24 hours - -// BackbeatConsumer constant defaults -const CONSUMER_FETCH_MAX_BYTES = 5000020; -const CONCURRENCY = 10; - -class MetricsConsumer { - /** - * @constructor - * @param {object} rConfig - redis ha configuration - * @param {string} rConfig.host - redis ha host - * @param {number} rConfig.port - redis ha port - * @param {object} mConfig - metrics configurations - * @param {string} mConfig.topic - metrics topic name - * @param {object} kafkaConfig - kafka configurations - * @param {string} kafkaConfig.hosts - kafka hosts - * as "host:port[/chroot]" - * @param {string} id - identifier used for filtering metrics entries - */ - constructor(rConfig, mConfig, kafkaConfig, id) { - this.mConfig = mConfig; - this.kafkaConfig = kafkaConfig; - this._id = id; - - this._consumer = null; - - this.logger = new Logger('Backbeat:MetricsConsumer'); - const redisClient = new RedisClient(rConfig, this.logger); - this._statsClient = new StatsModel(redisClient, INTERVAL, EXPIRY); - } - - /** - * List of valid "type" field values for metric kafka entries - * @param {string} type - type to check - * @return {boolean} true if type is a valid metric type - */ - static isValidMetricType(type) { - const validTypes = ['completed', 'failed', 'queued', 'pendingOnly']; - return validTypes.includes(type); - } - - start() { - let consumerReady = false; - const consumer = new BackbeatConsumer({ - kafka: { - hosts: this.kafkaConfig.hosts, - site: this.kafkaConfig.site, - }, - topic: this.mConfig.topic, - groupId: `${this.mConfig.groupIdPrefix}-${this._id}`, - concurrency: CONCURRENCY, - queueProcessor: this.processKafkaEntry.bind(this), - fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES, - }); - consumer.on('error', () => { - if (!consumerReady) { - this.logger.fatal('error starting metrics consumer'); - process.exit(1); - } - }); - consumer.on('ready', () => { - consumerReady = true; - consumer.subscribe(); - this._consumer = consumer; - this.logger.info('metrics processor is ready to consume entries'); - }); - } - - _getRedisKeys(extension) { - switch (extension) { - case crrExtension: return crrRedisKeys; - case ingestionExtension: return ingestionRedisKeys; - default: - throw errors.InternalError.customizeDescription( - `${extension} is not a valid extension`); - } - } - - _reportPending(site, redisKeys, ops, bytes) { - if (ops > 0) { - this._sendRequest('incrementKey', site, redisKeys, 'opsPending', - ops); - } - if (ops < 0) { - this._sendRequest('decrementKey', site, redisKeys, 'opsPending', - Math.abs(ops)); - } - if (bytes > 0) { - this._sendRequest('incrementKey', site, redisKeys, 'bytesPending', - bytes); - } - if (bytes < 0) { - this._sendRequest('decrementKey', site, redisKeys, 'bytesPending', - Math.abs(bytes)); - } - } - - _sendSiteLevelRequests(data) { - const { type, site, ops, bytes, extension } = data; - let redisKeys; - try { - redisKeys = this._getRedisKeys(extension); - } catch (err) { - return this.logger.error('error consuming metric entry', { - method: 'MetricsConsumer._sendSiteLevelRequests', - site, - type, - }); - } - if (type === 'completed') { - // Pending metrics - this._reportPending(site, redisKeys, -ops, -bytes); - // Other metrics - this._sendRequest('reportNewRequest', site, redisKeys, 'opsDone', - ops); - this._sendRequest('reportNewRequest', site, redisKeys, 'bytesDone', - bytes); - } else if (type === 'failed') { - // Pending metrics - this._reportPending(site, redisKeys, -ops, -bytes); - // Other metrics - this._sendRequest('reportNewRequest', site, redisKeys, 'opsFail', - ops); - this._sendRequest('reportNewRequest', site, redisKeys, 'bytesFail', - bytes); - } else if (type === 'queued') { - // Pending metrics - this._reportPending(site, redisKeys, ops, bytes); - // Other metrics - this._sendRequest('reportNewRequest', site, redisKeys, 'ops', ops); - this._sendRequest('reportNewRequest', site, redisKeys, 'bytes', - bytes); - } else if (type === 'pendingOnly') { - this._reportPending(site, redisKeys, ops, bytes); - } - return undefined; - } - - _sendObjectLevelRequests(data) { - const { type, site, bytes, extension, - bucketName, objectKey, versionId } = data; - const redisKeys = this._getRedisKeys(extension); - if (type === 'completed') { - const key = `${site}:${bucketName}:${objectKey}:` + - `${versionId}:${redisKeys.objectBytesDone}`; - this._sendObjectRequest(key, bytes); - } else if (type === 'queued') { - const key = `${site}:${bucketName}:${objectKey}:` + - `${versionId}:${redisKeys.objectBytes}`; - this._sendObjectRequest(key, bytes); - } - return undefined; - } - - processKafkaEntry(kafkaEntry, done) { - const log = this.logger.newRequestLogger(); - let data; - try { - data = JSON.parse(kafkaEntry.value); - } catch (err) { - log.error('error processing metrics entry', { - method: 'MetricsConsumer.processKafkaEntry', - error: err, - }); - log.end(); - return done(); - } - /* - data = { - timestamp: 1509416671977, - ops: 5, - bytes: 195, - extension: 'crr', - type: 'queued' - } - */ - // filter metric entries by service, i.e. 'crr', 'ingestion' - if (this._id !== data.extension) { - return done(); - } - const isValidType = MetricsConsumer.isValidMetricType(data.type); - if (!isValidType) { - log.error('unknown type field encountered in metrics consumer', { - method: 'MetricsConsumer.processKafkaEntry', - dataType: data.type, - data, - }); - log.end(); - return done(); - } - if (data.bucketName && data.objectKey && data.versionId) { - this._sendObjectLevelRequests(data); - } else { - this._sendSiteLevelRequests(data); - } - log.end(); - return done(); - } - - _sendRequest(action, site, redisKeys, keyType, value) { - if (redisKeys[keyType]) { - this._statsClient[action](`${site}:${redisKeys[keyType]}`, - value || 0); - } - } - - _sendObjectRequest(key, value) { - this._statsClient.reportNewRequest(key, value); - } - - close(cb) { - this._consumer.close(cb); - } -} - -module.exports = MetricsConsumer; diff --git a/lib/MetricsProducer.js b/lib/MetricsProducer.js deleted file mode 100644 index 0493234b1..000000000 --- a/lib/MetricsProducer.js +++ /dev/null @@ -1,77 +0,0 @@ -'use strict'; // eslint-disable-line strict - -const async = require('async'); -const { Logger } = require('werelogs'); - -const BackbeatProducer = require('./BackbeatProducer'); -const MetricsModel = require('./models/MetricsModel'); - -class MetricsProducer { - /** - * @constructor - * @param {Object} kafkaConfig - kafka connection config - * @param {Object} mConfig - metrics configurations - */ - constructor(kafkaConfig, mConfig) { - this._kafkaConfig = kafkaConfig; - this._topic = mConfig.topic; - - this._producer = null; - this._log = new Logger('MetricsProducer'); - } - - setupProducer(done) { - const producer = new BackbeatProducer({ - kafka: { hosts: this._kafkaConfig.hosts }, - maxRequestSize: this._kafkaConfig.maxRequestSize, - topic: this._topic, - }); - producer.once('error', done); - producer.once('ready', () => { - producer.removeAllListeners('error'); - producer.on('error', err => { - this._log.error('error from backbeat producer', - { error: err }); - }); - this._producer = producer; - done(); - }); - } - - getProducer() { - return this._producer; - } - - /** - * @param {Object} extMetrics - an object where keys are all sites for a - * given extension and values are the metrics for the site - * (i.e. { my-site: { ops: 1, bytes: 124 }, awsbackend: { ... } } ) - * @param {String} type - type of metric (queueud or processed) - * @param {String} ext - extension (i.e. 'crr') - * @param {function} cb - callback - * @return {undefined} - */ - publishMetrics(extMetrics, type, ext, cb) { - async.each(Object.keys(extMetrics), (siteName, done) => { - const { ops, bytes, bucketName, objectKey, versionId } = - extMetrics[siteName]; - const message = new MetricsModel(ops, bytes, ext, type, - siteName, bucketName, objectKey, versionId).serialize(); - this._producer.send([{ message }], err => { - if (err) { - // Using trace here because errors are already logged in - // BackbeatProducer. This is to log to see source of caller - this._log.trace(`error publishing ${type} metrics for` + - `extension metrics ${ext}`, { error: err }); - } - done(); - }); - }, cb); - } - - close(cb) { - this._producer.close(cb); - } -} - -module.exports = MetricsProducer; diff --git a/lib/queuePopulator/IngestionPopulator.js b/lib/queuePopulator/IngestionPopulator.js index c077ce0ef..48fcace94 100644 --- a/lib/queuePopulator/IngestionPopulator.js +++ b/lib/queuePopulator/IngestionPopulator.js @@ -9,8 +9,6 @@ const Logger = require('werelogs').Logger; const config = require('../Config'); const IngestionReader = require('./IngestionReader'); const BackbeatProducer = require('../BackbeatProducer'); -const MetricsConsumer = require('../MetricsConsumer'); -const MetricsProducer = require('../MetricsProducer'); const { metricsExtension } = require('../../extensions/ingestion/constants'); const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics'); const { @@ -79,11 +77,6 @@ class IngestionPopulator { // shared producer across readers this._producer = null; - // metrics clients - this._mProducer = null; - this._mConsumer = null; - this._redis = null; - // all ingestion readers (including paused ones) // i.e.: { zenko-bucket-name: IngestionReader() } this._ingestionSources = {}; @@ -152,17 +145,6 @@ class IngestionPopulator { }); } - _setupMetricsClients(cb) { - // Metrics Consumer - this._mConsumer = new MetricsConsumer(this.rConfig, this.mConfig, - this.kafkaConfig, metricsExtension); - this._mConsumer.start(); - - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); - this._mProducer.setupProducer(cb); - } - _setupProducer(cb) { if (this._producer) { return process.nextTick(cb); @@ -580,7 +562,6 @@ class IngestionPopulator { logger: this.log, extensions: [this._extension], producer: this._producer, - metricsProducer: this._mProducer, qpConfig: this.qpConfig, s3Config: this.s3Config, }); @@ -662,30 +643,6 @@ class IngestionPopulator { */ close(done) { async.series([ - next => { - if (this._mProducer) { - this.log.debug('closing metrics producer', { - method: 'IngestionPopulator.close', - }); - return this._mProducer.close(next); - } - this.log.debug('no metrics producer to close', { - method: 'IngestionPopulator.close', - }); - return next(); - }, - next => { - if (this._mConsumer) { - this.log.debug('closing metrics consumer', { - method: 'IngestionPopulator.close', - }); - return this._mConsumer.close(next); - } - this.log.debug('no metrics consumer to close', { - method: 'IngestionPopulator.close', - }); - return next(); - }, next => { if (this._producer) { this.log.debug('closing producer', { diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 7fdc07350..9d7e5ef3b 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -6,8 +6,6 @@ const { State: ZKState } = require('node-zookeeper-client'); const ProvisionDispatcher = require('../provisioning/ProvisionDispatcher'); const RaftLogReader = require('./RaftLogReader'); const BucketFileLogReader = require('./BucketFileLogReader'); -const MetricsProducer = require('../MetricsProducer'); -const MetricsConsumer = require('../MetricsConsumer'); const FailedCRRConsumer = require('../../extensions/replication/failedCRR/FailedCRRConsumer'); const MongoLogReader = require('./MongoLogReader'); @@ -171,9 +169,6 @@ class QueuePopulator { // list of updated log readers, if any this.logReadersUpdate = null; - // metrics clients - this._mProducer = null; - this._mConsumer = null; // bucket notification configuration manager this.bnConfigManager = null; this._loadedExtensions = []; @@ -189,15 +184,6 @@ class QueuePopulator { */ open(cb) { async.series([ - next => this._setupMetricsClients(err => { - if (err) { - this.log.error('error setting up metrics client', { - method: 'QueuePopulator.open', - error: err, - }); - } - return next(err); - }), next => this._setupFailedCRRClients(next), next => this._setupNotificationConfigManager(next), next => this._setupZookeeper(err => { @@ -235,17 +221,6 @@ class QueuePopulator { }); } - _setupMetricsClients(cb) { - // Metrics Consumer - this._mConsumer = new MetricsConsumer(this.rConfig, - this.mConfig, this.kafkaConfig, metricsExtension); - this._mConsumer.start(); - - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); - this._mProducer.setupProducer(cb); - } - /** * Set up and start the consumer for retrying failed CRR operations. * @param {Function} cb - The callback function @@ -273,30 +248,6 @@ class QueuePopulator { this._circuitBreaker.stop(); return next(); }, - next => { - if (this._mProducer) { - this.log.debug('closing metrics producer', { - method: 'QueuePopulator.close', - }); - return this._mProducer.close(next); - } - this.log.debug('no metrics producer to close', { - method: 'QueuePopulator.close', - }); - return next(); - }, - next => { - if (this._mConsumer) { - this.log.debug('closing metrics consumer', { - method: 'QueuePopulator.close', - }); - return this._mConsumer.close(next); - } - this.log.debug('no metrics consumer to close', { - method: 'QueuePopulator.close', - }); - return next(); - } ], cb); }