Skip to content
This repository has been archived by the owner on Sep 20, 2022. It is now read-only.

Commit

Permalink
Fixed Queen
Browse files Browse the repository at this point in the history
Bug!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  • Loading branch information
Tehsmash committed Nov 14, 2013
1 parent c610907 commit 57ce97b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 78 deletions.
2 changes: 1 addition & 1 deletion Queen/config/config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var config = {};

config.hiveIP = '127.0.0.1';
config.hiveIP = '192.168.1.106';
config.serverPort = 3000;

module.exports = config;
2 changes: 1 addition & 1 deletion Queen/controllers/agents.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ subscribeReady();
// Web Sockets
io.sockets.on('connection', function (socket) {
// Get the current number of connect agents
msg = rabbit.constructMessage('ALLAGENTS', 'control');
var msg = rabbit.constructMessage('ALLAGENTS', 'control');
new rabbit.rpc('control', msg, function (data) {
console.log("ADADASDASD", data);
socket.emit('init', data);
Expand Down
58 changes: 0 additions & 58 deletions Queen/npm-debug.log

This file was deleted.

48 changes: 31 additions & 17 deletions Queen/rabbit/rabbit_pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,41 @@ var sys = require('sys');
var TIMEOUT = 2000; //ms time to wait
var CONTENT_TYPE = 'application/json';

exports.PubSub = function (exchangeName, topic, callback) {
exports.PubSub = function (exchangeName, routingKey, callback) {
var connection = amqp.createConnection({host: config.hiveIP}),
self = this;

self.exchangeName = exchangeName;
self.topic = topic;
self.callback = callback;
connection.on('ready', onReady(connection, exchangeName, routingKey, callback));
};

connection.on('ready', function () {
function onReady(connection, name, routingKey, callback) {
return function() {
console.log("connected to " + connection.serverProperties.product);
// There is no need to declare type, 'topic' is the default:
var exchange = connection.exchange(self.exchangeName);
console.log("Setting Up Exchange");
var exchange = connection.exchange(name);

console.log("Setting Up Queue");
connection.queue('', {exclusive: true}, onQueueCreateOk(exchange, routingKey, callback));
}
}

// Consumer:
connection.queue('', {exclusive: true}, function (queue) {
//queue.bind(exchange, self.topic);
queue.subscribe(function (message) {
// Get original message string:
console.log(JSON.parse(message));
self.callback(JSON.parse(message));
});
});
});
};
function onQueueCreateOk(exchange, routingKey, callback) {
return function(queue) {
console.log("Binding queue to routingKey " + routingKey);
queue.bind(exchange, routingKey);
console.log("Subscribing to queue");
queue.subscribe(onMessage(callback));
console.log("Listening...");
}
}

function onMessage(callback) {
return function(message, headers, deliveryInfo) {
console.log("Message Received! Parsing...");
var msg = message['data'].toString('utf-8');
console.log(msg);
console.log(JSON.parse(msg));
callback(JSON.parse(msg));
}
}
5 changes: 4 additions & 1 deletion Queen/rabbit/rabbit_rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ exports.RPCQuery = function (queueName, data, callback) {
if (self.correlationId === m.correlationId) {
clearTimeout(timeout);
connection.end();
callback(JSON.parse(message.data));
var msg = message['data'].toString('utf-8');
console.log('Parsing Response...');
console.log(msg);
callback(JSON.parse(msg));
}
console.log(message);
});
Expand Down

0 comments on commit 57ce97b

Please sign in to comment.