Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently route requests #5

Merged
merged 4 commits into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ Response with content only.
}
```

### Concurrency
By default, `multifetch` will process each request sequentially, waiting until a request has been processed and fully piped out before it processes the next request.

If the response to each of your requests is small but takes a long time to fetch (e.g. heavy database queries), `multifetch` supports concurrently processing requests.

Passing `concurrency: N` as an option allows you to control the number of concurrent requests being processed at any one time:

```javascript
app.get('/api/multifetch', multifetch({concurrency: 5}));
```
In the above case, 5 requests would be routed through express concurrently, and the response of each is placed in a queue to be streamed out to the client sequentially.

License
-------

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test": "mocha"
},
"dependencies": {
"async": "~0.9.0",
"pump": "~1.0.0",
"extend": "~2.0.0"
},
Expand Down
103 changes: 67 additions & 36 deletions source/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var url = require('url');

var pump = require('pump');
var extend = require('extend');
var async = require('async');

var JsonStream = require('./json');
var NullifyStream = require('./nullify');
Expand Down Expand Up @@ -68,20 +69,26 @@ var fetchBare = function(request, response) {
return nullify;
};

var create = function(options, callback) {
if(!callback && typeof options === 'function') {
callback = options;
var endStream = function (jsonStream, error) {
jsonStream.writeObject('_error', error);
jsonStream.end();
};

var create = function(options, prefetch) {
if(!prefetch && typeof options === 'function') {
prefetch = options;
options = {};
}

options = options || {};

options = options || {};
var ignore = options.ignore || [];
var headers = options.headers !== undefined ? options.headers : true;
var concurrency = options.concurrency || 1; // Defaults to sequential fetching

var fetch = headers ? fetchWithHeaders : fetchBare;

callback = callback || noopCallback;
prefetch = prefetch || noopCallback;

return function(request, response, next) {
var app = request.app;
Expand All @@ -95,41 +102,65 @@ var create = function(options, callback) {

pump(json, response);

(function loop() {
var key = keys.pop();

if(!key) {
json.writeObject('_error', error);
return json.end();
}

var messages = createMessages(request, query[key]);
// Exit early if there is nothing to fetch.
if(keys.length === 0) {
return endStream(json, error);
}

var write = function(prevent) {
if(prevent) {
return loop();
// The resource queue processes resource streams sequentially.
var resourceQueue = async.queue(function worker(task, callback) {
pump(task.resource, json.createObjectStream(task.key), function(err) {
if(err) {
json.destroy();
return callback(err);
}
if(!(/2\d\d/).test(task.response.statusCode)) {
error = true;
}
callback();
});
}, 1);

var resource = fetch(messages.request, messages.response);

pump(resource, json.createObjectStream(key), function(err) {
if(err) {
return json.destroy();
}
if(!(/2\d\d/).test(messages.response.statusCode)) {
error = true;
}

loop();
});

app(messages.request, messages.response, function(err) {
json.destroy();
// Asynchronously fetch the resource for a key and push the resulting
// stream into the resource queue.
var fetchResource = function(key, callback) {
var messages = createMessages(request, query[key]);
process.nextTick(function() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is nextTick needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, you're right, it's not needed! I think it's an artifact from some previous playing I was doing. Removed.

prefetch(request, messages.request, function(prevent) {
if (prevent) return callback();

var resource = fetch(messages.request, messages.response);
var task = {
resource: resource,
request: messages.request,
response: messages.response,
key: key
};

app(messages.request, messages.response, function() {
resourceQueue.kill();
json.destroy();
});

// Callback is called once the stream for this resource has
// been fully piped out to the client.
resourceQueue.push(task, callback);
});
};

callback(request, messages.request, write);
}());
});
};

// Fire off all requests and push the resulting streams into a queue to
// be processed
async.eachLimit(keys, concurrency, fetchResource, function(err) {
if(resourceQueue.idle()) {
endStream(json, error);
} else {
// Called once all streams have been fully pumped out to the client.
resourceQueue.drain = function() {
endStream(json, error);
};
}
});
};
};

Expand Down
81 changes: 81 additions & 0 deletions test/integration/multifetch.options.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,85 @@ describe('multifetch.options', function() {
});
});
});


describe('concurrent fetching', function() {

before(function(done) {
server = helper.server();

server.get('/api/multifetch', multifetch({ concurrency: 5 }));
server = server.listen(helper.port, done);
});

after(function(done) {
server.close(done);
});

describe('fetch multiple resources', function() {
before(function(done) {
request.get({
url: helper.url('/api/multifetch'),
qs: {
api: '/api',
user_1: '/api/users/user_1',
user_2: '/api/users/user_2',
user_3: '/api/users/user_3',
readme: '/README.md'
},
json: true
}, function(err, _, result) {
body = result;
done(err);
});
});

it('should be successful response', function() {
chai.expect(body).to.have.property('_error', false);
});

it('should fetch all resources', function() {
chai.expect(body)
.to.have.property('api')
.to.have.property('statusCode', 200);

chai.expect(body)
.to.have.property('user_1')
.to.have.property('statusCode', 200);

chai.expect(body)
.to.have.property('user_2')
.to.have.property('statusCode', 200);

chai.expect(body)
.to.have.property('user_3')
.to.have.property('statusCode', 200);

chai.expect(body)
.to.have.property('readme')
.to.have.property('statusCode', 200);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split each expect into a separate it. E.g. it('should fetch user_1 resource').

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sure.

});
});

describe('hang up on bad request', function() {
var err;

before(function(done) {
request.get({
url: helper.url('/api/multifetch'),
qs: {
bad: '/api/not_found',
},
json: true
}, function(result) {
err = result;
done();
});
});

it('should emit an error', function() {
chai.expect(err).to.be.defined;
});
});
});
});