diff --git a/lib/hijack/redis_oplog.js b/lib/hijack/redis_oplog.js index e7220ccf..205ea60d 100644 --- a/lib/hijack/redis_oplog.js +++ b/lib/hijack/redis_oplog.js @@ -17,13 +17,26 @@ function protectAgainstRaceConditions (collection) { ); } -function rewriteReloadRequeryFuncs () { +let WrappedSymbol = Symbol('MontiCompareWithWrapped'); +let InfoSymbol = Symbol('MontiCompareWithInfo'); +export function rewriteReloadRequeryFuncs (observableCollection) { + if (observableCollection._ownerInfo) { + observableCollection.store[InfoSymbol] = observableCollection._ownerInfo; + } + + if (observableCollection.store.constructor[WrappedSymbol]) { + return; + } + + observableCollection.store.constructor[WrappedSymbol] = true; + // handle reload/requery cases - const originalCompareWith = this.store.constructor.prototype.compareWith; - this.store.constructor.prototype.compareWith = (other, callbacks) => { - Kadira.models.pubsub.trackPolledDocuments(this._ownerInfo, other.size()); + const originalCompareWith = observableCollection.store.constructor.prototype.compareWith; - return originalCompareWith.call(this.store, other, callbacks); + observableCollection.store.constructor.prototype.compareWith = function (other) { + Kadira.models.pubsub.trackPolledDocuments(this[InfoSymbol], other.size()); + + return originalCompareWith.apply(this, arguments); }; } @@ -55,7 +68,6 @@ export function wrapRedisOplogObserveDriver (driver) { // it is set to true in the initial add and synthetic mutations observableCollectionProto.add = function (doc, safe) { // handle reload/requery cases - rewriteReloadRequeryFuncs.call(this); let coll = this.cursorDescription.collectionName; let query = this.cursorDescription.selector; let opts = this.cursorDescription.options; @@ -96,9 +108,6 @@ export function wrapRedisOplogObserveDriver (driver) { }; observableCollectionProto.change = function (doc, modifiedFields) { - // handle reload/requery cases - rewriteReloadRequeryFuncs.call(this); - if (this._ownerInfo) { Kadira.models.pubsub.trackLiveUpdates(this._ownerInfo, '_changePublished', 1); } @@ -106,9 +115,6 @@ export function wrapRedisOplogObserveDriver (driver) { }; observableCollectionProto.remove = function (docId) { - // handle reload/requery cases - rewriteReloadRequeryFuncs.call(this); - if (this._ownerInfo) { Kadira.models.pubsub.trackLiveUpdates(this._ownerInfo, '_removePublished', 1); } diff --git a/lib/hijack/wrap_observers.js b/lib/hijack/wrap_observers.js index aa003576..8d755990 100644 --- a/lib/hijack/wrap_observers.js +++ b/lib/hijack/wrap_observers.js @@ -1,4 +1,5 @@ import { _ } from 'meteor/underscore'; +import { rewriteReloadRequeryFuncs } from './redis_oplog'; export function wrapOplogObserveDriver (proto) { // Track the polled documents. This is reflected to the RAM size and @@ -183,6 +184,10 @@ export function wrapForCountingObservers () { ownerStorer._ownerInfo = ownerInfo; + if (observerDriver.observableCollection) { + rewriteReloadRequeryFuncs(observerDriver.observableCollection); + } + Kadira.EventBus.emit('pubsub', 'observerCreated', ownerInfo); Kadira.models.pubsub.trackCreatedObserver(ownerInfo); diff --git a/tests/hijack/redis_oplog.js b/tests/hijack/redis_oplog.js index bab45e75..a306f9eb 100644 --- a/tests/hijack/redis_oplog.js +++ b/tests/hijack/redis_oplog.js @@ -271,3 +271,36 @@ addTestWithRoundedTime('Database - Redis Oplog - Changed', function (test) { Meteor._sleepForMs(100); }); + +addTestWithRoundedTime('Database - Redis Oplog - Remove with limit', function (test) { + const pub = RegisterPublication(() => TestData.find({}, { limit: 100 })); + TestData.remove({}); + const client = GetMeteorClient(); + + const sub = SubscribeAndWait(client, pub); + + TestData.insert({ name: 'test1' }); + TestData.insert({ name: 'test2' }); + TestData.insert({ name: 'test3' }); + + Meteor._sleepForMs(100); + TestData.remove({ name: 'test2' }); + + Meteor._sleepForMs(100); + + let metrics = FindMetricsForPub(pub); + + test.equal(metrics.totalObservers, 1, 'observers'); + test.equal(metrics.liveRemovedDocuments, 1, 'removed'); + + TestData.remove({}); + + Meteor._sleepForMs(100); + + metrics = FindMetricsForPub(pub); + + test.equal(metrics.totalObservers, 1, 'observers'); + test.equal(metrics.liveRemovedDocuments, 3, 'removed'); + + sub.stop(); +});