diff --git a/backends.js b/backends.js new file mode 100644 index 0000000..fb2d827 --- /dev/null +++ b/backends.js @@ -0,0 +1,85 @@ +var mbc = require('mbc-common'), + search_options = mbc.config.Search, + collections = mbc.config.Common.Collections, + backboneio = require('backbone.io'), + middleware = new mbc.iobackends().get_middleware() +; + +var test_conf = { + db: { + dbName: 'mediatestdb', + dbHost: 'localhost', + dbPort: 27017 + } +}; +var test_db = mbc.db(test_conf.db); + +module.exports = function (db) { + var backends = { + status: { + use: [middleware.uuid], + redis: true, + mongo: { + db: db, + collection: collections.Status, + opts: { search: search_options.Status }, + }}, + frame: { + store: backboneio.middleware.memoryStore(db, 'progress', {}), + redis: true, + }, + mostomessages: { + redis: true, + use: [middleware.tmpId], + mongo: { + db: db, + collection: collections.Mostomessages, + opts: { search: search_options.Mostomessages }, + }}, + // stuff we don't care if not saved across runs. Perhaps warns like sync and lpay started? + volatilemostomessages: { + redis: true, + use: [middleware.tmpId], + store: backboneio.middleware.memoryStore(db, 'volatilemessages', {}), + }, + // keep this, the tests create some Media.Models. + media: { + mongo: { + db: test_db, + collection: collections.Medias, + opts: { search: search_options.Medias }, + }}, + list: { + use: [middleware.uuid, middleware.tmpId], + mongo: { + db: test_db, + collection: collections.Lists, + opts: { search: search_options.Lists }, + }}, + piece: { + use: [middleware.uuid], + mongo: { + db: test_db, + collection: collections.Pieces, + opts: { search: search_options.Pieces }, + }}, + sched: { + use: [middleware.uuid, middleware.publishJSON], + mongo: { + db: test_db, + collection: collections.Scheds, + opts: { search: search_options.Scheds }, + }}, + transform: { + use: [middleware.uuid], + mongo: { + db: test_db, + collection: collections.Transforms, + opts: { search: search_options.Transforms }, + }}, + }; + + return backends; +}; + + diff --git a/drivers/mvcp/melted-node-driver.js b/drivers/mvcp/melted-node-driver.js index 1de951b..30fd3f9 100644 --- a/drivers/mvcp/melted-node-driver.js +++ b/drivers/mvcp/melted-node-driver.js @@ -10,16 +10,29 @@ var melted_node = require('melted-node') , logger = mbc.logger().addLogger('MELTED-NODE-DRIVER') , melted_log = mbc.logger().addLogger('MELTED-NODE') , uuid = require('node-uuid') +, events = require('events') +, util = require('util') ; function melted(host, port, timeout) { this.uuid = uuid.v4(); logger.debug(this.uuid + " - Creating server instance [" + host + ":" + port + "]"); this.mlt = new melted_node(host, port, melted_log, timeout); + var self = this; + ['response-timeout', 'command-error', 'command-response', 'start-connection', 'connected', 'disconnected', 'connection-error', 'reconnect', 'disconnect'].forEach(function(event) { + self.mlt.on(event, function() { + arguments = Array.prototype.slice.call(arguments, 0); + arguments.splice(0,0, event); + self.emit.apply(self, arguments); + }); + }); this.commandQueue = Q.resolve(); logger.debug(this.uuid + " - Server instance created [" + this.mlt.host + ":" + this.mlt.port + "]"); + events.EventEmitter.call(this); } +util.inherits(melted, events.EventEmitter) + melted.prototype._sendCommand = function(command) { logger.debug(this.uuid + " - Sending command: " + command); return this.mlt.sendCommand(command); @@ -115,7 +128,7 @@ melted.prototype.getServerStatus = function() { } var err = new Error(self.uuid + " - Error getting server status in response object: " + response) throw (err); - }).fail(function() { + }).fail(function(error) { var err = new Error(self.uuid + " - Error getting server status: " + error); logger.error(err.message); throw err; diff --git a/drivers/status/pubsub.js b/drivers/status/pubsub.js index 17b9735..0c72f9c 100644 --- a/drivers/status/pubsub.js +++ b/drivers/status/pubsub.js @@ -6,11 +6,16 @@ * creating constants in and handling db collections in mbc-common ***************************************************************************/ var events = require('events'); +var uuid = require('node-uuid'); +var Q = require('q'); var util = require('util'); var _ = require('underscore'); var mbc = require('mbc-common'); var logger = mbc.logger().addLogger('PUBSUB-DRIVER'); +var App = require("mbc-common/models/App"); +//XXX: keep this as it is here or make a deep copy later. +// _.clone() is shallow. var defaults = { // copied from caspa/models.App.Status _id: 2, piece: { @@ -27,21 +32,34 @@ var defaults = { // copied from caspa/models.App.Status on_air: false, }; -function MostoMessage(value, description, message) { - this.value = value; - this.description = description; - this.message = message; -} +var Status = new App.Status(); +Status.bindBackend(); +Status.save(); + +var ProgressStatus = new App.ProgressStatus(); +ProgressStatus.bindBackend(); +ProgressStatus.save(); + +var MostoMessagesCollection = new App.MostoMessagesCollection(); +MostoMessagesCollection.bindBackend(); + function CaspaDriver() { events.EventEmitter.call(this); var self = this; this.status = _.clone(defaults); this.db = mbc.db(); - this.publisher = mbc.pubsub(); } util.inherits(CaspaDriver, events.EventEmitter); +CaspaDriver.prototype.CODES = { + BLANK: 201, + SYNC: 202, + PLAY: 203, + MELTED_CONN: 501, + FILE_NOT_FOUND: 502, +}; + CaspaDriver.prototype.setupAll = function() { var self = this; var setups = [ @@ -58,26 +76,21 @@ CaspaDriver.prototype.setupAll = function() { CaspaDriver.prototype.setupStatus = function(callback) { var self = this; - var col = this.db.collection('status'); - col.findOne({_id: 2}, function(err, res) { - if( err ) - // err.. do something? - return; - if( !res ) { - // the status doesn't exist, create it - col.save(self.status, function(err, itm) { - callback(); - }); - } else { - // res existed, just signal as ready - callback(); - } - }); +//XXX: poner un status local aca ? }; CaspaDriver.prototype.setupMessages = function(callback) { // I think we should assume at init there's no sticky errors? - this.db.collection('mostomessages').remove(callback); + // So we either mark them as fixed or just clean up the db? + MostoMessagesCollection.fetch({ success: function() { + _.each( MostoMessagesCollection.where({status: "failing"}), function(message) { + message.set('status', 'fixed'); + message.set('end', moment().valueOf()); + message.save(); + }); + callback(); + } + }); }; CaspaDriver.prototype.setStatus = function(meltedStatus) { @@ -102,7 +115,8 @@ CaspaDriver.prototype.setStatus = function(meltedStatus) { if (clip===undefined) return { name: '', _id: '' }; return { name: clip.name, - _id: clip._id + _id: clip._id, + length: clip.totalFrames, } } @@ -142,7 +156,7 @@ CaspaDriver.prototype.setStatus = function(meltedStatus) { logger.debug("Status builded. doing last calculations"); if (status.piece.current) - status.piece.current.progress = (meltedStatus.position / status.piece.current.length) * 100 + "%"; + status.piece.current.progress = (meltedStatus.position / status.piece.current.length) * 100; var prevStatus = _.clone(this.status); @@ -155,43 +169,76 @@ CaspaDriver.prototype.setStatus = function(meltedStatus) { status.show[val]._id == prevStatus.show[val]._id ); }) ) { logger.debug("No changes, try to send statusclip"); - return this.setProgressStatus({ - progress: meltedStatus.position, - length: meltedStatus.clip.current.length, + ProgressStatus.set({ + currentFrame: meltedStatus.position, + totalFrames: meltedStatus.clip.current.length, }); + return ProgressStatus.save(); } logger.debug("Finally publish status"); - this.publish("mostoStatus", status); + Status.set(status); + return Status.save(); }; -CaspaDriver.prototype.setProgressStatus = function(statusPiece) { - if (statusPiece) - this.publish("mostoStatus.progress", { - currentFrame: statusPiece.progress, - totalFrames: statusPiece.length, - }); -} - CaspaDriver.prototype.publish = function(channel, status) { - this.publisher.publishJSON(channel, status); + logger.error('XXXX CALLED METHOD: CaspaDriver.publish()'); }; -CaspaDriver.prototype.publishMessage = function(code, description, message, sticky) { - message = new MostoMessage(code, description, message); - var method = 'emit'; - if( sticky ) { - // I create an id with the timestamp to be able to cancel the error afterwards - message.stickId = (new moment()).valueOf(); - method = 'create'; +//XXX: need to check all the calls so they match the new function signature. +//XXX: OLD WAS: CaspaDriver.prototype.publishMessage = function(code, description, message, sticky) { +/* + Publishes a message through redis. If the message code is considered an + ongoing error (such as mosto connectivity errors), it's saved to the database +*/ +//XXX: FIXME: stuff like sync lost or play started sholud probably be stored +// on a memoryStore and not mongo. +CaspaDriver.prototype.publishMessage = function(code, message, description, reference) { + var status = {}; + (code !== undefined) && (status.code = code); + description && (status.description = description); + message && (status.message = message); + reference && (status.reference = reference); + + var existing = MostoMessagesCollection.findWhere(status); + if(existing) { + // don't publish the same message twice + return existing; } - this.publisher.publishJSON(["mostoMessage", method].join('.'), - { model: message }); - return message; + + + status._id = uuid.v4(); + status._tmpid = true; + status = MostoMessagesCollection.create(status); + status.save(); + return status; }; -CaspaDriver.prototype.dropMessage = function(message) { - this.publisher.publish("mostoMessage.delete", { model: message }); +/* + updates the model in the database setting status='fixed' and returns a + promise that resolves once the object is updated in the database, and the + signal is published through redis +*/ +CaspaDriver.prototype.dropMessage = function(code, reference) { + var self = this; + var message = MostoMessagesCollection.findWhere({ code: code, reference: reference }); + if(!message) + return Q.resolve(false); + + message.set('status', 'fixed'); + message.set('end', moment().valueOf()); + + var defer = Q.defer(); + message.save({ + success: function(model, response, options) { + defer.resolve(true); + }, + error: function(model, response, options) { + defer.reject(response); + } + }); + + return defer; }; exports = module.exports = function() { diff --git a/heartbeats.js b/heartbeats.js index 4f51051..83e522c 100644 --- a/heartbeats.js +++ b/heartbeats.js @@ -55,6 +55,12 @@ heartbeats.prototype.startMvcpServer = function(callback) { var result = self.server.initServer(); result.then(function() { logger.info("MVCP server started"); + self.server.on('reconnect', function(had_error) { + self.emit('melted-disconnected'); + }); + self.server.on('connected', function() { + self.emit('melted-connected'); + }); if (callback !== undefined) { callback(); } diff --git a/models/Mosto.js b/models/Mosto.js index 71c73bc..33db032 100644 --- a/models/Mosto.js +++ b/models/Mosto.js @@ -130,6 +130,13 @@ Mosto.MeltedCollection = Backbone.Collection.extend({ }, initMvcpServer: function() { + var self = this; + this.driver.on('reconnect', function(had_error) { + self.trigger('melted-disconnected'); + }); + this.driver.on('connected', function() { + self.trigger('melted-connected'); + }); return this.driver.initServer(); }, diff --git a/mosto.js b/mosto.js index 81b74e9..240248c 100644 --- a/mosto.js +++ b/mosto.js @@ -1,3 +1,14 @@ +/* XXX: I need the patched models for status driver. + * Not putting this there so everything is on one place. + */ +var mbc = require('mbc-common') +, db = mbc.db() +, backends_conf = require('./backends')(db) +, iobackends = new mbc.iobackends(db, backends_conf) +; +iobackends.patchBackbone(); + +// var fs = require('fs') , util = require('util') , events = require('events') @@ -7,13 +18,13 @@ var fs = require('fs') , playlists_driver = require('./drivers/playlists/playlists-driver') , status_driver = require('./drivers/status/pubsub') , utils = require('./utils') -, mbc = require('mbc-common') , config = mbc.config.Mosto.General , _ = require('underscore') , heartbeats = require('./heartbeats') , models = require('./models/Mosto') , logger = mbc.logger().addLogger('CORE') ; + //TODO: Chequear window, se esta construyendo de formas distintas //INCLUSO EN EL DRIVER MISMO SE USA DE FORMAS DISTINTAS!!! function mosto(customConfig) { @@ -89,6 +100,27 @@ mosto.prototype.initDriver = function() { self.playlists.removePlaylist(id); }); + this.pl_driver.on('file-not-found', function(media) { + var error = self.status_driver.publishMessage(self.status_driver.CODES.FILE_NOT_FOUND, JSON.stringify(media), undefined, media.file); + }); + + this.playlists.on('remove:playlists', function(playlist) { + var broken = playlist.get('medias').filter(function(m) { return m.get("broken") }); + broken.forEach(function(model) { + if(model.get('broken')) { + // a broken file was removed, drop messages regarding it + self.status_driver.dropMessage(self.status_driver.CODES.FILE_NOT_FOUND, model.get('broken')); + } + }); + }); + + this.playlists.on('melted-disconnected:melted_medias', function() { + self.status_driver.publishMessage(self.status_driver.CODES.MELTED_CONN, "Connection with melted lost", undefined, "playlists"); + }); + this.playlists.on('melted-connected:melted_medias', function() { + self.status_driver.dropMessage(self.status_driver.CODES.MELTED_CONN, 'playlists'); + }); + self.pl_driver.start(); }; @@ -173,7 +205,7 @@ mosto.prototype.initHeartbeats = function() { }); self.heartbeats.on("noClips", function() { - var window = this.getTimeWindow(); + var window = self.getTimeWindow(); self.fetchPlaylists(window); }); @@ -181,6 +213,20 @@ mosto.prototype.initHeartbeats = function() { self.emit('playing'); }); + self.on('playing', function() { + self.status_driver.publishMessage(self.status_driver.CODES.PLAY); + }); + self.heartbeats.on('outOfSync', function() { + self.status_driver.publishMessage(self.status_driver.CODES.SYNC); + }); + + self.heartbeats.on('melted-disconnected', function() { + self.status_driver.publishMessage(self.status_driver.CODES.MELTED_CONN, "Connection with melted lost", undefined, "heartbeats"); + }); + self.heartbeats.on('melted-connected', function() { + self.status_driver.dropMessage(self.status_driver.CODES.MELTED_CONN, 'heartbeats'); + }); + self.heartbeats.init(); }; diff --git a/package.json b/package.json index 7173795..f8bdb92 100644 --- a/package.json +++ b/package.json @@ -30,12 +30,13 @@ "mbc-common" : "git://github.com/inaes-tic/mbc-common.git", "node-mlt" : "git://github.com/inaes-tic/node-mlt.git#0.0.1", "semaphore" : "1.0.x", - "backbone" : "1.x.x", + "backbone" : "1.x.x", + "backbone.io" : "git://github.com/inaes-tic/backbone.io", "winston" : "0.7.x", "istanbul" : "0.1.43" }, "devDependencies": { - "mocha" : "1.7.x", + "mocha" : "1.17.x", "should" : "1.2.x", "seed-random" : "1.0.1", "xml2js" : "0.2.7", diff --git a/test/001-patch-backbone.js b/test/001-patch-backbone.js new file mode 100644 index 0000000..7a1f66d --- /dev/null +++ b/test/001-patch-backbone.js @@ -0,0 +1,10 @@ +before(function(done) { + var mbc = require('mbc-common'); + var db = mbc.db(); + var backends_conf = require('../backends.js')(db); + var iobackends = new mbc.iobackends(db, backends_conf); + + iobackends.patchBackbone(); + + done(); +}); diff --git a/test/media_helpers.js b/test/media_helpers.js index 5f06e4d..53b65ac 100644 --- a/test/media_helpers.js +++ b/test/media_helpers.js @@ -3,8 +3,7 @@ var fs = require('fs'), crypto = require('crypto'), moment = require('moment'), seed = require('seed-random'), - _ = require('underscore'), - CMedia = require('mbc-common/models/Media'); + _ = require('underscore'); function parseXMLs(path) { if (path === undefined) { @@ -70,6 +69,11 @@ exports.getMedia = function(path) { * Scans given path (or default) getting media files and returns mbc-common.models.Media objects array */ exports.getMBCMedia = function(path) { + /* XXX: Keep it here as otherwise it can be loaded before Backbone is + * patched leading to all sorts of fun + */ + var CMedia = require('mbc-common/models/Media'); + if (path === undefined) { path = "test/videos/"; // TODO FIXME XXX: ugly hardcoded -> should be in config? } diff --git a/test/models-test.js b/test/models-test.js index 1e693dd..72b6bc8 100644 --- a/test/models-test.js +++ b/test/models-test.js @@ -1,5 +1,4 @@ -var Mosto = require('../models/Mosto') -, should = require('should') +var should = require('should') , mvcp = require('../drivers/mvcp/mvcp-driver') , melted = require('../api/Melted') , helpers = require('./media_helpers') @@ -8,7 +7,13 @@ var Mosto = require('../models/Mosto') ; describe.skip('models.Mosto', function() { + /* XXX: Keep it here as otherwise it can be loaded before Backbone is + * patched leading to all sorts of fun + */ + var Mosto = require('../models/Mosto') + var self = this; + self.playlists = Mosto.Playlists; self.medias = helpers.getMedia(); this.timeout(15000); diff --git a/test/playlist-mongo-driver.js b/test/playlist-mongo-driver.js index 35c29c4..d1097ec 100644 --- a/test/playlist-mongo-driver.js +++ b/test/playlist-mongo-driver.js @@ -5,10 +5,13 @@ var mbc = require('mbc-common'); var _ = require('underscore'); var melted = require('../api/Melted'); var helpers = require('./media_helpers'); -var Media = require('mbc-common/models/Media'); var uuid = require('node-uuid'); describe('PlaylistMongoDriver', function(){ + /* XXX: Keep it here as otherwise it can be loaded before Backbone is + * patched leading to all sorts of fun + */ + var Media = require('mbc-common/models/Media'); var self = this; before(function(done) {