diff --git a/README.md b/README.md index d8a8d16..7dda9f6 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,18 @@ Response with content only. } ``` +### Concurrency +By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request. + +If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests. + +Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time: + +```javascript +app.get('/api/multifetch', multifetch({concurrency: 5})); +``` +In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially. + License ------- diff --git a/package.json b/package.json index fe09c92..e7ac25a 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "test": "mocha" }, "dependencies": { + "async": "~0.9.0", "pump": "~1.0.0", "extend": "~2.0.0" }, diff --git a/source/index.js b/source/index.js index 81af56b..959ea99 100644 --- a/source/index.js +++ b/source/index.js @@ -2,6 +2,7 @@ var url = require('url'); var pump = require('pump'); var extend = require('extend'); +var async = require('async'); var JsonStream = require('./json'); var NullifyStream = require('./nullify'); @@ -68,20 +69,26 @@ var fetchBare = function(request, response) { return nullify; }; -var create = function(options, callback) { - if(!callback && typeof options === 'function') { - callback = options; +var endStream = function (jsonStream, error) { + jsonStream.writeObject('_error', error); + jsonStream.end(); +}; + +var create = function(options, prefetch) { + if(!prefetch && typeof options === 'function') { + prefetch = options; options = {}; } - options = options || {}; + options = options || {}; var ignore = options.ignore || []; var headers = options.headers !== undefined ? options.headers : true; + var concurrency = options.concurrency || 1; // Defaults to sequential fetching var fetch = headers ? fetchWithHeaders : fetchBare; - callback = callback || noopCallback; + prefetch = prefetch || noopCallback; return function(request, response, next) { var app = request.app; @@ -95,41 +102,63 @@ var create = function(options, callback) { pump(json, response); - (function loop() { - var key = keys.pop(); + // Exit early if there is nothing to fetch. + if(keys.length === 0) { + return endStream(json, error); + } - if(!key) { - json.writeObject('_error', error); - return json.end(); - } + // The resource queue processes resource streams sequentially. + var resourceQueue = async.queue(function worker(task, callback) { + pump(task.resource, json.createObjectStream(task.key), function(err) { + if(err) { + json.destroy(); + return callback(err); + } + if(!(/2\d\d/).test(task.response.statusCode)) { + error = true; + } + callback(); + }); + }, 1); + // Asynchronously fetch the resource for a key and push the resulting + // stream into the resource queue. + var fetchResource = function(key, callback) { var messages = createMessages(request, query[key]); - - var write = function(prevent) { - if(prevent) { - return loop(); - } + prefetch(request, messages.request, function(prevent) { + if (prevent) return callback(); var resource = fetch(messages.request, messages.response); - - pump(resource, json.createObjectStream(key), function(err) { - if(err) { - return json.destroy(); - } - if(!(/2\d\d/).test(messages.response.statusCode)) { - error = true; - } - - loop(); - }); - - app(messages.request, messages.response, function(err) { + var task = { + resource: resource, + request: messages.request, + response: messages.response, + key: key + }; + + app(messages.request, messages.response, function() { + resourceQueue.kill(); json.destroy(); }); - }; - callback(request, messages.request, write); - }()); + // Callback is called once the stream for this resource has + // been fully piped out to the client. + resourceQueue.push(task, callback); + }); + }; + + // Fire off all requests and push the resulting streams into a queue to + // be processed + async.eachLimit(keys, concurrency, fetchResource, function(err) { + if(resourceQueue.idle()) { + endStream(json, error); + } else { + // Called once all streams have been fully pumped out to the client. + resourceQueue.drain = function() { + endStream(json, error); + }; + } + }); }; }; diff --git a/test/integration/multifetch.options.spec.js b/test/integration/multifetch.options.spec.js index 7d8f2e1..cc33c3b 100644 --- a/test/integration/multifetch.options.spec.js +++ b/test/integration/multifetch.options.spec.js @@ -238,4 +238,93 @@ describe('multifetch.options', function() { }); }); }); + + + describe('concurrent fetching', function() { + + before(function(done) { + server = helper.server(); + + server.get('/api/multifetch', multifetch({ concurrency: 5 })); + server = server.listen(helper.port, done); + }); + + after(function(done) { + server.close(done); + }); + + describe('fetch multiple resources', function() { + before(function(done) { + request.get({ + url: helper.url('/api/multifetch'), + qs: { + api: '/api', + user_1: '/api/users/user_1', + user_2: '/api/users/user_2', + user_3: '/api/users/user_3', + readme: '/README.md' + }, + json: true + }, function(err, _, result) { + body = result; + done(err); + }); + }); + + it('should be successful response', function() { + chai.expect(body).to.have.property('_error', false); + }); + + it('should fetch api resource', function() { + chai.expect(body) + .to.have.property('api') + .to.have.property('statusCode', 200); + }); + + it('should fetch user_1 resource', function() { + chai.expect(body) + .to.have.property('user_1') + .to.have.property('statusCode', 200); + }); + + it('should fetch user_2 resource', function() { + chai.expect(body) + .to.have.property('user_2') + .to.have.property('statusCode', 200); + }); + + it('should fetch user_3 resource', function() { + chai.expect(body) + .to.have.property('user_3') + .to.have.property('statusCode', 200); + }); + + it('should fetch user_4 resource', function() { + chai.expect(body) + .to.have.property('readme') + .to.have.property('statusCode', 200); + }); + }); + + describe('hang up on bad request', function() { + var err; + + before(function(done) { + request.get({ + url: helper.url('/api/multifetch'), + qs: { + bad: '/api/not_found', + }, + json: true + }, function(result) { + err = result; + done(); + }); + }); + + it('should emit an error', function() { + chai.expect(err).to.be.defined; + }); + }); + }); });