Skip to content

Commit

Permalink
Shared file edit (#209)
Browse files Browse the repository at this point in the history
* WIP: Initial structure using cache buckets

* Remove test files

* Fix local file pulling and updating shared file

* rRemove test files

* Update function name

* Add db id to file watch cache

* Fix dependency bug

* Remove test files
  • Loading branch information
dmerrill6 authored Oct 20, 2020
1 parent 1835341 commit 833529c
Show file tree
Hide file tree
Showing 16 changed files with 415 additions and 129 deletions.
1 change: 1 addition & 0 deletions core/space/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type AddWatchFile struct {
BucketPath string `json:"bucket_path"`
BucketKey string `json:"bucket_key"`
BucketSlug string `json:"bucket_slug"`
IsRemote bool `json:"isRemote"`
}

type Identity struct {
Expand Down
2 changes: 1 addition & 1 deletion core/space/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Space struct {

type Syncer interface {
AddFileWatch(addFileInfo domain.AddWatchFile) error
GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool)
GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool)
}

type AddFileWatchFunc = func(addFileInfo domain.AddWatchFile) error
Expand Down
10 changes: 6 additions & 4 deletions core/space/services/services_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,18 +286,19 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do
return domain.OpenFileInfo{}, err
}

isRemote := dbID != ""
var filePath string
var b textile.Bucket
// check if file exists in sync
if dbID != "" {
if isRemote {
b, err = s.getBucketForRemoteFile(ctx, bucketName, dbID, path)
} else {
b, err = s.getBucketWithFallback(ctx, bucketName)
}
if err != nil {
return domain.OpenFileInfo{}, err
}
if filePath, exists := s.sync.GetOpenFilePath(b.Slug(), path); exists {
if filePath, exists := s.sync.GetOpenFilePath(b.Slug(), path, dbID); exists {
// sanity check in case file was deleted or moved
if PathExists(filePath) {
// return file handle
Expand All @@ -308,7 +309,7 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do
}

// else, open new file on FS
filePath, err = s.openFileOnFs(ctx, path, b)
filePath, err = s.openFileOnFs(ctx, path, b, isRemote)
if err != nil {
return domain.OpenFileInfo{}, err
}
Expand All @@ -319,7 +320,7 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do
}, nil
}

func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket) (string, error) {
func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket, isRemote bool) (string, error) {
// write file copy to temp folder
tmpFile, err := s.createTempFileForPath(ctx, path, false)
if err != nil {
Expand Down Expand Up @@ -349,6 +350,7 @@ func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket)
BucketPath: path,
BucketKey: b.Key(),
BucketSlug: b.Slug(),
IsRemote: isRemote,
}

err = s.sync.AddFileWatch(addWatchFile)
Expand Down
2 changes: 1 addition & 1 deletion core/space/space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestService_OpenFile(t *testing.T) {
getDir().dir,
)

mockSync.On("GetOpenFilePath", testKey, testPath).Return(
mockSync.On("GetOpenFilePath", testKey, testPath, mock.Anything).Return(
"",
false,
)
Expand Down
28 changes: 24 additions & 4 deletions core/sync/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

ipfspath "github.com/ipfs/interface-go-ipfs-core/path"

"github.com/FleekHQ/space-daemon/core/textile"
"github.com/FleekHQ/space-daemon/log"
)

Expand All @@ -22,13 +23,16 @@ func (h *watcherHandler) OnCreate(ctx context.Context, path string, fileInfo os.
var newRoot ipfspath.Path
var err error

bucketSlug, bucketPath, exists := h.bs.getOpenFileBucketSlugAndPath(path)
watchInfo, exists := h.bs.getOpenFileBucketSlugAndPath(path)
if !exists {
msg := fmt.Sprintf("error: could not find path %s", path)
log.Error(msg, fmt.Errorf(msg))
return
}

bucketSlug := watchInfo.BucketSlug
bucketPath := watchInfo.BucketPath

b, err := h.bs.textileClient.GetBucket(ctx, bucketSlug, nil)
if err != nil {
msg := fmt.Sprintf("error: could not find bucket with slug %s", bucketSlug)
Expand Down Expand Up @@ -93,12 +97,14 @@ func (h *watcherHandler) OnRemove(ctx context.Context, path string, fileInfo os.
log.Info("FS Handler: OnRemove", fmt.Sprintf("path:%s", path), fmt.Sprintf("fileName:%s", fileInfo.Name()))
// TODO: Also synchronizer lock check here

bucketSlug, bucketPath, exists := h.bs.getOpenFileBucketSlugAndPath(path)
watchInfo, exists := h.bs.getOpenFileBucketSlugAndPath(path)
if !exists {
msg := fmt.Sprintf("error: could not find path %s", path)
log.Error(msg, fmt.Errorf(msg))
return
}
bucketSlug := watchInfo.BucketSlug
bucketPath := watchInfo.BucketPath

b, err := h.bs.textileClient.GetBucket(ctx, bucketSlug, nil)
if err != nil {
Expand All @@ -125,14 +131,28 @@ func (h *watcherHandler) OnRemove(ctx context.Context, path string, fileInfo os.
func (h *watcherHandler) OnWrite(ctx context.Context, path string, fileInfo os.FileInfo) {
log.Info("FS Handler: OnWrite", fmt.Sprintf("path:%s", path), fmt.Sprintf("fileName:%s", fileInfo.Name()))

bucketSlug, bucketPath, exists := h.bs.getOpenFileBucketSlugAndPath(path)
watchInfo, exists := h.bs.getOpenFileBucketSlugAndPath(path)
if !exists {
msg := fmt.Sprintf("error: could not find path %s", path)
log.Error(msg, fmt.Errorf(msg))
return
}

b, err := h.bs.textileClient.GetBucket(ctx, bucketSlug, nil)
var b textile.Bucket
var err error
bucketSlug := watchInfo.BucketSlug
bucketPath := watchInfo.BucketPath

if watchInfo.IsRemote {
b, err = h.bs.textileClient.GetBucket(ctx, bucketSlug, &textile.GetBucketForRemoteFileInput{
Bucket: bucketSlug,
DbID: watchInfo.DbId,
Path: watchInfo.BucketPath,
})
} else {
b, err = h.bs.textileClient.GetBucket(ctx, bucketSlug, nil)
}

if err != nil {
msg := fmt.Sprintf("error: could not find bucket with slug %s", bucketSlug)
log.Error(msg, fmt.Errorf(msg))
Expand Down
22 changes: 11 additions & 11 deletions core/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type BucketSynchronizer interface {
Shutdown() error
RegisterNotifier(notifier GrpcNotifier)
AddFileWatch(addFileInfo domain.AddWatchFile) error
GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool)
GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool)
}

type TextileNotifier interface {
Expand Down Expand Up @@ -218,10 +218,10 @@ func (bs *bucketSynchronizer) AddFileWatch(addFileInfo domain.AddWatchFile) erro
return nil
}

func (bs *bucketSynchronizer) GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool) {
func (bs *bucketSynchronizer) GetOpenFilePath(bucketSlug, bucketPath, dbID string) (string, bool) {
var fi domain.AddWatchFile
var err error
reversKey := getOpenFileReverseKey(bucketSlug, bucketPath)
reversKey := getOpenFileReverseKey(bucketSlug, bucketPath, dbID)

if fi, err = bs.getOpenFileInfo(reversKey); err != nil {
return "", false
Expand All @@ -238,22 +238,22 @@ func getOpenFileKey(localPath string) string {
return OpenFilesKeyPrefix + localPath
}

func getOpenFileReverseKey(bucketSlug string, bucketPath string) string {
return ReverseOpenFilesKeyPrefix + bucketSlug + ":" + bucketPath
func getOpenFileReverseKey(bucketSlug, bucketPath, dbID string) string {
return ReverseOpenFilesKeyPrefix + bucketSlug + ":" + bucketPath + ":" + dbID
}

func (bs *bucketSynchronizer) getOpenFileBucketSlugAndPath(localPath string) (string, string, bool) {
func (bs *bucketSynchronizer) getOpenFileBucketSlugAndPath(localPath string) (domain.AddWatchFile, bool) {
var fi domain.AddWatchFile
var err error
if fi, err = bs.getOpenFileInfo(getOpenFileKey(localPath)); err != nil {
return "", "", false
return domain.AddWatchFile{}, false
}

if fi.BucketSlug == "" {
return "", "", false
return domain.AddWatchFile{}, false
}

return fi.BucketSlug, fi.BucketPath, true
return fi, true
}

// Helper function to set open file info in the store
Expand All @@ -265,7 +265,7 @@ func (bs *bucketSynchronizer) addFileInfoToStore(addFileInfo domain.AddWatchFile
if err := bs.store.SetString(getOpenFileKey(addFileInfo.LocalPath), string(out)); err != nil {
return err
}
reverseKey := getOpenFileReverseKey(addFileInfo.BucketKey, addFileInfo.BucketPath)
reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId)
if err := bs.store.SetString(reverseKey, string(out)); err != nil {
return err
}
Expand All @@ -277,7 +277,7 @@ func (bs *bucketSynchronizer) removeFileInfo(addFileInfo domain.AddWatchFile) er
if err := bs.store.Remove([]byte(getOpenFileKey(addFileInfo.LocalPath))); err != nil {
return err
}
reverseKey := getOpenFileReverseKey(addFileInfo.BucketKey, addFileInfo.BucketPath)
reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId)
if err := bs.store.Remove([]byte(reverseKey)); err != nil {
return err
}
Expand Down
27 changes: 6 additions & 21 deletions core/textile/bucket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ func (tc *textileClient) getBucket(ctx context.Context, slug string, remoteFile
b := bucket.New(
root,
getContextFn,
NewSecureBucketsClient(
bucketsClient,
slug,
),
tc.getSecureBucketsClient(bucketsClient),
)

// Attach a notifier if the bucket is local
Expand All @@ -80,18 +77,15 @@ func (tc *textileClient) getBucket(ctx context.Context, slug string, remoteFile
}

func (tc *textileClient) getBucketForMirror(ctx context.Context, slug string) (Bucket, error) {
root, getContextFn, newSlug, err := tc.getBucketRootForMirror(ctx, slug)
root, getContextFn, _, err := tc.getBucketRootForMirror(ctx, slug)
if err != nil {
return nil, err
}

b := bucket.New(
root,
getContextFn,
NewSecureBucketsClient(
tc.hb,
newSlug,
),
tc.getSecureBucketsClient(tc.hb),
)

return b, nil
Expand Down Expand Up @@ -202,10 +196,7 @@ func (tc *textileClient) getBucketRootFromReceivedFile(ctx context.Context, file
return nil, nil, err
}

sbs := NewSecureBucketsClient(
tc.hb,
receivedFile.Bucket,
)
sbs := tc.getSecureBucketsClient(tc.hb)

b, err := sbs.ListPath(remoteCtx, receivedFile.BucketKey, receivedFile.Path)

Expand Down Expand Up @@ -235,10 +226,7 @@ func (tc *textileClient) getBucketRootForMirror(ctx context.Context, slug string
return nil, nil, "", err
}

sbs := NewSecureBucketsClient(
tc.hb,
bucket.RemoteBucketSlug,
)
sbs := tc.getSecureBucketsClient(tc.hb)

b, err := sbs.ListPath(remoteCtx, bucket.RemoteBucketKey, "")

Expand Down Expand Up @@ -316,10 +304,7 @@ func (tc *textileClient) createBucket(ctx context.Context, bucketSlug string) (B
newB := bucket.New(
b.Root,
tc.getOrCreateBucketContext,
NewSecureBucketsClient(
tc.bucketsClient,
bucketSlug,
),
tc.getSecureBucketsClient(tc.bucketsClient),
)

return newB, nil
Expand Down
25 changes: 25 additions & 0 deletions core/textile/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/FleekHQ/space-daemon/config"
httpapi "github.com/ipfs/go-ipfs-http-client"
iface "github.com/ipfs/interface-go-ipfs-core"

"github.com/FleekHQ/space-daemon/core/keychain"
db "github.com/FleekHQ/space-daemon/core/store"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/FleekHQ/space-daemon/core/textile/utils"
"github.com/FleekHQ/space-daemon/core/util/address"
"github.com/FleekHQ/space-daemon/log"
ma "github.com/multiformats/go-multiaddr"
threadsClient "github.com/textileio/go-threads/api/client"
nc "github.com/textileio/go-threads/net/api/client"
bucketsClient "github.com/textileio/textile/v2/api/buckets/client"
Expand Down Expand Up @@ -61,6 +64,7 @@ type textileClient struct {
failedHealthchecks int
sync synchronizer.Synchronizer
notifier bucket.Notifier
ipfsClient iface.CoreAPI
}

// Creates a new Textile Client
Expand Down Expand Up @@ -188,6 +192,22 @@ func (tc *textileClient) start(ctx context.Context, cfg config.Config) error {
netc = n
}

ipfsNodeAddr := cfg.GetString(config.Ipfsnodeaddr, "/ip4/127.0.0.1/tcp/5001")
if ipfsNodeAddr == "" {
ipfsNodeAddr = "/ip4/127.0.0.1/tcp/5001"
}

multiAddr, err := ma.NewMultiaddr(ipfsNodeAddr)
if err != nil {
cmd.Fatal(err)
}

if ic, err := httpapi.NewApi(multiAddr); err != nil {
cmd.Fatal(err)
} else {
tc.ipfsClient = ic
}

tc.bucketsClient = buckets
tc.threads = threads
tc.netc = netc
Expand Down Expand Up @@ -502,6 +522,11 @@ func (tc *textileClient) GetModel() model.Model {
return model.New(tc.store, tc.kc, tc.threads, tc.hubAuth)
}

func (tc *textileClient) getSecureBucketsClient(baseClient *bucketsClient.Client) *SecureBucketClient {
isRemote := baseClient == tc.hb
return NewSecureBucketsClient(baseClient, tc.kc, tc.store, tc.threads, tc.ipfsClient, isRemote)
}

func (tc *textileClient) requiresHubConnection() error {
if err := tc.requiresRunning(); err != nil {
return err
Expand Down
Loading

0 comments on commit 833529c

Please sign in to comment.