Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cmds): do not return errors embedded in result type #10527

Merged
merged 10 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 25 additions & 39 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,59 +62,45 @@ type pinLsObject struct {
Type string
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
defer close(pins)

options, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
return err
}

res, err := api.core().Request("pin/ls").
Option("type", options.Type).
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}

pins := make(chan iface.Pin)
go func(ch chan<- iface.Pin) {
defer res.Output.Close()
defer close(ch)

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
switch err := dec.Decode(&out); err {
case nil:
case io.EOF:
return
default:
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
defer res.Output.Close()

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
return err
}
return nil
}

c, err := cid.Parse(out.Cid)
if err != nil {
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
}
c, err := cid.Parse(out.Cid)
if err != nil {
return err
}

select {
case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return
}
select {
case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return ctx.Err()
}
}(pins)
return pins, nil
}
}

// IsPinned returns whether or not the given cid is pinned
Expand Down
116 changes: 48 additions & 68 deletions client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ type lsOutput struct {
Objects []lsObject
}

func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
defer close(out)

options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
return err
}

resp, err := api.core().Request("ls", p.String()).
Expand All @@ -156,86 +158,64 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}
if resp.Error != nil {
return nil, resp.Error
return err
}
defer resp.Close()

dec := json.NewDecoder(resp.Output)
out := make(chan iface.DirEntry)

go func() {
defer resp.Close()
defer close(out)

for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err == io.EOF {
return
}
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}

if len(link.Objects) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
case <-ctx.Done():
}
return
for {
var link lsOutput
if err = dec.Decode(&link); err != nil {
if err != io.EOF {
return err
}
return nil
}

if len(link.Objects[0].Links) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
case <-ctx.Done():
}
return
}
if len(link.Objects) != 1 {
return errors.New("unexpected Objects len")
}

l0 := link.Objects[0].Links[0]
if len(link.Objects[0].Links) != 1 {
return errors.New("unexpected Links len")
}

c, err := cid.Decode(l0.Hash)
if err != nil {
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}
l0 := link.Objects[0].Links[0]

var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
c, err := cid.Decode(l0.Hash)
if err != nil {
return err
}

select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
}
var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
}()

return out, nil
select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

func (api *UnixfsAPI) core() *HttpApi {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/kubo/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, s

// check if MFS pin exists (across all possible states) and inspect its CID
pinStatuses := []pinclient.Status{pinclient.StatusQueued, pinclient.StatusPinning, pinclient.StatusPinned, pinclient.StatusFailed}
lsPinCh, lsErrCh := c.Ls(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
lsPinCh, lsErrCh := c.GoLs(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
existingRequestID := "" // is there any pre-existing MFS pin with pinName (for any CID)?
pinning := false // is CID for current MFS already being pinned?
pinTime := time.Now().UTC()
Expand Down
23 changes: 14 additions & 9 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -133,23 +134,24 @@ The JSON output contains type information.
}
}

lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

for i, fpath := range paths {
pth, err := cmdutils.PathOrCidPath(fpath)
if err != nil {
return err
}

results, err := api.Unixfs().Ls(req.Context, pth,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
if err != nil {
return err
}
results := make(chan iface.DirEntry)
lsErr := make(chan error, 1)
go func() {
lsErr <- api.Unixfs().Ls(lsCtx, pth, results,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
}()

processLink, dirDone = processDir()
for link := range results {
if link.Err != nil {
return link.Err
}
var ftype unixfs_pb.Data_DataType
switch link.Type {
case iface.TFile:
Expand All @@ -170,10 +172,13 @@ The JSON output contains type information.
Mode: link.Mode,
ModTime: link.ModTime,
}
if err := processLink(paths[i], lsLink); err != nil {
if err = processLink(paths[i], lsLink); err != nil {
return err
}
}
if err = <-lsErr; err != nil {
return err
}
dirDone(i)
}
return done()
Expand Down
18 changes: 9 additions & 9 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,16 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}

pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
if err != nil {
return err
}
pins := make(chan coreiface.Pin)
lsErr := make(chan error, 1)
lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

go func() {
lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
}()

for p := range pins {
if err := p.Err(); err != nil {
return err
}
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Expand All @@ -577,8 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}

return nil
return <-lsErr
}

const (
Expand Down
Loading
Loading