From 25bdf678a078f24e767feea2c187eea57a2f95f7 Mon Sep 17 00:00:00 2001 From: Daniel Merrill Date: Mon, 26 Oct 2020 18:02:17 -0300 Subject: [PATCH] Listen for textile client updates (#220) * WIP: Textile listener * WIP: textile client listener connecting but not receiving * Use replicator for metathread * clean up * Merge with develop and connect with restorer * Fix segfault on metathread creation * Prevent segfault * Allow enqueing repeating tasks when the old one already completed * Fix tests --- app/app.go | 13 +-- core/keychain/keychain.go | 9 +- core/sync/sync.go | 49 ++-------- core/textile/client.go | 23 ++++- core/textile/event_handler.go | 26 +++++ core/textile/listener.go | 138 +++++++++++++++++++++++++++ core/textile/model/model.go | 43 ++++++++- core/textile/secure_bucket_client.go | 15 --- core/textile/sync/mirror.go | 12 ++- core/textile/sync/queue.go | 8 +- core/textile/sync/sync_test.go | 6 +- core/textile/sync/synchronizer.go | 78 ++++++++------- core/textile/sync/task-executors.go | 9 +- core/textile/textile.go | 6 ++ core/textile/utils/utils.go | 6 +- go.mod | 1 + mocks/Client.go | 25 ++++- 17 files changed, 355 insertions(+), 112 deletions(-) create mode 100644 core/textile/listener.go diff --git a/app/app.go b/app/app.go index 45fdda59..bb225968 100644 --- a/app/app.go +++ b/app/app.go @@ -179,17 +179,18 @@ func (a *App) Start(ctx context.Context) error { textileClient.AttachMailboxNotifier(srv) textileClient.AttachSynchronizerNotifier(srv) - err = a.RunAsync("BucketSync", bucketSync, func() error { - bucketSync.RegisterNotifier(srv) - return bucketSync.Start(ctx) + + // start the gRPC server + err = a.RunAsync("gRPCServer", srv, func() error { + return srv.Start(ctx) }) if err != nil { return err } - // start the gRPC server - err = a.RunAsync("gRPCServer", srv, func() error { - return srv.Start(ctx) + err = a.RunAsync("BucketSync", bucketSync, func() error { + bucketSync.RegisterNotifier(srv) + return bucketSync.Start(ctx) }) if err != nil { return err diff --git a/core/keychain/keychain.go b/core/keychain/keychain.go index 79a1c161..3cd9583d 100644 --- a/core/keychain/keychain.go +++ b/core/keychain/keychain.go @@ -4,11 +4,12 @@ import ( "crypto/ed25519" "crypto/sha512" "encoding/hex" - "golang.org/x/crypto/pbkdf2" "os" "path" "strings" + "golang.org/x/crypto/pbkdf2" + "errors" "github.com/99designs/keyring" @@ -372,6 +373,12 @@ func (kc *keychain) retrieveKeyPair() (privKey []byte, mnemonic string, err erro } func (kc *keychain) GetManagedThreadKey(threadKeyName string) (thread.Key, error) { + // Check if there's a key stored before continuing + _, err := kc.GetStoredPublicKey() + if err != nil { + return thread.Key{}, err + } + size := 32 priv, _, err := kc.GetStoredKeyPairInLibP2PFormat() diff --git a/core/sync/sync.go b/core/sync/sync.go index 058a6373..ab3109ec 100644 --- a/core/sync/sync.go +++ b/core/sync/sync.go @@ -61,10 +61,9 @@ type bucketSynchronizer struct { textileClient textile.Client fh *watcherHandler th *textileHandler - // textileThreadListeners []textile.ThreadListener - notifier GrpcNotifier - store store.Store - ready chan bool + notifier GrpcNotifier + store store.Store + ready chan bool } // Creates a new bucketSynchronizer instancelistenerEventHandler @@ -74,28 +73,20 @@ func New( store store.Store, notifier GrpcNotifier, ) *bucketSynchronizer { - // textileThreadListeners := make([]textile.ThreadListener, 0) return &bucketSynchronizer{ folderWatcher: folderWatcher, textileClient: textileClient, fh: nil, th: nil, - // textileThreadListeners: textileThreadListeners, - notifier: notifier, - store: store, - ready: make(chan bool), + notifier: notifier, + store: store, + ready: make(chan bool), } } // Starts the folder watcher and the textile watcher. func (bs *bucketSynchronizer) Start(ctx context.Context) error { - // Disabling this temporarily due to errors - //buckets, err := bs.textileClient.ListBuckets(ctx) - // if err != nil { - // return err - // } - if bs.notifier == nil { log.Printf("using default notifier to start bucket sync") bs.notifier = &defaultNotifier{} @@ -106,24 +97,8 @@ func (bs *bucketSynchronizer) Start(ctx context.Context) error { bs: bs, } - bs.th = &textileHandler{ - notifier: bs.notifier, - bs: bs, - } - - // handlers := make([]textile.EventHandler, 0) - // handlers = append(handlers, bs.th) - - // Disabling this temporarily due to errors - //for range buckets { - // bs.textileThreadListeners = append(bs.textileThreadListeners, textile.NewListener(bs.textileClient, bucket.Slug(), handlers)) - //} - bs.folderWatcher.RegisterHandler(bs.fh) - // TODO: bs.textileThreadListener.RegisterHandler(bs.th) - // (Needs implementation of bs.th) - g, newCtx := errgroup.WithContext(ctx) g.Go(func() error { @@ -131,13 +106,6 @@ func (bs *bucketSynchronizer) Start(ctx context.Context) error { return bs.folderWatcher.Watch(newCtx) }) - // for _, listener := range bs.textileThreadListeners { - // g.Go(func() error { - // log.Debug("Starting textile thread listener in bucketsync") - // return listener.Listen(newCtx) - // }) - // } - // add open files to watcher keys, err := bs.store.KeysWithPrefix(OpenFilesKeyPrefix) if err != nil { @@ -157,9 +125,7 @@ func (bs *bucketSynchronizer) Start(ctx context.Context) error { } } - //go func() { bs.ready <- true - //}() err = g.Wait() @@ -179,9 +145,6 @@ func (bs *bucketSynchronizer) Shutdown() error { log.Debug("shutting down folder watcher in bucketsync") bs.folderWatcher.Close() log.Debug("shutting down textile thread listener in bucketsync") - // for _, listener := range bs.textileThreadListeners { - // listener.Close() - // } close(bs.ready) return nil diff --git a/core/textile/client.go b/core/textile/client.go index 6df9b2c2..25eea769 100644 --- a/core/textile/client.go +++ b/core/textile/client.go @@ -65,6 +65,7 @@ type textileClient struct { sync synchronizer.Synchronizer notifier bucket.Notifier ipfsClient iface.CoreAPI + dbListeners map[string]Listener } // Creates a new Textile Client @@ -93,6 +94,7 @@ func NewClient(store db.Store, kc keychain.Keychain, hubAuth hub.HubAuth, uc Use failedHealthchecks: 0, sync: nil, notifier: nil, + dbListeners: make(map[string]Listener), } } @@ -144,7 +146,18 @@ func (tc *textileClient) initializeSync(ctx context.Context) { } tc.sync = synchronizer.New( - tc.store, tc.GetModel(), tc.kc, tc.hubAuth, tc.hb, tc.ht, tc.netc, tc.cfg, getMirrorBucketFn, getLocalBucketFn, tc.getBucketContext, + tc.store, + tc.GetModel(), + tc.kc, + tc.hubAuth, + tc.hb, + tc.ht, + tc.netc, + tc.cfg, + getMirrorBucketFn, + getLocalBucketFn, + tc.getBucketContext, + tc.addListener, ) tc.notifier = notifier.New(tc.sync) @@ -419,6 +432,8 @@ func (tc *textileClient) Shutdown() error { close(tc.keypairDeleted) close(tc.shuttingDown) + tc.closeListeners() + if err := tc.bucketsClient.Close(); err != nil { return err } @@ -463,6 +478,10 @@ func (tc *textileClient) healthcheck(ctx context.Context) { tc.checkHubConnection(ctx) + if len(tc.dbListeners) == 0 { + tc.initializeListeners(ctx) + } + switch { case tc.isInitialized == false: log.Debug("Textile Client healthcheck... Not initialized yet.") @@ -521,7 +540,7 @@ func (tc *textileClient) RemoveKeys(ctx context.Context) error { } func (tc *textileClient) GetModel() model.Model { - return model.New(tc.store, tc.kc, tc.threads, tc.hubAuth) + return model.New(tc.store, tc.kc, tc.threads, tc.ht, tc.hubAuth, tc.cfg, tc.netc) } func (tc *textileClient) getSecureBucketsClient(baseClient *bucketsClient.Client) *SecureBucketClient { diff --git a/core/textile/event_handler.go b/core/textile/event_handler.go index f8c280b1..961fe511 100644 --- a/core/textile/event_handler.go +++ b/core/textile/event_handler.go @@ -2,6 +2,7 @@ package textile import ( "github.com/FleekHQ/space-daemon/core/textile/bucket" + "github.com/FleekHQ/space-daemon/core/textile/sync" "github.com/FleekHQ/space-daemon/log" tc "github.com/textileio/go-threads/api/client" ) @@ -27,3 +28,28 @@ func (h *defaultListenerHandler) OnRemove(bucketData *bucket.BucketData, listenE func (h *defaultListenerHandler) OnSave(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) { log.Info("Default Listener Handler: OnSave") } + +type restorerListenerHandler struct { + synchronizer sync.Synchronizer +} + +func newRestorerListenerHandler(synchronizer sync.Synchronizer) *restorerListenerHandler { + return &restorerListenerHandler{ + synchronizer: synchronizer, + } +} + +func (h *restorerListenerHandler) OnCreate(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) { + log.Debug("Restorer Listener Handler: OnCreate") + h.synchronizer.NotifyBucketRestore(bucketData.Name) +} + +func (h *restorerListenerHandler) OnRemove(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) { + log.Debug("Restorer Listener Handler: OnRemove") + h.synchronizer.NotifyBucketRestore(bucketData.Name) +} + +func (h *restorerListenerHandler) OnSave(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) { + log.Debug("Restorer Listener Handler: OnSave") + h.synchronizer.NotifyBucketRestore(bucketData.Name) +} diff --git a/core/textile/listener.go b/core/textile/listener.go new file mode 100644 index 00000000..05a47674 --- /dev/null +++ b/core/textile/listener.go @@ -0,0 +1,138 @@ +package textile + +import ( + "context" + "errors" + + "github.com/FleekHQ/space-daemon/core/textile/utils" + "github.com/textileio/go-threads/api/client" + threadsClient "github.com/textileio/go-threads/api/client" +) + +func (tc *textileClient) Listen(ctx context.Context, dbID, threadName string) (<-chan threadsClient.ListenEvent, error) { + db, err := utils.ParseDbIDFromString(dbID) + if err != nil { + return nil, err + } + + newCtx, err := utils.GetThreadContext(ctx, "", *db, true, tc.kc, tc.hubAuth, tc.ht) + if err != nil { + return nil, err + } + + return tc.ht.Listen(newCtx, *db, nil) +} + +func (tc *textileClient) addListener(ctx context.Context, bucketSlug string) error { + if err := tc.requiresHubConnection(); err != nil { + return err + } + handler := newRestorerListenerHandler(tc.sync) + handlers := []EventHandler{handler} + listener := NewListener(tc, bucketSlug, handlers) + tc.dbListeners[bucketSlug] = listener + + go func() { + err := listener.Listen(ctx) + if err != nil { + // Remove element from map as it's not listening anymore + delete(tc.dbListeners, bucketSlug) + } + }() + + return nil +} + +func (tc *textileClient) initializeListeners(ctx context.Context) error { + if err := tc.requiresHubConnection(); err != nil { + return err + } + + tc.closeListeners() + + buckets, err := tc.listBuckets(ctx) + if err != nil { + return err + } + + for _, bucket := range buckets { + tc.addListener(ctx, bucket.Slug()) + } + + return nil +} + +func (tc *textileClient) closeListeners() { + for key, listener := range tc.dbListeners { + listener.Close() + delete(tc.dbListeners, key) + } + +} + +type listener struct { + client Client + bucketSlug string + handlers []EventHandler + shutdown chan (bool) +} + +func NewListener(client Client, bucketSlug string, handlers []EventHandler) *listener { + return &listener{ + client: client, + bucketSlug: bucketSlug, + handlers: handlers, + shutdown: make(chan (bool)), + } +} + +func (l *listener) Listen(ctx context.Context) error { + bucketSchema, err := l.client.GetModel().FindBucket(ctx, l.bucketSlug) + if bucketSchema == nil || bucketSchema.RemoteDbID == "" { + return errors.New("Bucket does not have a linked mirror bucket") + } + + bucket, err := l.client.GetBucket(ctx, l.bucketSlug, nil) + if err != nil { + return err + } + bucketData := bucket.GetData() + + if err != nil { + return err + } + + eventChan, err := l.client.Listen(ctx, bucketSchema.RemoteDbID, bucketSchema.RemoteBucketSlug) + if err != nil { + return err + } + +Loop: + for { + select { + case ev := <-eventChan: + if ev.Err != nil { + return ev.Err + } + + for _, handler := range l.handlers { + switch ev.Action.Type { + case client.ActionCreate: + handler.OnCreate(&bucketData, &ev) + case client.ActionSave: + handler.OnSave(&bucketData, &ev) + case client.ActionDelete: + handler.OnRemove(&bucketData, &ev) + } + } + case <-l.shutdown: + break Loop + } + } + + return nil +} + +func (l *listener) Close() { + l.shutdown <- true +} diff --git a/core/textile/model/model.go b/core/textile/model/model.go index 7bb63bee..bbdb01ef 100644 --- a/core/textile/model/model.go +++ b/core/textile/model/model.go @@ -3,13 +3,17 @@ package model import ( "context" + "github.com/FleekHQ/space-daemon/config" "github.com/FleekHQ/space-daemon/core/keychain" "github.com/FleekHQ/space-daemon/core/space/domain" "github.com/FleekHQ/space-daemon/core/store" "github.com/FleekHQ/space-daemon/core/textile/hub" "github.com/FleekHQ/space-daemon/core/textile/utils" + "github.com/FleekHQ/space-daemon/log" threadsClient "github.com/textileio/go-threads/api/client" "github.com/textileio/go-threads/core/thread" + nc "github.com/textileio/go-threads/net/api/client" + "github.com/textileio/textile/v2/cmd" ) const metaThreadName = "metathreadV1" @@ -19,6 +23,9 @@ type model struct { kc keychain.Keychain threads *threadsClient.Client hubAuth hub.HubAuth + cfg config.Config + netc *nc.Client + ht *threadsClient.Client } type Model interface { @@ -56,17 +63,49 @@ type Model interface { FindReceivedFilesByIds(ctx context.Context, ids []string) ([]*ReceivedFileSchema, error) } -func New(st store.Store, kc keychain.Keychain, threads *threadsClient.Client, hubAuth hub.HubAuth) *model { +func New(st store.Store, kc keychain.Keychain, threads *threadsClient.Client, ht *threadsClient.Client, hubAuth hub.HubAuth, cfg config.Config, netc *nc.Client) *model { return &model{ st: st, kc: kc, threads: threads, hubAuth: hubAuth, + cfg: cfg, + netc: netc, + ht: ht, } } func (m *model) findOrCreateMetaThreadID(ctx context.Context) (*thread.ID, error) { - return utils.FindOrCreateDeterministicThreadID(ctx, utils.MetathreadThreadVariant, metaThreadName, m.kc, m.st, m.threads) + hubmaStr := m.cfg.GetString(config.TextileHubMa, "") + hubma := cmd.AddrFromStr(hubmaStr) + key, err := m.kc.GetManagedThreadKey(metaThreadName) + if err != nil { + return nil, err + } + + threadID, err := utils.NewDeterministicThreadID(m.kc, utils.MetathreadThreadVariant) + if err != nil { + return nil, err + } + hubmaWithThreadID := hubmaStr + "/thread/" + threadID.String() + + // If we are here, then there's no replicated metathread yet + if _, err := utils.FindOrCreateDeterministicThreadID(ctx, utils.MetathreadThreadVariant, metaThreadName, m.kc, m.st, m.threads); err != nil { + return nil, err + } + + // Try to join remote db if it was already replicated + err = m.threads.NewDBFromAddr(ctx, cmd.AddrFromStr(hubmaWithThreadID), key) + if err == nil || err.Error() == "rpc error: code = Unknown desc = db already exists" { + return &threadID, nil + } + + if _, err := m.netc.AddReplicator(ctx, threadID, hubma); err != nil { + log.Error("error while replicating metathread", err) + // Not returning error in case the user is offline (it should still work using local threads) + } + + return &threadID, nil } func (m *model) getMetaThreadContext(ctx context.Context) (context.Context, *thread.ID, error) { diff --git a/core/textile/secure_bucket_client.go b/core/textile/secure_bucket_client.go index 030bc2f2..e0c0bf2f 100644 --- a/core/textile/secure_bucket_client.go +++ b/core/textile/secure_bucket_client.go @@ -25,7 +25,6 @@ import ( "github.com/ipfs/interface-go-ipfs-core/options" "github.com/ipfs/interface-go-ipfs-core/path" threadsClient "github.com/textileio/go-threads/api/client" - "github.com/textileio/go-threads/core/thread" bc "github.com/textileio/textile/v2/api/buckets/client" bucketsClient "github.com/textileio/textile/v2/api/buckets/client" bucketspb "github.com/textileio/textile/v2/api/buckets/pb" @@ -470,17 +469,3 @@ func (s *SecureBucketClient) pullFileFromDHT(ctx context.Context, key, encPath s } const cacheBucketThreadName = "cache_bucket" - -func (s *SecureBucketClient) getCacheBucketCtx(ctx context.Context) (context.Context, *thread.ID, error) { - dbID, err := utils.FindOrCreateDeterministicThreadID(ctx, utils.CacheBucketVariant, cacheBucketThreadName, s.kc, s.st, s.threads) - if err != nil { - return nil, nil, err - } - - cacheCtx, err := utils.GetThreadContext(ctx, cacheBucketThreadName, *dbID, false, s.kc, nil, s.threads) - if err != nil { - return nil, nil, err - } - - return cacheCtx, dbID, nil -} diff --git a/core/textile/sync/mirror.go b/core/textile/sync/mirror.go index 291c3147..6c13834c 100644 --- a/core/textile/sync/mirror.go +++ b/core/textile/sync/mirror.go @@ -136,7 +136,10 @@ func (s *synchronizer) createMirrorThread(ctx context.Context, slug string) (*th return nil, err } - dbID := thread.NewIDV1(thread.Raw, 32) + dbID, err := utils.NewDeterministicThreadID(s.kc, utils.MirrorBucketVariantGen(slug)) + if err != nil { + return nil, err + } managedKey, err := s.kc.GetManagedThreadKey(mirrorThreadKeyName + "_" + slug) if err != nil { @@ -144,6 +147,13 @@ func (s *synchronizer) createMirrorThread(ctx context.Context, slug string) (*th return nil, err } + // If dbID is not found, GetDBInfo returns "thread not found" error + _, err = s.hubThreads.GetDBInfo(ctx, dbID) + if err == nil { + log.Debug("createMirrorThread: Db already exists") + return &dbID, nil + } + log.Debug("createMirrorThread: Creating Thread DB for bucket at db " + dbID.String()) if err := s.hubThreads.NewDB(ctx, dbID, db.WithNewManagedThreadKey(managedKey)); err != nil { return nil, err diff --git a/core/textile/sync/queue.go b/core/textile/sync/queue.go index 5d8c1b96..62eef525 100644 --- a/core/textile/sync/queue.go +++ b/core/textile/sync/queue.go @@ -98,7 +98,13 @@ func (s *synchronizer) restoreQueue() error { } func (s *synchronizer) isTaskEnqueued(task *Task) bool { - if s.queueHashMap[task.ID] != nil { + existingTask := s.queueHashMap[task.ID] + if existingTask == nil { + return false + } + + isPending := existingTask.State == taskQueued || existingTask.State == taskPending + if isPending { return true } diff --git a/core/textile/sync/sync_test.go b/core/textile/sync/sync_test.go index 34db5b44..f4859bf3 100644 --- a/core/textile/sync/sync_test.go +++ b/core/textile/sync/sync_test.go @@ -47,11 +47,15 @@ func initSync(t *testing.T) sync.Synchronizer { return mockClient.GetBucket(ctx, slug, mockRemoteFile) } + addListenerFn := func(ctx context.Context, slug string) error { + return nil + } + getBucketCtxFn := func(ctx context.Context, sDbID string, bucketSlug string, ishub bool, enckey []byte) (context.Context, *thread.ID, error) { return ctx, nil, nil } - s := sync.New(mockStore, mockModel, mockKeychain, mockHubAuth, nil, nil, nil, mockCfg, getMirrorBucketFn, getLocalBucketFn, getBucketCtxFn) + s := sync.New(mockStore, mockModel, mockKeychain, mockHubAuth, nil, nil, nil, mockCfg, getMirrorBucketFn, getLocalBucketFn, getBucketCtxFn, addListenerFn) return s } diff --git a/core/textile/sync/synchronizer.go b/core/textile/sync/synchronizer.go index 4b46b16e..8d4aa9b2 100644 --- a/core/textile/sync/synchronizer.go +++ b/core/textile/sync/synchronizer.go @@ -25,29 +25,31 @@ import ( type GetMirrorBucketFn func(ctx context.Context, slug string) (bucket.BucketInterface, error) type GetBucketFn func(ctx context.Context, slug string) (bucket.BucketInterface, error) type GetBucketCtxFn func(ctx context.Context, sDbID string, bucketSlug string, ishub bool, enckey []byte) (context.Context, *thread.ID, error) +type AddBucketListenerFn func(ctx context.Context, bucketSlug string) error const maxParallelTasks = 16 type synchronizer struct { - taskQueue *list.List - filePinningQueue *list.List - queueHashMap map[string]*Task - st store.Store - model model.Model - syncNeeded chan (bool) - shuttingDownMap map[*list.List]chan (bool) - queueMutexMap map[*list.List]*sync.Mutex - getMirrorBucket GetMirrorBucketFn - getBucket GetBucketFn - getBucketCtx GetBucketCtxFn - kc keychain.Keychain - hubAuth hub.HubAuth - hubBuckets *bucketsClient.Client - hubThreads *threadsClient.Client - cfg config.Config - netc *nc.Client - queueWg *sync.WaitGroup - eventNotifier EventNotifier + taskQueue *list.List + filePinningQueue *list.List + queueHashMap map[string]*Task + st store.Store + model model.Model + syncNeeded chan (bool) + shuttingDownMap map[*list.List]chan (bool) + queueMutexMap map[*list.List]*sync.Mutex + getMirrorBucket GetMirrorBucketFn + getBucket GetBucketFn + getBucketCtx GetBucketCtxFn + addBucketListener AddBucketListenerFn + kc keychain.Keychain + hubAuth hub.HubAuth + hubBuckets *bucketsClient.Client + hubThreads *threadsClient.Client + cfg config.Config + netc *nc.Client + queueWg *sync.WaitGroup + eventNotifier EventNotifier } // Creates a new Synchronizer @@ -63,6 +65,7 @@ func New( getMirrorBucket GetMirrorBucketFn, getBucket GetBucketFn, getBucketCtx GetBucketCtxFn, + addBucketListenerFn AddBucketListenerFn, ) *synchronizer { taskQueue := list.New() filePinningQueue := list.New() @@ -78,24 +81,25 @@ func New( queueWg := &sync.WaitGroup{} return &synchronizer{ - taskQueue: taskQueue, - filePinningQueue: filePinningQueue, - queueHashMap: make(map[string]*Task), - st: st, - model: model, - syncNeeded: make(chan bool), - shuttingDownMap: shuttingDownMap, - queueMutexMap: queueMutexMap, - getMirrorBucket: getMirrorBucket, - getBucket: getBucket, - getBucketCtx: getBucketCtx, - kc: kc, - hubAuth: hubAuth, - hubBuckets: hb, - hubThreads: ht, - cfg: cfg, - netc: netc, - queueWg: queueWg, + taskQueue: taskQueue, + filePinningQueue: filePinningQueue, + queueHashMap: make(map[string]*Task), + st: st, + model: model, + syncNeeded: make(chan bool), + shuttingDownMap: shuttingDownMap, + queueMutexMap: queueMutexMap, + getMirrorBucket: getMirrorBucket, + getBucket: getBucket, + getBucketCtx: getBucketCtx, + addBucketListener: addBucketListenerFn, + kc: kc, + hubAuth: hubAuth, + hubBuckets: hb, + hubThreads: ht, + cfg: cfg, + netc: netc, + queueWg: queueWg, } } diff --git a/core/textile/sync/task-executors.go b/core/textile/sync/task-executors.go index 2f6d1529..c232614e 100644 --- a/core/textile/sync/task-executors.go +++ b/core/textile/sync/task-executors.go @@ -120,9 +120,16 @@ func (s *synchronizer) processCreateBucket(ctx context.Context, task *Task) erro mirror, err := s.createMirrorBucket(ctx, bucket, enckey) if mirror != nil { _, err = s.model.CreateMirrorBucket(ctx, bucket, mirror) + if err != nil { + return err + } } - return err + if err := s.addBucketListener(ctx, bucket); err != nil { + return err + } + + return nil } func (s *synchronizer) processBucketBackupOn(ctx context.Context, task *Task) error { diff --git a/core/textile/textile.go b/core/textile/textile.go index 099b4e61..432c79d7 100644 --- a/core/textile/textile.go +++ b/core/textile/textile.go @@ -72,9 +72,15 @@ type Client interface { GetPublicShareBucket(ctx context.Context) (Bucket, error) DownloadPublicGatewayItem(ctx context.Context, cid cid.Cid) (io.ReadCloser, error) GetFailedHealthchecks() int + Listen(ctx context.Context, dbID, threadName string) (<-chan threadsClient.ListenEvent, error) } type Buckd interface { Stop() error Start(ctx context.Context) error } + +type Listener interface { + Listen(context.Context) error + Close() +} diff --git a/core/textile/utils/utils.go b/core/textile/utils/utils.go index a762bde8..49f30c0f 100644 --- a/core/textile/utils/utils.go +++ b/core/textile/utils/utils.go @@ -43,9 +43,13 @@ type DeterministicThreadVariant string const ( MetathreadThreadVariant DeterministicThreadVariant = "metathread" - CacheBucketVariant DeterministicThreadVariant = "cache_bucket" + MirrorBucketVariant DeterministicThreadVariant = "mirror_bucket" ) +func MirrorBucketVariantGen(mirrorBucketSlug string) DeterministicThreadVariant { + return DeterministicThreadVariant(string(MirrorBucketVariant) + ":" + mirrorBucketSlug) +} + func NewDeterministicThreadID(kc keychain.Keychain, threadVariant DeterministicThreadVariant) (thread.ID, error) { size := 32 variant := thread.Raw diff --git a/go.mod b/go.mod index d26be7b6..9c92e7da 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/textileio/dcrypto v0.0.1 github.com/textileio/go-threads v1.0.0 + github.com/textileio/textile v1.0.14 github.com/textileio/textile/v2 v2.1.0 github.com/tyler-smith/go-bip39 v1.0.2 go.etcd.io/etcd v3.3.22+incompatible diff --git a/mocks/Client.go b/mocks/Client.go index 53152020..0516f2cb 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.0.3. DO NOT EDIT. +// Code generated by mockery v2.0.0. DO NOT EDIT. package mocks @@ -461,6 +461,29 @@ func (_m *Client) ListBuckets(ctx context.Context) ([]textile.Bucket, error) { return r0, r1 } +// Listen provides a mock function with given fields: ctx, dbID, threadName +func (_m *Client) Listen(ctx context.Context, dbID string, threadName string) (<-chan client.ListenEvent, error) { + ret := _m.Called(ctx, dbID, threadName) + + var r0 <-chan client.ListenEvent + if rf, ok := ret.Get(0).(func(context.Context, string, string) <-chan client.ListenEvent); ok { + r0 = rf(ctx, dbID, threadName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan client.ListenEvent) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, dbID, threadName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // RejectSharedFilesInvitation provides a mock function with given fields: ctx, invitation func (_m *Client) RejectSharedFilesInvitation(ctx context.Context, invitation domain.Invitation) (domain.Invitation, error) { ret := _m.Called(ctx, invitation)