Skip to content

Commit

Permalink
Listen for textile client updates (#220)
Browse files Browse the repository at this point in the history
* WIP: Textile listener

* WIP: textile client listener connecting but not receiving

* Use replicator for metathread

* clean up

* Merge with develop and connect with restorer

* Fix segfault on metathread creation

* Prevent segfault

* Allow enqueing repeating tasks when the old one already completed

* Fix tests
  • Loading branch information
dmerrill6 authored Oct 26, 2020
1 parent a98a1de commit 25bdf67
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 112 deletions.
13 changes: 7 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,18 @@ func (a *App) Start(ctx context.Context) error {

textileClient.AttachMailboxNotifier(srv)
textileClient.AttachSynchronizerNotifier(srv)
err = a.RunAsync("BucketSync", bucketSync, func() error {
bucketSync.RegisterNotifier(srv)
return bucketSync.Start(ctx)

// start the gRPC server
err = a.RunAsync("gRPCServer", srv, func() error {
return srv.Start(ctx)
})
if err != nil {
return err
}

// start the gRPC server
err = a.RunAsync("gRPCServer", srv, func() error {
return srv.Start(ctx)
err = a.RunAsync("BucketSync", bucketSync, func() error {
bucketSync.RegisterNotifier(srv)
return bucketSync.Start(ctx)
})
if err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion core/keychain/keychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"crypto/ed25519"
"crypto/sha512"
"encoding/hex"
"golang.org/x/crypto/pbkdf2"
"os"
"path"
"strings"

"golang.org/x/crypto/pbkdf2"

"errors"

"github.com/99designs/keyring"
Expand Down Expand Up @@ -372,6 +373,12 @@ func (kc *keychain) retrieveKeyPair() (privKey []byte, mnemonic string, err erro
}

func (kc *keychain) GetManagedThreadKey(threadKeyName string) (thread.Key, error) {
// Check if there's a key stored before continuing
_, err := kc.GetStoredPublicKey()
if err != nil {
return thread.Key{}, err
}

size := 32

priv, _, err := kc.GetStoredKeyPairInLibP2PFormat()
Expand Down
49 changes: 6 additions & 43 deletions core/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ type bucketSynchronizer struct {
textileClient textile.Client
fh *watcherHandler
th *textileHandler
// textileThreadListeners []textile.ThreadListener
notifier GrpcNotifier
store store.Store
ready chan bool
notifier GrpcNotifier
store store.Store
ready chan bool
}

// Creates a new bucketSynchronizer instancelistenerEventHandler
Expand All @@ -74,28 +73,20 @@ func New(
store store.Store,
notifier GrpcNotifier,
) *bucketSynchronizer {
// textileThreadListeners := make([]textile.ThreadListener, 0)

return &bucketSynchronizer{
folderWatcher: folderWatcher,
textileClient: textileClient,
fh: nil,
th: nil,
// textileThreadListeners: textileThreadListeners,
notifier: notifier,
store: store,
ready: make(chan bool),
notifier: notifier,
store: store,
ready: make(chan bool),
}
}

// Starts the folder watcher and the textile watcher.
func (bs *bucketSynchronizer) Start(ctx context.Context) error {
// Disabling this temporarily due to errors
//buckets, err := bs.textileClient.ListBuckets(ctx)
// if err != nil {
// return err
// }

if bs.notifier == nil {
log.Printf("using default notifier to start bucket sync")
bs.notifier = &defaultNotifier{}
Expand All @@ -106,38 +97,15 @@ func (bs *bucketSynchronizer) Start(ctx context.Context) error {
bs: bs,
}

bs.th = &textileHandler{
notifier: bs.notifier,
bs: bs,
}

// handlers := make([]textile.EventHandler, 0)
// handlers = append(handlers, bs.th)

// Disabling this temporarily due to errors
//for range buckets {
// bs.textileThreadListeners = append(bs.textileThreadListeners, textile.NewListener(bs.textileClient, bucket.Slug(), handlers))
//}

bs.folderWatcher.RegisterHandler(bs.fh)

// TODO: bs.textileThreadListener.RegisterHandler(bs.th)
// (Needs implementation of bs.th)

g, newCtx := errgroup.WithContext(ctx)

g.Go(func() error {
log.Debug("Starting watcher in bucketsync")
return bs.folderWatcher.Watch(newCtx)
})

// for _, listener := range bs.textileThreadListeners {
// g.Go(func() error {
// log.Debug("Starting textile thread listener in bucketsync")
// return listener.Listen(newCtx)
// })
// }

// add open files to watcher
keys, err := bs.store.KeysWithPrefix(OpenFilesKeyPrefix)
if err != nil {
Expand All @@ -157,9 +125,7 @@ func (bs *bucketSynchronizer) Start(ctx context.Context) error {
}
}

//go func() {
bs.ready <- true
//}()

err = g.Wait()

Expand All @@ -179,9 +145,6 @@ func (bs *bucketSynchronizer) Shutdown() error {
log.Debug("shutting down folder watcher in bucketsync")
bs.folderWatcher.Close()
log.Debug("shutting down textile thread listener in bucketsync")
// for _, listener := range bs.textileThreadListeners {
// listener.Close()
// }

close(bs.ready)
return nil
Expand Down
23 changes: 21 additions & 2 deletions core/textile/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type textileClient struct {
sync synchronizer.Synchronizer
notifier bucket.Notifier
ipfsClient iface.CoreAPI
dbListeners map[string]Listener
}

// Creates a new Textile Client
Expand Down Expand Up @@ -93,6 +94,7 @@ func NewClient(store db.Store, kc keychain.Keychain, hubAuth hub.HubAuth, uc Use
failedHealthchecks: 0,
sync: nil,
notifier: nil,
dbListeners: make(map[string]Listener),
}
}

Expand Down Expand Up @@ -144,7 +146,18 @@ func (tc *textileClient) initializeSync(ctx context.Context) {
}

tc.sync = synchronizer.New(
tc.store, tc.GetModel(), tc.kc, tc.hubAuth, tc.hb, tc.ht, tc.netc, tc.cfg, getMirrorBucketFn, getLocalBucketFn, tc.getBucketContext,
tc.store,
tc.GetModel(),
tc.kc,
tc.hubAuth,
tc.hb,
tc.ht,
tc.netc,
tc.cfg,
getMirrorBucketFn,
getLocalBucketFn,
tc.getBucketContext,
tc.addListener,
)

tc.notifier = notifier.New(tc.sync)
Expand Down Expand Up @@ -419,6 +432,8 @@ func (tc *textileClient) Shutdown() error {
close(tc.keypairDeleted)
close(tc.shuttingDown)

tc.closeListeners()

if err := tc.bucketsClient.Close(); err != nil {
return err
}
Expand Down Expand Up @@ -463,6 +478,10 @@ func (tc *textileClient) healthcheck(ctx context.Context) {

tc.checkHubConnection(ctx)

if len(tc.dbListeners) == 0 {
tc.initializeListeners(ctx)
}

switch {
case tc.isInitialized == false:
log.Debug("Textile Client healthcheck... Not initialized yet.")
Expand Down Expand Up @@ -521,7 +540,7 @@ func (tc *textileClient) RemoveKeys(ctx context.Context) error {
}

func (tc *textileClient) GetModel() model.Model {
return model.New(tc.store, tc.kc, tc.threads, tc.hubAuth)
return model.New(tc.store, tc.kc, tc.threads, tc.ht, tc.hubAuth, tc.cfg, tc.netc)
}

func (tc *textileClient) getSecureBucketsClient(baseClient *bucketsClient.Client) *SecureBucketClient {
Expand Down
26 changes: 26 additions & 0 deletions core/textile/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package textile

import (
"github.com/FleekHQ/space-daemon/core/textile/bucket"
"github.com/FleekHQ/space-daemon/core/textile/sync"
"github.com/FleekHQ/space-daemon/log"
tc "github.com/textileio/go-threads/api/client"
)
Expand All @@ -27,3 +28,28 @@ func (h *defaultListenerHandler) OnRemove(bucketData *bucket.BucketData, listenE
func (h *defaultListenerHandler) OnSave(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) {
log.Info("Default Listener Handler: OnSave")
}

type restorerListenerHandler struct {
synchronizer sync.Synchronizer
}

func newRestorerListenerHandler(synchronizer sync.Synchronizer) *restorerListenerHandler {
return &restorerListenerHandler{
synchronizer: synchronizer,
}
}

func (h *restorerListenerHandler) OnCreate(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) {
log.Debug("Restorer Listener Handler: OnCreate")
h.synchronizer.NotifyBucketRestore(bucketData.Name)
}

func (h *restorerListenerHandler) OnRemove(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) {
log.Debug("Restorer Listener Handler: OnRemove")
h.synchronizer.NotifyBucketRestore(bucketData.Name)
}

func (h *restorerListenerHandler) OnSave(bucketData *bucket.BucketData, listenEvent *tc.ListenEvent) {
log.Debug("Restorer Listener Handler: OnSave")
h.synchronizer.NotifyBucketRestore(bucketData.Name)
}
138 changes: 138 additions & 0 deletions core/textile/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package textile

import (
"context"
"errors"

"github.com/FleekHQ/space-daemon/core/textile/utils"
"github.com/textileio/go-threads/api/client"
threadsClient "github.com/textileio/go-threads/api/client"
)

func (tc *textileClient) Listen(ctx context.Context, dbID, threadName string) (<-chan threadsClient.ListenEvent, error) {
db, err := utils.ParseDbIDFromString(dbID)
if err != nil {
return nil, err
}

newCtx, err := utils.GetThreadContext(ctx, "", *db, true, tc.kc, tc.hubAuth, tc.ht)
if err != nil {
return nil, err
}

return tc.ht.Listen(newCtx, *db, nil)
}

func (tc *textileClient) addListener(ctx context.Context, bucketSlug string) error {
if err := tc.requiresHubConnection(); err != nil {
return err
}
handler := newRestorerListenerHandler(tc.sync)
handlers := []EventHandler{handler}
listener := NewListener(tc, bucketSlug, handlers)
tc.dbListeners[bucketSlug] = listener

go func() {
err := listener.Listen(ctx)
if err != nil {
// Remove element from map as it's not listening anymore
delete(tc.dbListeners, bucketSlug)
}
}()

return nil
}

func (tc *textileClient) initializeListeners(ctx context.Context) error {
if err := tc.requiresHubConnection(); err != nil {
return err
}

tc.closeListeners()

buckets, err := tc.listBuckets(ctx)
if err != nil {
return err
}

for _, bucket := range buckets {
tc.addListener(ctx, bucket.Slug())
}

return nil
}

func (tc *textileClient) closeListeners() {
for key, listener := range tc.dbListeners {
listener.Close()
delete(tc.dbListeners, key)
}

}

type listener struct {
client Client
bucketSlug string
handlers []EventHandler
shutdown chan (bool)
}

func NewListener(client Client, bucketSlug string, handlers []EventHandler) *listener {
return &listener{
client: client,
bucketSlug: bucketSlug,
handlers: handlers,
shutdown: make(chan (bool)),
}
}

func (l *listener) Listen(ctx context.Context) error {
bucketSchema, err := l.client.GetModel().FindBucket(ctx, l.bucketSlug)
if bucketSchema == nil || bucketSchema.RemoteDbID == "" {
return errors.New("Bucket does not have a linked mirror bucket")
}

bucket, err := l.client.GetBucket(ctx, l.bucketSlug, nil)
if err != nil {
return err
}
bucketData := bucket.GetData()

if err != nil {
return err
}

eventChan, err := l.client.Listen(ctx, bucketSchema.RemoteDbID, bucketSchema.RemoteBucketSlug)
if err != nil {
return err
}

Loop:
for {
select {
case ev := <-eventChan:
if ev.Err != nil {
return ev.Err
}

for _, handler := range l.handlers {
switch ev.Action.Type {
case client.ActionCreate:
handler.OnCreate(&bucketData, &ev)
case client.ActionSave:
handler.OnSave(&bucketData, &ev)
case client.ActionDelete:
handler.OnRemove(&bucketData, &ev)
}
}
case <-l.shutdown:
break Loop
}
}

return nil
}

func (l *listener) Close() {
l.shutdown <- true
}
Loading

0 comments on commit 25bdf67

Please sign in to comment.