From 8d7264059ebd4ece4dfb280d53d2bc37544a1b86 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Tue, 26 Sep 2023 21:49:00 +0700 Subject: [PATCH 1/3] Fix asynquery duplicates --- app/asynquery/asynquery.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/app/asynquery/asynquery.go b/app/asynquery/asynquery.go index ff36c62d..45bd471a 100644 --- a/app/asynquery/asynquery.go +++ b/app/asynquery/asynquery.go @@ -3,6 +3,7 @@ package asynquery import ( "context" "crypto/md5" + "database/sql" "encoding/json" "errors" "fmt" @@ -95,14 +96,15 @@ func (m *CallManager) Shutdown() { // Call accepts JSON-RPC request for later asynchronous processing. func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynquery, error) { var ( - fp string - fpp any - floc *FileLocation - ok bool + filePath string + filePathParam any + fileLoc *FileLocation + ok bool ) p := req.Params.(map[string]any) + // Metadata-only update - if fpp, ok = p[FilePathParam]; !ok { + if filePathParam, ok = p[FilePathParam]; !ok { aq, err := m.createQueryRecord(queryParams{userID: userID}, req) if err != nil { m.logger.Warn("error adding query record", "err", err, "user_id", userID) @@ -120,20 +122,25 @@ func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynque m.logger.Info("query added and queued", "id", aq.ID, "user_id", userID) return aq, nil } - if fp, ok = fpp.(string); !ok { + if filePath, ok = filePathParam.(string); !ok { return nil, errors.New("invalid file path") } - floc, err := parseFilePath(fp) + fileLoc, err := parseFilePath(filePath) if err != nil { return nil, err } - aq, err := m.createQueryRecord(queryParams{userID: userID, uploadID: floc.UploadID}, req) + if aq, err := m.getQueryRecord(context.TODO(), queryParams{userID: userID, uploadID: fileLoc.UploadID}); err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, err + } else if aq.Status != models.AsynqueryStatusFailed { + return aq, nil + } + aq, err := m.createQueryRecord(queryParams{userID: userID, uploadID: fileLoc.UploadID}, req) if err != nil { m.logger.Warn("error adding query record", "err", err, "user_id", userID) return nil, fmt.Errorf("error adding query record: %w", err) } - m.logger.Info("query added", "id", aq.ID, "user_id", userID, "upload_id", floc.UploadID) + m.logger.Info("query added", "id", aq.ID, "user_id", userID, "upload_id", fileLoc.UploadID) return aq, nil } From 19f9e50debbd1e7df1907fd016b189143b7d3295 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Sun, 15 Oct 2023 00:50:19 +0700 Subject: [PATCH 2/3] Add asynquery.Call test --- app/asynquery/asynquery.go | 11 ++-- app/asynquery/asynquery_test.go | 98 ++++++++++++++++++++++++++++- app/asynquery/http_handlers_test.go | 7 ++- 3 files changed, 108 insertions(+), 8 deletions(-) diff --git a/app/asynquery/asynquery.go b/app/asynquery/asynquery.go index 45bd471a..98fbecd8 100644 --- a/app/asynquery/asynquery.go +++ b/app/asynquery/asynquery.go @@ -130,12 +130,15 @@ func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynque return nil, err } - if aq, err := m.getQueryRecord(context.TODO(), queryParams{userID: userID, uploadID: fileLoc.UploadID}); err != nil && !errors.Is(err, sql.ErrNoRows) { + aq, err := m.getQueryRecord(context.TODO(), queryParams{userID: userID, uploadID: fileLoc.UploadID}) + // If query record exists and is not failed, return it + if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, err - } else if aq.Status != models.AsynqueryStatusFailed { + } + if err == nil && aq.Status != models.AsynqueryStatusFailed { return aq, nil } - aq, err := m.createQueryRecord(queryParams{userID: userID, uploadID: fileLoc.UploadID}, req) + aq, err = m.createQueryRecord(queryParams{userID: userID, uploadID: fileLoc.UploadID}, req) if err != nil { m.logger.Warn("error adding query record", "err", err, "user_id", userID) return nil, fmt.Errorf("error adding query record: %w", err) @@ -306,7 +309,7 @@ func (m *CallManager) createQueryRecord(params queryParams, request *jsonrpc.RPC if err != nil { return nil, fmt.Errorf("error marshaling request: %w", err) } - q.ID = fmt.Sprintf("%x", md5.Sum(rb)) + q.ID = fmt.Sprintf("%x%v", md5.Sum(rb), time.Now().UnixMilli()) q.Body = null.JSONFrom(rb) return &q, q.Insert(m.db, boil.Infer()) } diff --git a/app/asynquery/asynquery_test.go b/app/asynquery/asynquery_test.go index 2f921727..bec85bb0 100644 --- a/app/asynquery/asynquery_test.go +++ b/app/asynquery/asynquery_test.go @@ -1,11 +1,16 @@ package asynquery import ( + "reflect" "testing" + "github.com/OdyseeTeam/odysee-api/app/query" "github.com/OdyseeTeam/odysee-api/apps/lbrytv/config" "github.com/OdyseeTeam/odysee-api/internal/e2etest" + "github.com/OdyseeTeam/odysee-api/models" "github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter" + "github.com/Pallinder/go-randomdata" + "github.com/ybbus/jsonrpc" "github.com/stretchr/testify/suite" ) @@ -17,10 +22,100 @@ type asynquerySuite struct { userHelper *e2etest.UserTestHelper } +func TestParseFilePath(t *testing.T) { + shortID := randomdata.Alphanumeric(32) + longID := randomdata.Alphanumeric(64) + tests := []struct { + filePath string + want *FileLocation + wantErr bool + }{ + { + filePath: "https://uploads.odysee.com/v1/uploads/" + shortID, + want: &FileLocation{Server: "uploads.odysee.com", UploadID: shortID}, + wantErr: false, + }, + { + filePath: "https://uploads.odysee.com/v1/uploads/" + longID, + want: &FileLocation{Server: "uploads.odysee.com", UploadID: longID}, + wantErr: false, + }, + { + filePath: "invalidpath", + want: nil, + wantErr: true, + }, + { + filePath: "https://uploads.odysee.com/v1/uploads/123", + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.filePath, func(t *testing.T) { + got, err := parseFilePath(tt.filePath) + if (err != nil) != tt.wantErr { + t.Errorf("parseFilePath() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseFilePath() = %v, want %v", got, tt.want) + } + }) + } +} + func TestAsynquerySuite(t *testing.T) { suite.Run(t, new(asynquerySuite)) } +func (s *asynquerySuite) TestCall() { + uploadID := randomdata.Alphanumeric(64) + req := jsonrpc.NewRequest(query.MethodStreamCreate, map[string]any{ + "name": "publish2test-dummymd", + "title": "Publish v2 test for dummy.md", + "description": "", + "locations": []string{}, + "bid": "0.01000000", + "languages": []string{"en"}, + "tags": []string{"c:disable-comments"}, + "thumbnail_url": "https://thumbs.odycdn.com/92399dc6df41af6f7c61def97335dfa5.webp", + "release_time": 1661882701, + "blocking": true, + "preview": false, + "license": "None", + "channel_id": "febc557fcfbe5c1813eb621f7d38a80bc4355085", + FilePathParam: "https://uploads-v4.api.na-backend.odysee.com/v1/uploads/" + uploadID, + }) + req.ID = randomdata.Number(1, 999999999) + + // Making sure there are no duplicate entries afterwards + for i := 0; i < 10; i++ { + _, err := s.manager.Call(s.userHelper.UserID(), req) + s.Require().NoError(err) + } + aqs, err := models.Asynqueries( + models.AsynqueryWhere.UploadID.EQ(uploadID), + models.AsynqueryWhere.UserID.EQ(s.userHelper.UserID()), + ).All(s.userHelper.DB) + s.Require().NoError(err) + s.EqualValues(1, len(aqs)) + + aq := aqs[0] + s.EqualValues(uploadID, aq.UploadID) + s.EqualValues(s.userHelper.UserID(), aq.UserID) + s.EqualValues(models.AsynqueryStatusReceived, aq.Status) + + dreq := &jsonrpc.RPCRequest{} + s.Require().NoError(aq.Body.Unmarshal(dreq)) + dparams := dreq.Params.(map[string]any) + params := req.Params.(map[string]any) + s.Equal(params["name"], dparams["name"]) + s.Equal(params["title"], dparams["title"]) + s.Equal(params[FilePathParam], dparams[FilePathParam]) +} + func (s *asynquerySuite) SetupSuite() { s.userHelper = &e2etest.UserTestHelper{} s.Require().NoError(s.userHelper.Setup(s.T())) @@ -30,7 +125,8 @@ func (s *asynquerySuite) SetupSuite() { m, err := NewCallManager(ro, s.userHelper.DB, zapadapter.NewKV(nil)) s.Require().NoError(err) s.manager = m - go m.Start() + // This should be called per-test, if needed + // go m.Start() } func (s *asynquerySuite) TearDownSuite() { diff --git a/app/asynquery/http_handlers_test.go b/app/asynquery/http_handlers_test.go index 0201bbd7..760e5c3f 100644 --- a/app/asynquery/http_handlers_test.go +++ b/app/asynquery/http_handlers_test.go @@ -98,8 +98,8 @@ func (s *asynqueryHandlerSuite) TestCreateUpload() { func (s *asynqueryHandlerSuite) TestCreate() { require := s.Require() ts := httptest.NewServer(s.router) - uploadID := randomdata.Alphanumeric(64) + uploadID := randomdata.Alphanumeric(64) req := jsonrpc.NewRequest(query.MethodStreamCreate, map[string]any{ "name": "publish2test-dummymd", "title": "Publish v2 test for dummy.md", @@ -121,7 +121,7 @@ func (s *asynqueryHandlerSuite) TestCreate() { streamCreateReq, err := json.Marshal(req) require.NoError(err) - resp := (&test.HTTPTest{ + createRequest := &test.HTTPTest{ Method: http.MethodPost, URL: ts.URL + "/api/v1/asynqueries/", ReqHeader: map[string]string{ @@ -129,7 +129,8 @@ func (s *asynqueryHandlerSuite) TestCreate() { }, ReqBody: bytes.NewReader(streamCreateReq), Code: http.StatusCreated, - }).Run(s.router, s.T()) + } + resp := createRequest.Run(s.router, s.T()) var query *models.Asynquery e2etest.Wait(s.T(), "query settling in the database", 5*time.Second, 1000*time.Millisecond, func() error { From 60df04180c87441322e544d071c3b3e9e2bc0a23 Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Mon, 16 Oct 2023 23:49:02 +0700 Subject: [PATCH 3/3] Bump golang.org/x/net --- go.mod | 8 ++++---- go.sum | 14 +++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index a4c96b35..ce911a3d 100644 --- a/go.mod +++ b/go.mod @@ -194,11 +194,11 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/arch v0.3.0 // indirect - golang.org/x/crypto v0.9.0 // indirect + golang.org/x/crypto v0.14.0 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.6.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index b4674c53..01321dc4 100644 --- a/go.sum +++ b/go.sum @@ -2011,8 +2011,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2161,8 +2162,9 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2344,8 +2346,9 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2353,8 +2356,8 @@ golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2370,8 +2373,9 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=