Skip to content

Commit

Permalink
Merge pull request #5 from WillFrew/concurrent-fetch
Browse files Browse the repository at this point in the history
Concurrently route requests
  • Loading branch information
kapetan committed Mar 24, 2015
2 parents c4e64ca + 2b66986 commit 8b989a8
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 32 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ Response with content only.
}
```

### Concurrency
By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request.

If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests.

Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time:

```javascript
app.get('/api/multifetch', multifetch({concurrency: 5}));
```
In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially.

License
-------

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test": "mocha"
},
"dependencies": {
"async": "~0.9.0",
"pump": "~1.0.0",
"extend": "~2.0.0"
},
Expand Down
93 changes: 61 additions & 32 deletions source/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var url = require('url');

var pump = require('pump');
var extend = require('extend');
var async = require('async');

var JsonStream = require('./json');
var NullifyStream = require('./nullify');
Expand Down Expand Up @@ -68,20 +69,26 @@ var fetchBare = function(request, response) {
return nullify;
};

var create = function(options, callback) {
if(!callback && typeof options === 'function') {
callback = options;
var endStream = function (jsonStream, error) {
jsonStream.writeObject('_error', error);
jsonStream.end();
};

var create = function(options, prefetch) {
if(!prefetch && typeof options === 'function') {
prefetch = options;
options = {};
}

options = options || {};

options = options || {};
var ignore = options.ignore || [];
var headers = options.headers !== undefined ? options.headers : true;
var concurrency = options.concurrency || 1; // Defaults to sequential fetching

var fetch = headers ? fetchWithHeaders : fetchBare;

callback = callback || noopCallback;
prefetch = prefetch || noopCallback;

return function(request, response, next) {
var app = request.app;
Expand All @@ -95,41 +102,63 @@ var create = function(options, callback) {

pump(json, response);

(function loop() {
var key = keys.pop();
// Exit early if there is nothing to fetch.
if(keys.length === 0) {
return endStream(json, error);
}

if(!key) {
json.writeObject('_error', error);
return json.end();
}
// The resource queue processes resource streams sequentially.
var resourceQueue = async.queue(function worker(task, callback) {
pump(task.resource, json.createObjectStream(task.key), function(err) {
if(err) {
json.destroy();
return callback(err);
}
if(!(/2\d\d/).test(task.response.statusCode)) {
error = true;
}
callback();
});
}, 1);

// Asynchronously fetch the resource for a key and push the resulting
// stream into the resource queue.
var fetchResource = function(key, callback) {
var messages = createMessages(request, query[key]);

var write = function(prevent) {
if(prevent) {
return loop();
}
prefetch(request, messages.request, function(prevent) {
if (prevent) return callback();

var resource = fetch(messages.request, messages.response);

pump(resource, json.createObjectStream(key), function(err) {
if(err) {
return json.destroy();
}
if(!(/2\d\d/).test(messages.response.statusCode)) {
error = true;
}

loop();
});

app(messages.request, messages.response, function(err) {
var task = {
resource: resource,
request: messages.request,
response: messages.response,
key: key
};

app(messages.request, messages.response, function() {
resourceQueue.kill();
json.destroy();
});
};

callback(request, messages.request, write);
}());
// Callback is called once the stream for this resource has
// been fully piped out to the client.
resourceQueue.push(task, callback);
});
};

// Fire off all requests and push the resulting streams into a queue to
// be processed
async.eachLimit(keys, concurrency, fetchResource, function(err) {
if(resourceQueue.idle()) {
endStream(json, error);
} else {
// Called once all streams have been fully pumped out to the client.
resourceQueue.drain = function() {
endStream(json, error);
};
}
});
};
};

Expand Down
89 changes: 89 additions & 0 deletions test/integration/multifetch.options.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,93 @@ describe('multifetch.options', function() {
});
});
});


describe('concurrent fetching', function() {

before(function(done) {
server = helper.server();

server.get('/api/multifetch', multifetch({ concurrency: 5 }));
server = server.listen(helper.port, done);
});

after(function(done) {
server.close(done);
});

describe('fetch multiple resources', function() {
before(function(done) {
request.get({
url: helper.url('/api/multifetch'),
qs: {
api: '/api',
user_1: '/api/users/user_1',
user_2: '/api/users/user_2',
user_3: '/api/users/user_3',
readme: '/README.md'
},
json: true
}, function(err, _, result) {
body = result;
done(err);
});
});

it('should be successful response', function() {
chai.expect(body).to.have.property('_error', false);
});

it('should fetch api resource', function() {
chai.expect(body)
.to.have.property('api')
.to.have.property('statusCode', 200);
});

it('should fetch user_1 resource', function() {
chai.expect(body)
.to.have.property('user_1')
.to.have.property('statusCode', 200);
});

it('should fetch user_2 resource', function() {
chai.expect(body)
.to.have.property('user_2')
.to.have.property('statusCode', 200);
});

it('should fetch user_3 resource', function() {
chai.expect(body)
.to.have.property('user_3')
.to.have.property('statusCode', 200);
});

it('should fetch user_4 resource', function() {
chai.expect(body)
.to.have.property('readme')
.to.have.property('statusCode', 200);
});
});

describe('hang up on bad request', function() {
var err;

before(function(done) {
request.get({
url: helper.url('/api/multifetch'),
qs: {
bad: '/api/not_found',
},
json: true
}, function(result) {
err = result;
done();
});
});

it('should emit an error', function() {
chai.expect(err).to.be.defined;
});
});
});
});

0 comments on commit 8b989a8

Please sign in to comment.