Skip to content

Commit

Permalink
Improve stream syncronization
Browse files Browse the repository at this point in the history
  • Loading branch information
yitzchak committed Oct 12, 2024
1 parent a475729 commit cb2b14d
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 64 deletions.
4 changes: 3 additions & 1 deletion src/cl-jupyter/kernel.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -741,4 +741,6 @@
(defmethod jupyter:evaluate-code ((k kernel) code &optional source-path breakpoints)
(if (jupyter:kernel-debugger-started jupyter:*kernel*)
(debugging-errors (repl code source-path breakpoints))
(jupyter:handling-errors (repl code source-path breakpoints))))
#+(or)(jupyter:handling-errors (repl code source-path breakpoints))
(with-simple-restart (abort "Exit debugger, returning to top level.")
(repl code source-path breakpoints))))
127 changes: 74 additions & 53 deletions src/iopub.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,21 @@
|#

(defclass iopub-channel (channel)
()
((name :initarg :name
:initform "stdout"
:accessor iopub-channel-name)
(value :initarg :value
:initform (make-array *iopub-stream-size*
:fill-pointer 0
:adjustable t
:element-type 'character)
:reader iopub-channel-value)
(prompt-prefix :initarg :prompt-prefix
:reader iopub-channel-prompt-prefix)
(prompt-suffix :initarg :prompt-suffix
:reader iopub-channel-prompt-suffix)
(column :accessor iopub-channel-column
:initform 0))
(:documentation "IOPUB channel class."))

#|
Expand Down Expand Up @@ -93,69 +107,76 @@
"name" stream-name
"text" data))))

(defun iopub-write-char (iopub name char)
(with-accessors ((current-name iopub-channel-name)
(value iopub-channel-value)
(prompt-prefix iopub-channel-prompt-prefix)
(prompt-suffix iopub-channel-prompt-suffix))
iopub
(unless (string= current-name name)
(when (plusp (length value))
(send-stream iopub current-name value)
(setf (fill-pointer value) 0))
(setf current-name name))
(cond ((graphic-char-p char)
(incf column))
((or (char= #\newline)
(char= #\return))
(setf column 0))
((char= #\tab)
(incf column 8)))
(vector-push-extend char value)
;; After the character has been added look for a prompt terminator at the
;; end.
(when (alexandria:ends-with-subseq prompt-suffix value)
(let ((start (search prompt-prefix value)))
;; If there is a prompt start also then print the prompt and remove it
;; from the buffer.
(when start
;; If there is data before the prompt then send it now.
(unless (zerop start)
(send-stream iopub name (subseq value 0 start)))
(write-string (subseq value
(+ start (length prompt-prefix))
(- (length value) (length prompt-suffix)))
*query-io*)
(finish-output *query-io*)
(adjust-array value (array-total-size value)
:fill-pointer 0))))))

(defun iopub-finish-output (iopub &optional name)
(with-accessors ((current-name iopub-channel-name)
(value iopub-channel-value))
iopub
(when (and (or (null name)
(string= current-name name))
(plusp (length value)))
(send-stream iopub current-name value)
(setf (fill-pointer value) 0))))

(defvar *iopub-stream-size* 1024)

(defclass iopub-stream (ngray:fundamental-character-output-stream)
((channel :initarg :channel
:reader iopub-stream-channel)
(name :initarg :name
:reader iopub-stream-name)
(value :initarg :value
:initform (make-array *iopub-stream-size*
:fill-pointer 0
:adjustable t
:element-type 'character)
:reader iopub-stream-value)
(prompt-prefix :initarg :prompt-prefix
:reader iopub-stream-prompt-prefix)
(prompt-suffix :initarg :prompt-suffix
:reader iopub-stream-prompt-suffix)
(column
:accessor iopub-stream-column
:initform 0)))
:reader iopub-stream-name)))

(defun make-iopub-stream (iopub name prompt-prefix prompt-suffix)
(defun make-iopub-stream (iopub name)
(make-instance 'iopub-stream :channel iopub
:name name
:prompt-prefix prompt-prefix
:prompt-suffix prompt-suffix))
:name name))

(defmethod ngray:stream-write-char ((stream iopub-stream) char)
(unless (equal char #\Sub) ; Ignore subsititute characters
(with-slots (channel name value prompt-prefix prompt-suffix column) stream
(cond
((graphic-char-p char)
(incf column))
((or (char= #\newline)
(char= #\return))
(setf column 0))
((char= #\tab)
(incf column 8)))
(vector-push-extend char value)
;; After the character has been added look for a prompt terminator at the
;; end.
(if (alexandria:ends-with-subseq prompt-suffix value)
(let ((start (search prompt-prefix value)))
;; If there is a prompt start also then print the prompt and remove it
;; from the buffer.
(when start
;; If there is data before the prompt then send it now.
(unless (zerop start)
(send-stream channel name (subseq value 0 start)))
(write-string (subseq value
(+ start (length prompt-prefix))
(- (length value) (length prompt-suffix)))
*query-io*)
(finish-output *query-io*)
(adjust-array value (array-total-size value)
:fill-pointer 0)))))))

(iopub-write-char (iopub-stream-channel stream)
(iopub-stream-name stream)
char))
char)

(defmethod ngray:stream-finish-output ((stream iopub-stream))
(with-slots (channel name value prompt-prefix) stream
(unless (or (zerop (length value))
(search prompt-prefix value))
(send-stream channel name value)
(adjust-array value (array-total-size value)
:fill-pointer 0))))
(iopub-finish-output (iopub-stream-channel stream)
(iopub-stream-name stream))
nil)

(defmethod ngray:stream-line-column ((stream iopub-stream))
(iopub-channel-column (iopub-stream-channel stream)))
10 changes: 6 additions & 4 deletions src/kernel.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,9 @@
:socket (pzmq:socket ctx :pub)
:transport transport
:ip ip
:port iopub-port)
:port iopub-port
:prompt-prefix prompt-prefix
:prompt-suffix prompt-suffix)
shell (make-instance 'shell-channel
:sink sink
:session session
Expand Down Expand Up @@ -813,9 +815,9 @@
(make-pathname :directory '(:relative "common-lisp-jupyter")
:name language-name
:type "history")))
error-output (make-iopub-stream iopub "stderr" prompt-prefix prompt-suffix)
standard-output (make-iopub-stream iopub "stdout" prompt-prefix prompt-suffix)
standard-input (make-stdin-stream stdin))
error-output (make-iopub-stream iopub "stderr")
standard-output (make-iopub-stream iopub "stdout")
standard-input (make-stdin-stream stdin iopub))
(start mac)
(start hb)
(start iopub)
Expand Down
14 changes: 8 additions & 6 deletions src/stdin.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ most cases of *query-io* usage. Makes overloading y-or-no-p unnecessary.

(defclass stdin-stream (ngray:fundamental-character-output-stream
ngray:fundamental-character-input-stream)
((channel
:initarg :channel
:reader stdin-stream-channel)
((channel :initarg :channel
:reader stdin-stream-channel)
(iopub :reader stdin-stream-iopub
:initarg :iopub)
(output
:initarg :output
:initform (make-array *stdin-stream-size*
Expand Down Expand Up @@ -67,14 +68,15 @@ most cases of *query-io* usage. Makes overloading y-or-no-p unnecessary.
(declare (ignore stream))
t)

(defun make-stdin-stream (stdin)
(make-instance 'stdin-stream :channel stdin))
(defun make-stdin-stream (stdin iopub)
(make-instance 'stdin-stream :channel stdin :iopub iopub))

(defmethod ngray:stream-write-char ((stream stdin-stream) char)
(vector-push-extend char (stdin-stream-output stream)))

(defun prompt-and-read (stream need-input)
(with-slots (channel output input lock) stream
(with-slots (channel iopub output input lock) stream
(iopub-finish-output iopub)
(when (bordeaux-threads:acquire-lock lock nil)
(unwind-protect
(let ((trimmed-output (copy-seq (string-trim '(#\Bel) output))))
Expand Down

0 comments on commit cb2b14d

Please sign in to comment.