Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 7, 2024
1 parent 6694ca8 commit 43aac71
Show file tree
Hide file tree
Showing 21 changed files with 41 additions and 760 deletions.
136 changes: 18 additions & 118 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const QueueEntry = require('../../lib/models/QueueEntry');
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const MetricsProducer = require('../../lib/MetricsProducer');
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } =
require('../ingestion/constants');

const getContentType = require('./utils/contentTypeHelper');
const BucketMemState = require('./utils/BucketMemState');
const MongoProcessorMetrics = require('./MongoProcessorMetrics');

// batch metrics by location and send to kafka metrics topic every 5 seconds
const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000;

// TODO - ADD PREFIX BASED ON SOURCE
// april 6, 2018

Expand Down Expand Up @@ -76,19 +71,6 @@ class MongoQueueProcessor {
this._mongoClient = new MongoClient(this.mongoClientConfig);
this._bucketMemState = new BucketMemState(Config);

// in-mem batch of metrics, we only track total entry count by location
// this._accruedMetrics = { zenko-location: 10 }
this._accruedMetrics = {};

setInterval(() => {
this._sendMetrics();
}, METRIC_REPORT_INTERVAL_MS);
}

_setupMetricsClients(cb) {
// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig);
this._mProducer.setupProducer(cb);
}

/**
Expand All @@ -97,32 +79,15 @@ class MongoQueueProcessor {
* @return {undefined}
*/
start() {
this.logger.info('starting mongo queue processor');
async.series([
next => this._setupMetricsClients(err => {
if (err) {
this.logger.error('error setting up metrics client', {
method: 'MongoQueueProcessor.start',
error: err,
});
}
return next(err);
}),
next => this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
}
return next(err);
}),
], error => {
if (error) {
this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
this.logger.fatal('error starting mongo queue processor');
process.exit(1);
}

this._bootstrapList = Config.getBootstrapList();
Config.on('bootstrap-list-update', () => {
this._bootstrapList = Config.getBootstrapList();
Expand Down Expand Up @@ -164,32 +129,17 @@ class MongoQueueProcessor {
* @return {undefined}
*/
stop(done) {
async.parallel([
next => {
if (this._consumer) {
this.logger.debug('closing kafka consumer', {
method: 'MongoQueueProcessor.stop',
});
return this._consumer.close(next);
}
this.logger.debug('no kafka consumer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'MongoQueueProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
if (this._consumer) {
this.logger.debug('closing kafka consumer', {
method: 'MongoQueueProcessor.stop',
});
this._consumer.close(done);
} else {
this.logger.debug('no kafka consumer to close', {
method: 'MongoQueueProcessor.stop',
});
done();
}
}

_getZenkoObjectMetadata(log, entry, bucketInfo, done) {
Expand Down Expand Up @@ -408,7 +358,6 @@ class MongoQueueProcessor {
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
Expand All @@ -418,7 +367,6 @@ class MongoQueueProcessor {
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata deleted from mongo', {
entry: sourceEntry.getLogInfo(),
location,
Expand All @@ -443,7 +391,6 @@ class MongoQueueProcessor {
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand All @@ -454,7 +401,6 @@ class MongoQueueProcessor {

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
log.end().debug('skipping duplicate entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand Down Expand Up @@ -498,7 +444,6 @@ class MongoQueueProcessor {
return this._mongoClient.putObject(bucket, key, objVal, params,
this.logger, err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error putting object metadata ' +
'to mongo', {
bucket,
Expand All @@ -509,7 +454,6 @@ class MongoQueueProcessor {
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata put to mongo', {
entry: sourceEntry.getLogInfo(),
location,
Expand All @@ -519,49 +463,6 @@ class MongoQueueProcessor {
});
}

/**
* Send accrued metrics by location to kafka
* @return {undefined}
*/
_sendMetrics() {
Object.keys(this._accruedMetrics).forEach(loc => {
const count = this._accruedMetrics[loc];

// only report metrics if something has been recorded for location
if (count > 0) {
this._accruedMetrics[loc] = 0;
const metric = { [loc]: { ops: count } };
this._mProducer.publishMetrics(metric, metricsTypeCompleted,
metricsExtension, () => {});
}
});
}

/**
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS
* @param {string} location - zenko storage location name
* @return {undefined}
*/
_produceMetricCompletionEntry(location) {
if (this._accruedMetrics[location]) {
this._accruedMetrics[location] += 1;
} else {
this._accruedMetrics[location] = 1;
}
}

/**
* For cases where we experience an error or skip an entry, we need to
* normalize pending metric. This means we will see pending metrics stuck
* above 0 and will need to bring those metrics down
* @param {string} location - location constraint name
* @return {undefined}
*/
_normalizePendingMetric(location) {
const metric = { [location]: { ops: 1 } };
this._mProducer.publishMetrics(metric, metricsTypePendingOnly,
metricsExtension, () => {});
}

/**
* Get bucket info in memoize state if exists, otherwise fetch from Mongo
Expand Down Expand Up @@ -639,7 +540,6 @@ class MongoQueueProcessor {
entryType: sourceEntry.constructor.name,
method: 'MongoQueueProcessor.processKafkaEntry',
});
this._normalizePendingMetric(location);
return process.nextTick(done);
});
}
Expand Down
26 changes: 0 additions & 26 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const EchoBucket = require('../tasks/EchoBucket');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry');
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const MetricsProducer = require('../../../lib/MetricsProducer');
const libConstants = require('../../../lib/constants');
const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
Expand Down Expand Up @@ -222,7 +221,6 @@ class QueueProcessor extends EventEmitter {
this.replicationStatusProducer = null;
this._consumer = null;
this._dataMoverConsumer = null;
this._mProducer = null;
this.site = site;
this.mConfig = mConfig;
this.serviceName = this.isReplayTopic ?
Expand Down Expand Up @@ -695,7 +693,6 @@ class QueueProcessor extends EventEmitter {
vaultclientCache: this.vaultclientCache,
accountCredsCache: this.accountCredsCache,
replicationStatusProducer: this.replicationStatusProducer,
mProducer: this._mProducer,
logger: this.logger,
site: this.site,
consumer: this._consumer,
Expand All @@ -722,16 +719,7 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
start(options) {
this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig);
return async.parallel([
done => this._mProducer.setupProducer(err => {
if (err) {
this.logger.info('error setting up metrics producer',
{ error: err.message });
process.exit(1);
}
return done();
}),
done => this._setupProducer(err => {
if (err) {
this.logger.info('error setting up kafka producer',
Expand Down Expand Up @@ -842,20 +830,6 @@ class QueueProcessor extends EventEmitter {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'QueueProcessor.stop',
site: this.site,
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'QueueProcessor.stop',
site: this.site,
});
return next();
},
], done);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const FailedCRRProducer = require('../failedCRR/FailedCRRProducer');
const ReplayProducer = require('../replay/ReplayProducer');
const MetricsProducer = require('../../../lib/MetricsProducer');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');

// StatsClient constant default for site metrics
Expand Down Expand Up @@ -216,7 +215,6 @@ class ReplicationStatusProcessor {
this.gcConfig = gcConfig;
this._consumer = null;
this._gcProducer = null;
this._mProducer = null;

this.logger =
new Logger('Backbeat:Replication:ReplicationStatusProcessor');
Expand Down Expand Up @@ -336,7 +334,6 @@ class ReplicationStatusProcessor {
sourceHTTPAgent: this.sourceHTTPAgent,
vaultclientCache: this.vaultclientCache,
gcProducer: this._gcProducer,
mProducer: this._mProducer,
statsClient: this._statsClient,
failedCRRProducer: this._failedCRRProducer,
replayProducers: this._ReplayProducers,
Expand Down Expand Up @@ -379,11 +376,6 @@ class ReplicationStatusProcessor {
done();
}
},
done => {
this._mProducer = new MetricsProducer(this.kafkaConfig,
this.mConfig);
this._mProducer.setupProducer(done);
},
done => {
let consumerReady = false;
this._consumer = new BackbeatConsumer({
Expand Down Expand Up @@ -434,18 +426,6 @@ class ReplicationStatusProcessor {
method: 'ReplicationStatusProcessor.stop',
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'ReplicationStatusProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'ReplicationStatusProcessor.stop',
});
return next();
}
], done);
}
Expand Down
Loading

0 comments on commit 43aac71

Please sign in to comment.