diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 6811079f4..9cc46e916 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -15,16 +15,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer'); const QueueEntry = require('../../lib/models/QueueEntry'); const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry'); const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry'); -const MetricsProducer = require('../../lib/MetricsProducer'); -const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } = - require('../ingestion/constants'); + const getContentType = require('./utils/contentTypeHelper'); const BucketMemState = require('./utils/BucketMemState'); const MongoProcessorMetrics = require('./MongoProcessorMetrics'); -// batch metrics by location and send to kafka metrics topic every 5 seconds -const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000; - // TODO - ADD PREFIX BASED ON SOURCE // april 6, 2018 @@ -79,16 +74,6 @@ class MongoQueueProcessor { // in-mem batch of metrics, we only track total entry count by location // this._accruedMetrics = { zenko-location: 10 } this._accruedMetrics = {}; - - setInterval(() => { - this._sendMetrics(); - }, METRIC_REPORT_INTERVAL_MS); - } - - _setupMetricsClients(cb) { - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig); - this._mProducer.setupProducer(cb); } /** @@ -177,18 +162,6 @@ class MongoQueueProcessor { }); return next(); }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'MongoQueueProcessor.stop', - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'MongoQueueProcessor.stop', - }); - return next(); - }, ], done); } @@ -408,7 +381,6 @@ class MongoQueueProcessor { return this._mongoClient.deleteObject(bucket, key, options, log, err => { if (err) { - this._normalizePendingMetric(location); log.end().error('error deleting object metadata ' + 'from mongo', { bucket, @@ -443,7 +415,6 @@ class MongoQueueProcessor { this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { if (err) { - this._normalizePendingMetric(location); log.end().error('error processing object queue entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', entry: sourceEntry.getLogInfo(), @@ -454,7 +425,6 @@ class MongoQueueProcessor { const content = getContentType(sourceEntry, zenkoObjMd); if (content.length === 0) { - this._normalizePendingMetric(location); log.end().debug('skipping duplicate entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', entry: sourceEntry.getLogInfo(), @@ -498,7 +468,6 @@ class MongoQueueProcessor { return this._mongoClient.putObject(bucket, key, objVal, params, this.logger, err => { if (err) { - this._normalizePendingMetric(location); log.end().error('error putting object metadata ' + 'to mongo', { bucket, @@ -519,23 +488,6 @@ class MongoQueueProcessor { }); } - /** - * Send accrued metrics by location to kafka - * @return {undefined} - */ - _sendMetrics() { - Object.keys(this._accruedMetrics).forEach(loc => { - const count = this._accruedMetrics[loc]; - - // only report metrics if something has been recorded for location - if (count > 0) { - this._accruedMetrics[loc] = 0; - const metric = { [loc]: { ops: count } }; - this._mProducer.publishMetrics(metric, metricsTypeCompleted, - metricsExtension, () => {}); - } - }); - } /** * Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS @@ -550,19 +502,6 @@ class MongoQueueProcessor { } } - /** - * For cases where we experience an error or skip an entry, we need to - * normalize pending metric. This means we will see pending metrics stuck - * above 0 and will need to bring those metrics down - * @param {string} location - location constraint name - * @return {undefined} - */ - _normalizePendingMetric(location) { - const metric = { [location]: { ops: 1 } }; - this._mProducer.publishMetrics(metric, metricsTypePendingOnly, - metricsExtension, () => {}); - } - /** * Get bucket info in memoize state if exists, otherwise fetch from Mongo * @param {ObjectQueueEntry} sourceEntry - object metadata entry @@ -639,7 +578,6 @@ class MongoQueueProcessor { entryType: sourceEntry.constructor.name, method: 'MongoQueueProcessor.processKafkaEntry', }); - this._normalizePendingMetric(location); return process.nextTick(done); }); } diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 7d59cb282..c7317ee92 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -28,7 +28,6 @@ const EchoBucket = require('../tasks/EchoBucket'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry'); const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); -const MetricsProducer = require('../../../lib/MetricsProducer'); const libConstants = require('../../../lib/constants'); const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); @@ -222,7 +221,6 @@ class QueueProcessor extends EventEmitter { this.replicationStatusProducer = null; this._consumer = null; this._dataMoverConsumer = null; - this._mProducer = null; this.site = site; this.mConfig = mConfig; this.serviceName = this.isReplayTopic ? @@ -695,7 +693,6 @@ class QueueProcessor extends EventEmitter { vaultclientCache: this.vaultclientCache, accountCredsCache: this.accountCredsCache, replicationStatusProducer: this.replicationStatusProducer, - mProducer: this._mProducer, logger: this.logger, site: this.site, consumer: this._consumer, @@ -722,16 +719,7 @@ class QueueProcessor extends EventEmitter { * @return {undefined} */ start(options) { - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); return async.parallel([ - done => this._mProducer.setupProducer(err => { - if (err) { - this.logger.info('error setting up metrics producer', - { error: err.message }); - process.exit(1); - } - return done(); - }), done => this._setupProducer(err => { if (err) { this.logger.info('error setting up kafka producer', @@ -842,20 +830,6 @@ class QueueProcessor extends EventEmitter { }); return next(); }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'QueueProcessor.stop', - site: this.site, - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'QueueProcessor.stop', - site: this.site, - }); - return next(); - }, ], done); } diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index 4c366473c..06f22fe06 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -16,7 +16,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const FailedCRRProducer = require('../failedCRR/FailedCRRProducer'); const ReplayProducer = require('../replay/ReplayProducer'); -const MetricsProducer = require('../../../lib/MetricsProducer'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); // StatsClient constant default for site metrics @@ -216,7 +215,6 @@ class ReplicationStatusProcessor { this.gcConfig = gcConfig; this._consumer = null; this._gcProducer = null; - this._mProducer = null; this.logger = new Logger('Backbeat:Replication:ReplicationStatusProcessor'); @@ -336,7 +334,6 @@ class ReplicationStatusProcessor { sourceHTTPAgent: this.sourceHTTPAgent, vaultclientCache: this.vaultclientCache, gcProducer: this._gcProducer, - mProducer: this._mProducer, statsClient: this._statsClient, failedCRRProducer: this._failedCRRProducer, replayProducers: this._ReplayProducers, @@ -379,11 +376,6 @@ class ReplicationStatusProcessor { done(); } }, - done => { - this._mProducer = new MetricsProducer(this.kafkaConfig, - this.mConfig); - this._mProducer.setupProducer(done); - }, done => { let consumerReady = false; this._consumer = new BackbeatConsumer({ @@ -434,18 +426,6 @@ class ReplicationStatusProcessor { method: 'ReplicationStatusProcessor.stop', }); return next(); - }, - next => { - if (this._mProducer) { - this.logger.debug('closing metrics producer', { - method: 'ReplicationStatusProcessor.stop', - }); - return this._mProducer.close(next); - } - this.logger.debug('no metrics producer to close', { - method: 'ReplicationStatusProcessor.stop', - }); - return next(); } ], done); } diff --git a/extensions/replication/tasks/CopyLocationTask.js b/extensions/replication/tasks/CopyLocationTask.js index 9d197f5f5..d17056c55 100644 --- a/extensions/replication/tasks/CopyLocationTask.js +++ b/extensions/replication/tasks/CopyLocationTask.js @@ -8,15 +8,12 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); const BackbeatClient = require('../../../lib/clients/BackbeatClient'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); const { LifecycleMetrics } = require('../../lifecycle/LifecycleMetrics'); -const ReplicationMetric = require('../ReplicationMetric'); const ReplicationMetrics = require('../ReplicationMetrics'); const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils'); const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials'); const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); -const { metricsExtension, metricsTypeQueued, metricsTypeCompleted } = - require('../constants'); const MPU_GCP_MAX_PARTS = 1024; @@ -41,10 +38,6 @@ class CopyLocationTask extends BackbeatTask { this.retryParams = retryParams; } } - this._replicationMetric = new ReplicationMetric() - .withProducer(this.mProducer.getProducer()) - .withSite(this.site) - .withExtension(metricsExtension); } _validateActionCredentials(actionEntry) { @@ -195,11 +188,6 @@ class CopyLocationTask extends BackbeatTask { _getAndPutObject(actionEntry, objMD, log, cb) { const objectLogger = this.logger.newRequestLogger(log.getUids()); - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeQueued) - .withObjectSize(objMD.getContentLength()) - .publish(); this.retry({ actionDesc: 'stream object data', logFields: { entry: actionEntry.getLogInfo() }, @@ -335,11 +323,6 @@ class CopyLocationTask extends BackbeatTask { actionEntry.setSuccess({ location: data.location, }); - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeCompleted) - .withObjectSize(size) - .publish(); return cb(null, data); }); } @@ -591,11 +574,6 @@ class CopyLocationTask extends BackbeatTask { } return doneOnce(err); } - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeCompleted) - .withObjectSize(size) - .publish(); return doneOnce(null, data); }); } @@ -786,11 +764,6 @@ class CopyLocationTask extends BackbeatTask { if (err) { return cb(err); } - this._replicationMetric - .withEntry(actionEntry) - .withMetricType(metricsTypeQueued) - .withObjectSize(objMD.getContentLength()) - .publish(); return this._completeRangedMPU(actionEntry, objMD, uploadId, log, cb); }); diff --git a/extensions/replication/tasks/MultipleBackendTask.js b/extensions/replication/tasks/MultipleBackendTask.js index 2d98aa2e2..0f45df2c3 100644 --- a/extensions/replication/tasks/MultipleBackendTask.js +++ b/extensions/replication/tasks/MultipleBackendTask.js @@ -8,8 +8,6 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const ReplicateObject = require('./ReplicateObject'); const { attachReqUids } = require('../../../lib/clients/utils'); -const getExtMetrics = require('../utils/getExtMetrics'); -const { metricsExtension, metricsTypeQueued } = require('../constants'); const MPU_GCP_MAX_PARTS = 1024; @@ -702,10 +700,6 @@ class MultipleBackendTask extends ReplicateObject { if (err) { return doneOnce(err); } - const extMetrics = getExtMetrics(this.site, - sourceEntry.getContentLength(), sourceEntry); - this.mProducer.publishMetrics(extMetrics, metricsTypeQueued, - metricsExtension, () => {}); return this._completeRangedMPU(sourceEntry, uploadId, log, doneOnce); }); @@ -713,10 +707,6 @@ class MultipleBackendTask extends ReplicateObject { _getAndPutObject(sourceEntry, log, cb) { const partLogger = this.logger.newRequestLogger(log.getUids()); - const extMetrics = getExtMetrics(this.site, - sourceEntry.getContentLength(), sourceEntry); - this.mProducer.publishMetrics(extMetrics, metricsTypeQueued, - metricsExtension, () => {}); if (BACKBEAT_INJECT_REPLICATION_ERROR_COPYOBJ) { if (Math.random() < BACKBEAT_INJECT_REPLICATION_ERROR_RATE) { diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index d0b926e1c..bb3edb7f9 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -10,11 +10,10 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); const mapLimitWaitPendingIfError = require('../../../lib/util/mapLimitWaitPendingIfError'); const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils'); -const getExtMetrics = require('../utils/getExtMetrics'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials'); const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); -const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants'); +const { replicationStages } = require('../constants'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); @@ -434,9 +433,6 @@ class ReplicateObject extends BackbeatTask { location: this.site, replicationContent: 'data', }); - const extMetrics = getExtMetrics(this.site, size, sourceEntry); - this.mProducer.publishMetrics(extMetrics, - metricsTypeCompleted, metricsExtension, () => {}); } _publishMetadataWriteMetrics(buffer, writeStartTime) { @@ -758,10 +754,6 @@ class ReplicateObject extends BackbeatTask { // Get data from source bucket and put it on the target bucket next => { if (!mdOnly) { - const extMetrics = getExtMetrics(this.site, - sourceEntry.getContentLength(), sourceEntry); - this.mProducer.publishMetrics(extMetrics, - metricsTypeQueued, metricsExtension, () => {}); return this._getAndPutData(sourceEntry, destEntry, log, next); } diff --git a/extensions/replication/tasks/UpdateReplicationStatus.js b/extensions/replication/tasks/UpdateReplicationStatus.js index cc326226c..517735d33 100644 --- a/extensions/replication/tasks/UpdateReplicationStatus.js +++ b/extensions/replication/tasks/UpdateReplicationStatus.js @@ -8,11 +8,6 @@ const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy'); -const { - metricsExtension, - metricsTypeCompleted, - metricsTypeFailed, -} = require('../constants'); const { getSortedSetMember, getSortedSetKey, @@ -106,39 +101,6 @@ class UpdateReplicationStatus extends BackbeatTask { this.failedCRRProducer.publishFailedCRREntry(JSON.stringify(message)); } - /** - * Report CRR metrics - * @param {ObjectQueueEntry} sourceEntry - The original entry - * @param {ObjectQueueEntry} updatedSourceEntry - updated object entry - * @return {undefined} - */ - _reportMetrics(sourceEntry, updatedSourceEntry) { - const content = updatedSourceEntry.getReplicationContent(); - const contentLength = updatedSourceEntry.getContentLength(); - const bytes = content.includes('DATA') ? contentLength : 0; - const data = {}; - const site = sourceEntry.getSite(); - data[site] = { ops: 1, bytes }; - const status = sourceEntry.getReplicationSiteStatus(site); - // Report to MetricsProducer with completed/failed metrics. - if (status === 'COMPLETED' || status === 'FAILED') { - const entryType = status === 'COMPLETED' ? - metricsTypeCompleted : metricsTypeFailed; - - this.mProducer.publishMetrics(data, entryType, metricsExtension, - err => { - if (err) { - this.logger.trace('error occurred in publishing metrics', { - error: err, - method: 'UpdateReplicationStatus._reportMetrics', - }); - } - }); - // TODO: update ZenkoMetrics - } - return undefined; - } - /** * Get the appropriate source metadata for a non-versioned bucket. If the * object metadata has changed since we performed CRR, then we want to @@ -444,8 +406,6 @@ class UpdateReplicationStatus extends BackbeatTask { if (err) { return done(err); } - - this._reportMetrics(sourceEntry, updatedSourceEntry); return this._handleGarbageCollection( updatedSourceEntry, log, done); }); diff --git a/lib/queuePopulator/BucketFileLogReader.js b/lib/queuePopulator/BucketFileLogReader.js index dcf5b7dad..34e1b4132 100644 --- a/lib/queuePopulator/BucketFileLogReader.js +++ b/lib/queuePopulator/BucketFileLogReader.js @@ -6,10 +6,10 @@ const LogReader = require('./LogReader'); class BucketFileLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, dmdConfig, logger, - extensions, metricsProducer, metricsHandler } = params; + extensions, metricsHandler } = params; super({ zkClient, kafkaConfig, logConsumer: null, logId: `bucketFile_${dmdConfig.logName}`, logger, extensions, - metricsProducer, metricsHandler }); + metricsHandler }); this._dmdConfig = dmdConfig; this._log = logger; diff --git a/lib/queuePopulator/IngestionPopulator.js b/lib/queuePopulator/IngestionPopulator.js index c077ce0ef..99cedc76a 100644 --- a/lib/queuePopulator/IngestionPopulator.js +++ b/lib/queuePopulator/IngestionPopulator.js @@ -9,9 +9,6 @@ const Logger = require('werelogs').Logger; const config = require('../Config'); const IngestionReader = require('./IngestionReader'); const BackbeatProducer = require('../BackbeatProducer'); -const MetricsConsumer = require('../MetricsConsumer'); -const MetricsProducer = require('../MetricsProducer'); -const { metricsExtension } = require('../../extensions/ingestion/constants'); const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics'); const { startCircuitBreakerMetricsExport, @@ -79,11 +76,6 @@ class IngestionPopulator { // shared producer across readers this._producer = null; - // metrics clients - this._mProducer = null; - this._mConsumer = null; - this._redis = null; - // all ingestion readers (including paused ones) // i.e.: { zenko-bucket-name: IngestionReader() } this._ingestionSources = {}; @@ -152,17 +144,6 @@ class IngestionPopulator { }); } - _setupMetricsClients(cb) { - // Metrics Consumer - this._mConsumer = new MetricsConsumer(this.rConfig, this.mConfig, - this.kafkaConfig, metricsExtension); - this._mConsumer.start(); - - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); - this._mProducer.setupProducer(cb); - } - _setupProducer(cb) { if (this._producer) { return process.nextTick(cb); @@ -580,7 +561,6 @@ class IngestionPopulator { logger: this.log, extensions: [this._extension], producer: this._producer, - metricsProducer: this._mProducer, qpConfig: this.qpConfig, s3Config: this.s3Config, }); @@ -662,30 +642,6 @@ class IngestionPopulator { */ close(done) { async.series([ - next => { - if (this._mProducer) { - this.log.debug('closing metrics producer', { - method: 'IngestionPopulator.close', - }); - return this._mProducer.close(next); - } - this.log.debug('no metrics producer to close', { - method: 'IngestionPopulator.close', - }); - return next(); - }, - next => { - if (this._mConsumer) { - this.log.debug('closing metrics consumer', { - method: 'IngestionPopulator.close', - }); - return this._mConsumer.close(next); - } - this.log.debug('no metrics consumer to close', { - method: 'IngestionPopulator.close', - }); - return next(); - }, next => { if (this._producer) { this.log.debug('closing producer', { diff --git a/lib/queuePopulator/IngestionReader.js b/lib/queuePopulator/IngestionReader.js index 14f8360c3..8a40e65c5 100644 --- a/lib/queuePopulator/IngestionReader.js +++ b/lib/queuePopulator/IngestionReader.js @@ -6,10 +6,6 @@ const VID_SEP = require('arsenal').versioning.VersioningConstants const IngestionProducer = require('./IngestionProducer'); const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics'); const LogReader = require('./LogReader'); -const { - metricsExtension, - metricsTypeQueued -} = require('../../extensions/ingestion/constants'); const { transformKey } = require('../util/entry'); function _isVersionedLogKey(key) { @@ -19,9 +15,9 @@ function _isVersionedLogKey(key) { class IngestionReader extends LogReader { constructor(params) { const { zkClient, ingestionConfig, kafkaConfig, bucketdConfig, qpConfig, - logger, extensions, producer, metricsProducer, s3Config } = params; + logger, extensions, producer, s3Config } = params; super({ zkClient, kafkaConfig, logConsumer: {}, logId: '', logger, - extensions, metricsProducer, zkMetricsHandler: IngestionPopulatorMetrics }); + extensions, zkMetricsHandler: IngestionPopulatorMetrics }); this._ingestionConfig = ingestionConfig; this.qpConfig = qpConfig; this.s3Config = s3Config; @@ -423,7 +419,6 @@ class IngestionReader extends LogReader { if (err) { return done(err); } - this._publishMetrics(); return done(); }); } @@ -446,18 +441,6 @@ class IngestionReader extends LogReader { ], done); } - _publishMetrics() { - // Ingestion extensions is a single IngestionQueuePopulatorExt - const extension = this._extensions[0]; - const location = this.getLocationConstraint(); - const metric = extension.getAndResetMetrics(this._targetZenkoBucket); - if (metric && metric.ops > 0) { - const value = { [location]: metric }; - this._mProducer.publishMetrics(value, metricsTypeQueued, - metricsExtension, () => {}); - } - } - /** * Bucket configs have user editable fields: credentials, endpoint diff --git a/lib/queuePopulator/KafkaLogReader.js b/lib/queuePopulator/KafkaLogReader.js index a5be6cae4..18a3631ce 100644 --- a/lib/queuePopulator/KafkaLogReader.js +++ b/lib/queuePopulator/KafkaLogReader.js @@ -12,14 +12,12 @@ class KafkaLogReader extends LogReader { * @param {Object} params.qpKafkaConfig - queue populator kafka configuration * @param {QueuePopulatorExtension[]} params.extensions - array of * queue populator extension modules - * @param {MetricsProducer} params.metricsProducer - instance of metrics - * producer * @param {MetricsHandler} params.metricsHandler - instance of metrics * handler */ constructor(params) { const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, metricsHandler } = params; // conf contains global kafka and queuePoplator kafka configs const conf = { hosts: kafkaConfig.hosts, @@ -31,7 +29,7 @@ class KafkaLogReader extends LogReader { const logConsumer = new LogConsumer(conf, logger); super({ zkClient, kafkaConfig, zkConfig, logConsumer, logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions, - metricsProducer, metricsHandler }); + metricsHandler }); this._kafkaConfig = conf; } diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index b941582f8..4ab422929 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -4,11 +4,6 @@ const jsutil = require('arsenal').jsutil; const config = require('../Config'); const BackbeatProducer = require('../BackbeatProducer'); -const ReplicationQueuePopulator = - require('../../extensions/replication/ReplicationQueuePopulator'); - -const { metricsExtension, metricsTypeQueued } = - require('../../extensions/replication/constants'); const { transformKey } = require('../util/entry'); class LogReader { @@ -28,8 +23,6 @@ class LogReader { * @param {Logger} params.logger - logger object * @param {QueuePopulatorExtension[]} params.extensions - array of * queue populator extension modules - * @param {MetricsProducer} params.metricsProducer - instance of metrics - * producer * @param {MetricsHandler} params.metricsHandler - instance of metrics * handler * @param {ZkMetricsHandler} params.zkMetricsHandler - instance of zookeeper @@ -45,7 +38,6 @@ class LogReader { this.log = params.logger; this._producers = {}; this._extensions = params.extensions; - this._mProducer = params.metricsProducer; // internal variable to carry over a tailable cursor across batches this._openLog = null; @@ -675,17 +667,6 @@ class LogReader { if (err) { return done(err); } - // Find the CRR Class extension - const crrExtension = this._extensions.find(ext => ( - ext instanceof ReplicationQueuePopulator - )); - if (crrExtension) { - const extMetrics = crrExtension.getAndResetMetrics(); - if (Object.keys(extMetrics).length > 0) { - this._mProducer.publishMetrics(extMetrics, - metricsTypeQueued, metricsExtension, () => { }); - } - } return done(); }); } diff --git a/lib/queuePopulator/MongoLogReader.js b/lib/queuePopulator/MongoLogReader.js index ac9a93568..95081ed25 100644 --- a/lib/queuePopulator/MongoLogReader.js +++ b/lib/queuePopulator/MongoLogReader.js @@ -5,14 +5,14 @@ const LogReader = require('./LogReader'); class MongoLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, zkConfig, mongoConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, metricsHandler } = params; logger.info('initializing mongo log reader', { method: 'MongoLogReader.constructor', mongoConfig }); const logConsumer = new LogConsumer(mongoConfig, logger); super({ zkClient, kafkaConfig, zkConfig, logConsumer, logId: `mongo_${mongoConfig.logName}`, logger, extensions, - metricsProducer, metricsHandler }); + metricsHandler }); this._mongoConfig = mongoConfig; } diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 7fdc07350..ac962864b 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -6,13 +6,10 @@ const { State: ZKState } = require('node-zookeeper-client'); const ProvisionDispatcher = require('../provisioning/ProvisionDispatcher'); const RaftLogReader = require('./RaftLogReader'); const BucketFileLogReader = require('./BucketFileLogReader'); -const MetricsProducer = require('../MetricsProducer'); -const MetricsConsumer = require('../MetricsConsumer'); const FailedCRRConsumer = require('../../extensions/replication/failedCRR/FailedCRRConsumer'); const MongoLogReader = require('./MongoLogReader'); const KafkaLogReader = require('./KafkaLogReader'); -const { metricsExtension } = require('../../extensions/replication/constants'); const NotificationConfigManager = require('../../extensions/notification/NotificationConfigManager'); const { ZenkoMetrics } = require('arsenal').metrics; @@ -171,9 +168,6 @@ class QueuePopulator { // list of updated log readers, if any this.logReadersUpdate = null; - // metrics clients - this._mProducer = null; - this._mConsumer = null; // bucket notification configuration manager this.bnConfigManager = null; this._loadedExtensions = []; @@ -189,15 +183,6 @@ class QueuePopulator { */ open(cb) { async.series([ - next => this._setupMetricsClients(err => { - if (err) { - this.log.error('error setting up metrics client', { - method: 'QueuePopulator.open', - error: err, - }); - } - return next(err); - }), next => this._setupFailedCRRClients(next), next => this._setupNotificationConfigManager(next), next => this._setupZookeeper(err => { @@ -235,17 +220,6 @@ class QueuePopulator { }); } - _setupMetricsClients(cb) { - // Metrics Consumer - this._mConsumer = new MetricsConsumer(this.rConfig, - this.mConfig, this.kafkaConfig, metricsExtension); - this._mConsumer.start(); - - // Metrics Producer - this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig); - this._mProducer.setupProducer(cb); - } - /** * Set up and start the consumer for retrying failed CRR operations. * @param {Function} cb - The callback function @@ -273,30 +247,6 @@ class QueuePopulator { this._circuitBreaker.stop(); return next(); }, - next => { - if (this._mProducer) { - this.log.debug('closing metrics producer', { - method: 'QueuePopulator.close', - }); - return this._mProducer.close(next); - } - this.log.debug('no metrics producer to close', { - method: 'QueuePopulator.close', - }); - return next(); - }, - next => { - if (this._mConsumer) { - this.log.debug('closing metrics consumer', { - method: 'QueuePopulator.close', - }); - return this._mConsumer.close(next); - } - this.log.debug('no metrics consumer to close', { - method: 'QueuePopulator.close', - }); - return next(); - } ], cb); } @@ -317,7 +267,6 @@ class QueuePopulator { mongoConfig: this.qpConfig.mongo, logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, }), ]; @@ -334,7 +283,6 @@ class QueuePopulator { ), logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, }), ]; @@ -347,7 +295,6 @@ class QueuePopulator { dmdConfig: this.qpConfig.dmd, logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, }), ]; @@ -385,7 +332,6 @@ class QueuePopulator { raftId: token, logger: this.log, extensions: this._extensions, - metricsProducer: this._mProducer, metricsHandler, })); return undefined; diff --git a/lib/queuePopulator/RaftLogReader.js b/lib/queuePopulator/RaftLogReader.js index 24061e037..b96ec0105 100644 --- a/lib/queuePopulator/RaftLogReader.js +++ b/lib/queuePopulator/RaftLogReader.js @@ -7,7 +7,7 @@ const LogReader = require('./LogReader'); class RaftLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, bucketdConfig, httpsConfig, - raftId, logger, extensions, metricsProducer, metricsHandler } = params; + raftId, logger, extensions, metricsHandler } = params; const { host, port } = bucketdConfig; logger.info('initializing raft log reader', { method: 'RaftLogReader.constructor', @@ -25,7 +25,7 @@ class RaftLogReader extends LogReader { raftSession: raftId, logger }); super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`, - logger, extensions, metricsProducer, metricsHandler }); + logger, extensions, metricsHandler }); this.raftId = raftId; } diff --git a/package.json b/package.json index f00756bba..23a7960fe 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "oplog_populator": "node extensions/oplogPopulator/OplogPopulatorTask.js", "mongo_queue_processor": "node extensions/mongoProcessor/mongoProcessorTask.js", "garbage_collector": "node extensions/gc/service.js", - "test": "mocha --recursive tests/unit --timeout 30000", + "test": "mocha --recursive tests/unit --timeout 60000", "cover:test": "nyc --clean --silent yarn run test && nyc report --report-dir ./coverage/test --reporter=lcov", "ft_test": "mocha --recursive $(find tests/functional -name '*.js') --timeout 30000", "ft_test:notification": "mocha --recursive $(find tests/functional/notification -name '*.js') --timeout 30000", diff --git a/tests/functional/ingestion/IngestionReader.js b/tests/functional/ingestion/IngestionReader.js index 892b1b0e6..67624d610 100644 --- a/tests/functional/ingestion/IngestionReader.js +++ b/tests/functional/ingestion/IngestionReader.js @@ -238,7 +238,6 @@ describe('ingestion reader tests with mock', function fD() { qpConfig: testConfig.queuePopulator, logger: dummyLogger, extensions: [ingestionQP], - metricsProducer: { publishMetrics: () => { } }, s3Config: testConfig.s3, producer, }); @@ -404,7 +403,6 @@ describe('ingestion reader tests with mock', function fD() { qpConfig: testConfig.queuePopulator, logger: dummyLogger, extensions: [ingestionQP], - metricsProducer: { publishMetrics: () => { } }, s3Config: testConfig.s3, producer, }); diff --git a/tests/functional/replication/queueProcessor.js b/tests/functional/replication/queueProcessor.js index 8156d874e..1f24bf242 100644 --- a/tests/functional/replication/queueProcessor.js +++ b/tests/functional/replication/queueProcessor.js @@ -962,7 +962,7 @@ describe('queue processor functional tests with mocking', () => { }), ], done); }); - + // eslint-disable-next-line mocha/no-exclusive-tests it('should retry with full replication if metadata-only returns ' + 'ObjNotFound', done => { s3mock.setParam('nbParts', 2); @@ -977,10 +977,14 @@ describe('queue processor functional tests with mocking', () => { }, done => queueProcessorSF.processReplicationEntry( s3mock.getParam('kafkaEntry'), err => { + /* eslint-disable no-console */ + console.log('err', err); assert.ifError(err); + console.log(s3mock.hasPutTargetData, s3mock.hasPutTargetMd); assert.strictEqual(s3mock.hasPutTargetData, true); assert(s3mock.hasPutTargetMd); done(); + /* eslint-disable no-console */ }), ], done); });