Skip to content

Commit

Permalink
Release commit created with Cranko.
Browse files Browse the repository at this point in the history
+++ cranko-release-info-v1
[[projects]]
qnames = ["wwt_kernel_data_relay", "pypa"]
version = "0.2.0"
age = 0

+++
  • Loading branch information
cranko committed Oct 25, 2021
2 parents f1fb712 + 9c877c9 commit bd5e5e8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 59 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# wwt_kernel_data_relay 0.2.0 (2021-10-25)

- Require and use message sequencing numbers in kernel replies (#2, @pkgw).
These weren't necessary in my initial testing, but in the BinderHub I have
issues with out-of-order and duplicated messages that make it look like we'll
have to use these.


# wwt_kernel_data_relay 0.1.1 (2021-10-23)

- Add the drop-in configuration files needed to automatically activate the
Expand Down
6 changes: 6 additions & 0 deletions docs/specification.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The JSON content of the *every* reply message should contain the following field
{
'status': $status:str,
'seq': $seq:int,
'more': $more:bool
}
Expand All @@ -146,6 +147,11 @@ present, will be relayed to the requestor as part of an HTTP 500 error.

.. _the jupyter_client documentation: https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply

The ``$seq`` field gives a sequence number for each reply message, starting at
zero and increasing by one with each reply. This is needed because some
implementations of the Jupyter messaging protocol don't guarantee in-order
message delivery.

The ``$more`` field indicates whether more reply messages will be sent. If your
implementation doesn't "know" when the last reply will be sent until all data
have been transferred, send a final message with no associated byte buffers.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def get_long_desc():

setup_args = dict(
name="wwt_kernel_data_relay", # cranko project-name
version="0.1.1", # cranko project-version
version="0.2.0", # cranko project-version
description="Jupyter server extension to allow kernels to make data web-accessible",
long_description=get_long_desc(),
long_description_content_type="text/markdown",
Expand Down
164 changes: 106 additions & 58 deletions wwt_kernel_data_relay/serverextension.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,32 @@
from notebook.base.handlers import IPythonHandler
from traitlets.config.configurable import LoggingConfigurable

__all__ = ['load_jupyter_server_extension']
__all__ = ["load_jupyter_server_extension"]


class SequencedBuffer(object):
def __init__(self):
self.next_seq = 0
self.by_seq = {}

def try_get_next(self):
try:
item = self.by_seq.pop(self.next_seq)
except KeyError:
return None

self.next_seq += 1
return item

def accumulate(self, reply, log_object):
seq = reply["content"].get("seq")
if seq is None:
log_object.log_warning("dropping message %r", reply)
return

# ignore old, duplicated messages
if seq >= self.next_seq:
self.by_seq[seq] = reply


class Registry(LoggingConfigurable):
Expand All @@ -29,40 +54,44 @@ def __init__(self):
self._mid_to_buffer = {}

def log_debug(self, fmt, *args):
self.log.debug('wwt_kernel_data_relay | ' + fmt, *args)
self.log.debug("wwt_kernel_data_relay | " + fmt, *args)

def log_info(self, fmt, *args):
self.log.info('wwt_kernel_data_relay | ' + fmt, *args)
self.log.info("wwt_kernel_data_relay | " + fmt, *args)

def log_warning(self, fmt, *args):
self.log.warning('wwt_kernel_data_relay | ' + fmt, *args)
self.log.warning("wwt_kernel_data_relay | " + fmt, *args)

def watch_new_kernel(self, kernel_id, km):
session = Session(
config = km.session.config,
key = km.session.key,
config=km.session.config,
key=km.session.key,
)

kc = km.client(session=session)
self._kid_to_client[kernel_id] = kc

stream = km.connect_iopub()
self.log_debug('watching kernel %s in session %s', kernel_id, session.session)
self.log_debug("watching kernel %s in session %s", kernel_id, session.session)

def watch_iopubs(msg_list):
idents, fed_msg_list = session.feed_identities(msg_list)
msg = session.deserialize(fed_msg_list)
msg_type = msg['header']['msg_type']
msg_type = msg["header"]["msg_type"]

if msg_type == 'wwtkdr_claim_key':
key = msg['content'].get('key')
if msg_type == "wwtkdr_claim_key":
key = msg["content"].get("key")

if not key:
self.log_warning('missing/empty key specified in claim by kernel %s', kernel_id)
elif key.startswith('_'):
self.log_warning('kernel %s attempted to claim reserved key %s', kernel_id, key)
self.log_warning(
"missing/empty key specified in claim by kernel %s", kernel_id
)
elif key.startswith("_"):
self.log_warning(
"kernel %s attempted to claim reserved key %s", kernel_id, key
)
else:
self.log_debug('key %s claimed by kernel %s', key, kernel_id)
self.log_debug("key %s claimed by kernel %s", key, kernel_id)
self._key_to_kid[key] = kernel_id

stream.on_recv(watch_iopubs)
Expand Down Expand Up @@ -102,7 +131,9 @@ async def get_next_reply(self, msg_id, kc):
# gotten a message and buffered it while we were waiting.
buffer = self._mid_to_buffer.get(msg_id)
if buffer:
return buffer.pop(0)
reply = buffer.try_get_next()
if reply is not None:
return reply

# Nothing in our private buffer. Ask the kernel client.
#
Expand All @@ -129,13 +160,17 @@ async def get_next_reply(self, msg_id, kc):
# intended for us. So the only tractable way to ensure that
# everything stays ordered is to always use the buffer.

reply_mid = reply['parent_header'].get('msg_id')
reply_mid = reply["parent_header"].get("msg_id")

if reply_mid is None:
self.log_warning('dropping message on floor because it has no parent_id: %s', reply)
self.log_warning(
"dropping message on floor because it has no parent_id: %s", reply
)
else:
buffer = self._mid_to_buffer.setdefault(reply_mid, [])
buffer.append(reply)
buffer = self._mid_to_buffer.get(reply_mid)
if buffer is None:
self._mid_to_buffer[reply_mid] = buffer = SequencedBuffer()
buffer.accumulate(reply, self)

# we never break out of the loop.

Expand Down Expand Up @@ -170,39 +205,43 @@ async def get(self, key, entry):
if kernel_id is None:
self.clear()
self.set_status(404)
self.finish(f'unrecognized WWTKDR key {key!r}')
self.finish(f"unrecognized WWTKDR key {key!r}")
return

self.registry.log_debug(
'GET key=%s kernel_id=%s entry=%s authenticated=%s',
key, kernel_id, entry, authenticated,
"GET key=%s kernel_id=%s entry=%s authenticated=%s",
key,
kernel_id,
entry,
authenticated,
)

kc = self.registry.get_client(self.kernel_manager, kernel_id)
if kc is None:
self.clear()
self.set_status(404)
self.registry.log_warning(f'could not get kernel client for WWTKDR key {key!r}')
self.finish(f'could not get kernel client for WWTKDR key {key!r}')
self.registry.log_warning(
f"could not get kernel client for WWTKDR key {key!r}"
)
self.finish(f"could not get kernel client for WWTKDR key {key!r}")
return

content = dict(
method = 'GET',
url = url,
authenticated = authenticated,
key = key,
entry = entry,

method="GET",
url=url,
authenticated=authenticated,
key=key,
entry=entry,
# Special marker for "expedited" processing in pywwt Jupyter
# clients, needed to make it possible for those clients to process
# such requests while evaluating async Python code. Without this,
# the user can't use an async command to ask the frontend to load a
# WTML file describing tiled data, because the kernel won't be able
# to process the KDR data request.
data = {'content': {'_pywwtExpedite': True}},
data={"content": {"_pywwtExpedite": True}},
)
msg = kc.session.msg('wwtkdr_resource_request', content)
msg_id = msg['header']['msg_id']
msg = kc.session.msg("wwtkdr_resource_request", content)
msg_id = msg["header"]["msg_id"]
kc.shell_channel.send(msg)

self.clear()
Expand All @@ -220,36 +259,42 @@ async def get(self, key, entry):
self.clear()
self.set_status(500)
self.registry.log_warning(
'incomplete or missing response from kernel | key=%s entry=%s kernel_id=%s msg_id=%s',
key, entry, kernel_id, msg_id
"incomplete or missing response from kernel | key=%s entry=%s kernel_id=%s msg_id=%s",
key,
entry,
kernel_id,
msg_id,
)
self.finish('incomplete or missing response from kernel')
self.finish("incomplete or missing response from kernel")
return

content = reply['content']
status = content.get('status', 'unspecified')
content = reply["content"]
status = content.get("status", "unspecified")

if status != 'ok':
if status != "ok":
self.registry.finish_reply_buffering(msg_id)
self.clear()
self.set_status(500)
msg = content.get('evalue', 'unspecified kernel error')
msg = content.get("evalue", "unspecified kernel error")
self.registry.log_warning(
'kernel internal error | %s | key=%s entry=%s kernel_id=%s',
msg, key, entry, kernel_id,
"kernel internal error | %s | key=%s entry=%s kernel_id=%s",
msg,
key,
entry,
kernel_id,
)
self.finish(msg)
return

if first:
self.set_status(content['http_status'])
for name, value in content['http_headers']:
self.set_status(content["http_status"])
for name, value in content["http_headers"]:
self.set_header(name, value)
first = False

keep_going = content['more'] and len(reply['buffers'])
keep_going = content["more"] and len(reply["buffers"])

for buf in reply['buffers']:
for buf in reply["buffers"]:
self.write(bytes(buf))

self.registry.finish_reply_buffering(msg_id)
Expand All @@ -259,12 +304,10 @@ async def get(self, key, entry):
class ProbeRequestHandler(IPythonHandler):
@web.authenticated
def get(self):
info = {
'status': 'ok'
}
info = {"status": "ok"}

self.set_status(200)
self.set_header('Content-Type', 'application/json')
self.set_header("Content-Type", "application/json")
self.write(json.dumps(info))
self.finish()

Expand All @@ -278,9 +321,11 @@ def load_jupyter_server_extension(nb_server_app):
"""

web_app = nb_server_app.web_app
host_pattern = '.*$'
probe_route_pattern = url_path_join(web_app.settings['base_url'], '/wwtkdr/_probe')
data_route_pattern = url_path_join(web_app.settings['base_url'], '/wwtkdr/([^/]*)/(.*)')
host_pattern = ".*$"
probe_route_pattern = url_path_join(web_app.settings["base_url"], "/wwtkdr/_probe")
data_route_pattern = url_path_join(
web_app.settings["base_url"], "/wwtkdr/([^/]*)/(.*)"
)

# The registry of kernels and URL "keys". Also slightly overloaded to
# contain our logger.
Expand All @@ -289,16 +334,19 @@ def load_jupyter_server_extension(nb_server_app):

# Register handlers.

web_app.add_handlers(host_pattern, [
(probe_route_pattern, ProbeRequestHandler),
(data_route_pattern, DataRequestHandler, {'registry': registry}),
])
web_app.add_handlers(
host_pattern,
[
(probe_route_pattern, ProbeRequestHandler),
(data_route_pattern, DataRequestHandler, {"registry": registry}),
],
)

# In order for the registry to be notified of when a kernel has requested a
# URL prefix, we need to shim ourselves into the kernel startup framework
# and register a message listener.

registry.log_info('shimming into notebook startup')
registry.log_info("shimming into notebook startup")
app_km = nb_server_app.kernel_manager
orig_start_watching_activity = app_km.start_watching_activity

Expand Down

0 comments on commit bd5e5e8

Please sign in to comment.