Skip to content

Commit

Permalink
add AsLargeBytes support to unixfs files (#24)
Browse files Browse the repository at this point in the history
* add AsLargeBytes support to unixfs files


This commit was moved from ipfs/go-unixfsnode@94986a7
  • Loading branch information
willscott authored Mar 3, 2022
1 parent 79823b0 commit 1790dea
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 47 deletions.
3 changes: 1 addition & 2 deletions unixfs/node/data/builder/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package builder
import (
"bytes"
"context"
"io"
"testing"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -70,7 +69,7 @@ func TestUnixFSFileRoundtrip(t *testing.T) {
t.Fatal(err)
}
// read back out the file.
out, err := io.ReadAll(ufn)
out, err := ufn.AsBytes()
if err != nil {
t.Fatal(err)
}
Expand Down
173 changes: 173 additions & 0 deletions unixfs/node/file/deferred.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package file

import (
"context"
"io"

dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
)

func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.Link) LargeBytesNode {
dfn := deferredFileNode{
LargeBytesNode: nil,
root: root,
lsys: lsys,
ctx: ctx,
}
dfn.LargeBytesNode = &deferred{&dfn}
return &dfn
}

type deferredFileNode struct {
LargeBytesNode

root ipld.Link
lsys *ipld.LinkSystem
ctx context.Context
}

func (d *deferredFileNode) resolve() error {
if d.lsys == nil {
return nil
}
target, err := d.lsys.Load(ipld.LinkContext{Ctx: d.ctx}, d.root, protoFor(d.root))
if err != nil {
return err
}

asFSNode, err := NewUnixFSFile(d.ctx, target, d.lsys)
if err != nil {
return err
}
d.LargeBytesNode = asFSNode
d.root = nil
d.lsys = nil
d.ctx = nil
return nil
}

type deferred struct {
*deferredFileNode
}

type deferredReader struct {
io.ReadSeeker
*deferredFileNode
}

func (d *deferred) AsLargeBytes() (io.ReadSeeker, error) {
return &deferredReader{nil, d.deferredFileNode}, nil
}

func (d *deferredReader) Read(p []byte) (int, error) {
if d.ReadSeeker == nil {
if err := d.deferredFileNode.resolve(); err != nil {
return 0, err
}
rs, err := d.deferredFileNode.AsLargeBytes()
if err != nil {
return 0, err
}
d.ReadSeeker = rs
}
return d.ReadSeeker.Read(p)
}

func (d *deferredReader) Seek(offset int64, whence int) (int64, error) {
if d.ReadSeeker == nil {
if err := d.deferredFileNode.resolve(); err != nil {
return 0, err
}
rs, err := d.deferredFileNode.AsLargeBytes()
if err != nil {
return 0, err
}
d.ReadSeeker = rs
}
return d.ReadSeeker.Seek(offset, whence)
}

func (d *deferred) Kind() ipld.Kind {
return ipld.Kind_Bytes
}

func (d *deferred) AsBytes() ([]byte, error) {
if err := d.deferredFileNode.resolve(); err != nil {
return []byte{}, err
}

return d.deferredFileNode.AsBytes()
}

func (d *deferred) AsBool() (bool, error) {
return false, ipld.ErrWrongKind{TypeName: "bool", MethodName: "AsBool", AppropriateKind: ipld.KindSet_JustBytes}
}

func (d *deferred) AsInt() (int64, error) {
return 0, ipld.ErrWrongKind{TypeName: "int", MethodName: "AsInt", AppropriateKind: ipld.KindSet_JustBytes}
}

func (d *deferred) AsFloat() (float64, error) {
return 0, ipld.ErrWrongKind{TypeName: "float", MethodName: "AsFloat", AppropriateKind: ipld.KindSet_JustBytes}
}

func (d *deferred) AsString() (string, error) {
return "", ipld.ErrWrongKind{TypeName: "string", MethodName: "AsString", AppropriateKind: ipld.KindSet_JustBytes}
}

func (d *deferred) AsLink() (ipld.Link, error) {
return nil, ipld.ErrWrongKind{TypeName: "link", MethodName: "AsLink", AppropriateKind: ipld.KindSet_JustBytes}
}

func (d *deferred) AsNode() (ipld.Node, error) {
return nil, nil
}

func (d *deferred) Size() int {
return 0
}

func (d *deferred) IsAbsent() bool {
return false
}

func (d *deferred) IsNull() bool {
if err := d.deferredFileNode.resolve(); err != nil {
return true
}
return d.deferredFileNode.IsNull()
}

func (d *deferred) Length() int64 {
return 0
}

func (d *deferred) ListIterator() ipld.ListIterator {
return nil
}

func (d *deferred) MapIterator() ipld.MapIterator {
return nil
}

func (d *deferred) LookupByIndex(idx int64) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

func (d *deferred) LookupByString(key string) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

func (d *deferred) LookupByNode(key ipld.Node) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

func (d *deferred) LookupBySegment(seg ipld.PathSegment) (ipld.Node, error) {
return nil, ipld.ErrWrongKind{}
}

// shardded files / nodes look like dagpb nodes.
func (d *deferred) Prototype() ipld.NodePrototype {
return dagpb.Type.PBNode
}
43 changes: 35 additions & 8 deletions unixfs/node/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
// root of a unixfs File.
// It provides a `bytes` view over the file, along with access to io.Reader streaming access
// to file data.
func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (StreamableByteNode, error) {
func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (LargeBytesNode, error) {
if substrate.Kind() == ipld.Kind_Bytes {
// A raw / single-node file.
return &singleNodeFile{substrate, 0}, nil
return &singleNodeFile{substrate}, nil
}
// see if it's got children.
links, err := substrate.LookupByString("Links")
Expand All @@ -30,22 +30,29 @@ func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSyst
ctx: ctx,
lsys: lsys,
substrate: substrate,
done: false,
rdr: nil}, nil
}, nil
}

// A StreamableByteNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type.
type StreamableByteNode interface {
// A LargeBytesNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type.
type LargeBytesNode interface {
ipld.Node
io.Reader
AsLargeBytes() (io.ReadSeeker, error)
}

type singleNodeFile struct {
ipld.Node
}

func (f *singleNodeFile) AsLargeBytes() (io.ReadSeeker, error) {
return &singleNodeReader{f, 0}, nil
}

type singleNodeReader struct {
ipld.Node
offset int
}

func (f *singleNodeFile) Read(p []byte) (int, error) {
func (f *singleNodeReader) Read(p []byte) (int, error) {
buf, err := f.Node.AsBytes()
if err != nil {
return 0, err
Expand All @@ -57,3 +64,23 @@ func (f *singleNodeFile) Read(p []byte) (int, error) {
f.offset += n
return n, nil
}

func (f *singleNodeReader) Seek(offset int64, whence int) (int64, error) {
buf, err := f.Node.AsBytes()
if err != nil {
return 0, err
}

switch whence {
case io.SeekStart:
f.offset = int(offset)
case io.SeekCurrent:
f.offset += int(offset)
case io.SeekEnd:
f.offset = len(buf) + int(offset)
}
if f.offset < 0 {
return 0, io.EOF
}
return int64(f.offset), nil
}
Loading

0 comments on commit 1790dea

Please sign in to comment.