Skip to content

Commit

Permalink
Merge pull request #505 from OdyseeTeam/fix-asynquery-dupe
Browse files Browse the repository at this point in the history
Fix asynquery dupe
  • Loading branch information
anbsky authored Oct 17, 2023
2 parents 6b9a6a5 + 60df041 commit 44b8204
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 23 deletions.
30 changes: 20 additions & 10 deletions app/asynquery/asynquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package asynquery
import (
"context"
"crypto/md5"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -95,14 +96,15 @@ func (m *CallManager) Shutdown() {
// Call accepts JSON-RPC request for later asynchronous processing.
func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynquery, error) {
var (
fp string
fpp any
floc *FileLocation
ok bool
filePath string
filePathParam any
fileLoc *FileLocation
ok bool
)
p := req.Params.(map[string]any)

// Metadata-only update
if fpp, ok = p[FilePathParam]; !ok {
if filePathParam, ok = p[FilePathParam]; !ok {
aq, err := m.createQueryRecord(queryParams{userID: userID}, req)
if err != nil {
m.logger.Warn("error adding query record", "err", err, "user_id", userID)
Expand All @@ -120,20 +122,28 @@ func (m *CallManager) Call(userID int, req *jsonrpc.RPCRequest) (*models.Asynque
m.logger.Info("query added and queued", "id", aq.ID, "user_id", userID)
return aq, nil
}
if fp, ok = fpp.(string); !ok {
if filePath, ok = filePathParam.(string); !ok {
return nil, errors.New("invalid file path")
}
floc, err := parseFilePath(fp)
fileLoc, err := parseFilePath(filePath)
if err != nil {
return nil, err
}

aq, err := m.createQueryRecord(queryParams{userID: userID, uploadID: floc.UploadID}, req)
aq, err := m.getQueryRecord(context.TODO(), queryParams{userID: userID, uploadID: fileLoc.UploadID})
// If query record exists and is not failed, return it
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
if err == nil && aq.Status != models.AsynqueryStatusFailed {
return aq, nil
}
aq, err = m.createQueryRecord(queryParams{userID: userID, uploadID: fileLoc.UploadID}, req)
if err != nil {
m.logger.Warn("error adding query record", "err", err, "user_id", userID)
return nil, fmt.Errorf("error adding query record: %w", err)
}
m.logger.Info("query added", "id", aq.ID, "user_id", userID, "upload_id", floc.UploadID)
m.logger.Info("query added", "id", aq.ID, "user_id", userID, "upload_id", fileLoc.UploadID)

return aq, nil
}
Expand Down Expand Up @@ -299,7 +309,7 @@ func (m *CallManager) createQueryRecord(params queryParams, request *jsonrpc.RPC
if err != nil {
return nil, fmt.Errorf("error marshaling request: %w", err)
}
q.ID = fmt.Sprintf("%x", md5.Sum(rb))
q.ID = fmt.Sprintf("%x%v", md5.Sum(rb), time.Now().UnixMilli())
q.Body = null.JSONFrom(rb)
return &q, q.Insert(m.db, boil.Infer())
}
Expand Down
98 changes: 97 additions & 1 deletion app/asynquery/asynquery_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package asynquery

import (
"reflect"
"testing"

"github.com/OdyseeTeam/odysee-api/app/query"
"github.com/OdyseeTeam/odysee-api/apps/lbrytv/config"
"github.com/OdyseeTeam/odysee-api/internal/e2etest"
"github.com/OdyseeTeam/odysee-api/models"
"github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter"
"github.com/Pallinder/go-randomdata"
"github.com/ybbus/jsonrpc"

"github.com/stretchr/testify/suite"
)
Expand All @@ -17,10 +22,100 @@ type asynquerySuite struct {
userHelper *e2etest.UserTestHelper
}

func TestParseFilePath(t *testing.T) {
shortID := randomdata.Alphanumeric(32)
longID := randomdata.Alphanumeric(64)
tests := []struct {
filePath string
want *FileLocation
wantErr bool
}{
{
filePath: "https://uploads.odysee.com/v1/uploads/" + shortID,
want: &FileLocation{Server: "uploads.odysee.com", UploadID: shortID},
wantErr: false,
},
{
filePath: "https://uploads.odysee.com/v1/uploads/" + longID,
want: &FileLocation{Server: "uploads.odysee.com", UploadID: longID},
wantErr: false,
},
{
filePath: "invalidpath",
want: nil,
wantErr: true,
},
{
filePath: "https://uploads.odysee.com/v1/uploads/123",
want: nil,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.filePath, func(t *testing.T) {
got, err := parseFilePath(tt.filePath)
if (err != nil) != tt.wantErr {
t.Errorf("parseFilePath() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseFilePath() = %v, want %v", got, tt.want)
}
})
}
}

func TestAsynquerySuite(t *testing.T) {
suite.Run(t, new(asynquerySuite))
}

func (s *asynquerySuite) TestCall() {
uploadID := randomdata.Alphanumeric(64)
req := jsonrpc.NewRequest(query.MethodStreamCreate, map[string]any{
"name": "publish2test-dummymd",
"title": "Publish v2 test for dummy.md",
"description": "",
"locations": []string{},
"bid": "0.01000000",
"languages": []string{"en"},
"tags": []string{"c:disable-comments"},
"thumbnail_url": "https://thumbs.odycdn.com/92399dc6df41af6f7c61def97335dfa5.webp",
"release_time": 1661882701,
"blocking": true,
"preview": false,
"license": "None",
"channel_id": "febc557fcfbe5c1813eb621f7d38a80bc4355085",
FilePathParam: "https://uploads-v4.api.na-backend.odysee.com/v1/uploads/" + uploadID,
})
req.ID = randomdata.Number(1, 999999999)

// Making sure there are no duplicate entries afterwards
for i := 0; i < 10; i++ {
_, err := s.manager.Call(s.userHelper.UserID(), req)
s.Require().NoError(err)
}
aqs, err := models.Asynqueries(
models.AsynqueryWhere.UploadID.EQ(uploadID),
models.AsynqueryWhere.UserID.EQ(s.userHelper.UserID()),
).All(s.userHelper.DB)
s.Require().NoError(err)
s.EqualValues(1, len(aqs))

aq := aqs[0]
s.EqualValues(uploadID, aq.UploadID)
s.EqualValues(s.userHelper.UserID(), aq.UserID)
s.EqualValues(models.AsynqueryStatusReceived, aq.Status)

dreq := &jsonrpc.RPCRequest{}
s.Require().NoError(aq.Body.Unmarshal(dreq))
dparams := dreq.Params.(map[string]any)
params := req.Params.(map[string]any)
s.Equal(params["name"], dparams["name"])
s.Equal(params["title"], dparams["title"])
s.Equal(params[FilePathParam], dparams[FilePathParam])
}

func (s *asynquerySuite) SetupSuite() {
s.userHelper = &e2etest.UserTestHelper{}
s.Require().NoError(s.userHelper.Setup(s.T()))
Expand All @@ -30,7 +125,8 @@ func (s *asynquerySuite) SetupSuite() {
m, err := NewCallManager(ro, s.userHelper.DB, zapadapter.NewKV(nil))
s.Require().NoError(err)
s.manager = m
go m.Start()
// This should be called per-test, if needed
// go m.Start()
}

func (s *asynquerySuite) TearDownSuite() {
Expand Down
7 changes: 4 additions & 3 deletions app/asynquery/http_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (s *asynqueryHandlerSuite) TestCreateUpload() {
func (s *asynqueryHandlerSuite) TestCreate() {
require := s.Require()
ts := httptest.NewServer(s.router)
uploadID := randomdata.Alphanumeric(64)

uploadID := randomdata.Alphanumeric(64)
req := jsonrpc.NewRequest(query.MethodStreamCreate, map[string]any{
"name": "publish2test-dummymd",
"title": "Publish v2 test for dummy.md",
Expand All @@ -121,15 +121,16 @@ func (s *asynqueryHandlerSuite) TestCreate() {
streamCreateReq, err := json.Marshal(req)
require.NoError(err)

resp := (&test.HTTPTest{
createRequest := &test.HTTPTest{
Method: http.MethodPost,
URL: ts.URL + "/api/v1/asynqueries/",
ReqHeader: map[string]string{
wallet.AuthorizationHeader: s.userHelper.TokenHeader,
},
ReqBody: bytes.NewReader(streamCreateReq),
Code: http.StatusCreated,
}).Run(s.router, s.T())
}
resp := createRequest.Run(s.router, s.T())

var query *models.Asynquery
e2etest.Wait(s.T(), "query settling in the database", 5*time.Second, 1000*time.Millisecond, func() error {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
14 changes: 9 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2011,8 +2011,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -2161,8 +2162,9 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -2344,17 +2346,18 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -2370,8 +2373,9 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down

0 comments on commit 44b8204

Please sign in to comment.