Skip to content

Commit

Permalink
WIP: iterators (broken!)
Browse files Browse the repository at this point in the history
  • Loading branch information
josePereiro committed Dec 18, 2024
1 parent 3dcfdcf commit 0bee24e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
40 changes: 23 additions & 17 deletions src/4.BloberiaBase/iterator.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ end

## --.--. - .-. .- .--.-.- .- .---- ... . .-.-.-.-
# iterate batch dirs
_rettrue(x...) = true
function _eachbatchdir_gen(B::Bloberia, sortfun::Function)
root0 = bloberiapath(B)
paths = sortfun(readdir(root0; join = true))
paths = isdir(root0) ? paths : String[]
return (path for path in paths if _is_bbpath_heuristic(path))
end


function _eachbatchdir_ch(B::Bloberia, ch_size, sortfun)
return Channel{String}(ch_size) do _ch
root0 = bloberiapath(B)
isdir(root0) || return
paths = sortfun(readdir(root0; join = true))
for path in paths
_is_bbpath_heuristic(path) || continue
for path in _eachbatchdir_gen(B, sortfun)
put!(_ch, path)
end
end
Expand Down Expand Up @@ -50,7 +55,8 @@ function _eachbatch_th(B::Bloberia, bbid_prefix = nothing;
@assert n_tasks > 0

return Channel{BlobBatch}(ch_size) do _ch
dir_ch = _eachbatchdir_ch(B, n_tasks, sortfun)
# here I need the ch for locking
dir_ch = _eachbatchdir_ch(B, ch_size, sortfun)
@sync for _ in 1:n_tasks
@spawn for path in dir_ch
bb = _bb_from_path(B, path, bbid_prefix, preload)
Expand All @@ -70,7 +76,7 @@ function _eachbatch_ser(B::Bloberia, bbid_prefix = nothing;

# channel
return Channel{BlobBatch}(ch_size) do _ch
file_ch = _eachbatchdir_ch(B, 1, sortfun)
file_ch = _eachbatchdir_gen(B, sortfun)
for path in file_ch
bb = _bb_from_path(B, path, bbid_prefix, preload)
isnothing(bb) && continue
Expand Down Expand Up @@ -168,24 +174,24 @@ end

## --.--. - .-. .- .--.-.- .- .---- ... . .-.-.-.-
# Iterator
function _B_iterate_next(ch, ch_next)
isnothing(ch_next) && return nothing
item, ch_state = ch_next
B_state = (ch, ch_state)
function _B_iterate_next(iter, iter_next)
isnothing(iter_next) && return nothing
item, ch_state = iter_next
B_state = (iter, ch_state)
return (item, B_state)
end

import Base.iterate
function Base.iterate(B::Bloberia)
ch = eachbatch(B)
ch_next = iterate(ch)
return _B_iterate_next(ch, ch_next)
iter = _eachbatchdir_gen(B, identity)
iter_next = iterate(iter)
return _B_iterate_next(iter, iter_next)
end

function Base.iterate(::Bloberia, B_state)
isnothing(B_state) && return nothing
ch, ch_state = B_state
ch_next = iterate(ch, ch_state)
return _B_iterate_next(ch, ch_next)
iter, iter_state = B_state
iter_next = iterate(iter, iter_state)
return _B_iterate_next(iter, iter_next)
end

29 changes: 17 additions & 12 deletions src/5.BlobBatchBase/iterator.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

## --.--. - .-. .- .--.-.- .- .---- ... . .-.-.-.-
# This is running sllow, see vblobcount(B)
function eachblob(bb::BlobBatch)
return Channel{Blob}(0) do _ch
function eachblob_ch(bb::BlobBatch; ch_size = 0)
return Channel{Blob}(ch_size) do _ch
buuids = getbuuids(bb)
for uuid in buuids
# I do not need to check if blob exist
Expand All @@ -14,27 +14,32 @@ function eachblob(bb::BlobBatch)
end
end

function eachblob(bb::BlobBatch)
buuids = getbuuids(bb)
return (Blob(bb, uuid) for uuid in buuids)
end

# --.--. - .-. .- .--.-.- .- .---- ... . .-.-.-.-
# Iterator
function _bb_iterate_next(ch::Channel, ch_next)
isnothing(ch_next) && return nothing
item, ch_state = ch_next
bb_state = (ch, ch_state)
function _bb_iterate_next(iter, iter_next)
isnothing(iter_next) && return nothing
item, iter_state = iter_next
bb_state = (iter, iter_state)
return (item, bb_state)
end

import Base.iterate
function Base.iterate(bb::BlobBatch)
ch = eachblob(bb)
ch_next = iterate(ch)
return _bb_iterate_next(ch, ch_next)
iter = eachblob(bb)
iter_next = iterate(iter)
return _bb_iterate_next(iter, iter_next)
end

function Base.iterate(::BlobBatch, bb_state)
isnothing(bb_state) && return nothing
ch, ch_state = bb_state
ch_next = iterate(ch, ch_state)
return _bb_iterate_next(ch, ch_next)
iter, iter_state = bb_state
iter_next = iterate(iter, iter_state)
return _bb_iterate_next(iter, iter_next)
end

import Base.length
Expand Down

0 comments on commit 0bee24e

Please sign in to comment.