-
Notifications
You must be signed in to change notification settings - Fork 0
/
filters.lisp
executable file
·166 lines (142 loc) · 6.01 KB
/
filters.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
(in-package :pipeline.filters)
(defgeneric spawn (command &key pipeline input output error wait first last)
(:documentation
"Create the program or thread that executes the given COMMAND.
INPUT, OUTPUT and ERROR are streams that are given to the COMMAND and
should be closed once the task terminates.
WAIT indicates whether the current thread should wait for COMMAND to
terminate. In a pipe, only the last command is given :WAIT T by
default.
The returned value is passed to CLEAN once all commands in a pipe
finish."))
(defgeneric clean/tag (tag filter))
(defgeneric clean (filter)
(:method ((filter cons))
(destructuring-bind (tag value) filter
(clean/tag tag value))))
(defvar *environment*
'("LANG_ALL=C" "USE_COLORS=0" (:inherit :ssh_auth_sock)))
(defun call-with-augmented-environment (environment function)
(flet ((preprocess (item)
(typecase item
(string item)
((cons atom atom) (format nil "~a=~a" (car item) (cdr item)))
(t item))))
(let ((*environment* (append (delete nil (mapcar #'preprocess environment))
*environment*)))
(funcall function))))
(defmacro with-augmented-environment ((&rest values) &body body)
`(call-with-augmented-environment
(list ,@(mapcar (lambda (v)
(typecase v
(keyword `(list :inherit ,v))
((cons atom (and atom (not null)))
(destructuring-bind (key . value) v
`(cons ',key ,value)))
(t v)))
values))
(lambda () ,@body)))
(defclass program ()
((name :initarg :name)
(environment :initform *environment*)
(arguments :initarg :arguments)
(search :initarg :search :initform t)
(error :initarg :error :initform nil :accessor error-of)))
(defun program* (name args)
(make-instance 'program :name name :arguments args))
(defun program (name &rest args)
(program* name (mapcar #'princ-to-string args)))
(defun error-to-output (program)
(prog1 program
(setf (error-of program) :output)))
(defmacro warn-on-errors ((&key (type 'error)) &body body)
`(handler-case (progn ,@body)
(,type (e) (warn "caught error ~s" e))))
(defun make-hook/on-death-close-streams (in out err)
(lambda (process)
(unless (process-alive-p process)
(ensure-stream-closed/no-error (process-input process))
(ensure-stream-closed/no-error in)
(ensure-stream-closed/no-error (process-output process))
(ensure-stream-closed/no-error out)
(ensure-stream-closed/no-error (process-error process))
(ensure-stream-closed/no-error err))))
(defun compute-environment (list)
(labels ((inherit (key)
(alexandria:when-let (value (osicat-posix:getenv (string key)))
(format nil "~a=~a" key value)))
(stringify (item)
(etypecase item
(string (list item))
((cons (eql :inherit) list)
(delete nil (mapcar #'inherit (rest item)))))))
(mapcan #'stringify list)))
(defmethod spawn ((program program) &key pipeline input output error wait first last)
(with-slots (name arguments search (program-error error) environment) program
(let* ((input (if first
(pipeline::register-resource
(make-concatenated-stream input))
input))
(output (if last
(pipeline::register-resource
(make-broadcast-stream output))
output))
(error (pipeline::%pipe-arg-error
(or program-error
(pipeline::register-resource
(make-broadcast-stream error)))
output)))
`(:process
,(apply #'
run-program
name
arguments
:search search
:wait wait
:input input
:output output
:directory *default-pathname-defaults*
:error error
:status-hook (unless wait
(make-hook/on-death-close-streams
input output error))
(when environment
(list :environment (compute-environment
environment))))))))
(defmethod clean/tag ((tag (eql :process)) process)
(when (process-alive-p process)
(process-wait process)))
(declaim (inline channel-to))
(defun channel-to (channel function)
(check-type channel trivial-channels:channel)
(pipeline:redirecting-result-to channel function))
(defmethod spawn ((function function) &key pipeline input output error wait first last)
(let ((input% (if first (pipeline::register-resource
(make-concatenated-stream input))
input))
(output% (if last
(pipeline::register-resource
(make-broadcast-stream output))
output))
(error% (pipeline::register-resource
(make-broadcast-stream error))))
(flet ((wrapped (&aux (warn-stream *error-output*)
(channel (pipeline:channel-of pipeline)))
(let ((wrapped (if channel (channel-to channel function) function))
(*standard-input* input%)
(*standard-output* output%)
(*error-output* error%))
(unwind-protect (handler-case (funcall wrapped)
(error (e)
(let ((*error-output* warn-stream))
(warn "caught error ~s" e))))
(ensure-stream-closed/no-error input%)
(ensure-stream-closed/no-error output%)
(ensure-stream-closed/no-error error%)))))
(if wait
`(:funcall ,(wrapped))
`(:thread ,(make-thread #'wrapped))))))
(defmethod clean/tag ((tag (eql :funcall)) call-result)
call-result)
(defmethod clean/tag ((tag (eql :thread)) thread)
(join-thread thread))