Skip to content

Commit

Permalink
allign rangedir functions
Browse files Browse the repository at this point in the history
  • Loading branch information
puellanivis committed Nov 15, 2024
1 parent 0d9fb71 commit 83abba9
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 55 deletions.
67 changes: 43 additions & 24 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,26 +1103,34 @@ func (d *Dir) Name() string {
}

// rangedir returns an iterator over the directory entries of the directory.
// It will only ever yield either a *sshfx.NameEntry or an error, never both.
// No error will be yielded until all available name entries have been yielded.
// Only one error will be yielded per invocation.
//
// We do not expose an iterator, because none has been standardized yet.
// and we do not want to accidentally implement an inconsistent API.
// However, for internal usage, we can definitely make use of this to simplify the common parts of ReadDir and Readdir.
// and we do not want to accidentally implement an API inconsistent with future standards.
// However, for internal usage, we can separate the paginated ReadDir request code from the conversion to Go entries.
//
// Callers must guarantee synchronization by either holding the directory lock, or holding an exclusive reference.
func (d *Dir) rangedir(ctx context.Context) iter.Seq2[*sshfx.NameEntry, error] {
func (d *Dir) rangedir(ctx context.Context, grow func(int)) iter.Seq2[*sshfx.NameEntry, error] {
return func(yield func(v *sshfx.NameEntry, err error) bool) {
// Pull from saved entries first.
for i, ent := range d.entries {
if !yield(ent, nil) {
// Early break, delete the entries we have yielded.
d.entries = slices.Delete(d.entries, 0, i+1)
return
for {
grow(len(d.entries))

// Pull from saved entries first.
for i, ent := range d.entries {
if !yield(ent, nil) {
// This is a break condition.
// We need to remove all entries that have been consumed,
// and that includes the one that we are currently on.
d.entries = slices.Delete(d.entries, 0, i+1)
return
}
}
}

// We got through all the remaining entries, delete all the entries.
d.entries = slices.Delete(d.entries, 0, len(d.entries))
// We got through all the remaining entries, delete all the entries.
d.entries = slices.Delete(d.entries, 0, len(d.entries))

for {
_, closed, err := d.handle.get()
if err != nil {
yield(nil, err)
Expand All @@ -1131,19 +1139,12 @@ func (d *Dir) rangedir(ctx context.Context) iter.Seq2[*sshfx.NameEntry, error] {

entries, err := d.cl.getNames(ctx, closed, &d.req)
if err != nil {
// There are no remaining entries to save here,
// SFTP can only return either an error or a result, never both.
// No need to loop, SFTP can only return either an error or a result, never both.
yield(nil, err)
return
}

for i, entry := range entries {
if !yield(entry, nil) {
// Early break, save the remaining entries we got for maybe later.
d.entries = append(d.entries, entries[i+1:]...)
return
}
}
d.entries = entries
}
}
}
Expand Down Expand Up @@ -1175,7 +1176,16 @@ func (d *Dir) ReaddirContext(ctx context.Context, n int) ([]fs.FileInfo, error)

var ret []fs.FileInfo

for ent, err := range d.rangedir(ctx) {
grow := func(more int) {
if n > 0 {
// the lesser of what's coming, and how much remains.
more = min(more, n-len(ret))
}

ret = slices.Grow(ret, more)
}

for ent, err := range d.rangedir(ctx, grow) {
if err != nil {
if errors.Is(err, io.EOF) && n <= 0 {
return ret, nil
Expand Down Expand Up @@ -1221,7 +1231,16 @@ func (d *Dir) ReadDirContext(ctx context.Context, n int) ([]fs.DirEntry, error)

var ret []fs.DirEntry

for ent, err := range d.rangedir(ctx) {
grow := func(more int) {
if n > 0 {
// the lesser of what's coming, and how much remains.
more = min(more, n-len(ret))
}

ret = slices.Grow(ret, more)
}

for ent, err := range d.rangedir(ctx, grow) {
if err != nil {
if errors.Is(err, io.EOF) && n <= 0 {
return ret, nil
Expand Down
74 changes: 43 additions & 31 deletions localfs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package localfs
import (
"cmp"
"io/fs"
"iter"
"os"
"slices"
"sync"
Expand All @@ -21,7 +22,8 @@ type File struct {
idLookup sftp.NameLookup

mu sync.Mutex
dirErr error
lastErr error
lastEnt *sshfx.NameEntry
entries []fs.FileInfo
}

Expand All @@ -43,45 +45,42 @@ func (f *File) Stat() (*sshfx.Attributes, error) {

// rangedir returns an iterator over the directory entries of the directory.
// It will only ever yield either a [fs.FileInfo] or an error, never both.
// No error will be yielded until all available FileInfos have been yielded,
// and thereafter the same error will be yielded indefinitely,
// however only one error will be yielded per invocation.
// If yield returns false, then the directory entry is considered unconsumed,
// and will be the first yield at the next call to rangedir.
// No error will be yielded until all available FileInfos have been yielded.
// Only one error will be yielded per invocation.
//
// We do not expose an iterator, because none has been standardized yet,
// and we do not want to accidentally implement an API inconsistent with future standards.
// However, for internal usage, we can separate the paginated Readdir code from the conversion to SFTP entries.
//
// Callers must guarantee synchronization by either holding the file lock, or holding an exclusive reference.
func (f *File) rangedir(yield func(fs.FileInfo, error) bool) {
for {
for i, entry := range f.entries {
if !yield(entry, nil) {
// This is break condition.
// As per our semantics, this means this entry has not been consumed.
// So we remove only the entries ahead of this one.
f.entries = slices.Delete(f.entries, 0, i)
return
func (f *File) rangedir(grow func(int)) iter.Seq2[fs.FileInfo, error] {
return func(yield func(fs.FileInfo, error) bool) {
for {
grow(len(f.entries))

for i, entry := range f.entries {
if !yield(entry, nil) {
// This is a break condition.
// We need to remove all entries that have been consumed,
// and that includes the one we are currently on.
f.entries = slices.Delete(f.entries, 0, i+1)
return
}
}
}

// We have consumed all of the saved entries, so we remove everything.
f.entries = slices.Delete(f.entries, 0, len(f.entries))
// We have consumed all of the saved entries, so we remove everything.
f.entries = slices.Delete(f.entries, 0, len(f.entries))

if f.dirErr != nil {
// No need to try acquiring more entries,
// we’re already in the error state.
yield(nil, f.dirErr)
return
}
if f.lastErr != nil {
yield(nil, f.lastErr)
f.lastErr = nil
return
}

ents, err := f.Readdir(128)
if err != nil {
f.dirErr = err
// We cannot guarantee we only get entries, or an error, never both.
// So we need to just save these, and loop.
f.entries, f.lastErr = f.Readdir(128)
}

f.entries = ents
}
}

Expand All @@ -91,8 +90,18 @@ func (f *File) ReadDir(maxDataLen uint32) (entries []*sshfx.NameEntry, err error
f.mu.Lock()
defer f.mu.Unlock()

if f.lastEnt != nil {
// Last ReadDir left an entry for us to include in this call.
entries = append(entries, f.lastEnt)
f.lastEnt = nil
}

grow := func(more int) {
entries = slices.Grow(entries, more)
}

var size int
for fi, err := range f.rangedir {
for fi, err := range f.rangedir(grow) {
if err != nil {
if len(entries) != 0 {
return entries, nil
Expand All @@ -112,7 +121,10 @@ func (f *File) ReadDir(maxDataLen uint32) (entries []*sshfx.NameEntry, err error
size += entry.Len()

if size > int(maxDataLen) {
// rangedir will take care of starting the next range with this entry.
// This would exceed the packet data length,
// so save this one for the next call,
// and return.
f.lastEnt = entry
break
}

Expand Down

0 comments on commit 83abba9

Please sign in to comment.