diff --git a/Makefile b/Makefile index 4bfbd69b..649d9db9 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,8 @@ date := $(shell date "+%Y-%m-%d-%H-%M") api_version := $(shell git describe --tags --match 'api-v*'|sed 's/api-v\([0-9.]*\)/\1/') watchman_version := $(shell git describe --tags --match 'watchman-v*'|sed 's/api-v\([0-9.]*\)/\1/') git_hash := $(shell git rev-parse --short HEAD) -forklift_version = latest -upload_version = latest +forklift_version = dev +uploads_version = dev3 .PHONY: test test: @@ -75,7 +75,7 @@ forklift: uploads: GOARCH=amd64 GOOS=linux go build \ -o dist/linux_amd64/uploads \ - -ldflags "-s -w -X github.com/OdyseeTeam/odysee-api/version.version=$(forklift_version) \ + -ldflags "-s -w -X github.com/OdyseeTeam/odysee-api/version.version=$(uploads_version) \ -X github.com/OdyseeTeam/odysee-api/version.commit=$(git_hash) \ -X github.com/OdyseeTeam/odysee-api/apps/version.buildDate=$(date)" \ ./apps/uploads/cmd/ @@ -95,7 +95,7 @@ watchman_example: goa example github.com/OdyseeTeam/odysee-api/apps/watchman/design -o apps/watchman uploads_image: - docker buildx build -t odyseeteam/uploads:dev --platform linux/amd64 -f ./build/uploads/Dockerfile . + docker buildx build -t odyseeteam/uploads:$(uploads_version) --platform linux/amd64 -f ./build/uploads/Dockerfile . forklift_image: - docker buildx build -t odyseeteam/forklift:dev --platform linux/amd64 -f ./build/forklift/Dockerfile + docker buildx build -t odyseeteam/forklift:$(forklift_version) --platform linux/amd64 -f ./build/forklift/Dockerfile . diff --git a/api/routes.go b/api/routes.go index c7ef8eab..fdad2695 100644 --- a/api/routes.go +++ b/api/routes.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/OdyseeTeam/odysee-api/app/asynquery" "github.com/OdyseeTeam/odysee-api/app/auth" "github.com/OdyseeTeam/odysee-api/app/geopublish" gpmetrics "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics" @@ -21,6 +22,7 @@ import ( "github.com/OdyseeTeam/odysee-api/internal/middleware" "github.com/OdyseeTeam/odysee-api/internal/monitor" "github.com/OdyseeTeam/odysee-api/internal/status" + "github.com/OdyseeTeam/odysee-api/pkg/keybox" "github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter" "github.com/OdyseeTeam/odysee-api/pkg/redislocker" "github.com/OdyseeTeam/player-server/pkg/paid" @@ -158,6 +160,29 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptio } } + keybox, err := keybox.KeyfobFromString(config.GetPaidTokenPrivKey()) + if err != nil { + panic(err) + } + + busRedisOpts, err := config.GetRedisBusOpts() + if err != nil { + panic(err) + } + launcher := asynquery.NewLauncher( + asynquery.WithBusRedisOpts(busRedisOpts), + asynquery.WithLogger(zapadapter.NewKV(nil)), + asynquery.WithPrivateKey(keybox.PrivateKey()), + asynquery.WithDB(nil), + ) + + err = launcher.InstallRoutes(v1Router) + if err != nil { + panic(err) + } + + go launcher.Start() + onceMetrics.Do(func() { gpmetrics.RegisterMetrics() redislocker.RegisterMetrics() diff --git a/app/asynquery/asynquery.go b/app/asynquery/asynquery.go index bc88d46f..163add3b 100644 --- a/app/asynquery/asynquery.go +++ b/app/asynquery/asynquery.go @@ -23,10 +23,12 @@ import ( "github.com/ybbus/jsonrpc" ) +const FilePathParam = "file_path" + var ( sdkNetError = errors.New("network level sdk error") sdkClientError = errors.New("client level sdk error") - reFilePathURL = regexp.MustCompile(`^https?://(.+)/(\w+)/\w+/([a-zA-Z0-9]+)$`) + reFilePathURL = regexp.MustCompile(`^https?://([^/]+)/.+/([a-zA-Z0-9]{32,})$`) ) type CallManager struct { @@ -42,7 +44,6 @@ type Caller struct { type FileLocation struct { Server string - Version string UploadID string } @@ -52,9 +53,10 @@ type Result struct { Response *jsonrpc.RPCResponse } -func NewCallManager(redisOpts asynq.RedisConnOpt, logger logging.KVLogger) (*CallManager, error) { +func NewCallManager(redisOpts asynq.RedisConnOpt, db boil.Executor, logger logging.KVLogger) (*CallManager, error) { m := CallManager{ logger: logger, + db: db, } b, err := bus.New(redisOpts, bus.WithConcurrency(10)) if err != nil { @@ -70,8 +72,7 @@ func (m *CallManager) NewCaller(userID int) *Caller { } // Start launches asynchronous query handlers and blocks until stopped. -func (m *CallManager) Start(db boil.Executor) error { - m.db = db +func (m *CallManager) Start() error { registerServerMetrics() m.bus.AddHandler(tasks.TaskAsynqueryMerge, m.HandleMerge) return m.bus.StartHandlers() @@ -81,7 +82,7 @@ func (m *CallManager) Shutdown() { m.bus.Shutdown() } -// Call accepts JSON-RPC request for later asynchronous processing. This may be called from a different process. +// Call accepts JSON-RPC request for later asynchronous processing. func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynquery, error) { p := req.Params.(map[string]interface{}) fp, err := parseFilePath(p["file_path"].(string)) @@ -94,6 +95,7 @@ func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynque m.logger.Warn("error adding query record", "err", err, "user_id", userID) return nil, err } + m.logger.Info("query record added", "id", aq.ID, "user_id", userID, "upload_id", fp.UploadID) return aq, nil } @@ -214,11 +216,12 @@ func (m *CallManager) createQueryRecord(userID int, request *jsonrpc.RPCRequest, return &q, q.Insert(m.db, boil.Infer()) } -func (m *CallManager) getQueryRecord(ctx context.Context, uploadID string, userID int32) (*models.Asynquery, error) { +func (m *CallManager) getQueryRecord(ctx context.Context, queryID string, userID int32) (*models.Asynquery, error) { l := logging.GetFromContext(ctx) mods := []qm.QueryMod{ - models.AsynqueryWhere.UploadID.EQ(uploadID), + // models.AsynqueryWhere.UploadID.EQ(uploadID), + models.AsynqueryWhere.ID.EQ(queryID), } if userID > 0 { mods = append(mods, models.AsynqueryWhere.UserID.EQ(int(userID))) @@ -278,13 +281,13 @@ func (m *CallManager) RetryDelay(n int, err error, t *asynq.Task) time.Duration func parseFilePath(filePath string) (*FileLocation, error) { matches := reFilePathURL.FindStringSubmatch(filePath) if len(matches) < 3 { - return nil, fmt.Errorf("invalid file path: %s", filePath) + return nil, fmt.Errorf("invalid file location: %s", filePath) } fl := &FileLocation{ - Server: matches[0], - Version: matches[1], + Server: matches[1], UploadID: matches[2], } + fmt.Println(fl) return fl, nil } diff --git a/app/asynquery/asynquery_test.go b/app/asynquery/asynquery_test.go index b60a44ff..ef4630a1 100644 --- a/app/asynquery/asynquery_test.go +++ b/app/asynquery/asynquery_test.go @@ -1,25 +1,19 @@ package asynquery import ( - "context" "testing" - "time" - "github.com/OdyseeTeam/odysee-api/app/query" "github.com/OdyseeTeam/odysee-api/apps/lbrytv/config" "github.com/OdyseeTeam/odysee-api/internal/e2etest" "github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter" "github.com/stretchr/testify/suite" - "github.com/ybbus/jsonrpc" ) -type cleanupFunc func() error - type asynquerySuite struct { suite.Suite - m *CallManager + manager *CallManager userHelper *e2etest.UserTestHelper } @@ -28,50 +22,45 @@ func TestAsynquerySuite(t *testing.T) { } func (s *asynquerySuite) TestSuccessCallback() { - results := make(chan AsyncQueryResult) - s.m.SetResultChannel(query.MethodWalletBalance, results) - - c := s.m.NewCaller(s.userHelper.UserID()) - r, err := c.Call(context.Background(), jsonrpc.NewRequest(query.MethodWalletBalance)) - s.Require().NoError(err) - s.Nil(r) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - select { - case r := <-results: - s.NotEmpty(r.Response.Result.(map[string]any)["available"]) - s.Equal(query.MethodWalletBalance, r.Query.Request.Method) - s.Equal(s.userHelper.UserID(), r.Query.UserID) - s.NotEmpty(r.Query.ID) - case <-ctx.Done(): - s.T().Log("waiting too long") - s.T().FailNow() - } + // c := s.manager.NewCaller(s.userHelper.UserID()) + // r, err := c.A(context.Background(), jsonrpc.NewRequest(query.MethodWalletBalance)) + // s.Require().NoError(err) + // s.Nil(r) + + // ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // defer cancel() + // select { + // case r := <-results: + // s.NotEmpty(r.Response.Result.(map[string]any)["available"]) + // s.Equal(query.MethodWalletBalance, r.Query.Request.Method) + // s.Equal(s.userHelper.UserID(), r.Query.UserID) + // s.NotEmpty(r.Query.ID) + // case <-ctx.Done(): + // s.T().Log("waiting too long") + // s.T().FailNow() + // } } func (s *asynquerySuite) TestErrorCallback() { - results := make(chan AsyncQueryResult) - s.m.SetResultChannel(query.MethodPublish, results) - - c := s.m.NewCaller(s.userHelper.UserID()) - r, err := c.Call(context.Background(), jsonrpc.NewRequest(query.MethodPublish)) - s.Require().NoError(err) - s.Nil(r) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - select { - case r := <-results: - s.Nil(r.Response.Result) - s.NotNil(r.Response.Error) - s.Equal(query.MethodPublish, r.Query.Request.Method) - s.Equal(s.userHelper.UserID(), r.Query.UserID) - s.NotEmpty(r.Query.ID) - case <-ctx.Done(): - s.T().Log("waiting too long") - s.T().FailNow() - } + // c := s.manager.NewCaller(s.userHelper.UserID()) + // r, err := c.Call(context.Background(), jsonrpc.NewRequest(query.MethodPublish)) + // s.Require().NoError(err) + // s.Nil(r) + + // ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // defer cancel() + // select { + // case r := <-results: + // s.Nil(r.Response.Result) + // s.NotNil(r.Response.Error) + // s.Equal(query.MethodPublish, r.Query.Request.Method) + // s.Equal(s.userHelper.UserID(), r.Query.UserID) + // s.NotEmpty(r.Query.ID) + // case <-ctx.Done(): + // s.T().Log("waiting too long") + // s.T().FailNow() + // } } func (s *asynquerySuite) SetupSuite() { @@ -80,14 +69,13 @@ func (s *asynquerySuite) SetupSuite() { ro, err := config.GetAsynqRedisOpts() s.Require().NoError(err) - m, err := NewCallManager(ro, zapadapter.NewKV(nil)) + m, err := NewCallManager(ro, s.userHelper.DB, zapadapter.NewKV(nil)) s.Require().NoError(err) - s.m = m - go m.Start(s.userHelper.DB) + s.manager = m + go m.Start() } func (s *asynquerySuite) TearDownSuite() { config.RestoreOverridden() - s.userHelper.Cleanup() - s.m.Shutdown() + s.manager.Shutdown() } diff --git a/app/asynquery/http_handlers.go b/app/asynquery/http_handlers.go index 207bd80f..76fbe00b 100644 --- a/app/asynquery/http_handlers.go +++ b/app/asynquery/http_handlers.go @@ -16,13 +16,14 @@ import ( "github.com/OdyseeTeam/odysee-api/pkg/keybox" "github.com/OdyseeTeam/odysee-api/pkg/logging" "github.com/OdyseeTeam/odysee-api/pkg/rpcerrors" + "github.com/mitchellh/mapstructure" "github.com/ybbus/jsonrpc" "github.com/gorilla/mux" ) const ( - UploadServiceURL = "https://uploads.na-backend.odysee.com/v1/uploads/" + UploadServiceURL = "https://uploads-v4.na-backend.odysee.com/v1/uploads/" StatusProceed = "proceed" StatusAuthError = "auth_error" ) @@ -33,7 +34,7 @@ type QueryHandler struct { keyfob *keybox.Keyfob } -type UploadPayload struct { +type UploadTokenResponse struct { Token string `json:"token"` Location string `json:"location"` } @@ -75,12 +76,13 @@ func (h QueryHandler) RetrieveUploadToken(w http.ResponseWriter, r *http.Request } resp, err := json.Marshal(Response{ Status: StatusProceed, - Payload: UploadPayload{Token: token, Location: UploadServiceURL}, + Payload: UploadTokenResponse{Token: token, Location: UploadServiceURL}, }) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } + w.Header().Add("location", UploadServiceURL) w.Write(resp) } @@ -130,8 +132,11 @@ func (h QueryHandler) Get(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) return } + + log := h.logger.With("query_id", queryID) user, err := auth.FromRequest(r) if err != nil { + log.Info("unauthorized request") w.WriteHeader(http.StatusUnauthorized) return } @@ -139,10 +144,12 @@ func (h QueryHandler) Get(w http.ResponseWriter, r *http.Request) { aq, err := h.callManager.getQueryRecord(context.TODO(), queryID, int32(user.ID)) if err != nil { if errors.Is(err, sql.ErrNoRows) { + log.Info("query not found") w.WriteHeader(http.StatusNotFound) - } else { - rpcerrors.Write(w, err) + return } + log.Info("query retrieval error", "err", err) + rpcerrors.Write(w, err) return } @@ -157,8 +164,32 @@ func (h QueryHandler) Get(w http.ResponseWriter, r *http.Request) { rpcerrors.Write(w, errors.Err(aq.Error)) } default: - w.WriteHeader(http.StatusAccepted) + w.WriteHeader(http.StatusOK) + } +} + +func (r *Response) UnmarshalJSON(data []byte) error { + type responseAlias Response // Alias to avoid recursion + aux := &responseAlias{ + Payload: json.RawMessage{}, } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + *r = Response(*aux) + switch r.Status { + case "proceed": + var payload UploadTokenResponse + if err := mapstructure.Decode(r.Payload, &payload); err != nil { + return fmt.Errorf("error decoding payload: %w", err) + } + r.Payload = payload + default: + return errors.Err("unknown status") + } + + return nil } func isMethodAllowed(method string) bool { diff --git a/app/asynquery/http_handlers_test.go b/app/asynquery/http_handlers_test.go new file mode 100644 index 00000000..9b586247 --- /dev/null +++ b/app/asynquery/http_handlers_test.go @@ -0,0 +1,205 @@ +package asynquery + +import ( + "bytes" + "database/sql" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/OdyseeTeam/odysee-api/app/auth" + "github.com/OdyseeTeam/odysee-api/app/query" + "github.com/OdyseeTeam/odysee-api/app/wallet" + "github.com/OdyseeTeam/odysee-api/internal/e2etest" + "github.com/OdyseeTeam/odysee-api/internal/test" + "github.com/OdyseeTeam/odysee-api/internal/testdeps" + "github.com/OdyseeTeam/odysee-api/models" + "github.com/OdyseeTeam/odysee-api/pkg/keybox" + "github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter" + "github.com/Pallinder/go-randomdata" + "github.com/gorilla/mux" + "github.com/volatiletech/sqlboiler/queries/qm" + "github.com/ybbus/jsonrpc" + + "github.com/stretchr/testify/suite" +) + +type asynqueryHandlerSuite struct { + suite.Suite + + userHelper *e2etest.UserTestHelper + router *mux.Router + launcher *Launcher +} + +func TestAsynqueryHandlerSuite(t *testing.T) { + suite.Run(t, new(asynqueryHandlerSuite)) +} + +func (s *asynqueryHandlerSuite) TestRetrieveUploadToken() { + ts := httptest.NewServer(s.router) + + resp := (&test.HTTPTest{ + Method: http.MethodPost, + URL: ts.URL + "/api/v1/asynqueries/auth/upload-token", + ReqHeader: map[string]string{ + wallet.AuthorizationHeader: s.userHelper.TokenHeader, + }, + Code: http.StatusOK, + }).Run(s.router, s.T()) + + s.Equal(UploadServiceURL, resp.Header().Get("Location")) + rr := &Response{} + s.Require().NoError(json.Unmarshal(resp.Body.Bytes(), rr)) + s.Empty(rr.Error) + s.Require().Equal(StatusProceed, rr.Status) + s.NotEmpty(rr.Payload.(UploadTokenResponse).Token) + s.Equal(UploadServiceURL, rr.Payload.(UploadTokenResponse).Location) +} + +func (s *asynqueryHandlerSuite) TestCreate() { + ts := httptest.NewServer(s.router) + uploadID := randomdata.Alphanumeric(64) + + streamCreateReq, err := json.Marshal(jsonrpc.NewRequest(query.MethodStreamCreate, map[string]interface{}{ + "name": "publish2test-dummymd", + "title": "Publish v2 test for dummy.md", + "description": "", + "locations": []string{}, + "bid": "0.01000000", + "languages": []string{"en"}, + "tags": []string{"c:disable-comments"}, + "thumbnail_url": "https://thumbs.odycdn.com/92399dc6df41af6f7c61def97335dfa5.webp", + "release_time": 1661882701, + "blocking": true, + "preview": false, + "license": "None", + "channel_id": "febc557fcfbe5c1813eb621f7d38a80bc4355085", + "allow_duplicate_name": true, + FilePathParam: "https://uploads-v4.api.na-backend.odysee.com/v1/uploads/" + uploadID, + })) + s.Require().NoError(err) + + resp := (&test.HTTPTest{ + Method: http.MethodPost, + URL: ts.URL + "/api/v1/asynqueries/", + ReqHeader: map[string]string{ + wallet.AuthorizationHeader: s.userHelper.TokenHeader, + }, + ReqBody: bytes.NewReader(streamCreateReq), + Code: http.StatusCreated, + }).Run(s.router, s.T()) + loc, err := url.Parse(resp.Header().Get("Location")) + s.Require().NoError(err) + s.Regexp(`./[\w\d]{32}`, loc.Path) + + var query *models.Asynquery + e2etest.Wait(s.T(), "upload settling into the database", 5*time.Second, 1000*time.Millisecond, func() error { + mods := []qm.QueryMod{ + models.AsynqueryWhere.UploadID.EQ(uploadID), + models.AsynqueryWhere.UserID.EQ(s.userHelper.UserID()), + } + query, err = models.Asynqueries(mods...).One(s.launcher.db) + if errors.Is(err, sql.ErrNoRows) { + return e2etest.ErrWaitContinue + } else if err != nil { + return err + } + return nil + }) + s.Equal(models.AsynqueryStatusReceived, query.Status) + s.Equal(uploadID, query.UploadID) + + (&test.HTTPTest{ + Method: http.MethodGet, + URL: ts.URL + "/api/v1/asynqueries/" + query.ID, + Code: http.StatusUnauthorized, + }).Run(s.router, s.T()) + + (&test.HTTPTest{ + Method: http.MethodGet, + URL: ts.URL + "/api/v1/asynqueries/" + query.ID, + ReqHeader: map[string]string{ + wallet.AuthorizationHeader: s.userHelper.TokenHeader, + }, + Code: http.StatusOK, + }).Run(s.router, s.T()) + + // var rr *StreamCreateResponse + // s.Require().NoError(json.Unmarshal(resp.Body.Bytes(), rr)) + +} + +func (s *asynqueryHandlerSuite) SetupSuite() { + s.userHelper = &e2etest.UserTestHelper{} + s.Require().NoError(s.userHelper.Setup(s.T())) + s.router = mux.NewRouter() + + kf, err := keybox.GenerateKeyfob() + s.Require().NoError(err) + + redisHelper := testdeps.NewRedisTestHelper(s.T()) + s.launcher = NewLauncher( + WithBusRedisOpts(redisHelper.AsynqOpts), + WithLogger(zapadapter.NewKV(nil)), + WithPrivateKey(kf.PrivateKey()), + WithDB(s.userHelper.DB), + ) + s.router.Use(auth.Middleware(s.userHelper.Auther)) + + err = s.launcher.InstallRoutes(s.router) + s.Require().NoError(err) + + s.T().Cleanup(s.launcher.Shutdown) +} + +type StreamCreateResponse struct { + Height int `json:"height"` + Hex string `json:"hex"` + Inputs []struct { + Address string `json:"address"` + Amount string `json:"amount"` + Confirmations int `json:"confirmations"` + Height int `json:"height"` + Nout int `json:"nout"` + Timestamp int `json:"timestamp"` + Txid string `json:"txid"` + Type string `json:"type"` + } `json:"inputs"` + Outputs []struct { + Address string `json:"address"` + Amount string `json:"amount"` + ClaimID string `json:"claim_id,omitempty"` + ClaimOp string `json:"claim_op,omitempty"` + Confirmations int `json:"confirmations"` + Height int `json:"height"` + Meta struct { + } `json:"meta,omitempty"` + Name string `json:"name,omitempty"` + NormalizedName string `json:"normalized_name,omitempty"` + Nout int `json:"nout"` + PermanentURL string `json:"permanent_url,omitempty"` + Timestamp interface{} `json:"timestamp"` + Txid string `json:"txid"` + Type string `json:"type"` + Value struct { + Source struct { + Hash string `json:"hash"` + MediaType string `json:"media_type"` + Name string `json:"name"` + SdHash string `json:"sd_hash"` + Size string `json:"size"` + } `json:"source"` + StreamType string `json:"stream_type"` + } `json:"value,omitempty"` + ValueType string `json:"value_type,omitempty"` + } `json:"outputs"` + TotalFee string `json:"total_fee"` + TotalInput string `json:"total_input"` + TotalOutput string `json:"total_output"` + Txid string `json:"txid"` +} diff --git a/app/asynquery/routes.go b/app/asynquery/routes.go index ead009d8..1e4f21e1 100644 --- a/app/asynquery/routes.go +++ b/app/asynquery/routes.go @@ -6,15 +6,19 @@ import ( "github.com/OdyseeTeam/odysee-api/pkg/keybox" "github.com/OdyseeTeam/odysee-api/pkg/logging" + "github.com/gorilla/mux" "github.com/hibiken/asynq" + "github.com/volatiletech/sqlboiler/boil" ) type Launcher struct { - privateKey crypto.PrivateKey - busRedisURL string - logger logging.KVLogger - readyCancel context.CancelFunc + busRedisOpts asynq.RedisConnOpt + db boil.Executor + logger logging.KVLogger + manager *CallManager + privateKey crypto.PrivateKey + readyCancel context.CancelFunc } type LauncherOption func(*Launcher) @@ -25,9 +29,14 @@ func WithPrivateKey(privateKey crypto.PrivateKey) LauncherOption { } } -func WithBusRedisURL(busRedisURL string) LauncherOption { +func WithDB(db boil.Executor) LauncherOption { + return func(l *Launcher) { + l.db = db + } +} +func WithBusRedisOpts(redisOpts asynq.RedisConnOpt) LauncherOption { return func(l *Launcher) { - l.busRedisURL = busRedisURL + l.busRedisOpts = redisOpts } } @@ -58,18 +67,27 @@ func (l *Launcher) InstallRoutes(r *mux.Router) error { if err != nil { return err } - redisOpts, err := asynq.ParseRedisURI(l.busRedisURL) + manager, err := NewCallManager(l.busRedisOpts, l.db, l.logger) if err != nil { return err } - cm, err := NewCallManager(redisOpts, l.logger) - if err != nil { - return err - } - handler := NewHandler(cm, l.logger, keyfob) + l.manager = manager + handler := NewHandler(manager, l.logger, keyfob) r.HandleFunc("/api/v1/asynqueries/auth/pubkey", keybox.PublicKeyHandler(keyfob)).Methods("GET") r.HandleFunc("/api/v1/asynqueries/auth/upload-token", handler.RetrieveUploadToken).Methods("POST") - r.HandleFunc("/api/v1/asynqueries", handler.Create).Methods("POST") r.HandleFunc("/api/v1/asynqueries/{id}", handler.Get).Methods("GET") + r.HandleFunc("/api/v1/asynqueries/", handler.Create).Methods("POST") return nil } + +func (l *Launcher) Start() error { + err := l.manager.Start() + if err != nil { + return err + } + return nil +} + +func (l *Launcher) Shutdown() { + l.manager.Shutdown() +} diff --git a/app/geopublish/routes.go b/app/geopublish/routes.go index cfe97e6f..8b0d7411 100644 --- a/app/geopublish/routes.go +++ b/app/geopublish/routes.go @@ -17,11 +17,11 @@ import ( ) func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPrefix string, logger logging.KVLogger) (*Handler, error) { - redisOpts, err := config.GetRedisBusOpts() + redisOpts, err := config.GetRedisLockerOpts() if err != nil { return nil, fmt.Errorf("cannot get redis config: %w", err) } - asynqRedisOpts, err := config.GetAsynqRedisOpts() + asynqRedisOpts, err := config.GetRedisBusOpts() if err != nil { return nil, fmt.Errorf("cannot get redis config: %w", err) } diff --git a/apps/forklift/cmd/main.go b/apps/forklift/cmd/main.go index 8779293f..86cbed40 100644 --- a/apps/forklift/cmd/main.go +++ b/apps/forklift/cmd/main.go @@ -33,7 +33,7 @@ func main() { } logger := zapadapter.NewKV(nil) - cfg, err := configng.Read(".", "upload", "yaml") + cfg, err := configng.Read("./config", "upload", "yaml") if err != nil { panic(err) } diff --git a/apps/forklift/forklift.yml b/apps/forklift/config/forklift.yml similarity index 96% rename from apps/forklift/forklift.yml rename to apps/forklift/config/forklift.yml index 6b3a4768..33045105 100644 --- a/apps/forklift/forklift.yml +++ b/apps/forklift/config/forklift.yml @@ -1,5 +1,6 @@ Database: DSN: postgres://:odyseeteam@localhost + DBName: uploads AutoMigrations: true RedisBus: redis://:odyredis@localhost:6379/1 diff --git a/apps/lbrytv/config/config.go b/apps/lbrytv/config/config.go index 60b6a263..32d56829 100644 --- a/apps/lbrytv/config/config.go +++ b/apps/lbrytv/config/config.go @@ -71,21 +71,8 @@ func GetRedisLockerOpts() (*redis.Options, error) { } // GetRedisBusOpts returns Redis connection options in the official redis client format. -func GetRedisBusOpts() (*redis.Options, error) { - opts, err := redis.ParseURL(Config.Viper.GetString("RedisBus")) - if err != nil { - return nil, err - } - return opts, nil -} - -// GetAsynqRedisOpts rreturns Redis connection options ready for asynq package. -func GetAsynqRedisOpts() (asynq.RedisConnOpt, error) { - redisOpts, err := asynq.ParseRedisURI(Config.Viper.GetString("redis")) - if err != nil { - return nil, err - } - return redisOpts, nil +func GetRedisBusOpts() (asynq.RedisConnOpt, error) { + return asynq.ParseRedisURI("RedisBus") } // GetDatabase returns postgresql database server connection config. @@ -126,6 +113,11 @@ func GetPaidTokenPrivKey() string { return Config.Viper.GetString("PaidTokenPrivKey") } +// GetPaidTokenPrivKey returns absolute path to the private RSA key for generating paid tokens. +func GetUploadTokenPrivateKey() string { + return Config.Viper.GetString("UploadTokenPrivateKey") +} + // GetStreamsV5 returns config map for v5 streams endpoint. func GetStreamsV5() map[string]string { return Config.Viper.GetStringMapString("StreamsV5") diff --git a/apps/uploads/cmd/main.go b/apps/uploads/cmd/main.go index 891b37f6..944440c0 100644 --- a/apps/uploads/cmd/main.go +++ b/apps/uploads/cmd/main.go @@ -54,7 +54,7 @@ func main() { } func serve(logger logging.KVLogger) { - cfg, err := configng.Read(".", "uploads", "yaml") + cfg, err := configng.Read("./config", "uploads", "yaml") if err != nil { logger.Fatal("config reading failed", "err", err) } @@ -76,7 +76,7 @@ func serve(logger logging.KVLogger) { logger.Fatal("redislocker launch failed", "err", err) } - k, err := keybox.NewPublicKeyFromURL(cfg.V.GetString("PublicKeyURL")) + k, err := keybox.PublicKeyFromURL(cfg.V.GetString("PublicKeyURL")) if err != nil { logger.Fatal("public key loading failed", "url", cfg.V.GetString("PublicKeyURL"), "err", err) } diff --git a/apps/uploads/uploads.yml b/apps/uploads/config/uploads.yml similarity index 95% rename from apps/uploads/uploads.yml rename to apps/uploads/config/uploads.yml index 78fd6e47..a9fa7e73 100644 --- a/apps/uploads/uploads.yml +++ b/apps/uploads/config/uploads.yml @@ -1,5 +1,6 @@ Database: DSN: postgres://:odyseeteam@localhost + DBName: uploads AutoMigrations: true RedisLocker: redis://:odyredis@localhost:6379/1 diff --git a/build/forklift/Dockerfile b/build/forklift/Dockerfile index e69de29b..b4f207f4 100644 --- a/build/forklift/Dockerfile +++ b/build/forklift/Dockerfile @@ -0,0 +1,12 @@ +# syntax=docker/dockerfile:1 +FROM odyseeteam/transcoder-ffmpeg:5.1.1 AS ffmpeg +FROM alpine:3.17 +EXPOSE 8080 + +COPY --from=ffmpeg /build/ffprobe /usr/local/bin/ + +WORKDIR /app +COPY ./dist/linux_amd64/forklift /app +COPY ./apps/forklift/config/forklift.yml /app + +CMD ["/app/uploads", "serve"] diff --git a/build/uploads/Dockerfile b/build/uploads/Dockerfile index b2549061..678c3793 100644 --- a/build/uploads/Dockerfile +++ b/build/uploads/Dockerfile @@ -1,9 +1,11 @@ +# syntax=docker/dockerfile:1 FROM alpine:3.17 EXPOSE 8080 -RUN apk add --no-cache libc6-compat +# RUN apk add --no-cache libc6-compat WORKDIR /app -COPY ../dist/linux_amd64/uploads /app +COPY ./dist/linux_amd64/uploads /app +COPY ./apps/uploads/config/uploads.yml /app CMD ["/app/uploads", "serve"] diff --git a/config/config.go b/config/config.go index c53d3d36..6fa5be44 100644 --- a/config/config.go +++ b/config/config.go @@ -32,6 +32,14 @@ func ReadConfig(configName string) *ConfigWrapper { return c } +func (c DBConfig) GetFullDSN() string { + return c.Connection +} + +func (c DBConfig) GetDBName() string { + return c.DBName +} + func (c *ConfigWrapper) initPaths() { c.Viper.SetConfigName(c.configName) c.Viper.AddConfigPath("./config/") @@ -63,6 +71,7 @@ func (c *ConfigWrapper) GetDatabase() DBConfig { // Override sets a setting key value to whatever you supply. // Useful in tests: +// // config.Override("Lbrynet", "http://www.google.com:8080/api/proxy") // defer config.RestoreOverridden() // ... diff --git a/internal/e2etest/e2etest.go b/internal/e2etest/e2etest.go index 59aaba40..fd8b7712 100644 --- a/internal/e2etest/e2etest.go +++ b/internal/e2etest/e2etest.go @@ -14,6 +14,7 @@ import ( "github.com/OdyseeTeam/odysee-api/models" "github.com/OdyseeTeam/odysee-api/pkg/iapi" "github.com/OdyseeTeam/odysee-api/pkg/migrator" + "github.com/stretchr/testify/require" ) type TestUser struct { @@ -32,64 +33,55 @@ type UserTestHelper struct { } func (s *UserTestHelper) Setup(t *testing.T) error { + require := require.New(t) s.t = t config.Override("LbrynetServers", "") db, dbCleanup, err := migrator.CreateTestDB(migrator.DBConfigFromApp(config.GetDatabase()), storage.MigrationsFS) - if err != nil { - panic(err) - } + require.NoError(err) storage.SetDB(db) - t.Cleanup(func() { dbCleanup() }) - s.DB = db - s.SDKRouter = sdkrouter.New(config.GetLbrynetServers()) + + sdkr := sdkrouter.New(config.GetLbrynetServers()) th, err := test.GetTestTokenHeader() - if err != nil { - return err - } - s.TokenHeader = th + require.NoError(err) auther, err := wallet.NewOauthAuthenticator( config.GetOauthProviderURL(), config.GetOauthClientID(), - config.GetInternalAPIHost(), s.SDKRouter) - if err != nil { - return err - } - s.Auther = auther + config.GetInternalAPIHost(), sdkr) + require.NoError(err) w, err := test.InjectTestingWallet(test.TestUserID) - if err != nil { - return err - } + require.NoError(err) s.t.Logf("set up wallet userid=%v", w.UserID) - t.Cleanup(func() { - w.Unload() - w.RemoveFile() - }) - u, err := auther.Authenticate(s.TokenHeader, "127.0.0.1") - if err != nil { - return err - } + u, err := auther.Authenticate(th, "127.0.0.1") + require.NoError(err) iac, err := iapi.NewClient( iapi.WithOAuthToken(strings.TrimPrefix(th, wallet.TokenPrefix)), iapi.WithRemoteIP("8.8.8.8"), ) - if err != nil { - return err - } + require.NoError(err) cu := auth.NewCurrentUser(u, "8.8.8.8", iac, nil) + t.Cleanup(func() { + dbCleanup() + w.Unload() + w.RemoveFile() + }) + t.Cleanup(config.RestoreOverridden) + + s.Auther = auther + s.SDKRouter = sdkr + s.TokenHeader = th + s.DB = db s.TestUser = &TestUser{ User: u, SDKAddress: sdkrouter.GetSDKAddress(u), CurrentUser: cu, } - - t.Cleanup(config.RestoreOverridden) return nil } diff --git a/oapi.yml b/oapi.yml index 64832c7d..318d4122 100644 --- a/oapi.yml +++ b/oapi.yml @@ -36,6 +36,15 @@ GeoPublishConcurrency: 3 PaidTokenPrivKey: token_privkey.rsa +# Change this key for production! +# You can re-generate the key by running: +# openssl ecparam -genkey -name prime256v1 -noout -out private_key.pem && base64 -i private_key.pem +UploadTokenPrivateKey: >- + LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUdKMEY4RXNYUDJCbGtZWDhnaXdze + jF4NVU5a0IxTldnV3ZVSmNhVmpmT01vQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFTDN3Q0s2Y2FST2syK0 + o5dU5xQURtMXlKWHJFOFlrS3BjQnZPRFc2WXFWa29XVktYb25hNApwajR2ZEZjeG94bDVFZWJtRjhTa0k + 0djhqcnhEb3hRRlFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo= + LbrynetXServer: http://sdk.lbry.tech:5279/api LbrynetXPercentage: 0 diff --git a/pkg/configng/configng.go b/pkg/configng/configng.go index e299bfd6..3998f3fa 100644 --- a/pkg/configng/configng.go +++ b/pkg/configng/configng.go @@ -16,6 +16,7 @@ type S3Config struct { type PostgresConfig struct { DSN string + dbName string AutoMigrations bool } @@ -50,6 +51,10 @@ func (c PostgresConfig) GetFullDSN() string { return c.DSN } +func (c PostgresConfig) GetDBName() string { + return c.dbName +} + func (c PostgresConfig) MigrateOnConnect() bool { return c.AutoMigrations } diff --git a/pkg/keybox/keybox.go b/pkg/keybox/keybox.go index b298a567..71720238 100644 --- a/pkg/keybox/keybox.go +++ b/pkg/keybox/keybox.go @@ -29,24 +29,14 @@ type Validator struct { publicKey crypto.PublicKey } -// GenerateKeyfob generates a new Keyfob containing a public and a private key. -func GenerateKeyfob() (*Keyfob, error) { - pvk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - if err != nil { - return nil, fmt.Errorf("unable to generate private key: %e", err) - } - kf, err := NewKeyfob(pvk) - if err != nil { - return nil, err - } - return kf, nil -} - // NewKeyfob creates a new Keyfob from an existing private key. func NewKeyfob(privateKey crypto.PrivateKey) (*Keyfob, error) { + if privateKey == nil { + return nil, errors.New("empty private key supplied") + } edpk, ok := privateKey.(*ecdsa.PrivateKey) if !ok { - return nil, errors.New("private key is not an issue private key") + return nil, errors.New("private key is not an ecdsa private key") } kf := &Keyfob{ privateKey: edpk, @@ -55,14 +45,15 @@ func NewKeyfob(privateKey crypto.PrivateKey) (*Keyfob, error) { return kf, nil } -func KeyfobFromString(privateKey string) (*Keyfob, error) { - pvk, err := privateKeyFromString(privateKey) +// GenerateKeyfob generates a new Keyfob containing a public and a private key. +func GenerateKeyfob() (*Keyfob, error) { + pvk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) if err != nil { - return nil, fmt.Errorf("unable to load private key from string: %e", err) + return nil, fmt.Errorf("unable to generate private key: %e", err) } kf, err := NewKeyfob(pvk) if err != nil { - return nil, fmt.Errorf("unable to create keyfob: %e", err) + return nil, err } return kf, nil } @@ -76,6 +67,19 @@ func NewValidator(publicKey crypto.PublicKey) (*Validator, error) { return v, nil } +// KeyfobFromString creates a Keyfob from an existing private key supplied as a base64 string. +func KeyfobFromString(privateKey string) (*Keyfob, error) { + pvk, err := privateKeyFromString(privateKey) + if err != nil { + return nil, fmt.Errorf("unable to load private key from string: %e", err) + } + kf, err := NewKeyfob(pvk) + if err != nil { + return nil, fmt.Errorf("unable to create keyfob: %e", err) + } + return kf, nil +} + func ValidatorFromPublicKeyString(publicKey string) (*Validator, error) { v := &Validator{} var err error @@ -86,7 +90,7 @@ func ValidatorFromPublicKeyString(publicKey string) (*Validator, error) { return v, nil } -func NewPublicKeyFromURL(keyURL string) (crypto.PublicKey, error) { +func PublicKeyFromURL(keyURL string) (crypto.PublicKey, error) { r, err := http.Get(keyURL) if err != nil { return nil, fmt.Errorf("unable to retrieve public key: %w", err) @@ -135,6 +139,10 @@ func (kf Keyfob) PublicKey() crypto.PublicKey { return kf.publicKey } +func (kf Keyfob) PrivateKey() crypto.PrivateKey { + return kf.privateKey +} + // NewValidator creates a new Validator from the public key. func (kf Keyfob) Validator() *Validator { return &Validator{publicKey: kf.PublicKey()} diff --git a/pkg/keybox/keybox_test.go b/pkg/keybox/keybox_test.go index b46b445e..585c42e6 100644 --- a/pkg/keybox/keybox_test.go +++ b/pkg/keybox/keybox_test.go @@ -41,7 +41,7 @@ func TestNewValidator(t *testing.T) { assert.Equal(t, base64.StdEncoding.EncodeToString(publicKeyBytes), testPubKey) } -func TestGenerateToken(t *testing.T) { +func TestKeyfobFromStringGenerateToken(t *testing.T) { privateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) require.NoError(t, err) @@ -74,7 +74,7 @@ func TestPublicKeyHandler(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(PublicKeyHandler(kf.PublicKey()))) defer ts.Close() - pubKey, err := NewPublicKeyFromURL(ts.URL) + pubKey, err := PublicKeyFromURL(ts.URL) require.NoError(err) assert.Equal(pubKey, kf.PublicKey(), "retrieved public key does not match parsed public key") diff --git a/pkg/migrator/db.go b/pkg/migrator/db.go index f0de8ed8..4e6ae67b 100644 --- a/pkg/migrator/db.go +++ b/pkg/migrator/db.go @@ -7,8 +7,8 @@ import ( ) type DSNConfig interface { - DBName() string - FullDSN() string + GetFullDSN() string + GetDBName() string } type DBConfig struct { @@ -47,6 +47,10 @@ func (c *DBConfig) GetFullDSN() string { return fmt.Sprintf("%s/%s?%s", c.dsn, c.dbName, c.connOpts) } +func (c *DBConfig) GetDBName() string { + return c.dbName +} + func ConnectDB(cfg DSNConfig, migrationsFS ...embed.FS) (*sql.DB, error) { var err error db, err := sql.Open("postgres", cfg.GetFullDSN()) @@ -64,5 +68,5 @@ func ConnectDB(cfg DSNConfig, migrationsFS ...embed.FS) (*sql.DB, error) { } func DBConfigFromApp(cfg DSNConfig) *DBConfig { - return DefaultDBConfig().DSN(cfg.GetFullDSN()).Name(cfg.DBName) + return DefaultDBConfig().DSN(cfg.GetFullDSN()).Name(cfg.GetDBName()) }