Skip to content

Commit

Permalink
Refactor pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
utkinis committed Oct 13, 2023
1 parent 5e4199b commit 04cfe22
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions src/Utils/pipelines.jl
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
mutable struct Pipeline
in::Channel
src::Channel
out::Base.Event
task::Task

function Pipeline(; pre=nothing, post=nothing)
in = Channel()
src = Channel()
out = Base.Event(true)
task = Threads.@spawn begin
isnothing(pre) || pre()
isnothing(pre) || Base.invokelatest(pre)
try
for work in in
work()
for work in src
Base.invokelatest(work)
notify(out)
end
finally
isnothing(post) || post()
isnothing(post) || Base.invokelatest(post)
end
end
errormonitor(task)
return new(in, out, task)
return new(src, out, task)
end
end

Base.close(p::Pipeline) = close(p.in)
function Base.close(p::Pipeline)
close(p.src)
wait(p.task)
return
end

Base.isopen(p::Pipeline) = isopen(p.in)
Base.isopen(p::Pipeline) = isopen(p.src)

function Base.put!(work::F, p::Pipeline) where {F}
put!(p.in, work)
put!(p.src, work)
return
end

function Base.wait(p::Pipeline)
if isopen(p.in)
if isopen(p.src)
wait(p.out)
else
error("Pipeline is not running")
Expand Down

0 comments on commit 04cfe22

Please sign in to comment.