Skip to content

Commit

Permalink
try
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 6, 2024
1 parent 6694ca8 commit 1898cec
Show file tree
Hide file tree
Showing 18 changed files with 18 additions and 345 deletions.
64 changes: 1 addition & 63 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@ const BackbeatConsumer = require('../../lib/BackbeatConsumer');
const QueueEntry = require('../../lib/models/QueueEntry');
const DeleteOpQueueEntry = require('../../lib/models/DeleteOpQueueEntry');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const MetricsProducer = require('../../lib/MetricsProducer');
const { metricsExtension, metricsTypeCompleted, metricsTypePendingOnly } =
require('../ingestion/constants');

const getContentType = require('./utils/contentTypeHelper');
const BucketMemState = require('./utils/BucketMemState');
const MongoProcessorMetrics = require('./MongoProcessorMetrics');

// batch metrics by location and send to kafka metrics topic every 5 seconds
const METRIC_REPORT_INTERVAL_MS = process.env.CI === 'true' ? 1000 : 5000;

// TODO - ADD PREFIX BASED ON SOURCE
// april 6, 2018

Expand Down Expand Up @@ -79,16 +74,6 @@ class MongoQueueProcessor {
// in-mem batch of metrics, we only track total entry count by location
// this._accruedMetrics = { zenko-location: 10 }
this._accruedMetrics = {};

setInterval(() => {
this._sendMetrics();
}, METRIC_REPORT_INTERVAL_MS);
}

_setupMetricsClients(cb) {
// Metrics Producer
this._mProducer = new MetricsProducer(this.kafkaConfig, this._mConfig);
this._mProducer.setupProducer(cb);
}

/**
Expand Down Expand Up @@ -177,18 +162,6 @@ class MongoQueueProcessor {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'MongoQueueProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
}

Expand Down Expand Up @@ -408,7 +381,6 @@ class MongoQueueProcessor {
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
Expand Down Expand Up @@ -443,7 +415,6 @@ class MongoQueueProcessor {
this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand All @@ -454,7 +425,6 @@ class MongoQueueProcessor {

const content = getContentType(sourceEntry, zenkoObjMd);
if (content.length === 0) {
this._normalizePendingMetric(location);
log.end().debug('skipping duplicate entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
entry: sourceEntry.getLogInfo(),
Expand Down Expand Up @@ -498,7 +468,6 @@ class MongoQueueProcessor {
return this._mongoClient.putObject(bucket, key, objVal, params,
this.logger, err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error putting object metadata ' +
'to mongo', {
bucket,
Expand All @@ -519,23 +488,6 @@ class MongoQueueProcessor {
});
}

/**
* Send accrued metrics by location to kafka
* @return {undefined}
*/
_sendMetrics() {
Object.keys(this._accruedMetrics).forEach(loc => {
const count = this._accruedMetrics[loc];

// only report metrics if something has been recorded for location
if (count > 0) {
this._accruedMetrics[loc] = 0;
const metric = { [loc]: { ops: count } };
this._mProducer.publishMetrics(metric, metricsTypeCompleted,
metricsExtension, () => {});
}
});
}

/**
* Accrue metrics in-mem every METRIC_REPORT_INTERVAL_MS
Expand All @@ -550,19 +502,6 @@ class MongoQueueProcessor {
}
}

/**
* For cases where we experience an error or skip an entry, we need to
* normalize pending metric. This means we will see pending metrics stuck
* above 0 and will need to bring those metrics down
* @param {string} location - location constraint name
* @return {undefined}
*/
_normalizePendingMetric(location) {
const metric = { [location]: { ops: 1 } };
this._mProducer.publishMetrics(metric, metricsTypePendingOnly,
metricsExtension, () => {});
}

/**
* Get bucket info in memoize state if exists, otherwise fetch from Mongo
* @param {ObjectQueueEntry} sourceEntry - object metadata entry
Expand Down Expand Up @@ -639,7 +578,6 @@ class MongoQueueProcessor {
entryType: sourceEntry.constructor.name,
method: 'MongoQueueProcessor.processKafkaEntry',
});
this._normalizePendingMetric(location);
return process.nextTick(done);
});
}
Expand Down
26 changes: 0 additions & 26 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const EchoBucket = require('../tasks/EchoBucket');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry');
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const MetricsProducer = require('../../../lib/MetricsProducer');
const libConstants = require('../../../lib/constants');
const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
Expand Down Expand Up @@ -222,7 +221,6 @@ class QueueProcessor extends EventEmitter {
this.replicationStatusProducer = null;
this._consumer = null;
this._dataMoverConsumer = null;
this._mProducer = null;
this.site = site;
this.mConfig = mConfig;
this.serviceName = this.isReplayTopic ?
Expand Down Expand Up @@ -695,7 +693,6 @@ class QueueProcessor extends EventEmitter {
vaultclientCache: this.vaultclientCache,
accountCredsCache: this.accountCredsCache,
replicationStatusProducer: this.replicationStatusProducer,
mProducer: this._mProducer,
logger: this.logger,
site: this.site,
consumer: this._consumer,
Expand All @@ -722,16 +719,7 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
start(options) {
this._mProducer = new MetricsProducer(this.kafkaConfig, this.mConfig);
return async.parallel([
done => this._mProducer.setupProducer(err => {
if (err) {
this.logger.info('error setting up metrics producer',
{ error: err.message });
process.exit(1);
}
return done();
}),
done => this._setupProducer(err => {
if (err) {
this.logger.info('error setting up kafka producer',
Expand Down Expand Up @@ -842,20 +830,6 @@ class QueueProcessor extends EventEmitter {
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'QueueProcessor.stop',
site: this.site,
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'QueueProcessor.stop',
site: this.site,
});
return next();
},
], done);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const FailedCRRProducer = require('../failedCRR/FailedCRRProducer');
const ReplayProducer = require('../replay/ReplayProducer');
const MetricsProducer = require('../../../lib/MetricsProducer');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');

// StatsClient constant default for site metrics
Expand Down Expand Up @@ -216,7 +215,6 @@ class ReplicationStatusProcessor {
this.gcConfig = gcConfig;
this._consumer = null;
this._gcProducer = null;
this._mProducer = null;

this.logger =
new Logger('Backbeat:Replication:ReplicationStatusProcessor');
Expand Down Expand Up @@ -336,7 +334,6 @@ class ReplicationStatusProcessor {
sourceHTTPAgent: this.sourceHTTPAgent,
vaultclientCache: this.vaultclientCache,
gcProducer: this._gcProducer,
mProducer: this._mProducer,
statsClient: this._statsClient,
failedCRRProducer: this._failedCRRProducer,
replayProducers: this._ReplayProducers,
Expand Down Expand Up @@ -379,11 +376,6 @@ class ReplicationStatusProcessor {
done();
}
},
done => {
this._mProducer = new MetricsProducer(this.kafkaConfig,
this.mConfig);
this._mProducer.setupProducer(done);
},
done => {
let consumerReady = false;
this._consumer = new BackbeatConsumer({
Expand Down Expand Up @@ -434,18 +426,6 @@ class ReplicationStatusProcessor {
method: 'ReplicationStatusProcessor.stop',
});
return next();
},
next => {
if (this._mProducer) {
this.logger.debug('closing metrics producer', {
method: 'ReplicationStatusProcessor.stop',
});
return this._mProducer.close(next);
}
this.logger.debug('no metrics producer to close', {
method: 'ReplicationStatusProcessor.stop',
});
return next();
}
], done);
}
Expand Down
27 changes: 0 additions & 27 deletions extensions/replication/tasks/CopyLocationTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');
const BackbeatClient = require('../../../lib/clients/BackbeatClient');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const { LifecycleMetrics } = require('../../lifecycle/LifecycleMetrics');
const ReplicationMetric = require('../ReplicationMetric');
const ReplicationMetrics = require('../ReplicationMetrics');
const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils');
const { getAccountCredentials } =
require('../../../lib/credentials/AccountCredentials');
const RoleCredentials =
require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted } =
require('../constants');

const MPU_GCP_MAX_PARTS = 1024;

Expand All @@ -41,10 +38,6 @@ class CopyLocationTask extends BackbeatTask {
this.retryParams = retryParams;
}
}
this._replicationMetric = new ReplicationMetric()
.withProducer(this.mProducer.getProducer())
.withSite(this.site)
.withExtension(metricsExtension);
}

_validateActionCredentials(actionEntry) {
Expand Down Expand Up @@ -195,11 +188,6 @@ class CopyLocationTask extends BackbeatTask {

_getAndPutObject(actionEntry, objMD, log, cb) {
const objectLogger = this.logger.newRequestLogger(log.getUids());
this._replicationMetric
.withEntry(actionEntry)
.withMetricType(metricsTypeQueued)
.withObjectSize(objMD.getContentLength())
.publish();
this.retry({
actionDesc: 'stream object data',
logFields: { entry: actionEntry.getLogInfo() },
Expand Down Expand Up @@ -335,11 +323,6 @@ class CopyLocationTask extends BackbeatTask {
actionEntry.setSuccess({
location: data.location,
});
this._replicationMetric
.withEntry(actionEntry)
.withMetricType(metricsTypeCompleted)
.withObjectSize(size)
.publish();
return cb(null, data);
});
}
Expand Down Expand Up @@ -591,11 +574,6 @@ class CopyLocationTask extends BackbeatTask {
}
return doneOnce(err);
}
this._replicationMetric
.withEntry(actionEntry)
.withMetricType(metricsTypeCompleted)
.withObjectSize(size)
.publish();
return doneOnce(null, data);
});
}
Expand Down Expand Up @@ -786,11 +764,6 @@ class CopyLocationTask extends BackbeatTask {
if (err) {
return cb(err);
}
this._replicationMetric
.withEntry(actionEntry)
.withMetricType(metricsTypeQueued)
.withObjectSize(objMD.getContentLength())
.publish();
return this._completeRangedMPU(actionEntry, objMD,
uploadId, log, cb);
});
Expand Down
10 changes: 0 additions & 10 deletions extensions/replication/tasks/MultipleBackendTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const ReplicateObject = require('./ReplicateObject');
const { attachReqUids } = require('../../../lib/clients/utils');
const getExtMetrics = require('../utils/getExtMetrics');
const { metricsExtension, metricsTypeQueued } = require('../constants');

const MPU_GCP_MAX_PARTS = 1024;

Expand Down Expand Up @@ -702,21 +700,13 @@ class MultipleBackendTask extends ReplicateObject {
if (err) {
return doneOnce(err);
}
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics, metricsTypeQueued,
metricsExtension, () => {});
return this._completeRangedMPU(sourceEntry,
uploadId, log, doneOnce);
});
}

_getAndPutObject(sourceEntry, log, cb) {
const partLogger = this.logger.newRequestLogger(log.getUids());
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics, metricsTypeQueued,
metricsExtension, () => {});

if (BACKBEAT_INJECT_REPLICATION_ERROR_COPYOBJ) {
if (Math.random() < BACKBEAT_INJECT_REPLICATION_ERROR_RATE) {
Expand Down
10 changes: 1 addition & 9 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');

const mapLimitWaitPendingIfError = require('../../../lib/util/mapLimitWaitPendingIfError');
const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils');
const getExtMetrics = require('../utils/getExtMetrics');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials');
const RoleCredentials = require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants');
const { replicationStages } = require('../constants');

const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

Expand Down Expand Up @@ -434,9 +433,6 @@ class ReplicateObject extends BackbeatTask {
location: this.site,
replicationContent: 'data',
});
const extMetrics = getExtMetrics(this.site, size, sourceEntry);
this.mProducer.publishMetrics(extMetrics,
metricsTypeCompleted, metricsExtension, () => {});
}

_publishMetadataWriteMetrics(buffer, writeStartTime) {
Expand Down Expand Up @@ -758,10 +754,6 @@ class ReplicateObject extends BackbeatTask {
// Get data from source bucket and put it on the target bucket
next => {
if (!mdOnly) {
const extMetrics = getExtMetrics(this.site,
sourceEntry.getContentLength(), sourceEntry);
this.mProducer.publishMetrics(extMetrics,
metricsTypeQueued, metricsExtension, () => {});
return this._getAndPutData(sourceEntry, destEntry, log,
next);
}
Expand Down
Loading

0 comments on commit 1898cec

Please sign in to comment.