From 77ba557156cf73a3d52698de2413044a6b591c8f Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 8 Dec 2016 16:05:28 -0800 Subject: [PATCH] Backport #1958: Add an output callback override stack Fix a subtle async bug in processing comm messages. Basically, message processing did not wait for any promises a comm message handler may return to resolve, because comms were not set up to resolve return values of message handlers. This allows us to override output callbacks to redirect output messages, and is used to implement the Output widget, for example. It does not redirect status messages or messages on other channels. Write the get_output_callback_id function that we use. Return after error condition. Some tests for output callback overrides. --- notebook/static/services/kernels/comm.js | 13 +- notebook/static/services/kernels/kernel.js | 71 +++++++++- notebook/tests/notebook/output.js | 157 +++++++++++++++++++-- 3 files changed, 215 insertions(+), 26 deletions(-) diff --git a/notebook/static/services/kernels/comm.js b/notebook/static/services/kernels/comm.js index 85065ddcb3..f751866081 100644 --- a/notebook/static/services/kernels/comm.js +++ b/notebook/static/services/kernels/comm.js @@ -129,12 +129,9 @@ define([ } this.comms[content.comm_id] = this.comms[content.comm_id].then(function(comm) { - try { - comm.handle_msg(msg); - } catch (e) { - console.log("Exception handling comm msg: ", e, e.stack, msg); - } - return comm; + return (Promise.resolve(comm.handle_msg(msg)) + .catch(utils.reject('Exception handling comm message')) + .then(function() {return comm;})); }); return this.comms[content.comm_id]; }; @@ -194,7 +191,7 @@ define([ var callback = this['_' + key + '_callback']; if (callback) { try { - callback(msg); + return callback(msg); } catch (e) { console.log("Exception in Comm callback", e, e.stack, msg); } @@ -202,7 +199,7 @@ define([ }; Comm.prototype.handle_msg = function (msg) { - this._callback('msg', msg); + return this._callback('msg', msg); }; Comm.prototype.handle_close = function (msg) { diff --git a/notebook/static/services/kernels/kernel.js b/notebook/static/services/kernels/kernel.js index 20b07ca4b9..10035a60db 100644 --- a/notebook/static/services/kernels/kernel.js +++ b/notebook/static/services/kernels/kernel.js @@ -42,6 +42,7 @@ define([ this.username = "username"; this.session_id = utils.uuid(); this._msg_callbacks = {}; + this._msg_callbacks_overrides = {}; this._msg_queue = Promise.resolve(); this.info_reply = {}; // kernel_info_reply stored here after starting @@ -301,6 +302,8 @@ define([ this.events.trigger('kernel_restarting.Kernel', {kernel: this}); this.stop_channels(); + this._msg_callbacks = {}; + this._msg_callbacks_overrides = {}; var that = this; var on_success = function (data, status, xhr) { that.events.trigger('kernel_created.Kernel', {kernel: that}); @@ -847,6 +850,35 @@ define([ } }; + /** + * Get output callbacks for a specific message. + * + * @function get_output_callbacks_for_msg + * + * Since output callbacks can be overridden, we first check the override stack. + */ + Kernel.prototype.get_output_callbacks_for_msg = function (msg_id) { + return this.get_callbacks_for_msg(this.get_output_callback_id(msg_id)); + }; + + + /** + * Get the output callback id for a message + * + * Since output callbacks can be redirected, this may not be the same as + * the msg_id. + * + * @function get_output_callback_id + */ + Kernel.prototype.get_output_callback_id = function (msg_id) { + var callback_id = msg_id; + var overrides = this._msg_callbacks_overrides[msg_id]; + if (overrides && overrides.length > 0) { + callback_id = overrides[overrides.length-1]; + } + return callback_id + } + /** * Clear callbacks for a specific message. * @@ -891,10 +923,16 @@ define([ * * } * + * If the third parameter is truthy, the callback is set as the last + * callback registered. + * * @function set_callbacks_for_msg */ - Kernel.prototype.set_callbacks_for_msg = function (msg_id, callbacks) { - this.last_msg_id = msg_id; + Kernel.prototype.set_callbacks_for_msg = function (msg_id, callbacks, save) { + var remember = save || true; + if (remember) { + this.last_msg_id = msg_id; + } if (callbacks) { // shallow-copy mapping, because we will modify it at the top level var cbcopy = this._msg_callbacks[msg_id] = this.last_msg_callbacks = {}; @@ -903,11 +941,31 @@ define([ cbcopy.input = callbacks.input; cbcopy.shell_done = (!callbacks.shell); cbcopy.iopub_done = (!callbacks.iopub); - } else { + } else if (remember) { this.last_msg_callbacks = {}; } }; - + + /** + * Override output callbacks for a particular msg_id + */ + Kernel.prototype.output_callback_overrides_push = function(msg_id, callback_id) { + var output_callbacks = this._msg_callbacks_overrides[msg_id]; + if (output_callbacks === void 0) { + this._msg_callbacks_overrides[msg_id] = output_callbacks = []; + } + output_callbacks.push(callback_id); + } + + Kernel.prototype.output_callback_overrides_pop = function(msg_id) { + var callback_ids = this._msg_callbacks_overrides[msg_id]; + if (!callback_ids) { + console.error("Popping callback overrides, but none registered", msg_id); + return; + } + return callback_ids.pop(); + } + Kernel.prototype._handle_ws_message = function (e) { var that = this; this._msg_queue = this._msg_queue.then(function() { @@ -1032,7 +1090,7 @@ define([ * @function _handle_clear_output */ Kernel.prototype._handle_clear_output = function (msg) { - var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id); + var callbacks = this.get_output_callbacks_for_msg(msg.parent_header.msg_id); if (!callbacks || !callbacks.iopub) { return; } @@ -1048,7 +1106,8 @@ define([ * @function _handle_output_message */ Kernel.prototype._handle_output_message = function (msg) { - var callbacks = this.get_callbacks_for_msg(msg.parent_header.msg_id); + var msg_id = msg.parent_header.msg_id; + var callbacks = this.get_output_callbacks_for_msg(msg_id); if (!callbacks || !callbacks.iopub) { // The message came from another client. Let the UI decide what to // do with it. diff --git a/notebook/tests/notebook/output.js b/notebook/tests/notebook/output.js index db6f276d71..cfc6958388 100644 --- a/notebook/tests/notebook/output.js +++ b/notebook/tests/notebook/output.js @@ -4,6 +4,17 @@ casper.notebook_test(function () { + this.compare_outputs = function(results, expected) { + for (var i = 0; i < results.length; i++) { + var r = results[i]; + var ex = expected[i]; + this.test.assertEquals(r.output_type, ex.output_type, "output " + i + " = " + r.output_type); + if (r.output_type === 'stream') { + this.test.assertEquals(r.name, ex.name, "stream " + i + " = " + r.name); + this.test.assertEquals(r.text, ex.text, "content " + i); + } + } + } this.test_coalesced_output = function (msg, code, expected) { this.then(function () { this.echo("Test coalesced output: " + msg); @@ -24,31 +35,23 @@ casper.notebook_test(function () { return cell.output_area.outputs; }); this.test.assertEquals(results.length, expected.length, "correct number of outputs"); - for (var i = 0; i < results.length; i++) { - var r = results[i]; - var ex = expected[i]; - this.test.assertEquals(r.output_type, ex.output_type, "output " + i); - if (r.output_type === 'stream') { - this.test.assertEquals(r.name, ex.name, "stream " + i); - this.test.assertEquals(r.text, ex.text, "content " + i); - } - } + this.compare_outputs(results, expected); }); }; - + this.thenEvaluate(function () { IPython.notebook.insert_cell_at_index("code", 0); var cell = IPython.notebook.get_cell(0); cell.set_text([ "from __future__ import print_function", "import sys", - "from IPython.display import display" + "from IPython.display import display, clear_output" ].join("\n") ); cell.execute(); }); - + this.test_coalesced_output("stdout", [ "print(1)", "sys.stdout.flush()", @@ -123,4 +126,134 @@ casper.notebook_test(function () { }, }] ); + + this.then(function () { + this.echo("Test output callback overrides"); + }); + + this.thenEvaluate(function () { + IPython.notebook.insert_cell_at_index("code", 0); + var cell = IPython.notebook.get_cell(0); + cell.set_text(["print(1)", + "sys.stdout.flush()", + "print(2)", + "sys.stdout.flush()", + "print(3, file=sys.stderr)", + "sys.stdout.flush()", + "display(2)", + "clear_output()", + "sys.stdout.flush()", + "print('remove handler')", + "sys.stdout.flush()", + "print('back to cell')", + "sys.stdout.flush()", + ].join('\n')); + cell.execute(); + var kernel = IPython.notebook.kernel; + var msg_id = cell.last_msg_id; + var callback_id = 'mycallbackid' + cell.iopub_messages = []; + var add_msg = function(msg) { + if (msg.content.text==="remove handler\n") { + kernel.output_callback_overrides_pop(msg_id); + } + msg.content.output_type = msg.msg_type; + cell.iopub_messages.push(msg.content); + }; + kernel.set_callbacks_for_msg(callback_id, { + iopub: { + output: add_msg, + clear_output: add_msg, + } + }, false); + kernel.output_callback_overrides_push(msg_id, callback_id); + }); + + this.wait_for_idle(); + + this.then(function () { + var expected_callback = [{ + output_type: "stream", + name: "stdout", + text: "1\n" + }, { + output_type: "stream", + name: "stdout", + text: "2\n" + }, { + output_type: "stream", + name: "stderr", + text: "3\n" + },{ + output_type: "display_data", + },{ + output_type: "clear_output", + },{ + output_type: "stream", + name: "stdout", + text: "remove handler\n" + },] + var expected_cell = [{ + output_type: "stream", + name: "stdout", + text: "back to cell\n" + }] + var returned = this.evaluate(function () { + var cell = IPython.notebook.get_cell(0); + return [cell.output_area.outputs, cell.iopub_messages]; + }); + var cell_results = returned[0]; + var callback_results = returned[1]; + this.test.assertEquals(cell_results.length, expected_cell.length, "correct number of cell outputs"); + this.test.assertEquals(callback_results.length, expected_callback.length, "correct number of callback outputs"); + this.compare_outputs(cell_results, expected_cell); + this.compare_outputs(callback_results, expected_callback); + }); + + this.then(function () { + this.echo("Test output callback overrides get execute_results messages too"); + }); + + this.thenEvaluate(function () { + IPython.notebook.insert_cell_at_index("code", 0); + var cell = IPython.notebook.get_cell(0); + cell.set_text("'end'"); + cell.execute(); + var kernel = IPython.notebook.kernel; + var msg_id = cell.last_msg_id; + var callback_id = 'mycallbackid2' + cell.iopub_messages = []; + var add_msg = function(msg) { + msg.content.output_type = msg.msg_type; + cell.iopub_messages.push(msg.content); + }; + kernel.set_callbacks_for_msg(callback_id, { + iopub: { + output: add_msg, + clear_output: add_msg, + } + }, false); + kernel.output_callback_overrides_push(msg_id, callback_id); + }); + + this.wait_for_idle(); + + this.then(function () { + var expected_callback = [{ + output_type: "execute_result", + data: { + "text/plain" : "'end'" + } + }]; + var expected_cell = []; + var returned = this.evaluate(function () { + var cell = IPython.notebook.get_cell(0); + return [cell.output_area.outputs, cell.iopub_messages]; + }); + var cell_results = returned[0]; + var callback_results = returned[1]; + this.test.assertEquals(cell_results.length, expected_cell.length, "correct number of cell outputs"); + this.test.assertEquals(callback_results.length, expected_callback.length, "correct number of callback outputs"); + this.compare_outputs(callback_results, expected_callback); + }); });