diff --git a/core/space/domain/domain.go b/core/space/domain/domain.go index 31b644ab..752eadaa 100644 --- a/core/space/domain/domain.go +++ b/core/space/domain/domain.go @@ -64,6 +64,7 @@ type AddWatchFile struct { BucketPath string `json:"bucket_path"` BucketKey string `json:"bucket_key"` BucketSlug string `json:"bucket_slug"` + IsRemote bool `json:"isRemote"` } type Identity struct { diff --git a/core/space/services/services.go b/core/space/services/services.go index 37a5bd08..a5981ca4 100644 --- a/core/space/services/services.go +++ b/core/space/services/services.go @@ -30,7 +30,7 @@ type Space struct { type Syncer interface { AddFileWatch(addFileInfo domain.AddWatchFile) error - GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool) + GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool) } type AddFileWatchFunc = func(addFileInfo domain.AddWatchFile) error diff --git a/core/space/services/services_fs.go b/core/space/services/services_fs.go index 63d02cc7..9418f93f 100644 --- a/core/space/services/services_fs.go +++ b/core/space/services/services_fs.go @@ -286,10 +286,11 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do return domain.OpenFileInfo{}, err } + isRemote := dbID != "" var filePath string var b textile.Bucket // check if file exists in sync - if dbID != "" { + if isRemote { b, err = s.getBucketForRemoteFile(ctx, bucketName, dbID, path) } else { b, err = s.getBucketWithFallback(ctx, bucketName) @@ -297,7 +298,7 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do if err != nil { return domain.OpenFileInfo{}, err } - if filePath, exists := s.sync.GetOpenFilePath(b.Slug(), path); exists { + if filePath, exists := s.sync.GetOpenFilePath(b.Slug(), path, dbID); exists { // sanity check in case file was deleted or moved if PathExists(filePath) { // return file handle @@ -308,7 +309,7 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do } // else, open new file on FS - filePath, err = s.openFileOnFs(ctx, path, b) + filePath, err = s.openFileOnFs(ctx, path, b, isRemote) if err != nil { return domain.OpenFileInfo{}, err } @@ -319,7 +320,7 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do }, nil } -func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket) (string, error) { +func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket, isRemote bool) (string, error) { // write file copy to temp folder tmpFile, err := s.createTempFileForPath(ctx, path, false) if err != nil { @@ -349,6 +350,7 @@ func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket) BucketPath: path, BucketKey: b.Key(), BucketSlug: b.Slug(), + IsRemote: isRemote, } err = s.sync.AddFileWatch(addWatchFile) diff --git a/core/space/space_test.go b/core/space/space_test.go index b2b9524b..1dd7fea3 100644 --- a/core/space/space_test.go +++ b/core/space/space_test.go @@ -323,7 +323,7 @@ func TestService_OpenFile(t *testing.T) { getDir().dir, ) - mockSync.On("GetOpenFilePath", testKey, testPath).Return( + mockSync.On("GetOpenFilePath", testKey, testPath, mock.Anything).Return( "", false, ) diff --git a/core/sync/fs.go b/core/sync/fs.go index 68e58ce7..8c7046f4 100644 --- a/core/sync/fs.go +++ b/core/sync/fs.go @@ -7,6 +7,7 @@ import ( ipfspath "github.com/ipfs/interface-go-ipfs-core/path" + "github.com/FleekHQ/space-daemon/core/textile" "github.com/FleekHQ/space-daemon/log" ) @@ -22,13 +23,16 @@ func (h *watcherHandler) OnCreate(ctx context.Context, path string, fileInfo os. var newRoot ipfspath.Path var err error - bucketSlug, bucketPath, exists := h.bs.getOpenFileBucketSlugAndPath(path) + watchInfo, exists := h.bs.getOpenFileBucketSlugAndPath(path) if !exists { msg := fmt.Sprintf("error: could not find path %s", path) log.Error(msg, fmt.Errorf(msg)) return } + bucketSlug := watchInfo.BucketSlug + bucketPath := watchInfo.BucketPath + b, err := h.bs.textileClient.GetBucket(ctx, bucketSlug, nil) if err != nil { msg := fmt.Sprintf("error: could not find bucket with slug %s", bucketSlug) @@ -93,12 +97,14 @@ func (h *watcherHandler) OnRemove(ctx context.Context, path string, fileInfo os. log.Info("FS Handler: OnRemove", fmt.Sprintf("path:%s", path), fmt.Sprintf("fileName:%s", fileInfo.Name())) // TODO: Also synchronizer lock check here - bucketSlug, bucketPath, exists := h.bs.getOpenFileBucketSlugAndPath(path) + watchInfo, exists := h.bs.getOpenFileBucketSlugAndPath(path) if !exists { msg := fmt.Sprintf("error: could not find path %s", path) log.Error(msg, fmt.Errorf(msg)) return } + bucketSlug := watchInfo.BucketSlug + bucketPath := watchInfo.BucketPath b, err := h.bs.textileClient.GetBucket(ctx, bucketSlug, nil) if err != nil { @@ -125,14 +131,28 @@ func (h *watcherHandler) OnRemove(ctx context.Context, path string, fileInfo os. func (h *watcherHandler) OnWrite(ctx context.Context, path string, fileInfo os.FileInfo) { log.Info("FS Handler: OnWrite", fmt.Sprintf("path:%s", path), fmt.Sprintf("fileName:%s", fileInfo.Name())) - bucketSlug, bucketPath, exists := h.bs.getOpenFileBucketSlugAndPath(path) + watchInfo, exists := h.bs.getOpenFileBucketSlugAndPath(path) if !exists { msg := fmt.Sprintf("error: could not find path %s", path) log.Error(msg, fmt.Errorf(msg)) return } - b, err := h.bs.textileClient.GetBucket(ctx, bucketSlug, nil) + var b textile.Bucket + var err error + bucketSlug := watchInfo.BucketSlug + bucketPath := watchInfo.BucketPath + + if watchInfo.IsRemote { + b, err = h.bs.textileClient.GetBucket(ctx, bucketSlug, &textile.GetBucketForRemoteFileInput{ + Bucket: bucketSlug, + DbID: watchInfo.DbId, + Path: watchInfo.BucketPath, + }) + } else { + b, err = h.bs.textileClient.GetBucket(ctx, bucketSlug, nil) + } + if err != nil { msg := fmt.Sprintf("error: could not find bucket with slug %s", bucketSlug) log.Error(msg, fmt.Errorf(msg)) diff --git a/core/sync/sync.go b/core/sync/sync.go index 9d8939c3..058a6373 100644 --- a/core/sync/sync.go +++ b/core/sync/sync.go @@ -37,7 +37,7 @@ type BucketSynchronizer interface { Shutdown() error RegisterNotifier(notifier GrpcNotifier) AddFileWatch(addFileInfo domain.AddWatchFile) error - GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool) + GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool) } type TextileNotifier interface { @@ -218,10 +218,10 @@ func (bs *bucketSynchronizer) AddFileWatch(addFileInfo domain.AddWatchFile) erro return nil } -func (bs *bucketSynchronizer) GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool) { +func (bs *bucketSynchronizer) GetOpenFilePath(bucketSlug, bucketPath, dbID string) (string, bool) { var fi domain.AddWatchFile var err error - reversKey := getOpenFileReverseKey(bucketSlug, bucketPath) + reversKey := getOpenFileReverseKey(bucketSlug, bucketPath, dbID) if fi, err = bs.getOpenFileInfo(reversKey); err != nil { return "", false @@ -238,22 +238,22 @@ func getOpenFileKey(localPath string) string { return OpenFilesKeyPrefix + localPath } -func getOpenFileReverseKey(bucketSlug string, bucketPath string) string { - return ReverseOpenFilesKeyPrefix + bucketSlug + ":" + bucketPath +func getOpenFileReverseKey(bucketSlug, bucketPath, dbID string) string { + return ReverseOpenFilesKeyPrefix + bucketSlug + ":" + bucketPath + ":" + dbID } -func (bs *bucketSynchronizer) getOpenFileBucketSlugAndPath(localPath string) (string, string, bool) { +func (bs *bucketSynchronizer) getOpenFileBucketSlugAndPath(localPath string) (domain.AddWatchFile, bool) { var fi domain.AddWatchFile var err error if fi, err = bs.getOpenFileInfo(getOpenFileKey(localPath)); err != nil { - return "", "", false + return domain.AddWatchFile{}, false } if fi.BucketSlug == "" { - return "", "", false + return domain.AddWatchFile{}, false } - return fi.BucketSlug, fi.BucketPath, true + return fi, true } // Helper function to set open file info in the store @@ -265,7 +265,7 @@ func (bs *bucketSynchronizer) addFileInfoToStore(addFileInfo domain.AddWatchFile if err := bs.store.SetString(getOpenFileKey(addFileInfo.LocalPath), string(out)); err != nil { return err } - reverseKey := getOpenFileReverseKey(addFileInfo.BucketKey, addFileInfo.BucketPath) + reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId) if err := bs.store.SetString(reverseKey, string(out)); err != nil { return err } @@ -277,7 +277,7 @@ func (bs *bucketSynchronizer) removeFileInfo(addFileInfo domain.AddWatchFile) er if err := bs.store.Remove([]byte(getOpenFileKey(addFileInfo.LocalPath))); err != nil { return err } - reverseKey := getOpenFileReverseKey(addFileInfo.BucketKey, addFileInfo.BucketPath) + reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId) if err := bs.store.Remove([]byte(reverseKey)); err != nil { return err } diff --git a/core/textile/bucket_factory.go b/core/textile/bucket_factory.go index ff04a81d..7d8ff5e4 100644 --- a/core/textile/bucket_factory.go +++ b/core/textile/bucket_factory.go @@ -64,10 +64,7 @@ func (tc *textileClient) getBucket(ctx context.Context, slug string, remoteFile b := bucket.New( root, getContextFn, - NewSecureBucketsClient( - bucketsClient, - slug, - ), + tc.getSecureBucketsClient(bucketsClient), ) // Attach a notifier if the bucket is local @@ -80,7 +77,7 @@ func (tc *textileClient) getBucket(ctx context.Context, slug string, remoteFile } func (tc *textileClient) getBucketForMirror(ctx context.Context, slug string) (Bucket, error) { - root, getContextFn, newSlug, err := tc.getBucketRootForMirror(ctx, slug) + root, getContextFn, _, err := tc.getBucketRootForMirror(ctx, slug) if err != nil { return nil, err } @@ -88,10 +85,7 @@ func (tc *textileClient) getBucketForMirror(ctx context.Context, slug string) (B b := bucket.New( root, getContextFn, - NewSecureBucketsClient( - tc.hb, - newSlug, - ), + tc.getSecureBucketsClient(tc.hb), ) return b, nil @@ -202,10 +196,7 @@ func (tc *textileClient) getBucketRootFromReceivedFile(ctx context.Context, file return nil, nil, err } - sbs := NewSecureBucketsClient( - tc.hb, - receivedFile.Bucket, - ) + sbs := tc.getSecureBucketsClient(tc.hb) b, err := sbs.ListPath(remoteCtx, receivedFile.BucketKey, receivedFile.Path) @@ -235,10 +226,7 @@ func (tc *textileClient) getBucketRootForMirror(ctx context.Context, slug string return nil, nil, "", err } - sbs := NewSecureBucketsClient( - tc.hb, - bucket.RemoteBucketSlug, - ) + sbs := tc.getSecureBucketsClient(tc.hb) b, err := sbs.ListPath(remoteCtx, bucket.RemoteBucketKey, "") @@ -316,10 +304,7 @@ func (tc *textileClient) createBucket(ctx context.Context, bucketSlug string) (B newB := bucket.New( b.Root, tc.getOrCreateBucketContext, - NewSecureBucketsClient( - tc.bucketsClient, - bucketSlug, - ), + tc.getSecureBucketsClient(tc.bucketsClient), ) return newB, nil diff --git a/core/textile/client.go b/core/textile/client.go index 135e2bdf..bd8c9610 100644 --- a/core/textile/client.go +++ b/core/textile/client.go @@ -10,6 +10,8 @@ import ( "time" "github.com/FleekHQ/space-daemon/config" + httpapi "github.com/ipfs/go-ipfs-http-client" + iface "github.com/ipfs/interface-go-ipfs-core" "github.com/FleekHQ/space-daemon/core/keychain" db "github.com/FleekHQ/space-daemon/core/store" @@ -21,6 +23,7 @@ import ( "github.com/FleekHQ/space-daemon/core/textile/utils" "github.com/FleekHQ/space-daemon/core/util/address" "github.com/FleekHQ/space-daemon/log" + ma "github.com/multiformats/go-multiaddr" threadsClient "github.com/textileio/go-threads/api/client" nc "github.com/textileio/go-threads/net/api/client" bucketsClient "github.com/textileio/textile/v2/api/buckets/client" @@ -61,6 +64,7 @@ type textileClient struct { failedHealthchecks int sync synchronizer.Synchronizer notifier bucket.Notifier + ipfsClient iface.CoreAPI } // Creates a new Textile Client @@ -188,6 +192,22 @@ func (tc *textileClient) start(ctx context.Context, cfg config.Config) error { netc = n } + ipfsNodeAddr := cfg.GetString(config.Ipfsnodeaddr, "/ip4/127.0.0.1/tcp/5001") + if ipfsNodeAddr == "" { + ipfsNodeAddr = "/ip4/127.0.0.1/tcp/5001" + } + + multiAddr, err := ma.NewMultiaddr(ipfsNodeAddr) + if err != nil { + cmd.Fatal(err) + } + + if ic, err := httpapi.NewApi(multiAddr); err != nil { + cmd.Fatal(err) + } else { + tc.ipfsClient = ic + } + tc.bucketsClient = buckets tc.threads = threads tc.netc = netc @@ -502,6 +522,11 @@ func (tc *textileClient) GetModel() model.Model { return model.New(tc.store, tc.kc, tc.threads, tc.hubAuth) } +func (tc *textileClient) getSecureBucketsClient(baseClient *bucketsClient.Client) *SecureBucketClient { + isRemote := baseClient == tc.hb + return NewSecureBucketsClient(baseClient, tc.kc, tc.store, tc.threads, tc.ipfsClient, isRemote) +} + func (tc *textileClient) requiresHubConnection() error { if err := tc.requiresRunning(); err != nil { return err diff --git a/core/textile/model/model.go b/core/textile/model/model.go index 1c7c87a0..42336cb8 100644 --- a/core/textile/model/model.go +++ b/core/textile/model/model.go @@ -2,21 +2,17 @@ package model import ( "context" - "errors" "github.com/FleekHQ/space-daemon/core/keychain" "github.com/FleekHQ/space-daemon/core/space/domain" "github.com/FleekHQ/space-daemon/core/store" "github.com/FleekHQ/space-daemon/core/textile/hub" "github.com/FleekHQ/space-daemon/core/textile/utils" - "github.com/FleekHQ/space-daemon/log" threadsClient "github.com/textileio/go-threads/api/client" "github.com/textileio/go-threads/core/thread" - "github.com/textileio/go-threads/db" ) const metaThreadName = "metathreadV1" -const threadIDStoreKey = "thread_id" type model struct { st store.Store @@ -59,71 +55,8 @@ func New(st store.Store, kc keychain.Keychain, threads *threadsClient.Client, hu } } -// Returns the store key for a thread ID. It uses the keychain to obtain the public key, since the store key depends on it. -func getMetathreadStoreKey(kc keychain.Keychain) ([]byte, error) { - pub, err := kc.GetStoredPublicKey() - if err != nil { - return nil, err - } - - pubInBytes, err := pub.Raw() - if err != nil { - return nil, err - } - - result := []byte(threadIDStoreKey + "_" + metaThreadName) - result = append(result, pubInBytes...) - - return result, nil -} - func (m *model) findOrCreateMetaThreadID(ctx context.Context) (*thread.ID, error) { - storeKey, err := getMetathreadStoreKey(m.kc) - if err != nil { - return nil, err - } - - if val, _ := m.st.Get(storeKey); val != nil { - // Cast the stored dbID from bytes to thread.ID - if dbID, err := thread.Cast(val); err != nil { - return nil, err - } else { - return &dbID, nil - } - } - - // thread id does not exist yet - - // We need to create an ID that's derived deterministically from the user private key - // The reason for this is that the user needs to be able to restore the exact ID when moving across devices. - // The only consideration is that we must try to avoid dbID collisions with other users. - dbID, err := utils.NewDeterministicThreadID(m.kc, utils.MetathreadThreadVariant) - if err != nil { - return nil, err - } - - dbIDInBytes := dbID.Bytes() - - log.Debug("Model.findOrCreateMetaThreadID: Created meta thread in db " + dbID.String()) - - managedKey, err := m.kc.GetManagedThreadKey(metaThreadName) - if err != nil { - log.Error("error getting managed thread key", err) - return nil, err - } - - err = m.threads.NewDB(ctx, dbID, db.WithNewManagedThreadKey(managedKey)) - st := err.Error() - if err != nil && st != "rpc error: code = Unknown desc = db already exists" { - return nil, err - } - - if err := m.st.Set(storeKey, dbIDInBytes); err != nil { - newErr := errors.New("error while storing thread id: check your local space db accessibility") - return nil, newErr - } - - return &dbID, nil + return utils.FindOrCreateDeterministicThreadID(ctx, utils.MetathreadThreadVariant, metaThreadName, m.kc, m.st, m.threads) } func (m *model) getMetaThreadContext(ctx context.Context) (context.Context, *thread.ID, error) { diff --git a/core/textile/secure_bucket_client.go b/core/textile/secure_bucket_client.go index f9c0cde7..030bc2f2 100644 --- a/core/textile/secure_bucket_client.go +++ b/core/textile/secure_bucket_client.go @@ -5,9 +5,13 @@ import ( "errors" "fmt" "io" + "io/ioutil" + "os" "regexp" "strings" + "github.com/FleekHQ/space-daemon/core/keychain" + "github.com/FleekHQ/space-daemon/core/store" "github.com/FleekHQ/space-daemon/core/textile/common" "github.com/FleekHQ/space-daemon/core/textile/utils" @@ -15,7 +19,13 @@ import ( "github.com/FleekHQ/space-daemon/core/textile/bucket/crypto" + "github.com/ipfs/go-cid" + ipfsfiles "github.com/ipfs/go-ipfs-files" + iface "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" "github.com/ipfs/interface-go-ipfs-core/path" + threadsClient "github.com/textileio/go-threads/api/client" + "github.com/textileio/go-threads/core/thread" bc "github.com/textileio/textile/v2/api/buckets/client" bucketsClient "github.com/textileio/textile/v2/api/buckets/client" bucketspb "github.com/textileio/textile/v2/api/buckets/pb" @@ -29,16 +39,28 @@ var textileRelPathRegex = regexp.MustCompile(`/ip[f|n]s/[^/]*(?P/.*)`) // and also decrypts response from the underlying textile client type SecureBucketClient struct { client *bucketsClient.Client - bucketSlug string + kc keychain.Keychain + st store.Store + threads *threadsClient.Client + ipfsClient iface.CoreAPI + isRemote bool } func NewSecureBucketsClient( client *bucketsClient.Client, - bucketSlug string, + kc keychain.Keychain, + st store.Store, + threads *threadsClient.Client, + ipfsClient iface.CoreAPI, + isRemote bool, ) *SecureBucketClient { return &SecureBucketClient{ client: client, - bucketSlug: bucketSlug, + kc: kc, + st: st, + threads: threads, + ipfsClient: ipfsClient, + isRemote: isRemote, } } @@ -107,7 +129,7 @@ func (s *SecureBucketClient) PullPath(ctx context.Context, key, path string, wri go func() { defer pipeWriter.Close() - if err := s.client.PullPath(ctx, key, encryptedPath, pipeWriter, opts...); err != nil { + if err := s.racePullFile(ctx, key, encryptedPath, pipeWriter, opts...); err != nil { errs <- err } }() @@ -125,7 +147,8 @@ func (s *SecureBucketClient) PullPath(ctx context.Context, key, path string, wri return } }() - return <-errs + err = <-errs + return err } func (s *SecureBucketClient) overwriteDecryptedItem(ctx context.Context, item *bucketspb.PathItem) error { @@ -245,3 +268,219 @@ func (s *SecureBucketClient) decryptPathData( func cleanBucketPath(path string) string { return strings.TrimPrefix(path, "/") } + +type pathPullingFn func(context.Context, string, string, io.Writer, ...bc.Option) (bool, error) + +type pullSuccessResponse struct { + file *os.File + shouldCache bool +} + +func (s *SecureBucketClient) racePullFile(ctx context.Context, key, encPath string, w io.Writer, opts ...bc.Option) error { + pullers := []pathPullingFn{s.pullFileFromLocal, s.pullFileFromClient, s.pullFileFromDHT} + + pullSuccess := make(chan *pullSuccessResponse) + errc := make(chan error) + defer close(pullSuccess) + + ctxWithCancel, cancelPulls := context.WithCancel(ctx) + pendingFns := len(pullers) + erroredFns := 0 + + for _, fn := range pullers { + f, err := ioutil.TempFile("", "*-"+encPath) + if err != nil { + cancelPulls() + return err + } + defer f.Close() + defer os.Remove(f.Name()) + + go func(fn pathPullingFn, f *os.File) { + shouldCache, err := fn(ctxWithCancel, key, encPath, f, opts...) + if err != nil { + errc <- err + return + } + + chanRes := &pullSuccessResponse{ + file: f, + shouldCache: shouldCache, + } + + pullSuccess <- chanRes + errc <- nil + }(fn, f) + } + + var pullErr error + + // Wait for either all pullers to fail or for one to succeed + go func() { + for { + select { + case err := <-errc: + pendingFns-- + + if err != nil { + erroredFns++ + pullErr = err + } + if pendingFns <= 0 && erroredFns >= len(pullers) { + // All functions failed. Stop waiting + pullSuccess <- nil + } + + if pendingFns <= 0 { + close(errc) + return + } + } + } + }() + + pullResponse := <-pullSuccess + cancelPulls() + + // Return error if all pull functions failed + if erroredFns >= len(pullers) || pullResponse == nil { + return pullErr + } + + finalFile := pullResponse.file + shouldCache := pullResponse.shouldCache + + // Copy pulled file to upstream writer + resErrc := make(chan error) + defer close(resErrc) + go func() { + from, err := os.Open(finalFile.Name()) + if err != nil { + resErrc <- err + return + } + defer from.Close() + + _, err = io.Copy(w, from) + resErrc <- err + }() + + // Copy pulled file to local cache + cacheErrc := make(chan error) + defer close(cacheErrc) + go func() { + var err error + if !shouldCache { + cacheErrc <- nil + return + } + from, err := os.Open(finalFile.Name()) + if err != nil { + cacheErrc <- err + return + } + defer from.Close() + + p, err := s.ipfsClient.Unixfs().Add( + ctx, + ipfsfiles.NewReaderFile(from), + options.Unixfs.Pin(false), // Turn to true when we enable DHT discovery + options.Unixfs.Progress(false), + options.Unixfs.CidVersion(1), + ) + if err != nil { + cacheErrc <- err + return + } + + cidBinary := p.Cid().Bytes() + err = s.st.Set(getFileCacheKey(encPath), cidBinary) + + cacheErrc <- err + }() + + if err := <-resErrc; err != nil { + return err + } + + if err := <-cacheErrc; err != nil { + return err + } + + return nil +} + +const fileCachePrefix = "file_cache" + +func getFileCacheKey(encPath string) []byte { + return []byte(fileCachePrefix + ":" + encPath) +} + +func (s *SecureBucketClient) pullFileFromClient(ctx context.Context, key, encPath string, w io.Writer, opts ...bc.Option) (shouldCache bool, err error) { + shouldCache = true + if s.isRemote == false { + // File already in local bucket + shouldCache = false + } + + if err = s.client.PullPath(ctx, key, encPath, w, opts...); err != nil { + return false, err + } + return shouldCache, nil +} + +var errNoLocalClient = errors.New("No cache client available") + +func (s *SecureBucketClient) pullFileFromLocal(ctx context.Context, key, encPath string, w io.Writer, opts ...bc.Option) (shouldCache bool, err error) { + shouldCache = false + + cidBinary, err := s.st.Get(getFileCacheKey(encPath)) + if cidBinary == nil || err != nil { + return false, errors.New("CID not stored in local cache") + } + + _, c, err := cid.CidFromBytes(cidBinary) + if err != nil { + return false, err + } + + node, err := s.ipfsClient.Unixfs().Get(ctx, path.New(c.String())) + if err != nil { + return false, err + } + defer node.Close() + + file := ipfsfiles.ToFile(node) + if file == nil { + return false, errors.New("File is a directory") + } + + if _, err := io.Copy(w, file); err != nil { + return false, err + } + + return shouldCache, nil +} + +func (s *SecureBucketClient) pullFileFromDHT(ctx context.Context, key, encPath string, w io.Writer, opts ...bc.Option) (shouldCache bool, err error) { + shouldCache = true + + // return shouldCache, nil + return false, errors.New("Not implemented") +} + +const cacheBucketThreadName = "cache_bucket" + +func (s *SecureBucketClient) getCacheBucketCtx(ctx context.Context) (context.Context, *thread.ID, error) { + dbID, err := utils.FindOrCreateDeterministicThreadID(ctx, utils.CacheBucketVariant, cacheBucketThreadName, s.kc, s.st, s.threads) + if err != nil { + return nil, nil, err + } + + cacheCtx, err := utils.GetThreadContext(ctx, cacheBucketThreadName, *dbID, false, s.kc, nil, s.threads) + if err != nil { + return nil, nil, err + } + + return cacheCtx, dbID, nil +} diff --git a/core/textile/sharing.go b/core/textile/sharing.go index fab3386d..bff14a2b 100644 --- a/core/textile/sharing.go +++ b/core/textile/sharing.go @@ -42,7 +42,7 @@ func (tc *textileClient) ShareFilesViaPublicKey(ctx context.Context, paths []dom roles[tpk.String()] = buckets.Admin } - sbc := NewSecureBucketsClient(tc.hb, pth.Bucket) + sbc := tc.getSecureBucketsClient(tc.hb) err := sbc.PushPathAccessRoles(ctx, pth.BucketKey, pth.Path, roles) if err != nil { @@ -148,7 +148,7 @@ func (tc *textileClient) GetReceivedFiles(ctx context.Context, accepted bool, se return nil, "", err } - sbc := NewSecureBucketsClient(tc.hb, file.Bucket) + sbc := tc.getSecureBucketsClient(tc.hb) f, err := sbc.ListPath(ctx, file.BucketKey, file.Path) if err != nil { @@ -242,7 +242,7 @@ func (tc *textileClient) GetPathAccessRoles(ctx context.Context, b Bucket, path return nil, err } - sbc := NewSecureBucketsClient(tc.hb, bucketSlug) + sbc := tc.getSecureBucketsClient(tc.hb) rs, err := sbc.PullPathAccessRoles(hubCtx, bucketKey, path) if err != nil { @@ -281,7 +281,7 @@ func (tc *textileClient) GetPathAccessRoles(ctx context.Context, b Bucket, path // return true if file was shared // XXX: export this func? func (tc *textileClient) isSharedFile(ctx context.Context, bucket Bucket, path string) bool { - sbc := NewSecureBucketsClient(tc.hb, bucket.Slug()) + sbc := tc.getSecureBucketsClient(tc.hb) roles, err := sbc.PullPathAccessRoles(ctx, bucket.Key(), path) if err != nil { diff --git a/core/textile/sync/pinning.go b/core/textile/sync/pinning.go index 48879a6d..0049f276 100644 --- a/core/textile/sync/pinning.go +++ b/core/textile/sync/pinning.go @@ -31,6 +31,7 @@ func (s *synchronizer) uploadFileToRemote(ctx context.Context, bucket, path stri if err := localBucket.GetFile(ctx, path, pipeWriter); err != nil { errc <- err + return } errc <- nil diff --git a/core/textile/sync/task-executors.go b/core/textile/sync/task-executors.go index 6c253364..98850cf3 100644 --- a/core/textile/sync/task-executors.go +++ b/core/textile/sync/task-executors.go @@ -80,14 +80,17 @@ func (s *synchronizer) processPinFile(ctx context.Context, task *Task) error { bucket := task.Args[0] path := task.Args[1] - err := s.uploadFileToRemote(ctx, bucket, path) + if err := s.uploadFileToRemote(ctx, bucket, path); err != nil { + return err + } + s.setMirrorFileBackup(ctx, path, bucket, false) if s.eventNotifier != nil { s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupReady, nil)) } - return err + return nil } func (s *synchronizer) processUnpinFile(ctx context.Context, task *Task) error { diff --git a/core/textile/textile.go b/core/textile/textile.go index cf00dade..ae1d658d 100644 --- a/core/textile/textile.go +++ b/core/textile/textile.go @@ -24,6 +24,7 @@ const ( hubTarget = "127.0.0.1:3006" threadsTarget = "127.0.0.1:3006" defaultPersonalBucketSlug = "personal" + defaultCacheBucketSlug = "personal_cache" defaultPersonalMirrorBucketSlug = "personal_mirror" defaultPublicShareBucketSlug = "personal_public" ) diff --git a/core/textile/utils/utils.go b/core/textile/utils/utils.go index 5b297540..a762bde8 100644 --- a/core/textile/utils/utils.go +++ b/core/textile/utils/utils.go @@ -7,13 +7,16 @@ import ( "encoding/base32" "encoding/binary" "encoding/hex" + "errors" "path/filepath" "github.com/FleekHQ/space-daemon/core/keychain" + "github.com/FleekHQ/space-daemon/core/store" "github.com/FleekHQ/space-daemon/core/textile/hub" crypto "github.com/libp2p/go-libp2p-crypto" tc "github.com/textileio/go-threads/api/client" "github.com/textileio/go-threads/core/thread" + "github.com/textileio/go-threads/db" "github.com/textileio/textile/v2/api/common" "golang.org/x/crypto/pbkdf2" ) @@ -40,6 +43,7 @@ type DeterministicThreadVariant string const ( MetathreadThreadVariant DeterministicThreadVariant = "metathread" + CacheBucketVariant DeterministicThreadVariant = "cache_bucket" ) func NewDeterministicThreadID(kc keychain.Keychain, threadVariant DeterministicThreadVariant) (thread.ID, error) { @@ -137,3 +141,75 @@ func IsMetaFileName(pathOrName string) bool { return false } + +const threadIDStoreKey = "thread_id" + +// Returns the store key for a thread ID. It uses the keychain to obtain the public key, since the store key depends on it. +func getDeterministicthreadStoreKey(kc keychain.Keychain, variant DeterministicThreadVariant) ([]byte, error) { + pub, err := kc.GetStoredPublicKey() + if err != nil { + return nil, err + } + + pubInBytes, err := pub.Raw() + if err != nil { + return nil, err + } + + result := []byte(threadIDStoreKey + "_" + string(variant)) + result = append(result, pubInBytes...) + + return result, nil +} + +// Finds or creates a thread ID that's based on the user private key and the specified variant +// Using the same private key, variant and thread name will always end up generating the same key +func FindOrCreateDeterministicThreadID(ctx context.Context, variant DeterministicThreadVariant, threadName string, kc keychain.Keychain, st store.Store, threads *tc.Client) (*thread.ID, error) { + storeKey, err := getDeterministicthreadStoreKey(kc, variant) + if err != nil { + return nil, err + } + + if val, _ := st.Get(storeKey); val != nil { + // Cast the stored dbID from bytes to thread.ID + if dbID, err := thread.Cast(val); err != nil { + return nil, err + } else { + return &dbID, nil + } + } + + // thread id does not exist yet + + // We need to create an ID that's derived deterministically from the user private key + // The reason for this is that the user needs to be able to restore the exact ID when moving across devices. + // The only consideration is that we must try to avoid dbID collisions with other users. + dbID, err := NewDeterministicThreadID(kc, variant) + if err != nil { + return nil, err + } + + dbIDInBytes := dbID.Bytes() + + managedKey, err := kc.GetManagedThreadKey(threadName) + if err != nil { + return nil, err + } + + threadCtx, err := GetThreadContext(ctx, threadName, dbID, false, kc, nil, threads) + if err != nil { + return nil, err + } + + err = threads.NewDB(threadCtx, dbID, db.WithNewManagedThreadKey(managedKey)) + if err != nil && err.Error() != "rpc error: code = Unknown desc = db already exists" { + return nil, err + } + + if err := st.Set(storeKey, dbIDInBytes); err != nil { + newErr := errors.New("error while storing thread id: check your local space db accessibility") + return nil, newErr + } + + return &dbID, nil +} diff --git a/mocks/mock_syncer.go b/mocks/mock_syncer.go index 3e8fa65c..086c32cd 100644 --- a/mocks/mock_syncer.go +++ b/mocks/mock_syncer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.0.0-alpha.2. DO NOT EDIT. +// Code generated by mockery v2.0.0. DO NOT EDIT. package mocks @@ -26,20 +26,20 @@ func (_m *Syncer) AddFileWatch(addFileInfo domain.AddWatchFile) error { return r0 } -// GetOpenFilePath provides a mock function with given fields: bucketSlug, bucketPath -func (_m *Syncer) GetOpenFilePath(bucketSlug string, bucketPath string) (string, bool) { - ret := _m.Called(bucketSlug, bucketPath) +// GetOpenFilePath provides a mock function with given fields: bucketSlug, bucketPath, dbID +func (_m *Syncer) GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool) { + ret := _m.Called(bucketSlug, bucketPath, dbID) var r0 string - if rf, ok := ret.Get(0).(func(string, string) string); ok { - r0 = rf(bucketSlug, bucketPath) + if rf, ok := ret.Get(0).(func(string, string, string) string); ok { + r0 = rf(bucketSlug, bucketPath, dbID) } else { r0 = ret.Get(0).(string) } var r1 bool - if rf, ok := ret.Get(1).(func(string, string) bool); ok { - r1 = rf(bucketSlug, bucketPath) + if rf, ok := ret.Get(1).(func(string, string, string) bool); ok { + r1 = rf(bucketSlug, bucketPath, dbID) } else { r1 = ret.Get(1).(bool) }