From f0bed4e2e33bb714d285fadad549c955d78814f2 Mon Sep 17 00:00:00 2001 From: asabya Date: Thu, 9 Nov 2023 09:21:20 +0530 Subject: [PATCH] fix: #555, remove feedtracker --- cmd/dfs/cmd/config.go | 17 +- cmd/dfs/cmd/root.go | 1 - cmd/dfs/cmd/server.go | 6 - go.mod | 1 - go.sum | 7 - pkg/api/handler.go | 2 - pkg/dfs/api.go | 31 +- pkg/dfs/user_api.go | 2 +- pkg/feed/api.go | 62 +-- pkg/feed/tracker/tracker.go | 877 ++++++++++++++++--------------- pkg/feed/tracker/tracker_test.go | 391 +++++++------- pkg/test/login_test.go | 14 +- pkg/user/login.go | 9 +- 13 files changed, 667 insertions(+), 753 deletions(-) diff --git a/cmd/dfs/cmd/config.go b/cmd/dfs/cmd/config.go index fa503567..a6ff98fd 100644 --- a/cmd/dfs/cmd/config.go +++ b/cmd/dfs/cmd/config.go @@ -7,15 +7,14 @@ import ( var ( optionCORSAllowedOrigins = "cors-allowed-origins" - //optionFeedTracker = "dfs.feed-tracker" - optionDFSHttpPort = "dfs.ports.http-port" - optionDFSPprofPort = "dfs.ports.pprof-port" - optionVerbosity = "verbosity" - optionBeeApi = "bee.bee-api-endpoint" - optionBeePostageBatchId = "bee.postage-batch-id" - optionCookieDomain = "cookie-domain" - optionNetwork = "ens-network" - optionRPC = "rpc" + optionDFSHttpPort = "dfs.ports.http-port" + optionDFSPprofPort = "dfs.ports.pprof-port" + optionVerbosity = "verbosity" + optionBeeApi = "bee.bee-api-endpoint" + optionBeePostageBatchId = "bee.postage-batch-id" + optionCookieDomain = "cookie-domain" + optionNetwork = "ens-network" + optionRPC = "rpc" defaultCORSAllowedOrigins = []string{} defaultDFSHttpPort = ":9090" diff --git a/cmd/dfs/cmd/root.go b/cmd/dfs/cmd/root.go index be3d0971..e7bfe5d3 100644 --- a/cmd/dfs/cmd/root.go +++ b/cmd/dfs/cmd/root.go @@ -156,7 +156,6 @@ func initConfig() { func writeConfig() { c := viper.New() - //c.Set(optionFeedTracker, false) c.Set(optionCORSAllowedOrigins, defaultCORSAllowedOrigins) c.Set(optionDFSHttpPort, defaultDFSHttpPort) c.Set(optionDFSPprofPort, defaultDFSPprofPort) diff --git a/cmd/dfs/cmd/server.go b/cmd/dfs/cmd/server.go index 5d55db16..e7e0afd7 100644 --- a/cmd/dfs/cmd/server.go +++ b/cmd/dfs/cmd/server.go @@ -45,7 +45,6 @@ import ( var ( pprof bool - feedTracker bool swag bool httpPort string pprofPort string @@ -78,9 +77,6 @@ can consume it.`, if err := config.BindPFlag(optionDFSHttpPort, cmd.Flags().Lookup("httpPort")); err != nil { return err } - //if err := config.BindPFlag(optionFeedTracker, cmd.Flags().Lookup("feed-tracker")); err != nil { - // return err - //} if err := config.BindPFlag(optionDFSPprofPort, cmd.Flags().Lookup("pprofPort")); err != nil { return err } @@ -194,7 +190,6 @@ can consume it.`, EnsConfig: ensConfig, SubscriptionConfig: subscriptionConfig, Logger: logger, - FeedTracker: feedTracker, } hdlr, err := api.New(ctx, opts) @@ -232,7 +227,6 @@ can consume it.`, func init() { serverCmd.Flags().BoolVar(&pprof, "pprof", false, "should run pprof") serverCmd.Flags().BoolVar(&swag, "swag", false, "should run swagger-ui") - //serverCmd.Flags().BoolVar(&feedTracker, "feed-tracker", false, "should run feed tracker") serverCmd.Flags().String("httpPort", defaultDFSHttpPort, "http port") serverCmd.Flags().String("pprofPort", defaultDFSPprofPort, "pprof port") serverCmd.Flags().String("cookieDomain", defaultCookieDomain, "the domain to use in the cookie") diff --git a/go.mod b/go.mod index 83ebd26d..9a935235 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/stretchr/testify v1.8.4 github.com/swaggo/http-swagger v1.3.4 github.com/swaggo/swag v1.16.2 - github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/tinygrasshopper/bettercsv v0.0.1 github.com/tyler-smith/go-bip39 v1.1.0 github.com/wealdtech/go-ens/v3 v3.5.5 diff --git a/go.sum b/go.sum index 8e443a55..56cb47e7 100644 --- a/go.sum +++ b/go.sum @@ -256,7 +256,6 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -454,19 +453,16 @@ github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -580,7 +576,6 @@ github.com/swaggo/swag v1.16.2 h1:28Pp+8DkQoV+HLzLx8RGJZXNGKbFqnuvSbAAtoxiY04= github.com/swaggo/swag v1.16.2/go.mod h1:6YzXnDcpr0767iOejs318CwYkCQqyGer6BizOg03f+E= github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= -github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tinygrasshopper/bettercsv v0.0.1 h1:N96aWjbUBN2q+KotgSI9FMR+1Y4IIBMVMPiL8qASK0k= github.com/tinygrasshopper/bettercsv v0.0.1/go.mod h1:0pXjg6Vm8+zAkvosNH2S0dx8gc7H1hDIV0pMzmq1vRI= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= @@ -885,7 +880,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU= @@ -999,7 +993,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 765971de..4361124d 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -42,7 +42,6 @@ type Options struct { EnsConfig *contracts.ENSConfig SubscriptionConfig *contracts.SubscriptionConfig Logger logging.Logger - FeedTracker bool } // New returns a new handler @@ -53,7 +52,6 @@ func New(ctx context.Context, opts *Options) (*Handler, error) { EnsConfig: opts.EnsConfig, SubscriptionConfig: opts.SubscriptionConfig, Logger: opts.Logger, - FeedTracker: opts.FeedTracker, } api, err := dfs.NewDfsAPI(ctx, dfsOpts) if err != nil { diff --git a/pkg/dfs/api.go b/pkg/dfs/api.go index d37e3595..bd9cc212 100644 --- a/pkg/dfs/api.go +++ b/pkg/dfs/api.go @@ -39,14 +39,13 @@ const ( // API is the go api for fairOS type API struct { - context context.Context - cancel context.CancelFunc - client blockstore.Client - users *user.Users - logger logging.Logger - tm *taskmanager.TaskManager - sm subscriptionManager.SubscriptionManager - shouldInitFeedTracker bool + context context.Context + cancel context.CancelFunc + client blockstore.Client + users *user.Users + logger logging.Logger + tm *taskmanager.TaskManager + sm subscriptionManager.SubscriptionManager io.Closer } @@ -56,7 +55,6 @@ type Options struct { EnsConfig *contracts.ENSConfig SubscriptionConfig *contracts.SubscriptionConfig Logger logging.Logger - FeedTracker bool } // NewDfsAPI is the main entry point for the df controller. @@ -91,14 +89,13 @@ func NewDfsAPI(ctx context.Context, opts *Options) (*API, error) { tmLogger := logging.New(io.Discard, 0) ctx2, cancel := context.WithCancel(ctx) return &API{ - context: ctx2, - cancel: cancel, - shouldInitFeedTracker: opts.FeedTracker, - client: c, - users: users, - logger: logger, - tm: taskmanager.New(10, defaultMaxWorkers, time.Second*15, tmLogger), - sm: sm, + context: ctx2, + cancel: cancel, + client: c, + users: users, + logger: logger, + tm: taskmanager.New(10, defaultMaxWorkers, time.Second*15, tmLogger), + sm: sm, }, nil } diff --git a/pkg/dfs/user_api.go b/pkg/dfs/user_api.go index b4721e96..02139d52 100644 --- a/pkg/dfs/user_api.go +++ b/pkg/dfs/user_api.go @@ -27,7 +27,7 @@ func (a *API) CreateUserV2(userName, passPhrase, mnemonic, sessionId string) (*u // LoginUserV2 is a controller function which calls the users login function. func (a *API) LoginUserV2(userName, passPhrase, sessionId string) (*user.LoginResponse, error) { - return a.users.LoginUserV2(userName, passPhrase, a.client, a.tm, a.sm, sessionId, a.shouldInitFeedTracker) + return a.users.LoginUserV2(userName, passPhrase, a.client, a.tm, a.sm, sessionId) } // LoadLiteUser is a controller function which loads user from mnemonic and doesn't store any user info on chain diff --git a/pkg/feed/api.go b/pkg/feed/api.go index 2e1482c1..287e3be3 100644 --- a/pkg/feed/api.go +++ b/pkg/feed/api.go @@ -31,7 +31,6 @@ import ( "github.com/fairdatasociety/fairOS-dfs/pkg/feed/lookup" "github.com/fairdatasociety/fairOS-dfs/pkg/logging" "github.com/fairdatasociety/fairOS-dfs/pkg/utils" - "github.com/syndtr/goleveldb/leveldb" "golang.org/x/crypto/sha3" ) @@ -58,7 +57,6 @@ type API struct { handler *Handler accountInfo *account.Info logger logging.Logger - db *leveldb.DB } // request is a custom type that involves in the fairOS feed creation @@ -82,16 +80,6 @@ func New(accountInfo *account.Info, client blockstore.Client, logger logging.Log } } -// SetUpdateTracker sets the update tracker for the feed -func (a *API) SetUpdateTracker(db *leveldb.DB) { - a.db = db -} - -// GetUpdateTracker gets the update tracker for the feed -func (a *API) GetUpdateTracker() *leveldb.DB { - return a.db -} - // CreateFeed creates a feed by constructing a single owner chunk. This chunk // can only be accessed if the pod address is known. Also, no one else can spoof this // chunk since this is signed by the pod. @@ -172,13 +160,6 @@ func (a *API) CreateFeed(user utils.Address, topic, data, encryptionPassword []b if err != nil { // skipcq: TCV-001 return nil, err } - // update the feed update tracker - if a.db != nil { - err = a.PutFeedUpdateEpoch(append(topic, user[:20]...), req.Epoch) - if err != nil { // skipcq: TCV-001 - return nil, err - } - } return addr, nil } @@ -249,16 +230,7 @@ func (a *API) GetFeedData(topic []byte, user utils.Address, encryptionPassword [ // create the query from values q := &Query{Feed: *f} q.TimeLimit = 0 - if a.db != nil && !isFeedUpdater { - epoch, err := a.GetFeedUpdateEpoch(append(topic, user[:20]...)) - if err != nil { - q.Hint = lookup.NoClue - } else { - q.Hint = epoch - } - } else { - q.Hint = lookup.NoClue - } + q.Hint = lookup.NoClue if q.Hint == lookup.NoClue { _, err := a.handler.Lookup(ctx, q) if err != nil { @@ -399,12 +371,6 @@ retry: } return nil, err } - if a.db != nil && !isFeedUpdater { - err = a.PutFeedUpdateEpoch(append(topic, user[:20]...), req.Epoch) - if err != nil { // skipcq: TCV-001 - return nil, err - } - } return address, nil } @@ -453,32 +419,6 @@ func (a *API) IsReadOnlyFeed() bool { return a.accountInfo.GetPrivateKey() == nil } -// PutFeedUpdateEpoch -func (a *API) PutFeedUpdateEpoch(topic []byte, epoch lookup.Epoch) error { - data, err := epoch.MarshalBinary() - if err != nil { - return err - } - return a.db.Put(topic, data, nil) -} - -// GetFeedUpdateEpoch -func (a *API) GetFeedUpdateEpoch(topic []byte) (lookup.Epoch, error) { - epoch := lookup.Epoch{} - data, err := a.db.Get(topic, nil) - if err != nil { - return epoch, err - } - err = epoch.UnmarshalBinary(data) - if err != nil { - return epoch, err - } - return epoch, nil -} - func (a *API) Close() error { - if a.db != nil { - return a.db.Close() - } return nil } diff --git a/pkg/feed/tracker/tracker.go b/pkg/feed/tracker/tracker.go index 51a1508b..e17c4d18 100644 --- a/pkg/feed/tracker/tracker.go +++ b/pkg/feed/tracker/tracker.go @@ -1,440 +1,441 @@ package tracker -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "os" - "strings" - "sync" - - "github.com/fairdatasociety/fairOS-dfs/pkg/blockstore" - "github.com/fairdatasociety/fairOS-dfs/pkg/feed" - "github.com/fairdatasociety/fairOS-dfs/pkg/logging" - "github.com/fairdatasociety/fairOS-dfs/pkg/utils" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/storage" -) - -const ( - listTopic = "leveldb/storage/files-list2" -) - -var ( - errFileOpen = errors.New("leveldb/storage: file still open") -) - -func InitFeedsTracker(address utils.Address, username, password string, fd *feed.API, client blockstore.Client, logger logging.Logger) (*leveldb.DB, error) { - db, err := leveldb.Open(NewMemStorage(fd, client, address, username, password, logger), nil) - if err != nil { - return nil, err - } - fd.SetUpdateTracker(db) - logger.Debugf("feed tracker initialised for %s\n", username) - return db, nil -} - -type memStorageLock struct { - ms *memStorage -} - -func (lock *memStorageLock) Unlock() { - ms := lock.ms - ms.mu.Lock() - defer ms.mu.Unlock() - if ms.slock == lock { - ms.slock = nil - } -} - -// memStorage is a memory-backed storage. -type memStorage struct { - mu sync.Mutex - slock *memStorageLock - files map[string]*memFile - list map[string]storage.FileDesc - meta storage.FileDesc - fd *feed.API - client blockstore.Client - address utils.Address - username string - password string - logging logging.Logger -} - -// NewMemStorage returns a new memory-backed storage implementation. -func NewMemStorage(fd *feed.API, client blockstore.Client, address utils.Address, username string, password string, logger logging.Logger) storage.Storage { - list := make(map[string]storage.FileDesc) - topic := getTopic([]string{listTopic, username, password}) - _, dt, err := fd.GetFeedData(topic, address, []byte(password), true) - if err == nil { - _ = json.Unmarshal(dt, &list) - } - return &memStorage{ - files: make(map[string]*memFile), - list: list, - fd: fd, - client: client, - address: address, - username: username, - password: password, - logging: logger, - } -} - -func (ms *memStorage) Lock() (storage.Locker, error) { - ms.mu.Lock() - defer ms.mu.Unlock() - if ms.slock != nil { - return nil, storage.ErrLocked - } - ms.slock = &memStorageLock{ms: ms} - return ms.slock, nil -} - -func (ms *memStorage) Log(str string) { - ms.logging.Debug(str) -} - -func (ms *memStorage) List(ft storage.FileType) ([]storage.FileDesc, error) { - ms.mu.Lock() - var fds []storage.FileDesc - for _, fd := range ms.list { - if fd.Type&ft != 0 { - fds = append(fds, fd) - } - } - ms.mu.Unlock() - return fds, nil -} - -func (ms *memStorage) Open(fd storage.FileDesc) (storage.Reader, error) { - if !storage.FileDescOk(fd) { - return nil, storage.ErrInvalidFile - } - - ms.mu.Lock() - defer ms.mu.Unlock() - if m, exist := ms.files[fd.String()]; exist { - if m.open { - return nil, errFileOpen - } - m.open = true - return &memReader{Reader: bytes.NewReader(m.Bytes()), ms: ms, m: m}, nil - } - m := &memFile{} - m.fd = ms.fd - m.name = fd.String() - m.username = ms.username - m.password = ms.password - m.address = ms.address - m.client = ms.client - topic := getTopic([]string{fd.String(), ms.username, ms.password}) - _, ref, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) - if err != nil && err.Error() != "feed does not exist or was not updated yet" { - return nil, os.ErrNotExist - } - - data, _, err := ms.client.DownloadBlob(ref) - if err != nil { - return nil, os.ErrNotExist - } - m.Buffer = bytes.NewBuffer(data) - ms.files[fd.String()] = m - m.open = true - return &memReader{Reader: bytes.NewReader(m.Bytes()), ms: ms, m: m}, nil -} - -func (ms *memStorage) Create(fd storage.FileDesc) (storage.Writer, error) { - if !storage.FileDescOk(fd) { - return nil, storage.ErrInvalidFile - } - - ms.mu.Lock() - defer ms.mu.Unlock() - m, exist := ms.files[fd.String()] - if exist { - if m.open { - return nil, errFileOpen - } - m.Reset() - } else { - m = &memFile{} - m.fd = ms.fd - m.name = fd.String() - m.username = ms.username - m.password = ms.password - m.address = ms.address - m.client = ms.client - - m.Buffer = bytes.NewBuffer([]byte{}) - - ref, err := ms.client.UploadBlob(m.Buffer.Bytes(), 0, false) - if err != nil { - return nil, err - } - - topic := getTopic([]string{fd.String(), ms.username, ms.password}) - _, err = ms.fd.UpdateFeed(ms.address, topic, ref, []byte(ms.password), true) - if err != nil { - return nil, err - } - ms.files[fd.String()] = m - } - m.open = true - ms.list[fd.String()] = fd - dt, err := json.Marshal(ms.list) - if err == nil { - topic := getTopic([]string{listTopic, ms.username, ms.password}) - _, err = ms.fd.UpdateFeed(ms.address, topic, dt, []byte(ms.password), true) - if err != nil { - ms.logging.Error("error updating list", "error", err) - } - } - - return &memWriter{memFile: m, ms: ms, name: fd.String()}, nil -} - -func (ms *memStorage) Remove(fd storage.FileDesc) error { - if !storage.FileDescOk(fd) { - return storage.ErrInvalidFile - } - - ms.mu.Lock() - defer ms.mu.Unlock() - if _, exist := ms.files[fd.String()]; exist { - delete(ms.files, fd.String()) - topic := getTopic([]string{fd.String(), ms.username, ms.password}) - - _, err := ms.fd.UpdateFeed(ms.address, topic, []byte(utils.DeletedFeedMagicWord), []byte(ms.password), true) - if err != nil { - return err - } - - delete(ms.list, fd.String()) - dt, err := json.Marshal(ms.list) - if err != nil { - return err - } - lTopic := getTopic([]string{listTopic, ms.username, ms.password}) - _, err = ms.fd.UpdateFeed(ms.address, lTopic, dt, []byte(ms.password), true) - if err != nil { - return err - } - return nil - } - return os.ErrNotExist -} - -func (ms *memStorage) Rename(oldfd, newfd storage.FileDesc) error { - if !storage.FileDescOk(oldfd) || !storage.FileDescOk(newfd) { - return storage.ErrInvalidFile - } - if oldfd == newfd { - return nil - } - - ms.mu.Lock() - defer ms.mu.Unlock() - oldm, exist := ms.files[oldfd.String()] - if !exist { - return os.ErrNotExist - } - newm, exist := ms.files[newfd.String()] - if (exist && newm.open) || oldm.open { - return errFileOpen - } - delete(ms.files, oldfd.String()) - delete(ms.list, oldfd.String()) - - ms.files[newfd.String()] = oldm - ms.list[newfd.String()] = newfd - - topic := getTopic([]string{oldfd.String(), ms.username, ms.password}) - _, ref, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) - if err != nil { - return err - } - - topic = getTopic([]string{newfd.String(), ms.username, ms.password}) - _, err = ms.fd.UpdateFeed(ms.address, topic, ref, []byte(ms.password), true) - if err != nil { - return err - } - - dt, err := json.Marshal(ms.list) - if err != nil { - return err - } - - lTopic := getTopic([]string{listTopic, ms.username, ms.password}) - _, err = ms.fd.UpdateFeed(ms.address, lTopic, dt, []byte(ms.password), true) - if err != nil { - return err - } - - return nil -} - -func (ms *memStorage) Close() error { - return nil -} - -func (ms *memStorage) setMeta(fd storage.FileDesc) error { - content := fd.String() - // Check and backup old CURRENT file. - currentPath := "CURRENT" - - topic := getTopic([]string{currentPath, ms.username, ms.password}) - _, dt, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) - if err != nil && err.Error() != "feed does not exist or was not updated yet" { - return err - } - if string(dt) == content { - // Content not changed, do nothing. - return nil - } - - _, err = ms.fd.UpdateFeed(ms.address, topic, []byte(content), []byte(ms.password), true) - if err != nil { // skipcq: TCV-001 - return err - } - ms.meta = fd - return nil -} - -func (ms *memStorage) SetMeta(fd storage.FileDesc) error { - if !storage.FileDescOk(fd) { - return storage.ErrInvalidFile - } - - ms.mu.Lock() - defer ms.mu.Unlock() - return ms.setMeta(fd) -} - -func (ms *memStorage) GetMeta() (storage.FileDesc, error) { - ms.mu.Lock() - defer ms.mu.Unlock() - - meta := storage.FileDesc{} - if ms.meta.Zero() { - // Try - // - CURRENT - currentPath := "CURRENT" - topic := getTopic([]string{currentPath, ms.username, ms.password}) - _, dt, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) - if err != nil { - return meta, os.ErrNotExist - } - if !fsParseNamePtr(string(dt), &meta) { - return meta, os.ErrNotExist - } - ms.meta = meta - return meta, nil - } - - return ms.meta, nil -} - -type memFile struct { - name string - *bytes.Buffer - open bool - fd *feed.API - username string - password string - address utils.Address - client blockstore.Client -} - -type memReader struct { - *bytes.Reader - ms *memStorage - m *memFile - closed bool -} - -func (mr *memReader) Close() error { - mr.ms.mu.Lock() - defer mr.ms.mu.Unlock() - if mr.closed { - return storage.ErrClosed - } - mr.m.open = false - return nil -} - -type memWriter struct { - name string - *memFile - ms *memStorage - closed bool -} - -func (mw *memWriter) Write(p []byte) (n int, err error) { - n, err = mw.memFile.Write(p) - if err != nil { - return - } - - ref, err := mw.client.UploadBlob(mw.Bytes(), 0, false) - if err != nil { - return - } - - topic := getTopic([]string{mw.name, mw.username, mw.password}) - _, err = mw.fd.UpdateFeed(mw.address, topic, ref, []byte(mw.password), true) - return -} - -func (mw *memWriter) Sync() error { - return nil -} - -func (mw *memWriter) Close() error { - mw.ms.mu.Lock() - defer mw.ms.mu.Unlock() - if mw.closed { - return storage.ErrClosed - } - mw.memFile.open = false - return nil -} - -func fsParseName(name string) (fd storage.FileDesc, ok bool) { - var tail string - _, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail) - if err == nil { - switch tail { - case "log": - fd.Type = storage.TypeJournal - case "ldb", "sst": - fd.Type = storage.TypeTable - case "tmp": - fd.Type = storage.TypeTemp - default: - return - } - return fd, true - } - n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail) - if n == 1 { - fd.Type = storage.TypeManifest - return fd, true - } - return -} - -func fsParseNamePtr(name string, fd *storage.FileDesc) bool { - _fd, ok := fsParseName(name) - if fd != nil { - *fd = _fd - } - return ok -} - -func getTopic(segments []string) []byte { - return utils.HashString(strings.Join(segments, "/")) -} +// +//import ( +// "bytes" +// "encoding/json" +// "errors" +// "fmt" +// "os" +// "strings" +// "sync" +// +// "github.com/fairdatasociety/fairOS-dfs/pkg/blockstore" +// "github.com/fairdatasociety/fairOS-dfs/pkg/feed" +// "github.com/fairdatasociety/fairOS-dfs/pkg/logging" +// "github.com/fairdatasociety/fairOS-dfs/pkg/utils" +// "github.com/syndtr/goleveldb/leveldb" +// "github.com/syndtr/goleveldb/leveldb/storage" +//) +// +//const ( +// listTopic = "leveldb/storage/files-list2" +//) +// +//var ( +// errFileOpen = errors.New("leveldb/storage: file still open") +//) +// +//func InitFeedsTracker(address utils.Address, username, password string, fd *feed.API, client blockstore.Client, logger logging.Logger) (*leveldb.DB, error) { +// db, err := leveldb.Open(NewMemStorage(fd, client, address, username, password, logger), nil) +// if err != nil { +// return nil, err +// } +// fd.SetUpdateTracker(db) +// logger.Debugf("feed tracker initialised for %s\n", username) +// return db, nil +//} +// +//type memStorageLock struct { +// ms *memStorage +//} +// +//func (lock *memStorageLock) Unlock() { +// ms := lock.ms +// ms.mu.Lock() +// defer ms.mu.Unlock() +// if ms.slock == lock { +// ms.slock = nil +// } +//} +// +//// memStorage is a memory-backed storage. +//type memStorage struct { +// mu sync.Mutex +// slock *memStorageLock +// files map[string]*memFile +// list map[string]storage.FileDesc +// meta storage.FileDesc +// fd *feed.API +// client blockstore.Client +// address utils.Address +// username string +// password string +// logging logging.Logger +//} +// +//// NewMemStorage returns a new memory-backed storage implementation. +//func NewMemStorage(fd *feed.API, client blockstore.Client, address utils.Address, username string, password string, logger logging.Logger) storage.Storage { +// list := make(map[string]storage.FileDesc) +// topic := getTopic([]string{listTopic, username, password}) +// _, dt, err := fd.GetFeedData(topic, address, []byte(password), true) +// if err == nil { +// _ = json.Unmarshal(dt, &list) +// } +// return &memStorage{ +// files: make(map[string]*memFile), +// list: list, +// fd: fd, +// client: client, +// address: address, +// username: username, +// password: password, +// logging: logger, +// } +//} +// +//func (ms *memStorage) Lock() (storage.Locker, error) { +// ms.mu.Lock() +// defer ms.mu.Unlock() +// if ms.slock != nil { +// return nil, storage.ErrLocked +// } +// ms.slock = &memStorageLock{ms: ms} +// return ms.slock, nil +//} +// +//func (ms *memStorage) Log(str string) { +// ms.logging.Debug(str) +//} +// +//func (ms *memStorage) List(ft storage.FileType) ([]storage.FileDesc, error) { +// ms.mu.Lock() +// var fds []storage.FileDesc +// for _, fd := range ms.list { +// if fd.Type&ft != 0 { +// fds = append(fds, fd) +// } +// } +// ms.mu.Unlock() +// return fds, nil +//} +// +//func (ms *memStorage) Open(fd storage.FileDesc) (storage.Reader, error) { +// if !storage.FileDescOk(fd) { +// return nil, storage.ErrInvalidFile +// } +// +// ms.mu.Lock() +// defer ms.mu.Unlock() +// if m, exist := ms.files[fd.String()]; exist { +// if m.open { +// return nil, errFileOpen +// } +// m.open = true +// return &memReader{Reader: bytes.NewReader(m.Bytes()), ms: ms, m: m}, nil +// } +// m := &memFile{} +// m.fd = ms.fd +// m.name = fd.String() +// m.username = ms.username +// m.password = ms.password +// m.address = ms.address +// m.client = ms.client +// topic := getTopic([]string{fd.String(), ms.username, ms.password}) +// _, ref, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) +// if err != nil && err.Error() != "feed does not exist or was not updated yet" { +// return nil, os.ErrNotExist +// } +// +// data, _, err := ms.client.DownloadBlob(ref) +// if err != nil { +// return nil, os.ErrNotExist +// } +// m.Buffer = bytes.NewBuffer(data) +// ms.files[fd.String()] = m +// m.open = true +// return &memReader{Reader: bytes.NewReader(m.Bytes()), ms: ms, m: m}, nil +//} +// +//func (ms *memStorage) Create(fd storage.FileDesc) (storage.Writer, error) { +// if !storage.FileDescOk(fd) { +// return nil, storage.ErrInvalidFile +// } +// +// ms.mu.Lock() +// defer ms.mu.Unlock() +// m, exist := ms.files[fd.String()] +// if exist { +// if m.open { +// return nil, errFileOpen +// } +// m.Reset() +// } else { +// m = &memFile{} +// m.fd = ms.fd +// m.name = fd.String() +// m.username = ms.username +// m.password = ms.password +// m.address = ms.address +// m.client = ms.client +// +// m.Buffer = bytes.NewBuffer([]byte{}) +// +// ref, err := ms.client.UploadBlob(m.Buffer.Bytes(), 0, false) +// if err != nil { +// return nil, err +// } +// +// topic := getTopic([]string{fd.String(), ms.username, ms.password}) +// _, err = ms.fd.UpdateFeed(ms.address, topic, ref, []byte(ms.password), true) +// if err != nil { +// return nil, err +// } +// ms.files[fd.String()] = m +// } +// m.open = true +// ms.list[fd.String()] = fd +// dt, err := json.Marshal(ms.list) +// if err == nil { +// topic := getTopic([]string{listTopic, ms.username, ms.password}) +// _, err = ms.fd.UpdateFeed(ms.address, topic, dt, []byte(ms.password), true) +// if err != nil { +// ms.logging.Error("error updating list", "error", err) +// } +// } +// +// return &memWriter{memFile: m, ms: ms, name: fd.String()}, nil +//} +// +//func (ms *memStorage) Remove(fd storage.FileDesc) error { +// if !storage.FileDescOk(fd) { +// return storage.ErrInvalidFile +// } +// +// ms.mu.Lock() +// defer ms.mu.Unlock() +// if _, exist := ms.files[fd.String()]; exist { +// delete(ms.files, fd.String()) +// topic := getTopic([]string{fd.String(), ms.username, ms.password}) +// +// _, err := ms.fd.UpdateFeed(ms.address, topic, []byte(utils.DeletedFeedMagicWord), []byte(ms.password), true) +// if err != nil { +// return err +// } +// +// delete(ms.list, fd.String()) +// dt, err := json.Marshal(ms.list) +// if err != nil { +// return err +// } +// lTopic := getTopic([]string{listTopic, ms.username, ms.password}) +// _, err = ms.fd.UpdateFeed(ms.address, lTopic, dt, []byte(ms.password), true) +// if err != nil { +// return err +// } +// return nil +// } +// return os.ErrNotExist +//} +// +//func (ms *memStorage) Rename(oldfd, newfd storage.FileDesc) error { +// if !storage.FileDescOk(oldfd) || !storage.FileDescOk(newfd) { +// return storage.ErrInvalidFile +// } +// if oldfd == newfd { +// return nil +// } +// +// ms.mu.Lock() +// defer ms.mu.Unlock() +// oldm, exist := ms.files[oldfd.String()] +// if !exist { +// return os.ErrNotExist +// } +// newm, exist := ms.files[newfd.String()] +// if (exist && newm.open) || oldm.open { +// return errFileOpen +// } +// delete(ms.files, oldfd.String()) +// delete(ms.list, oldfd.String()) +// +// ms.files[newfd.String()] = oldm +// ms.list[newfd.String()] = newfd +// +// topic := getTopic([]string{oldfd.String(), ms.username, ms.password}) +// _, ref, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) +// if err != nil { +// return err +// } +// +// topic = getTopic([]string{newfd.String(), ms.username, ms.password}) +// _, err = ms.fd.UpdateFeed(ms.address, topic, ref, []byte(ms.password), true) +// if err != nil { +// return err +// } +// +// dt, err := json.Marshal(ms.list) +// if err != nil { +// return err +// } +// +// lTopic := getTopic([]string{listTopic, ms.username, ms.password}) +// _, err = ms.fd.UpdateFeed(ms.address, lTopic, dt, []byte(ms.password), true) +// if err != nil { +// return err +// } +// +// return nil +//} +// +//func (ms *memStorage) Close() error { +// return nil +//} +// +//func (ms *memStorage) setMeta(fd storage.FileDesc) error { +// content := fd.String() +// // Check and backup old CURRENT file. +// currentPath := "CURRENT" +// +// topic := getTopic([]string{currentPath, ms.username, ms.password}) +// _, dt, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) +// if err != nil && err.Error() != "feed does not exist or was not updated yet" { +// return err +// } +// if string(dt) == content { +// // Content not changed, do nothing. +// return nil +// } +// +// _, err = ms.fd.UpdateFeed(ms.address, topic, []byte(content), []byte(ms.password), true) +// if err != nil { // skipcq: TCV-001 +// return err +// } +// ms.meta = fd +// return nil +//} +// +//func (ms *memStorage) SetMeta(fd storage.FileDesc) error { +// if !storage.FileDescOk(fd) { +// return storage.ErrInvalidFile +// } +// +// ms.mu.Lock() +// defer ms.mu.Unlock() +// return ms.setMeta(fd) +//} +// +//func (ms *memStorage) GetMeta() (storage.FileDesc, error) { +// ms.mu.Lock() +// defer ms.mu.Unlock() +// +// meta := storage.FileDesc{} +// if ms.meta.Zero() { +// // Try +// // - CURRENT +// currentPath := "CURRENT" +// topic := getTopic([]string{currentPath, ms.username, ms.password}) +// _, dt, err := ms.fd.GetFeedData(topic, ms.address, []byte(ms.password), true) +// if err != nil { +// return meta, os.ErrNotExist +// } +// if !fsParseNamePtr(string(dt), &meta) { +// return meta, os.ErrNotExist +// } +// ms.meta = meta +// return meta, nil +// } +// +// return ms.meta, nil +//} +// +//type memFile struct { +// name string +// *bytes.Buffer +// open bool +// fd *feed.API +// username string +// password string +// address utils.Address +// client blockstore.Client +//} +// +//type memReader struct { +// *bytes.Reader +// ms *memStorage +// m *memFile +// closed bool +//} +// +//func (mr *memReader) Close() error { +// mr.ms.mu.Lock() +// defer mr.ms.mu.Unlock() +// if mr.closed { +// return storage.ErrClosed +// } +// mr.m.open = false +// return nil +//} +// +//type memWriter struct { +// name string +// *memFile +// ms *memStorage +// closed bool +//} +// +//func (mw *memWriter) Write(p []byte) (n int, err error) { +// n, err = mw.memFile.Write(p) +// if err != nil { +// return +// } +// +// ref, err := mw.client.UploadBlob(mw.Bytes(), 0, false) +// if err != nil { +// return +// } +// +// topic := getTopic([]string{mw.name, mw.username, mw.password}) +// _, err = mw.fd.UpdateFeed(mw.address, topic, ref, []byte(mw.password), true) +// return +//} +// +//func (mw *memWriter) Sync() error { +// return nil +//} +// +//func (mw *memWriter) Close() error { +// mw.ms.mu.Lock() +// defer mw.ms.mu.Unlock() +// if mw.closed { +// return storage.ErrClosed +// } +// mw.memFile.open = false +// return nil +//} +// +//func fsParseName(name string) (fd storage.FileDesc, ok bool) { +// var tail string +// _, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail) +// if err == nil { +// switch tail { +// case "log": +// fd.Type = storage.TypeJournal +// case "ldb", "sst": +// fd.Type = storage.TypeTable +// case "tmp": +// fd.Type = storage.TypeTemp +// default: +// return +// } +// return fd, true +// } +// n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail) +// if n == 1 { +// fd.Type = storage.TypeManifest +// return fd, true +// } +// return +//} +// +//func fsParseNamePtr(name string, fd *storage.FileDesc) bool { +// _fd, ok := fsParseName(name) +// if fd != nil { +// *fd = _fd +// } +// return ok +//} +// +//func getTopic(segments []string) []byte { +// return utils.HashString(strings.Join(segments, "/")) +//} diff --git a/pkg/feed/tracker/tracker_test.go b/pkg/feed/tracker/tracker_test.go index b2d8b09e..68acfec3 100644 --- a/pkg/feed/tracker/tracker_test.go +++ b/pkg/feed/tracker/tracker_test.go @@ -1,197 +1,198 @@ package tracker -import ( - "fmt" - "io" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/fairdatasociety/fairOS-dfs/pkg/account" - "github.com/fairdatasociety/fairOS-dfs/pkg/blockstore/bee/mock" - "github.com/fairdatasociety/fairOS-dfs/pkg/feed" - "github.com/fairdatasociety/fairOS-dfs/pkg/feed/lookup" - "github.com/fairdatasociety/fairOS-dfs/pkg/logging" - "github.com/fairdatasociety/fairOS-dfs/pkg/utils" - "github.com/syndtr/goleveldb/leveldb" -) - -func TestTimeKeeper(t *testing.T) { - logger := logging.New(io.Discard, 0) - - acc1 := account.New(logger) - _, _, err := acc1.CreateUserAccount("") - if err != nil { - t.Fatal(err) - } - user1 := acc1.GetAddress(account.UserAccountIndex) - accountInfo1 := acc1.GetUserAccountInfo() - client := mock.NewMockBeeClient() - - t.Run("level-get-from-same-feed-pointer", func(t *testing.T) { - fd1 := feed.New(accountInfo1, client, logger) - db, err := leveldb.Open(NewMemStorage(fd1, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd1.SetUpdateTracker(db) - - topicOne := utils.HashString("topicOne") - _, err = fd1.GetFeedUpdateEpoch(topicOne) - if err == nil { - t.Fatal("feed should not exist") - } - - now := time.Now().Unix() - err = fd1.PutFeedUpdateEpoch(topicOne, lookup.Epoch{ - Time: uint64(now), - Level: 31, - }) - if err != nil { - t.Fatal(err) - } - - epoch, err := fd1.GetFeedUpdateEpoch(topicOne) - if err != nil { - t.Fatal(err) - } - - require.Equal(t, uint64(now), epoch.Time) - - err = db.Close() - if err != nil { - t.Fatal(err) - } - }) - - t.Run("level-get-from-different-feed-pointer", func(t *testing.T) { - fd1 := feed.New(accountInfo1, client, logger) - db, err := leveldb.Open(NewMemStorage(fd1, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd1.SetUpdateTracker(db) - - topic := utils.HashString("topicTwo") - _, err = fd1.GetFeedUpdateEpoch(topic) - if err == nil { - t.Fatal("feed should not exist") - } - - now := time.Now().Unix() - err = fd1.PutFeedUpdateEpoch(topic, lookup.Epoch{ - Time: uint64(now), - Level: 31, - }) - if err != nil { - t.Fatal(err) - } - err = db.Close() - if err != nil { - t.Fatal(err) - } - - fd2 := feed.New(accountInfo1, client, logger) - db2, err := leveldb.Open(NewMemStorage(fd2, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd2.SetUpdateTracker(db2) - epoch, err := fd2.GetFeedUpdateEpoch(topic) - if err != nil { - t.Fatal(err) - } - - require.Equal(t, uint64(now), epoch.Time) - - err = db2.Close() - if err != nil { - t.Fatal(err) - } - }) - - t.Run("level-get-from-multiple-different-feed-pointer", func(t *testing.T) { - fd1 := feed.New(accountInfo1, client, logger) - db, err := leveldb.Open(NewMemStorage(fd1, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd1.SetUpdateTracker(db) - - now := time.Now().Unix() - for i := 0; i < 10000; i++ { - topic := utils.HashString(fmt.Sprintf("topic-%d", i)) - err = fd1.PutFeedUpdateEpoch(topic, lookup.Epoch{ - Time: uint64(now + int64(i)), - Level: 31, - }) - if err != nil { - t.Fatal(err) - } - } - - err = db.Close() - if err != nil { - t.Fatal(err) - } - - fd2 := feed.New(accountInfo1, client, logger) - db2, err := leveldb.Open(NewMemStorage(fd2, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd2.SetUpdateTracker(db2) - for i := 0; i < 10000; i++ { - topic := utils.HashString(fmt.Sprintf("topic-%d", i)) - epoch, err := fd2.GetFeedUpdateEpoch(topic) - if err != nil { - t.Fatal(err) - } - require.Equal(t, uint64(now+int64(i)), epoch.Time) - - } - err = db2.Close() - if err != nil { - t.Fatal(err) - } - fd3 := feed.New(accountInfo1, client, logger) - db3, err := leveldb.Open(NewMemStorage(fd3, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd3.SetUpdateTracker(db3) - for i := 0; i < 10000; i++ { - topic := utils.HashString(fmt.Sprintf("topic-%d", i)) - epoch, err := fd3.GetFeedUpdateEpoch(topic) - if err != nil { - t.Fatal(err) - } - require.Equal(t, uint64(now+int64(i)), epoch.Time) - - } - err = db3.Close() - if err != nil { - t.Fatal(err) - } - fd4 := feed.New(accountInfo1, client, logger) - db4, err := leveldb.Open(NewMemStorage(fd4, client, user1, "username", "password", logger), nil) - if err != nil { - t.Fatal(err) - } - fd4.SetUpdateTracker(db4) - for i := 0; i < 10000; i++ { - topic := utils.HashString(fmt.Sprintf("topic-%d", i)) - epoch, err := fd4.GetFeedUpdateEpoch(topic) - if err != nil { - t.Fatal(err) - } - require.Equal(t, uint64(now+int64(i)), epoch.Time) - - } - err = db4.Close() - if err != nil { - t.Fatal(err) - } - }) -} +// +//import ( +// "fmt" +// "io" +// "testing" +// "time" +// +// "github.com/stretchr/testify/require" +// +// "github.com/fairdatasociety/fairOS-dfs/pkg/account" +// "github.com/fairdatasociety/fairOS-dfs/pkg/blockstore/bee/mock" +// "github.com/fairdatasociety/fairOS-dfs/pkg/feed" +// "github.com/fairdatasociety/fairOS-dfs/pkg/feed/lookup" +// "github.com/fairdatasociety/fairOS-dfs/pkg/logging" +// "github.com/fairdatasociety/fairOS-dfs/pkg/utils" +// "github.com/syndtr/goleveldb/leveldb" +//) +// +//func TestTimeKeeper(t *testing.T) { +// logger := logging.New(io.Discard, 0) +// +// acc1 := account.New(logger) +// _, _, err := acc1.CreateUserAccount("") +// if err != nil { +// t.Fatal(err) +// } +// user1 := acc1.GetAddress(account.UserAccountIndex) +// accountInfo1 := acc1.GetUserAccountInfo() +// client := mock.NewMockBeeClient() +// +// t.Run("level-get-from-same-feed-pointer", func(t *testing.T) { +// fd1 := feed.New(accountInfo1, client, logger) +// db, err := leveldb.Open(NewMemStorage(fd1, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd1.SetUpdateTracker(db) +// +// topicOne := utils.HashString("topicOne") +// _, err = fd1.GetFeedUpdateEpoch(topicOne) +// if err == nil { +// t.Fatal("feed should not exist") +// } +// +// now := time.Now().Unix() +// err = fd1.PutFeedUpdateEpoch(topicOne, lookup.Epoch{ +// Time: uint64(now), +// Level: 31, +// }) +// if err != nil { +// t.Fatal(err) +// } +// +// epoch, err := fd1.GetFeedUpdateEpoch(topicOne) +// if err != nil { +// t.Fatal(err) +// } +// +// require.Equal(t, uint64(now), epoch.Time) +// +// err = db.Close() +// if err != nil { +// t.Fatal(err) +// } +// }) +// +// t.Run("level-get-from-different-feed-pointer", func(t *testing.T) { +// fd1 := feed.New(accountInfo1, client, logger) +// db, err := leveldb.Open(NewMemStorage(fd1, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd1.SetUpdateTracker(db) +// +// topic := utils.HashString("topicTwo") +// _, err = fd1.GetFeedUpdateEpoch(topic) +// if err == nil { +// t.Fatal("feed should not exist") +// } +// +// now := time.Now().Unix() +// err = fd1.PutFeedUpdateEpoch(topic, lookup.Epoch{ +// Time: uint64(now), +// Level: 31, +// }) +// if err != nil { +// t.Fatal(err) +// } +// err = db.Close() +// if err != nil { +// t.Fatal(err) +// } +// +// fd2 := feed.New(accountInfo1, client, logger) +// db2, err := leveldb.Open(NewMemStorage(fd2, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd2.SetUpdateTracker(db2) +// epoch, err := fd2.GetFeedUpdateEpoch(topic) +// if err != nil { +// t.Fatal(err) +// } +// +// require.Equal(t, uint64(now), epoch.Time) +// +// err = db2.Close() +// if err != nil { +// t.Fatal(err) +// } +// }) +// +// t.Run("level-get-from-multiple-different-feed-pointer", func(t *testing.T) { +// fd1 := feed.New(accountInfo1, client, logger) +// db, err := leveldb.Open(NewMemStorage(fd1, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd1.SetUpdateTracker(db) +// +// now := time.Now().Unix() +// for i := 0; i < 10000; i++ { +// topic := utils.HashString(fmt.Sprintf("topic-%d", i)) +// err = fd1.PutFeedUpdateEpoch(topic, lookup.Epoch{ +// Time: uint64(now + int64(i)), +// Level: 31, +// }) +// if err != nil { +// t.Fatal(err) +// } +// } +// +// err = db.Close() +// if err != nil { +// t.Fatal(err) +// } +// +// fd2 := feed.New(accountInfo1, client, logger) +// db2, err := leveldb.Open(NewMemStorage(fd2, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd2.SetUpdateTracker(db2) +// for i := 0; i < 10000; i++ { +// topic := utils.HashString(fmt.Sprintf("topic-%d", i)) +// epoch, err := fd2.GetFeedUpdateEpoch(topic) +// if err != nil { +// t.Fatal(err) +// } +// require.Equal(t, uint64(now+int64(i)), epoch.Time) +// +// } +// err = db2.Close() +// if err != nil { +// t.Fatal(err) +// } +// fd3 := feed.New(accountInfo1, client, logger) +// db3, err := leveldb.Open(NewMemStorage(fd3, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd3.SetUpdateTracker(db3) +// for i := 0; i < 10000; i++ { +// topic := utils.HashString(fmt.Sprintf("topic-%d", i)) +// epoch, err := fd3.GetFeedUpdateEpoch(topic) +// if err != nil { +// t.Fatal(err) +// } +// require.Equal(t, uint64(now+int64(i)), epoch.Time) +// +// } +// err = db3.Close() +// if err != nil { +// t.Fatal(err) +// } +// fd4 := feed.New(accountInfo1, client, logger) +// db4, err := leveldb.Open(NewMemStorage(fd4, client, user1, "username", "password", logger), nil) +// if err != nil { +// t.Fatal(err) +// } +// fd4.SetUpdateTracker(db4) +// for i := 0; i < 10000; i++ { +// topic := utils.HashString(fmt.Sprintf("topic-%d", i)) +// epoch, err := fd4.GetFeedUpdateEpoch(topic) +// if err != nil { +// t.Fatal(err) +// } +// require.Equal(t, uint64(now+int64(i)), epoch.Time) +// +// } +// err = db4.Close() +// if err != nil { +// t.Fatal(err) +// } +// }) +//} diff --git a/pkg/test/login_test.go b/pkg/test/login_test.go index 8e226239..4d1cd8e9 100644 --- a/pkg/test/login_test.go +++ b/pkg/test/login_test.go @@ -59,18 +59,18 @@ func TestLogin(t *testing.T) { t.Fatal(err) } - _, err = userObject.LoginUserV2("not_an_username", "password1", mockClient, tm, sm, "", false) + _, err = userObject.LoginUserV2("not_an_username", "password1", mockClient, tm, sm, "") if !errors.Is(err, user.ErrUserNameNotFound) { t.Fatal(err) } - _, err = userObject.LoginUserV2("7e4567e7cb003804992eef11fd5c757275a4c", "wrong_password", mockClient, tm, sm, "", false) + _, err = userObject.LoginUserV2("7e4567e7cb003804992eef11fd5c757275a4c", "wrong_password", mockClient, tm, sm, "") if !errors.Is(err, user.ErrInvalidPassword) { t.Fatal(err) } // addUserAndSessionToMap user again - sr1, err := userObject.LoginUserV2("7e4567e7cb003804992eef11fd5c757275a4c", "password1twelve", mockClient, tm, sm, "", false) + sr1, err := userObject.LoginUserV2("7e4567e7cb003804992eef11fd5c757275a4c", "password1twelve", mockClient, tm, sm, "") if err != nil { t.Fatal(err) } @@ -139,12 +139,12 @@ func TestLogin(t *testing.T) { t.Fatal(err) } - lr1, err := userObject.LoginUserV2(user1, pass, mockClient, tm, sm, "", false) + lr1, err := userObject.LoginUserV2(user1, pass, mockClient, tm, sm, "") if err != nil { t.Fatal(err) } login1 := lr1.UserInfo - lr2, err := userObject.LoginUserV2(user1, pass+pass, mockClient, tm, sm, "", false) + lr2, err := userObject.LoginUserV2(user1, pass+pass, mockClient, tm, sm, "") if err != nil { t.Fatal(err) } @@ -212,12 +212,12 @@ func TestLogin(t *testing.T) { } ui2 := sr2.UserInfo - lr1, err := userObject.LoginUserV2(user1, pass, mockClient, tm, sm, "", false) + lr1, err := userObject.LoginUserV2(user1, pass, mockClient, tm, sm, "") if err != nil { t.Fatal(err) } login1 := lr1.UserInfo - lr2, err := userObject.LoginUserV2(user1, pass+pass, mockClient, tm, sm, "", false) + lr2, err := userObject.LoginUserV2(user1, pass+pass, mockClient, tm, sm, "") if err != nil { t.Fatal(err) } diff --git a/pkg/user/login.go b/pkg/user/login.go index 51ee63aa..e88d1216 100644 --- a/pkg/user/login.go +++ b/pkg/user/login.go @@ -46,7 +46,7 @@ type LoginResponse struct { // LoginUserV2 checks if the user is present and logs in the user. It also creates the required information // to execute user function and stores it in memory. -func (u *Users) LoginUserV2(userName, passPhrase string, client blockstore.Client, tm taskmanager.TaskManagerGO, sm subscriptionManager.SubscriptionManager, sessionId string, initFeedTracker bool) (*LoginResponse, error) { +func (u *Users) LoginUserV2(userName, passPhrase string, client blockstore.Client, tm taskmanager.TaskManagerGO, sm subscriptionManager.SubscriptionManager, sessionId string) (*LoginResponse, error) { // check if sessionId is still active if u.IsUserLoggedIn(sessionId) { // skipcq: TCV-001 return nil, ErrUserAlreadyLoggedIn @@ -92,13 +92,6 @@ func (u *Users) LoginUserV2(userName, passPhrase string, client blockstore.Clien return nil, err } - //if initFeedTracker { - // _, err = tracker.InitFeedsTracker(utils.Address(address), userName, passPhrase, fd, client, u.logger) - // if err != nil { - // u.logger.Errorf("error initializing feeds tracker: %v", err) - // } - //} - // Instantiate pod, dir & file objects file := f.NewFile(userName, client, fd, accountInfo.GetAddress(), tm, u.logger) pod := p.NewPod(u.client, fd, acc, tm, sm, u.logger)