Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use iobackends to send status messages. #145

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions backends.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
var mbc = require('mbc-common'),
search_options = mbc.config.Search,
collections = mbc.config.Common.Collections,
backboneio = require('backbone.io'),
middleware = new mbc.iobackends().get_middleware()
;

var test_conf = {
db: {
dbName: 'mediatestdb',
dbHost: 'localhost',
dbPort: 27017
}
};
var test_db = mbc.db(test_conf.db);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep test code inside tests... You can pass db to the constructor of this function and mosto server also receives the db object and can pass it alongside when calling this...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:19, Juan Martin Runge [email protected] wrote:

In backends.js:

@@ -0,0 +1,85 @@
+var mbc = require('mbc-common'),

  • search_options = mbc.config.Search,
  • collections = mbc.config.Common.Collections,
  • backboneio = require('backbone.io'),
  • middleware = new mbc.iobackends().get_middleware()
    +;

+var test_conf = {

  • db: {
  •    dbName: 'mediatestdb',
    
  •    dbHost: 'localhost',
    
  •    dbPort: 27017
    
  • }
    +};
    +var test_db = mbc.db(test_conf.db);

Please keep test code inside tests... You can pass db to the constructor of this function and mosto server also receives the db object and can pass it alongside when calling this...

Alright. Some tests do require() of mosto.js and create some things
on the db. We can either move the connection info for the test db
somewhere else here (inside mosto) or add it as another config option
and passing that to the call to mbc.db()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the comment I made in test/001-patch-backbone.js and let me know what do you think. On mosto.js we could do

if (!opts.db)
    db = mbc.db();
else
    db = opts.db;

Or something like this...


module.exports = function (db) {
var backends = {
status: {
use: [middleware.uuid],
redis: true,
mongo: {
db: db,
collection: collections.Status,
opts: { search: search_options.Status },
}},
frame: {
store: backboneio.middleware.memoryStore(db, 'progress', {}),
redis: true,
},
mostomessages: {
redis: true,
use: [middleware.tmpId],
mongo: {
db: db,
collection: collections.Mostomessages,
opts: { search: search_options.Mostomessages },
}},
// stuff we don't care if not saved across runs. Perhaps warns like sync and lpay started?
volatilemostomessages: {
redis: true,
use: [middleware.tmpId],
store: backboneio.middleware.memoryStore(db, 'volatilemessages', {}),
},
// keep this, the tests create some Media.Models.
media: {
mongo: {
db: test_db,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not hardcode db

collection: collections.Medias,
opts: { search: search_options.Medias },
}},
list: {
use: [middleware.uuid, middleware.tmpId],
mongo: {
db: test_db,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem previous

collection: collections.Lists,
opts: { search: search_options.Lists },
}},
piece: {
use: [middleware.uuid],
mongo: {
db: test_db,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again

collection: collections.Pieces,
opts: { search: search_options.Pieces },
}},
sched: {
use: [middleware.uuid, middleware.publishJSON],
mongo: {
db: test_db,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And again

collection: collections.Scheds,
opts: { search: search_options.Scheds },
}},
transform: {
use: [middleware.uuid],
mongo: {
db: test_db,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last of this!

collection: collections.Transforms,
opts: { search: search_options.Transforms },
}},
};

return backends;
};


15 changes: 14 additions & 1 deletion drivers/mvcp/melted-node-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,29 @@ var melted_node = require('melted-node')
, logger = mbc.logger().addLogger('MELTED-NODE-DRIVER')
, melted_log = mbc.logger().addLogger('MELTED-NODE')
, uuid = require('node-uuid')
, events = require('events')
, util = require('util')
;

function melted(host, port, timeout) {
this.uuid = uuid.v4();
logger.debug(this.uuid + " - Creating server instance [" + host + ":" + port + "]");
this.mlt = new melted_node(host, port, melted_log, timeout);
var self = this;
['response-timeout', 'command-error', 'command-response', 'start-connection', 'connected', 'disconnected', 'connection-error', 'reconnect', 'disconnect'].forEach(function(event) {
self.mlt.on(event, function() {
arguments = Array.prototype.slice.call(arguments, 0);
arguments.splice(0,0, event);
self.emit.apply(self, arguments);
});
});
this.commandQueue = Q.resolve();
logger.debug(this.uuid + " - Server instance created [" + this.mlt.host + ":" + this.mlt.port + "]");
events.EventEmitter.call(this);
}

util.inherits(melted, events.EventEmitter)

melted.prototype._sendCommand = function(command) {
logger.debug(this.uuid + " - Sending command: " + command);
return this.mlt.sendCommand(command);
Expand Down Expand Up @@ -115,7 +128,7 @@ melted.prototype.getServerStatus = function() {
}
var err = new Error(self.uuid + " - Error getting server status in response object: " + response)
throw (err);
}).fail(function() {
}).fail(function(error) {
var err = new Error(self.uuid + " - Error getting server status: " + error);
logger.error(err.message);
throw err;
Expand Down
145 changes: 96 additions & 49 deletions drivers/status/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
* creating constants in and handling db collections in mbc-common
***************************************************************************/
var events = require('events');
var uuid = require('node-uuid');
var Q = require('q');
var util = require('util');
var _ = require('underscore');
var mbc = require('mbc-common');
var logger = mbc.logger().addLogger('PUBSUB-DRIVER');
var App = require("mbc-common/models/App");

//XXX: keep this as it is here or make a deep copy later.
// _.clone() is shallow.
var defaults = { // copied from caspa/models.App.Status
_id: 2,
piece: {
Expand All @@ -27,21 +32,34 @@ var defaults = { // copied from caspa/models.App.Status
on_air: false,
};

function MostoMessage(value, description, message) {
this.value = value;
this.description = description;
this.message = message;
}
var Status = new App.Status();
Status.bindBackend();
Status.save();

var ProgressStatus = new App.ProgressStatus();
ProgressStatus.bindBackend();
ProgressStatus.save();

var MostoMessagesCollection = new App.MostoMessagesCollection();
MostoMessagesCollection.bindBackend();


function CaspaDriver() {
events.EventEmitter.call(this);
var self = this;
this.status = _.clone(defaults);
this.db = mbc.db();
this.publisher = mbc.pubsub();
}
util.inherits(CaspaDriver, events.EventEmitter);

CaspaDriver.prototype.CODES = {
BLANK: 201,
SYNC: 202,
PLAY: 203,
MELTED_CONN: 501,
FILE_NOT_FOUND: 502,
};

CaspaDriver.prototype.setupAll = function() {
var self = this;
var setups = [
Expand All @@ -58,26 +76,21 @@ CaspaDriver.prototype.setupAll = function() {

CaspaDriver.prototype.setupStatus = function(callback) {
var self = this;
var col = this.db.collection('status');
col.findOne({_id: 2}, function(err, res) {
if( err )
// err.. do something?
return;
if( !res ) {
// the status doesn't exist, create it
col.save(self.status, function(err, itm) {
callback();
});
} else {
// res existed, just signal as ready
callback();
}
});
//XXX: poner un status local aca ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please translate comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "local status"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:30, Juan Martin Runge [email protected] wrote:

What do you mean by "local status"?

I got to remove that. For a while I thought of keeping a private
instance of App.ProgressStatus and App.MostoMessagesCollection for
each driver instead of the global in use now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

};

CaspaDriver.prototype.setupMessages = function(callback) {
// I think we should assume at init there's no sticky errors?
this.db.collection('mostomessages').remove(callback);
// So we either mark them as fixed or just clean up the db?
MostoMessagesCollection.fetch({ success: function() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant a sticky error be "Could not connect to melted"? In that case, if mosto was restarted after it emitted this error, and on restart it cant connect again... should we fix previous error and generate new one or reuse it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:33, Juan Martin Runge [email protected] wrote:

Cant a sticky error be "Could not connect to melted"? In that case, if mosto was restarted after it emitted this error, and on restart it cant connect again... should we fix previous error and generate new one or reuse it?

The original driver reused the message instead of creating a new one.
What I want to do is just reopen the error if it still persists, else
mark it as resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reopening would mean that it was marked as resolved before, doesnt it? In that case, it would be a new error. If it was not resolved, and persists, we should do nothing. If it was resolved, mark it as resolved. Perhaps we should check unresolved errors on startup and see if they went fixed or not...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:55, Juan Martin Runge [email protected] wrote:

In drivers/status/pubsub.js:

};

CaspaDriver.prototype.setupMessages = function(callback) {
// I think we should assume at init there's no sticky errors?

  • this.db.collection('mostomessages').remove(callback);
  • // So we either mark them as fixed or just clean up the db?
  • MostoMessagesCollection.fetch({ success: function() {

Reopening would mean that it was marked as resolved before, doesnt it? In that case, it would be a new error. If it was not resolved, and persists, we should do nothing. If it was resolved, mark it as resolved. Perhaps we should check unresolved errors on startup and see if they went fixed or not...

I got your comment mixed up with another code block.
The original code just dropped everything, if the error persists it
would be triggered again.
Now we don't do that but on a second thought it is probably better to
have the old behaviour again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stick to the initial error until its resolved instead of dropping and creating a new one...

_.each( MostoMessagesCollection.where({status: "failing"}), function(message) {
message.set('status', 'fixed');
message.set('end', moment().valueOf());
message.save();
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use resolve function here, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:31, Juan Martin Runge [email protected] wrote:

You could use resolve function here, no?

yes but I added it this morning. Haven't updated yet the code in Mosto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!

callback();
}
});
};

CaspaDriver.prototype.setStatus = function(meltedStatus) {
Expand All @@ -102,7 +115,8 @@ CaspaDriver.prototype.setStatus = function(meltedStatus) {
if (clip===undefined) return { name: '', _id: '' };
return {
name: clip.name,
_id: clip._id
_id: clip._id,
length: clip.totalFrames,
}
}

Expand Down Expand Up @@ -142,7 +156,7 @@ CaspaDriver.prototype.setStatus = function(meltedStatus) {
logger.debug("Status builded. doing last calculations");

if (status.piece.current)
status.piece.current.progress = (meltedStatus.position / status.piece.current.length) * 100 + "%";
status.piece.current.progress = (meltedStatus.position / status.piece.current.length) * 100;

var prevStatus = _.clone(this.status);

Expand All @@ -155,43 +169,76 @@ CaspaDriver.prototype.setStatus = function(meltedStatus) {
status.show[val]._id == prevStatus.show[val]._id );
}) ) {
logger.debug("No changes, try to send statusclip");
return this.setProgressStatus({
progress: meltedStatus.position,
length: meltedStatus.clip.current.length,
ProgressStatus.set({
currentFrame: meltedStatus.position,
totalFrames: meltedStatus.clip.current.length,
});
return ProgressStatus.save();
}

logger.debug("Finally publish status");
this.publish("mostoStatus", status);
Status.set(status);
return Status.save();
};

CaspaDriver.prototype.setProgressStatus = function(statusPiece) {
if (statusPiece)
this.publish("mostoStatus.progress", {
currentFrame: statusPiece.progress,
totalFrames: statusPiece.length,
});
}

CaspaDriver.prototype.publish = function(channel, status) {
this.publisher.publishJSON(channel, status);
logger.error('XXXX CALLED METHOD: CaspaDriver.publish()');
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please find where this method is used and fix the calls. Then remove this method.


CaspaDriver.prototype.publishMessage = function(code, description, message, sticky) {
message = new MostoMessage(code, description, message);
var method = 'emit';
if( sticky ) {
// I create an id with the timestamp to be able to cancel the error afterwards
message.stickId = (new moment()).valueOf();
method = 'create';
//XXX: need to check all the calls so they match the new function signature.
//XXX: OLD WAS: CaspaDriver.prototype.publishMessage = function(code, description, message, sticky) {
/*
Publishes a message through redis. If the message code is considered an
ongoing error (such as mosto connectivity errors), it's saved to the database
*/
//XXX: FIXME: stuff like sync lost or play started sholud probably be stored
// on a memoryStore and not mongo.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the lifecycle of memoryStore? Will it be accessible if Caspa/Mosto goes down? I think is interesting to know when playback started or if it was out of sync if you come to the ui and caspa and/or mosto went down (and restarted) when you were away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:38, Juan Martin Runge [email protected] wrote:

Whats the lifecycle of memoryStore? Will it be accessible if Caspa/Mosto goes down? I think is interesting to know when playback started or if it was out of sync if you come to the ui and caspa and/or mosto went down (and restarted) when you were away.

Currently everything is persisted to Mongo, but the things stored on a
memoryStore vanish when the server goes down. So unless necessary I'll
keep them there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, please remove comment then.

CaspaDriver.prototype.publishMessage = function(code, message, description, reference) {
var status = {};
(code !== undefined) && (status.code = code);
description && (status.description = description);
message && (status.message = message);
reference && (status.reference = reference);

var existing = MostoMessagesCollection.findWhere(status);
if(existing) {
// don't publish the same message twice
return existing;
}
this.publisher.publishJSON(["mostoMessage", method].join('.'),
{ model: message });
return message;


status._id = uuid.v4();
status._tmpid = true;
status = MostoMessagesCollection.create(status);
status.save();
return status;
};

CaspaDriver.prototype.dropMessage = function(message) {
this.publisher.publish("mostoMessage.delete", { model: message });
/*
updates the model in the database setting status='fixed' and returns a
promise that resolves once the object is updated in the database, and the
signal is published through redis
*/
CaspaDriver.prototype.dropMessage = function(code, reference) {
var self = this;
var message = MostoMessagesCollection.findWhere({ code: code, reference: reference });
if(!message)
return Q.resolve(false);

message.set('status', 'fixed');
message.set('end', moment().valueOf());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "resolve" function?


var defer = Q.defer();
message.save({
success: function(model, response, options) {
defer.resolve(true);
},
error: function(model, response, options) {
defer.reject(response);
}
});

return defer;
};

exports = module.exports = function() {
Expand Down
6 changes: 6 additions & 0 deletions heartbeats.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ heartbeats.prototype.startMvcpServer = function(callback) {
var result = self.server.initServer();
result.then(function() {
logger.info("MVCP server started");
self.server.on('reconnect', function(had_error) {
self.emit('melted-disconnected');
});
self.server.on('connected', function() {
self.emit('melted-connected');
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will only listen to this events?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 13 March 2014 12:43, Juan Martin Runge [email protected] wrote:

In heartbeats.js:

@@ -55,6 +55,12 @@ heartbeats.prototype.startMvcpServer = function(callback) {
var result = self.server.initServer();
result.then(function() {
logger.info("MVCP server started");

  •    self.server.on('reconnect', function(had_error) {
    
  •        self.emit('melted-disconnected');
    
  •    });
    
  •    self.server.on('connected', function() {
    
  •        self.emit('melted-connected');
    
  •    });
    

We will only listen to this events?

They are the ones that make most sense to show in the ui as the rest are
already logged.

melted-node emits 'response-timeout', 'command-error', 'command-response',
'start-connection', 'connected', 'disconnected', 'connection-error',
'reconnect', 'disconnect'.
Despite its name 'reconnect' means really that the connection with melted
was closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1- We should fix thant in melted-node ASAP! reconnect should mean that is already reconnected, connected should be used only when connected first time and disconnected every time it gets disconnected no matter why.
2- I think response-timeout, command-error, connected, disconnected, connection-error, reconnect and disconnect are interesting events to listen for... User will not be looking at the log, will be looking at the UI

if (callback !== undefined) {
callback();
}
Expand Down
7 changes: 7 additions & 0 deletions models/Mosto.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ Mosto.MeltedCollection = Backbone.Collection.extend({
},

initMvcpServer: function() {
var self = this;
this.driver.on('reconnect', function(had_error) {
self.trigger('melted-disconnected');
});
this.driver.on('connected', function() {
self.trigger('melted-connected');
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem previous...

return this.driver.initServer();
},

Expand Down
Loading