Skip to content

Commit

Permalink
Merge pull request #103 from monti-apm/feature/redis-oplog
Browse files Browse the repository at this point in the history
Redis Oplog
  • Loading branch information
renanccastro authored Apr 30, 2024
2 parents 1013f31 + e12f3dc commit 0354827
Show file tree
Hide file tree
Showing 17 changed files with 887 additions and 63 deletions.
46 changes: 46 additions & 0 deletions .github/workflows/test-redis-oplog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Test Redis Oplog
on:
push:
branches:
- master
pull_request:
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
meteorRelease:
- '--release 1.11'
- '--release 1.12.1'
- '--release 2.1.1'
- '--release 2.2'
- '--release 2.3.2'
- '--release 2.4.1'
- '--release 2.5.6'
- '--release 2.6.1'
- '--release 2.7.3'
- '--release 2.8.2'
- '--release 2.9.0'
# Left empty to use latest version
-
env:
REDIS_OPLOG_SETTINGS: '{"debug":true}'
steps:
- uses: supercharge/[email protected]
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: '12.x'
- name: Install Dependencies
run: |
curl https://install.meteor.com | /bin/sh
npm i -g @zodern/mtest
- name: Run Tests
run: |
# Fix using old versions of Meteor
export NODE_TLS_REJECT_UNAUTHORIZED=0
mtest --package ./ --once ${{ matrix.meteorRelease }}
2 changes: 2 additions & 0 deletions lib/hijack/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ export function wrapFibers () {

Fibers.yield = function () {
let kadiraInfo = Kadira._getInfo();

if (kadiraInfo) {
let eventId = Kadira.tracer.event(kadiraInfo.trace, 'async');

if (eventId) {
// The event unique to this fiber
// Using a symbol since Meteor doesn't copy symbols to new fibers created
Expand Down
18 changes: 14 additions & 4 deletions lib/hijack/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Meteor } from 'meteor/meteor';
import { Random } from 'meteor/random';
import { Mongo, MongoInternals } from 'meteor/mongo';
import { _ } from 'meteor/underscore';
import {haveAsyncCallback, OptimizedApply} from '../utils';
import { haveAsyncCallback, OptimizedApply } from '../utils';


// This hijack is important to make sure, collections created before
Expand Down Expand Up @@ -46,12 +46,13 @@ function getSyncronousCursor () {
}

export function hijackDBOps () {
let mongoConnectionProto = MeteorX.MongoConnection.prototype;
let mongoConnectionProto = MongoInternals.Connection.prototype;

// findOne is handled by find - so no need to track it
// upsert is handles by update
// 2.4 replaced _ensureIndex with createIndex
[
'find', 'update', 'remove', 'insert', '_ensureIndex', '_dropIndex', 'createIndex'
'update', 'remove', 'insert', '_ensureIndex', '_dropIndex', 'createIndex'
].forEach(function (func) {
let originalFunc = mongoConnectionProto[func];

Expand Down Expand Up @@ -141,7 +142,16 @@ export function hijackDBOps () {
});

if (cursorDescription.options) {
let cursorOptions = _.pick(cursorDescription.options, ['fields', 'projection', 'sort', 'limit']);
const opts = cursorDescription.options;

const projection = opts.fields || opts.projection;

let cursorOptions = _.pick(cursorDescription.options, ['sort', 'limit']);

if (projection) {
cursorOptions.projection = projection;
}

for (let field in cursorOptions) {

Check warning on line 155 in lib/hijack/db.js

View workflow job for this annotation

GitHub Actions / build

The body of a for-in should be wrapped in an if statement to filter unwanted properties from the prototype
let value = cursorOptions[field];
if (typeof value === 'object') {
Expand Down
9 changes: 7 additions & 2 deletions lib/hijack/instrument.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import {
wrapForCountingObservers,
wrapMultiplexer,
wrapOplogObserveDriver,
wrapPollingObserveDriver
wrapPollingObserveDriver,
} from './wrap_observers';
import { wrapStringifyDDP } from './wrap_ddp_stringify';
import { setLabels } from './set_labels';
import { hijackDBOps } from './db';
import { wrapRedisOplogObserveDriver } from './redis_oplog';

let instrumented = false;
Kadira._startInstrumenting = function (callback) {
Expand All @@ -40,7 +41,11 @@ Kadira._startInstrumenting = function (callback) {
wrapSubscription(MeteorX.Subscription.prototype);

if (MeteorX.MongoOplogDriver) {
wrapOplogObserveDriver(MeteorX.MongoOplogDriver.prototype);
if (MeteorX.MongoOplogDriver.name === 'RedisOplogObserveDriver') {
wrapRedisOplogObserveDriver(MeteorX.MongoOplogDriver);
} else {
wrapOplogObserveDriver(MeteorX.MongoOplogDriver.prototype);
}
}

if (MeteorX.MongoPollingDriver) {
Expand Down
152 changes: 152 additions & 0 deletions lib/hijack/redis_oplog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { Random } from 'meteor/random';

export function getDummyCollectionName () {
const collection = new Meteor.Collection(`__dummy_coll_${Random.id()}`);
const handler = collection.find({}).observeChanges({ added: Function.prototype });
handler.stop();
return collection._name;
}
function protectAgainstRaceConditions (collection) {
if (!collection._redisOplog) {
return true;
}

return (
collection._redisOplog &&
collection._redisOplog.protectAgainstRaceConditions
);
}

function rewriteReloadRequeryFuncs () {
// handle reload/requery cases
const originalCompareWith = this.store.constructor.prototype.compareWith;
this.store.constructor.prototype.compareWith = (other, callbacks) => {
Kadira.models.pubsub.trackPolledDocuments(this._ownerInfo, other.size());

return originalCompareWith.call(this.store, other, callbacks);
};
}

export function wrapRedisOplogObserveDriver (driver) {
const collectionName = getDummyCollectionName();

const dummyDriver = new driver({
cursorDescription: {
collectionName,
selector: {},
options: {}
},
multiplexer: { ready () {} },
matcher: { combineIntoProjection: () => ({}) },
});

const observableCollectionProto = dummyDriver.observableCollection.constructor.prototype;
const redisSubscriberProto = dummyDriver.redisSubscriber.constructor.prototype;

let originalAdd = observableCollectionProto.add;
let originalChange = observableCollectionProto.change;
let originalRemove = observableCollectionProto.remove;

// Track the polled documents. This is reflect to the RAM size and
// for the CPU usage directly


// According to the comments in redis-oplog, the "safe" param means that the document is "cleaned".
// it is set to true in the initial add and synthetic mutations
observableCollectionProto.add = function (doc, safe) {
// handle reload/requery cases
rewriteReloadRequeryFuncs.call(this);
let coll = this.cursorDescription.collectionName;
let query = this.cursorDescription.selector;
let opts = this.cursorDescription.options;
let docSize = Kadira.docSzCache.getSize(coll, query, opts, [doc]);
if (this._ownerInfo) {
Kadira.models.pubsub.trackLiveUpdates(this._ownerInfo, '_addPublished', 1);
Kadira.models.pubsub.trackDocSize(this._ownerInfo.name, 'liveFetches', docSize);
} else {
// If there is no ownerInfo, that means this is the initial adds
// Also means it is not coming from a subscription
if (!this._liveUpdatesCounts) {
this._liveUpdatesCounts = {
_initialAdds: 0
};
}

this._liveUpdatesCounts._initialAdds++;

if (this._polledDocuments) {
this._polledDocuments += 1;
} else {
this._polledDocuments = 1;
}

if (this._docSize) {
this._docSize.polledFetches += docSize;
} else {
this._docSize = {
polledFetches: docSize,
initialFetches: 0
};
}

this._docSize.initialFetches += docSize;
}

return originalAdd.call(this, doc, safe);
};

observableCollectionProto.change = function (doc, modifiedFields) {
// handle reload/requery cases
rewriteReloadRequeryFuncs.call(this);

if (this._ownerInfo) {
Kadira.models.pubsub.trackLiveUpdates(this._ownerInfo, '_changePublished', 1);
}
originalChange.call(this, doc, modifiedFields);
};

observableCollectionProto.remove = function (docId) {
// handle reload/requery cases
rewriteReloadRequeryFuncs.call(this);

if (this._ownerInfo) {
Kadira.models.pubsub.trackLiveUpdates(this._ownerInfo, '_removePublished', 1);
}
originalRemove.call(this, docId);
};

// Redis and oplog don't have the same op constants
const redisEventToOplogMap = {
i: 'i',
u: 'u',
r: 'd',
};

const originalRedisProcess = redisSubscriberProto.process;

redisSubscriberProto.process = function (op, doc, fields) {
Kadira.models.pubsub.trackDocumentChanges(this.observableCollection._ownerInfo, {
op: redisEventToOplogMap[op]
});
const collection = this.observableCollection.collection;
// redis-oplog always fetches the document again, except when:
// - operator === removal
// - there is an explicit _redisOplog config passed with collection.configureRedisOplog and that includes protectAgainstRaceConditions = false
// in this case there is a fetch counted in the collection level, not in the publication level as it happens while doing the insert
if (op !== 'r' && protectAgainstRaceConditions(collection)) {
Kadira.models.pubsub.trackPolledDocuments(this.observableCollection._ownerInfo, 1);
}
originalRedisProcess.call(this, op, doc, fields);
};

// @todo check this
let originalStop = driver.prototype.stop;
driver.prototype.stop = function () {
if (this.observableCollection._ownerInfo && this.observableCollection._ownerInfo.type === 'sub') {
Kadira.EventBus.emit('pubsub', 'observerDeleted', this.observableCollection._ownerInfo);
Kadira.models.pubsub.trackDeletedObserver(this.observableCollection._ownerInfo);
}

return originalStop.call(this);
};
}
25 changes: 16 additions & 9 deletions lib/hijack/wrap_observers.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export function wrapForCountingObservers () {
// to count observers
let mongoConnectionProto = MeteorX.MongoConnection.prototype;
let originalObserveChanges = mongoConnectionProto._observeChanges;

mongoConnectionProto._observeChanges = function (cursorDescription, ordered, callbacks) {
let ret = originalObserveChanges.call(this, cursorDescription, ordered, callbacks);
// get the Kadira Info via the Meteor.EnvironmentalVariable
Expand All @@ -176,30 +177,36 @@ export function wrapForCountingObservers () {
startTime: new Date().getTime()
};

console.log('ownerInfo', ownerInfo);

let observerDriver = ret._multiplexer._observeDriver;
observerDriver._ownerInfo = ownerInfo;
// We store counts for redis-oplog in the observableCollection instead
let ownerStorer = observerDriver.observableCollection || observerDriver;

ownerStorer._ownerInfo = ownerInfo;

Kadira.EventBus.emit('pubsub', 'observerCreated', ownerInfo);
Kadira.models.pubsub.trackCreatedObserver(ownerInfo);

// We need to send initially polled documents if there are
if (observerDriver._polledDocuments) {
Kadira.models.pubsub.trackPolledDocuments(ownerInfo, observerDriver._polledDocuments);
observerDriver._polledDocuments = 0;
if (ownerStorer._polledDocuments) {
Kadira.models.pubsub.trackPolledDocuments(ownerInfo, ownerStorer._polledDocuments);
ownerStorer._polledDocuments = 0;
}

// We need to send initially polled documents if there are
if (observerDriver._polledDocSize) {
Kadira.models.pubsub.trackDocSize(ownerInfo.name, 'polledFetches', observerDriver._polledDocSize);
observerDriver._polledDocSize = 0;
if (ownerStorer._polledDocSize) {
Kadira.models.pubsub.trackDocSize(ownerInfo.name, 'polledFetches', ownerStorer._polledDocSize);
ownerStorer._polledDocSize = 0;
}

// Process _liveUpdatesCounts
_.each(observerDriver._liveUpdatesCounts, function (count, key) {
_.each(ownerStorer._liveUpdatesCounts, function (count, key) {
Kadira.models.pubsub.trackLiveUpdates(ownerInfo, key, count);
});

// Process docSize
_.each(observerDriver._docSize, function (count, key) {
_.each(ownerStorer._docSize, function (count, key) {
Kadira.models.pubsub.trackDocSize(ownerInfo.name, key, count);
});
}
Expand Down
Loading

0 comments on commit 0354827

Please sign in to comment.