Skip to content

Commit

Permalink
try
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 6, 2024
1 parent 6694ca8 commit 26c9146
Show file tree
Hide file tree
Showing 13 changed files with 17 additions and 223 deletions.
64 changes: 1 addition & 63 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const QueueEntry = require('../../lib/models/QueueEntry');
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const MetricsProducer = require('../../lib/MetricsProducer');
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } =
require('../ingestion/constants');

const getContentType = require('./utils/contentTypeHelper');
const BucketMemState = require('./utils/BucketMemState');
const MongoProcessorMetrics = require('./MongoProcessorMetrics');

// batch metrics by location and send to kafka metrics topic every 5 seconds
const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000;

// TODO - ADD PREFIX BASED ON SOURCE
// april 6, 2018

Expand Down Expand Up @@ -79,16 +74,6 @@ class MongoQueueProcessor {
// in-mem batch of metrics, we only track total entry count by location
// this._accruedMetrics = { zenko-location: 10 }
this._accruedMetrics = {};

setInterval(() => {
this._sendMetrics();
}, METRIC_REPORT_INTERVAL_MS);
}

_setupMetricsClients(cb) {
// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig);
this._mProducer.setupProducer(cb);
}

/**
Expand Down Expand Up @@ -177,18 +162,6 @@ class MongoQueueProcessor {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'MongoQueueProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
}

Expand Down Expand Up @@ -408,7 +381,6 @@ class MongoQueueProcessor {
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
Expand Down Expand Up @@ -443,7 +415,6 @@ class MongoQueueProcessor {
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand All @@ -454,7 +425,6 @@ class MongoQueueProcessor {

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
log.end().debug('skipping duplicate entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand Down Expand Up @@ -498,7 +468,6 @@ class MongoQueueProcessor {
return this._mongoClient.putObject(bucket, key, objVal, params,
this.logger, err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error putting object metadata ' +
'to mongo', {
bucket,
Expand All @@ -519,23 +488,6 @@ class MongoQueueProcessor {
});
}

/**
* Send accrued metrics by location to kafka
* @return {undefined}
*/
_sendMetrics() {
Object.keys(this._accruedMetrics).forEach(loc => {
const count = this._accruedMetrics[loc];

// only report metrics if something has been recorded for location
if (count > 0) {
this._accruedMetrics[loc] = 0;
const metric = { [loc]: { ops: count } };
this._mProducer.publishMetrics(metric, metricsTypeCompleted,
metricsExtension, () => {});
}
});
}

/**
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS
Expand All @@ -550,19 +502,6 @@ class MongoQueueProcessor {
}
}

/**
* For cases where we experience an error or skip an entry, we need to
* normalize pending metric. This means we will see pending metrics stuck
* above 0 and will need to bring those metrics down
* @param {string} location - location constraint name
* @return {undefined}
*/
_normalizePendingMetric(location) {
const metric = { [location]: { ops: 1 } };
this._mProducer.publishMetrics(metric, metricsTypePendingOnly,
metricsExtension, () => {});
}

/**
* Get bucket info in memoize state if exists, otherwise fetch from Mongo
* @param {ObjectQueueEntry} sourceEntry - object metadata entry
Expand Down Expand Up @@ -639,7 +578,6 @@ class MongoQueueProcessor {
entryType: sourceEntry.constructor.name,
method: 'MongoQueueProcessor.processKafkaEntry',
});
this._normalizePendingMetric(location);
return process.nextTick(done);
});
}
Expand Down
10 changes: 0 additions & 10 deletions extensions/replication/tasks/MultipleBackendTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const ReplicateObject = require('./ReplicateObject');
const { attachReqUids } = require('../../../lib/clients/utils');
const getExtMetrics = require('../utils/getExtMetrics');
const { metricsExtension, metricsTypeQueued } = require('../constants');

const MPU_GCP_MAX_PARTS = 1024;

Expand Down Expand Up @@ -702,21 +700,13 @@ class MultipleBackendTask extends ReplicateObject {
if (err) {
return doneOnce(err);
}
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics, metricsTypeQueued,
metricsExtension, () => {});
return this._completeRangedMPU(sourceEntry,
uploadId, log, doneOnce);
});
}

_getAndPutObject(sourceEntry, log, cb) {
const partLogger = this.logger.newRequestLogger(log.getUids());
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics, metricsTypeQueued,
metricsExtension, () => {});

if (BACKBEAT_INJECT_REPLICATION_ERROR_COPYOBJ) {
if (Math.random() < BACKBEAT_INJECT_REPLICATION_ERROR_RATE) {
Expand Down
4 changes: 2 additions & 2 deletions lib/queuePopulator/BucketFileLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ const LogReader = require('./LogReader');
class BucketFileLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, dmdConfig, logger,
extensions, metricsProducer, metricsHandler } = params;
extensions, metricsHandler } = params;
super({ zkClient, kafkaConfig, logConsumer: null,
logId: `bucketFile_${dmdConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
metricsHandler });

this._dmdConfig = dmdConfig;
this._log = logger;
Expand Down
44 changes: 0 additions & 44 deletions lib/queuePopulator/IngestionPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ const Logger = require('werelogs').Logger;
const config = require('../Config');
const IngestionReader = require('./IngestionReader');
const BackbeatProducer = require('../BackbeatProducer');
const MetricsConsumer = require('../MetricsConsumer');
const MetricsProducer = require('../MetricsProducer');
const { metricsExtension } = require('../../extensions/ingestion/constants');
const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics');
const {
startCircuitBreakerMetricsExport,
Expand Down Expand Up @@ -79,11 +76,6 @@ class IngestionPopulator {
// shared producer across readers
this._producer = null;

// metrics clients
this._mProducer = null;
this._mConsumer = null;
this._redis = null;

// all ingestion readers (including paused ones)
// i.e.: { zenko-bucket-name: IngestionReader() }
this._ingestionSources = {};
Expand Down Expand Up @@ -152,17 +144,6 @@ class IngestionPopulator {
});
}

_setupMetricsClients(cb) {
// Metrics Consumer
this._mConsumer = new MetricsConsumer(this.rConfig, this.mConfig,
this.kafkaConfig, metricsExtension);
this._mConsumer.start();

// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig);
this._mProducer.setupProducer(cb);
}

_setupProducer(cb) {
if (this._producer) {
return process.nextTick(cb);
Expand Down Expand Up @@ -580,7 +561,6 @@ class IngestionPopulator {
logger: this.log,
extensions: [this._extension],
producer: this._producer,
metricsProducer: this._mProducer,
qpConfig: this.qpConfig,
s3Config: this.s3Config,
});
Expand Down Expand Up @@ -662,30 +642,6 @@ class IngestionPopulator {
*/
close(done) {
async.series([
next => {
if (this._mProducer) {
this.log.debug('closing metrics producer', {
method: 'IngestionPopulator.close',
});
return this._mProducer.close(next);
}
this.log.debug('no metrics producer to close', {
method: 'IngestionPopulator.close',
});
return next();
},
next => {
if (this._mConsumer) {
this.log.debug('closing metrics consumer', {
method: 'IngestionPopulator.close',
});
return this._mConsumer.close(next);
}
this.log.debug('no metrics consumer to close', {
method: 'IngestionPopulator.close',
});
return next();
},
next => {
if (this._producer) {
this.log.debug('closing producer', {
Expand Down
21 changes: 2 additions & 19 deletions lib/queuePopulator/IngestionReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ const VID_SEP = require('arsenal').versioning.VersioningConstants
const IngestionProducer = require('./IngestionProducer');
const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics');
const LogReader = require('./LogReader');
const {
metricsExtension,
metricsTypeQueued
} = require('../../extensions/ingestion/constants');
const { transformKey } = require('../util/entry');

function _isVersionedLogKey(key) {
Expand All @@ -19,9 +15,9 @@ function _isVersionedLogKey(key) {
class IngestionReader extends LogReader {
constructor(params) {
const { zkClient, ingestionConfig, kafkaConfig, bucketdConfig, qpConfig,
logger, extensions, producer, metricsProducer, s3Config } = params;
logger, extensions, producer, s3Config } = params;
super({ zkClient, kafkaConfig, logConsumer: {}, logId: '', logger,
extensions, metricsProducer, zkMetricsHandler: IngestionPopulatorMetrics });
extensions, zkMetricsHandler: IngestionPopulatorMetrics });
this._ingestionConfig = ingestionConfig;
this.qpConfig = qpConfig;
this.s3Config = s3Config;
Expand Down Expand Up @@ -423,7 +419,6 @@ class IngestionReader extends LogReader {
if (err) {
return done(err);
}
this._publishMetrics();
return done();
});
}
Expand All @@ -446,18 +441,6 @@ class IngestionReader extends LogReader {
], done);
}

_publishMetrics() {
// Ingestion extensions is a single IngestionQueuePopulatorExt
const extension = this._extensions[0];
const location = this.getLocationConstraint();
const metric = extension.getAndResetMetrics(this._targetZenkoBucket);
if (metric && metric.ops > 0) {
const value = { [location]: metric };
this._mProducer.publishMetrics(value, metricsTypeQueued,
metricsExtension, () => {});
}
}


/**
* Bucket configs have user editable fields: credentials, endpoint
Expand Down
6 changes: 2 additions & 4 deletions lib/queuePopulator/KafkaLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ class KafkaLogReader extends LogReader {
* @param {Object} params.qpKafkaConfig - queue populator kafka configuration
* @param {QueuePopulatorExtension[]} params.extensions - array of
* queue populator extension modules
* @param {MetricsProducer} params.metricsProducer - instance of metrics
* producer
* @param {MetricsHandler} params.metricsHandler - instance of metrics
* handler
*/
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, metricsHandler } = params;
// conf contains global kafka and queuePoplator kafka configs
const conf = {
hosts: kafkaConfig.hosts,
Expand All @@ -31,7 +29,7 @@ class KafkaLogReader extends LogReader {
const logConsumer = new LogConsumer(conf, logger);
super({ zkClient, kafkaConfig, zkConfig, logConsumer,
logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
metricsHandler });
this._kafkaConfig = conf;
}

Expand Down
19 changes: 0 additions & 19 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ const jsutil = require('arsenal').jsutil;

const config = require('../Config');
const BackbeatProducer = require('../BackbeatProducer');
const ReplicationQueuePopulator =
require('../../extensions/replication/ReplicationQueuePopulator');

const { metricsExtension, metricsTypeQueued } =
require('../../extensions/replication/constants');
const { transformKey } = require('../util/entry');

class LogReader {
Expand All @@ -28,8 +23,6 @@ class LogReader {
* @param {Logger} params.logger - logger object
* @param {QueuePopulatorExtension[]} params.extensions - array of
* queue populator extension modules
* @param {MetricsProducer} params.metricsProducer - instance of metrics
* producer
* @param {MetricsHandler} params.metricsHandler - instance of metrics
* handler
* @param {ZkMetricsHandler} params.zkMetricsHandler - instance of zookeeper
Expand All @@ -45,7 +38,6 @@ class LogReader {
this.log = params.logger;
this._producers = {};
this._extensions = params.extensions;
this._mProducer = params.metricsProducer;

// internal variable to carry over a tailable cursor across batches
this._openLog = null;
Expand Down Expand Up @@ -675,17 +667,6 @@ class LogReader {
if (err) {
return done(err);
}
// Find the CRR Class extension
const crrExtension = this._extensions.find(ext => (
ext instanceof ReplicationQueuePopulator
));
if (crrExtension) {
const extMetrics = crrExtension.getAndResetMetrics();
if (Object.keys(extMetrics).length > 0) {
this._mProducer.publishMetrics(extMetrics,
metricsTypeQueued, metricsExtension, () => { });
}
}
return done();
});
}
Expand Down
4 changes: 2 additions & 2 deletions lib/queuePopulator/MongoLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ const LogReader = require('./LogReader');
class MongoLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, mongoConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, metricsHandler } = params;
logger.info('initializing mongo log reader',
{ method: 'MongoLogReader.constructor',
mongoConfig });
const logConsumer = new LogConsumer(mongoConfig, logger);
super({ zkClient, kafkaConfig, zkConfig, logConsumer,
logId: `mongo_${mongoConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
metricsHandler });
this._mongoConfig = mongoConfig;
}

Expand Down
Loading

0 comments on commit 26c9146

Please sign in to comment.