Skip to content

Commit

Permalink
Start local buckets before init txl client, point txl client to local…
Browse files Browse the repository at this point in the history
… buckets, replace context creation for list and create buckets.
  • Loading branch information
jsonsivar committed Jun 22, 2020
1 parent fc47ef0 commit 82a8860
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 65 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
space.json
bin

debug/
debug/

.build-envs
32 changes: 9 additions & 23 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ func Start(ctx context.Context, cfg config.Config, env env.SpaceEnv) {
return
}

// setup local buckets
buckd := textile.NewBuckd()
g.Go(func() error {
err := buckd.Start()
return err
})
<-buckd.WaitForReady()

// setup textile client
bootstrapReady := make(chan bool)
textileClient := textile.NewClient(store)
g.Go(func() error {
Expand All @@ -75,24 +84,6 @@ func Start(ctx context.Context, cfg config.Config, env env.SpaceEnv) {
<-textileClient.WaitForReady()
<-bootstrapReady

// setup local threads

// threadsd := textile.NewThreadsd()
// g.Go(func() error {
// err := threadsd.Start()
// return err
// })
// <-threadsd.WaitForReady()

// setup local buckd

buckd := textile.NewBuckd()
g.Go(func() error {
err := buckd.Start()
return err
})
<-buckd.WaitForReady()

// watcher is started inside bucket sync
sync := sync.New(watcher, textileClient, store, nil)

Expand Down Expand Up @@ -181,11 +172,6 @@ func Start(ctx context.Context, cfg config.Config, env env.SpaceEnv) {
store.Close()
}

// if threadsd != nil {
// log.Println("shutdown Threadsd node")
// threadsd.Stop()
// }

if buckd != nil {
log.Println("shutdown Buckd node")
buckd.Stop()
Expand Down
32 changes: 24 additions & 8 deletions core/textile/buckd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ import (
"context"
"fmt"
"os"
"os/user"

"github.com/FleekHQ/space-poc/log"
"github.com/textileio/textile/cmd"
"github.com/textileio/textile/core"
)

var IpfsAddr string
var MongoUsr string
var MongoPw string
var MongoHost string

type TextileBuckd struct {
textile *core.Textile
isRunning bool
ready chan bool
}
Expand All @@ -22,23 +29,25 @@ func NewBuckd() Buckd {
}

func (tb *TextileBuckd) Start() error {
// TODO: get value from build time instead
IpfsAddr = os.Getenv("IPFS_ADDR")
MongoUsr = os.Getenv("MONGO_USR")
MongoPw = os.Getenv("MONGO_PW")
MongoHost = os.Getenv("MONGO_HOST")

addrAPI := cmd.AddrFromStr("/ip4/127.0.0.1/tcp/3006")
addrAPIProxy := cmd.AddrFromStr("/ip4/127.0.0.1/tcp/3007")
addrThreadsHost := cmd.AddrFromStr("/ip4/0.0.0.0/tcp/4006")
// TODO: replace with local blockstore
// TODO: get value from build time
addrIpfsAPI := cmd.AddrFromStr("/ip4/34.223.251.246/tcp/5001")
addrIpfsAPI := cmd.AddrFromStr(IpfsAddr)

addrGatewayHost := cmd.AddrFromStr("/ip4/127.0.0.1/tcp/8006")
addrGatewayURL := "http://127.0.0.1:8006"

// PLACEHOLDER: filecoin settings

// TODO: replace with embedded store
// TODO: get value from build time
pw := os.Getenv("MONGOPW")
addrMongoURI := "mongodb+srv://root:" + pw + "@textile-bucksd-dev-eg4f5.mongodb.net"
// <dbname>?retryWrites=true&w=majority"
addrMongoURI := "mongodb+srv://" + MongoUsr + ":" + MongoPw + "@" + MongoHost

// TODO: setup logging
// if logFile != "" {
Expand All @@ -51,10 +60,15 @@ func (tb *TextileBuckd) Start() error {
// replicate). it will give back a couple just dont use
// local one

usr, err := user.Current()
if err != nil {
log.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
textile, err := core.NewTextile(ctx, core.Config{
RepoPath: "~/.buckd/repo",
RepoPath: usr.HomeDir + "/.buckd/repo",
AddrAPI: addrAPI,
AddrAPIProxy: addrAPIProxy,
AddrThreadsHost: addrThreadsHost,
Expand All @@ -73,11 +87,12 @@ func (tb *TextileBuckd) Start() error {
if err != nil {
log.Fatal(err)
}
defer textile.Close()

textile.Bootstrap()

fmt.Println("Welcome to Buckets!")
fmt.Println("Your peer ID is " + textile.HostID().String())
tb.textile = textile
tb.isRunning = true
tb.ready <- true
return nil
Expand All @@ -89,6 +104,7 @@ func (tb *TextileBuckd) WaitForReady() chan bool {

func (tb *TextileBuckd) Stop() error {
tb.isRunning = false
tb.textile.Close()
close(tb.ready)
// TODO: what else
return nil
Expand Down
7 changes: 4 additions & 3 deletions core/textile/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package textile

import (
"context"
"io"
"sync"

"github.com/ipfs/interface-go-ipfs-core/path"
"github.com/textileio/go-threads/core/thread"
bucketsClient "github.com/textileio/textile/api/buckets/client"
bucketsproto "github.com/textileio/textile/api/buckets/pb"
"io"
"sync"
)

type BucketsClient interface {
Expand Down Expand Up @@ -55,5 +56,5 @@ func (b *bucket) GetData() BucketData {
}

func (b *bucket) GetContext(ctx context.Context) (context.Context, *thread.ID, error) {
return b.client.GetBucketContext(ctx, b.Slug())
return b.client.GetLocalBucketContext(ctx, b.Slug())
}
33 changes: 11 additions & 22 deletions core/textile/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"encoding/hex"
"errors"
"github.com/FleekHQ/space-poc/config"
"github.com/libp2p/go-libp2p-core/crypto"
"os"
"sync"
"time"

"github.com/FleekHQ/space-poc/config"
"github.com/libp2p/go-libp2p-core/crypto"

"github.com/FleekHQ/space-poc/core/keychain"
db "github.com/FleekHQ/space-poc/core/store"
"github.com/FleekHQ/space-poc/log"
Expand All @@ -28,8 +29,8 @@ type textileClient struct {
isRunning bool
Ready chan bool

bucketsLock sync.RWMutex
buckets map[string]*bucket
bucketsLock sync.RWMutex
buckets map[string]*bucket
}

func (tc *textileClient) WaitForReady() chan bool {
Expand Down Expand Up @@ -162,29 +163,18 @@ func (tc *textileClient) start(cfg config.Config) error {
var threads *threadsClient.Client
var buckets *bucketsClient.Client

finalHubTarget := hubTarget
finalThreadsTarget := threadsTarget

hubTargetFromCfg := cfg.GetString(config.TextileHubTarget, "")
threadsTargetFromCfg := cfg.GetString(config.TextileThreadsTarget, "")
// by default it goes to local threads now
host := "127.0.0.1:3006"

if hubTargetFromCfg != "" {
finalHubTarget = hubTargetFromCfg
}

if threadsTargetFromCfg != "" {
finalThreadsTarget = threadsTargetFromCfg
}

log.Debug("Creating buckets client in " + finalHubTarget)
if b, err := bucketsClient.NewClient(finalHubTarget, opts...); err != nil {
log.Debug("Creating buckets client in " + host)
if b, err := bucketsClient.NewClient(host, opts...); err != nil {
cmd.Fatal(err)
} else {
buckets = b
}

log.Debug("Creating threads client in " + finalThreadsTarget)
if t, err := threadsClient.NewClient(finalThreadsTarget, opts...); err != nil {
log.Debug("Creating threads client in " + host)
if t, err := threadsClient.NewClient(host, opts...); err != nil {
cmd.Fatal(err)
} else {
threads = t
Expand Down Expand Up @@ -244,4 +234,3 @@ func (tc *textileClient) StartAndBootstrap(ctx context.Context, cfg config.Confi
log.Debug("Textile Client initialized successfully")
return nil
}

38 changes: 30 additions & 8 deletions core/textile/client_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

"github.com/FleekHQ/space-poc/core/keychain"
"github.com/FleekHQ/space-poc/log"
"github.com/libp2p/go-libp2p-core/crypto"
Expand All @@ -20,7 +21,7 @@ func NotFound(slug string) error {
bucket concurrent methods are helpers that try to keep
track of a set of the buckets we have, the goal is for all buckets to be singletons
at the client level so we are always working with unique locks per bucket
*/
*/

// NOTE: Be careful to not call this method without releasing locks first
func (tc *textileClient) getBucket(slug string) Bucket {
Expand Down Expand Up @@ -65,9 +66,8 @@ func (tc *textileClient) setBuckets(buckets []Bucket) []Bucket {
return results
}


func (tc *textileClient) GetBucket(ctx context.Context, slug string) (Bucket, error) {
if b := tc.getBucket(slug); b != nil {
if b := tc.getBucket(slug); b != nil {
return b, nil
}

Expand All @@ -89,11 +89,33 @@ func (tc *textileClient) GetBucket(ctx context.Context, slug string) (Bucket, er
return nil, NotFound(slug)
}


func (tc *textileClient) GetDefaultBucket(ctx context.Context) (Bucket, error) {
return tc.GetBucket(ctx, defaultPersonalBucketSlug)
}

func (tc *textileClient) GetLocalBucketContext(ctx context.Context, bucketSlug string) (context.Context, *thread.ID, error) {
var publicKey crypto.PubKey
var err error
kc := keychain.New(tc.store)
if _, publicKey, err = kc.GetStoredKeyPairInLibP2PFormat(); err != nil {
return nil, nil, err
}

var pubKeyInBytes []byte
if pubKeyInBytes, err = publicKey.Bytes(); err != nil {
return nil, nil, err
}
ctx = common.NewThreadNameContext(ctx, getThreadName(pubKeyInBytes, bucketSlug))
var dbID *thread.ID
log.Debug("Fetching thread id from local store")
if dbID, err = tc.findOrCreateThreadID(ctx, tc.threads, bucketSlug); err != nil {
return nil, nil, err
}

ctx = common.NewThreadIDContext(ctx, *dbID)
return ctx, dbID, nil
}

// Returns a context that works for accessing a bucket
func (tc *textileClient) GetBucketContext(ctx context.Context, bucketSlug string) (context.Context, *thread.ID, error) {
if err := tc.requiresRunning(); err != nil {
Expand Down Expand Up @@ -129,7 +151,7 @@ func (tc *textileClient) GetBucketContext(ctx context.Context, bucketSlug string
}

func (tc *textileClient) ListBuckets(ctx context.Context) ([]Bucket, error) {
threadsCtx, _, err := tc.GetBucketContext(ctx, defaultPersonalBucketSlug)
threadsCtx, _, err := tc.GetLocalBucketContext(ctx, defaultPersonalBucketSlug)

if err != nil {
log.Error("error in ListBuckets while fetching bucket context", err)
Expand All @@ -155,11 +177,11 @@ func (tc *textileClient) CreateBucket(ctx context.Context, bucketSlug string) (B
log.Debug("Creating a new bucket with slug " + bucketSlug)
var err error

if b := tc.getBucket(bucketSlug); b!= nil {
if b := tc.getBucket(bucketSlug); b != nil {
return b, nil
}

if ctx, _, err = tc.GetBucketContext(ctx, bucketSlug); err != nil {
if ctx, _, err = tc.GetLocalBucketContext(ctx, bucketSlug); err != nil {
return nil, err
}

Expand Down Expand Up @@ -190,6 +212,6 @@ func (tc *textileClient) CreateBucket(ctx context.Context, bucketSlug string) (B
}

func (tc *textileClient) getNewBucket(b *bucketsproto.Root) Bucket {
newB:= newBucket(b, tc, tc.bucketsClient)
newB := newBucket(b, tc, tc.bucketsClient)
return tc.setBucket(newB.Slug(), newB)
}
Loading

0 comments on commit 82a8860

Please sign in to comment.