Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently route requests #5

Merged
merged 4 commits into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ Response with content only.
}
```

### Concurrency
By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request.

If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests.

Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time:

```javascript
app.get('/api/multifetch', multifetch({concurrency: 5}));
```
In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially.

License
-------

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test": "mocha"
},
"dependencies": {
"async": "~0.9.0",
"pump": "~1.0.0",
"extend": "~2.0.0"
},
Expand Down
93 changes: 61 additions & 32 deletions source/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var url = require('url');

var pump = require('pump');
var extend = require('extend');
var async = require('async');

var JsonStream = require('./json');
var NullifyStream = require('./nullify');
Expand Down Expand Up @@ -68,20 +69,26 @@ var fetchBare = function(request, response) {
return nullify;
};

var create = function(options, callback) {
if(!callback && typeof options === 'function') {
callback = options;
var endStream = function (jsonStream, error) {
jsonStream.writeObject('_error', error);
jsonStream.end();
};

var create = function(options, prefetch) {
if(!prefetch && typeof options === 'function') {
prefetch = options;
options = {};
}

options = options || {};

options = options || {};
var ignore = options.ignore || [];
var headers = options.headers !== undefined ? options.headers : true;
var concurrency = options.concurrency || 1; // Defaults to sequential fetching

var fetch = headers ? fetchWithHeaders : fetchBare;

callback = callback || noopCallback;
prefetch = prefetch || noopCallback;

return function(request, response, next) {
var app = request.app;
Expand All @@ -95,41 +102,63 @@ var create = function(options, callback) {

pump(json, response);

(function loop() {
var key = keys.pop();
// Exit early if there is nothing to fetch.
if(keys.length === 0) {
return endStream(json, error);
}

if(!key) {
json.writeObject('_error', error);
return json.end();
}
// The resource queue processes resource streams sequentially.
var resourceQueue = async.queue(function worker(task, callback) {
pump(task.resource, json.createObjectStream(task.key), function(err) {
if(err) {
json.destroy();
return callback(err);
}
if(!(/2\d\d/).test(task.response.statusCode)) {
error = true;
}
callback();
});
}, 1);

// Asynchronously fetch the resource for a key and push the resulting
// stream into the resource queue.
var fetchResource = function(key, callback) {
var messages = createMessages(request, query[key]);

var write = function(prevent) {
if(prevent) {
return loop();
}
prefetch(request, messages.request, function(prevent) {
if (prevent) return callback();

var resource = fetch(messages.request, messages.response);

pump(resource, json.createObjectStream(key), function(err) {
if(err) {
return json.destroy();
}
if(!(/2\d\d/).test(messages.response.statusCode)) {
error = true;
}

loop();
});

app(messages.request, messages.response, function(err) {
var task = {
resource: resource,
request: messages.request,
response: messages.response,
key: key
};

app(messages.request, messages.response, function() {
resourceQueue.kill();
json.destroy();
});
};

callback(request, messages.request, write);
}());
// Callback is called once the stream for this resource has
// been fully piped out to the client.
resourceQueue.push(task, callback);
});
};

// Fire off all requests and push the resulting streams into a queue to
// be processed
async.eachLimit(keys, concurrency, fetchResource, function(err) {
if(resourceQueue.idle()) {
endStream(json, error);
} else {
// Called once all streams have been fully pumped out to the client.
resourceQueue.drain = function() {
endStream(json, error);
};
}
});
};
};

Expand Down
89 changes: 89 additions & 0 deletions test/integration/multifetch.options.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,93 @@ describe('multifetch.options', function() {
});
});
});


describe('concurrent fetching', function() {

before(function(done) {
server = helper.server();

server.get('/api/multifetch', multifetch({ concurrency: 5 }));
server = server.listen(helper.port, done);
});

after(function(done) {
server.close(done);
});

describe('fetch multiple resources', function() {
before(function(done) {
request.get({
url: helper.url('/api/multifetch'),
qs: {
api: '/api',
user_1: '/api/users/user_1',
user_2: '/api/users/user_2',
user_3: '/api/users/user_3',
readme: '/README.md'
},
json: true
}, function(err, _, result) {
body = result;
done(err);
});
});

it('should be successful response', function() {
chai.expect(body).to.have.property('_error', false);
});

it('should fetch api resource', function() {
chai.expect(body)
.to.have.property('api')
.to.have.property('statusCode', 200);
});

it('should fetch user_1 resource', function() {
chai.expect(body)
.to.have.property('user_1')
.to.have.property('statusCode', 200);
});

it('should fetch user_2 resource', function() {
chai.expect(body)
.to.have.property('user_2')
.to.have.property('statusCode', 200);
});

it('should fetch user_3 resource', function() {
chai.expect(body)
.to.have.property('user_3')
.to.have.property('statusCode', 200);
});

it('should fetch user_4 resource', function() {
chai.expect(body)
.to.have.property('readme')
.to.have.property('statusCode', 200);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split each expect into a separate it. E.g. it('should fetch user_1 resource').

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sure.

});
});

describe('hang up on bad request', function() {
var err;

before(function(done) {
request.get({
url: helper.url('/api/multifetch'),
qs: {
bad: '/api/not_found',
},
json: true
}, function(result) {
err = result;
done();
});
});

it('should emit an error', function() {
chai.expect(err).to.be.defined;
});
});
});
});