From cdef9c14b6140b52efa31a36adf64d380842099b Mon Sep 17 00:00:00 2001 From: Will Frew Date: Tue, 10 Mar 2015 15:03:07 +0000 Subject: [PATCH 1/4] Adds a concurrency option for parallelising internal requests. --- package.json | 1 + source/index.js | 103 +++++++++++++------- test/integration/multifetch.options.spec.js | 81 +++++++++++++++ 3 files changed, 149 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index fe09c92..e7ac25a 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "test": "mocha" }, "dependencies": { + "async": "~0.9.0", "pump": "~1.0.0", "extend": "~2.0.0" }, diff --git a/source/index.js b/source/index.js index 81af56b..f6f204f 100644 --- a/source/index.js +++ b/source/index.js @@ -2,6 +2,7 @@ var url = require('url'); var pump = require('pump'); var extend = require('extend'); +var async = require('async'); var JsonStream = require('./json'); var NullifyStream = require('./nullify'); @@ -68,20 +69,26 @@ var fetchBare = function(request, response) { return nullify; }; -var create = function(options, callback) { - if(!callback && typeof options === 'function') { - callback = options; +var endStream = function (jsonStream, error) { + jsonStream.writeObject('_error', error); + jsonStream.end(); +}; + +var create = function(options, prefetch) { + if(!prefetch && typeof options === 'function') { + prefetch = options; options = {}; } - options = options || {}; + options = options || {}; var ignore = options.ignore || []; var headers = options.headers !== undefined ? options.headers : true; + var concurrency = options.concurrency || 1; // Defaults to sequential fetching var fetch = headers ? fetchWithHeaders : fetchBare; - callback = callback || noopCallback; + prefetch = prefetch || noopCallback; return function(request, response, next) { var app = request.app; @@ -95,41 +102,65 @@ var create = function(options, callback) { pump(json, response); - (function loop() { - var key = keys.pop(); - - if(!key) { - json.writeObject('_error', error); - return json.end(); - } - - var messages = createMessages(request, query[key]); + // Exit early if there is nothing to fetch. + if(keys.length === 0) { + return endStream(json, error); + } - var write = function(prevent) { - if(prevent) { - return loop(); + // The resource queue processes resource streams sequentially. + var resourceQueue = async.queue(function worker(task, callback) { + pump(task.resource, json.createObjectStream(task.key), function(err) { + if(err) { + json.destroy(); + return callback(err); } + if(!(/2\d\d/).test(task.response.statusCode)) { + error = true; + } + callback(); + }); + }, 1); - var resource = fetch(messages.request, messages.response); - - pump(resource, json.createObjectStream(key), function(err) { - if(err) { - return json.destroy(); - } - if(!(/2\d\d/).test(messages.response.statusCode)) { - error = true; - } - - loop(); - }); - - app(messages.request, messages.response, function(err) { - json.destroy(); + // Asynchronously fetch the resource for a key and push the resulting + // stream into the resource queue. + var fetchResource = function(key, callback) { + var messages = createMessages(request, query[key]); + process.nextTick(function() { + prefetch(request, messages.request, function(prevent) { + if (prevent) return callback(); + + var resource = fetch(messages.request, messages.response); + var task = { + resource: resource, + request: messages.request, + response: messages.response, + key: key + }; + + app(messages.request, messages.response, function() { + resourceQueue.kill(); + json.destroy(); + }); + + // Callback is called once the stream for this resource has + // been fully piped out to the client. + resourceQueue.push(task, callback); }); - }; - - callback(request, messages.request, write); - }()); + }); + }; + + // Fire off all requests and push the resulting streams into a queue to + // be processed + async.eachLimit(keys, concurrency, fetchResource, function(err) { + if(resourceQueue.idle()) { + endStream(json, error); + } else { + // Called once all streams have been fully pumped out to the client. + resourceQueue.drain = function() { + endStream(json, error); + }; + } + }); }; }; diff --git a/test/integration/multifetch.options.spec.js b/test/integration/multifetch.options.spec.js index 7d8f2e1..a8a7c12 100644 --- a/test/integration/multifetch.options.spec.js +++ b/test/integration/multifetch.options.spec.js @@ -238,4 +238,85 @@ describe('multifetch.options', function() { }); }); }); + + + describe('concurrent fetching', function() { + + before(function(done) { + server = helper.server(); + + server.get('/api/multifetch', multifetch({ concurrency: 5 })); + server = server.listen(helper.port, done); + }); + + after(function(done) { + server.close(done); + }); + + describe('fetch multiple resources', function() { + before(function(done) { + request.get({ + url: helper.url('/api/multifetch'), + qs: { + api: '/api', + user_1: '/api/users/user_1', + user_2: '/api/users/user_2', + user_3: '/api/users/user_3', + readme: '/README.md' + }, + json: true + }, function(err, _, result) { + body = result; + done(err); + }); + }); + + it('should be successful response', function() { + chai.expect(body).to.have.property('_error', false); + }); + + it('should fetch all resources', function() { + chai.expect(body) + .to.have.property('api') + .to.have.property('statusCode', 200); + + chai.expect(body) + .to.have.property('user_1') + .to.have.property('statusCode', 200); + + chai.expect(body) + .to.have.property('user_2') + .to.have.property('statusCode', 200); + + chai.expect(body) + .to.have.property('user_3') + .to.have.property('statusCode', 200); + + chai.expect(body) + .to.have.property('readme') + .to.have.property('statusCode', 200); + }); + }); + + describe('hang up on bad request', function() { + var err; + + before(function(done) { + request.get({ + url: helper.url('/api/multifetch'), + qs: { + bad: '/api/not_found', + }, + json: true + }, function(result) { + err = result; + done(); + }); + }); + + it('should emit an error', function() { + chai.expect(err).to.be.defined; + }); + }); + }); }); From 995c12f337e38acdeab259276aa14749252c9ee3 Mon Sep 17 00:00:00 2001 From: Will Frew Date: Tue, 10 Mar 2015 15:18:38 +0000 Subject: [PATCH 2/4] Document concurrency option --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index d8a8d16..7dda9f6 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,18 @@ Response with content only. } ``` +### Concurrency +By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request. + +If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests. + +Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time: + +```javascript +app.get('/api/multifetch', multifetch({concurrency: 5})); +``` +In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially. + License ------- From 6772da29f8b93db98be34195bb678bbd06b24816 Mon Sep 17 00:00:00 2001 From: Will Frew Date: Mon, 23 Mar 2015 12:30:31 +0000 Subject: [PATCH 3/4] Removes unecessary process.nextTick --- source/index.js | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/source/index.js b/source/index.js index f6f204f..959ea99 100644 --- a/source/index.js +++ b/source/index.js @@ -125,27 +125,25 @@ var create = function(options, prefetch) { // stream into the resource queue. var fetchResource = function(key, callback) { var messages = createMessages(request, query[key]); - process.nextTick(function() { - prefetch(request, messages.request, function(prevent) { - if (prevent) return callback(); - - var resource = fetch(messages.request, messages.response); - var task = { - resource: resource, - request: messages.request, - response: messages.response, - key: key - }; - - app(messages.request, messages.response, function() { - resourceQueue.kill(); - json.destroy(); - }); - - // Callback is called once the stream for this resource has - // been fully piped out to the client. - resourceQueue.push(task, callback); + prefetch(request, messages.request, function(prevent) { + if (prevent) return callback(); + + var resource = fetch(messages.request, messages.response); + var task = { + resource: resource, + request: messages.request, + response: messages.response, + key: key + }; + + app(messages.request, messages.response, function() { + resourceQueue.kill(); + json.destroy(); }); + + // Callback is called once the stream for this resource has + // been fully piped out to the client. + resourceQueue.push(task, callback); }); }; From 2b66986a587b9cc970489a6e9490971aa9008f27 Mon Sep 17 00:00:00 2001 From: Will Frew Date: Mon, 23 Mar 2015 12:31:10 +0000 Subject: [PATCH 4/4] Split concurrency test into smaller tests cases --- test/integration/multifetch.options.spec.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/integration/multifetch.options.spec.js b/test/integration/multifetch.options.spec.js index a8a7c12..cc33c3b 100644 --- a/test/integration/multifetch.options.spec.js +++ b/test/integration/multifetch.options.spec.js @@ -275,23 +275,31 @@ describe('multifetch.options', function() { chai.expect(body).to.have.property('_error', false); }); - it('should fetch all resources', function() { + it('should fetch api resource', function() { chai.expect(body) .to.have.property('api') .to.have.property('statusCode', 200); + }); + it('should fetch user_1 resource', function() { chai.expect(body) .to.have.property('user_1') .to.have.property('statusCode', 200); + }); + it('should fetch user_2 resource', function() { chai.expect(body) .to.have.property('user_2') .to.have.property('statusCode', 200); + }); + it('should fetch user_3 resource', function() { chai.expect(body) .to.have.property('user_3') .to.have.property('statusCode', 200); + }); + it('should fetch user_4 resource', function() { chai.expect(body) .to.have.property('readme') .to.have.property('statusCode', 200);