Skip to content

Commit

Permalink
feat: update nutsdb version and change flow RPush new job message
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Dec 16, 2023
1 parent 421220e commit 5233a80
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/lesismal/nbio v1.3.20
github.com/mattn/go-sqlite3 v1.14.18
github.com/nutsdb/nutsdb v1.0.2-0.20231211145816-3468fe949461
github.com/nutsdb/nutsdb v1.0.3-0.20231216092931-133fd88373b5
github.com/yudhasubki/eventpool v0.1.5
gopkg.in/guregu/null.v4 v4.0.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/nutsdb/nutsdb v1.0.2-0.20231210073356-fec3b20ddfa0 h1:7JLN1uW/tG5WMKq
github.com/nutsdb/nutsdb v1.0.2-0.20231210073356-fec3b20ddfa0/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/nutsdb/nutsdb v1.0.2-0.20231211145816-3468fe949461 h1:wb5O82PuENmF1H0+Mv3IUYY66YS5z0cPgCSX/PNbSqE=
github.com/nutsdb/nutsdb v1.0.2-0.20231211145816-3468fe949461/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/nutsdb/nutsdb v1.0.3-0.20231216092931-133fd88373b5 h1:eSMyRgjdRSGrnH0jzuRIdusDFGRRBXOOrUSIDqoAiBk=
github.com/nutsdb/nutsdb v1.0.3-0.20231216092931-133fd88373b5/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
48 changes: 20 additions & 28 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,36 +471,28 @@ func (listener *Listener[V]) jobCatcher(name string, message io.Reader) error {
return err
}

err = backoff.Retry(func() error {
return listener.kv.updateBucketTx(func(tx *nutsdb.Tx) error {
for idx, message := range messages {
var (
id = fmt.Sprintf("%d_%d", prefix, idx)
)

b, err := json.Marshal(bucket.MessageVisibility{
Message: bucket.Message{
Id: id,
Message: message.Message,
},
RetryPolicy: bucket.MessageVisibilityRetryPolicy{
MaxAttempts: 0,
NextIter: time.Now().Add(listener.option.VisibilityDuration),
},
})
if err != nil {
return err
}
messageBytes := make([][]byte, 0, len(messages))
for idx, message := range messages {
b, err := json.Marshal(bucket.MessageVisibility{
Message: bucket.Message{
Id: fmt.Sprintf("%d_%d", prefix, idx),
Message: message.Message,
},
RetryPolicy: bucket.MessageVisibilityRetryPolicy{
MaxAttempts: 0,
NextIter: time.Now().Add(listener.option.VisibilityDuration),
},
})
if err != nil {
return err
}

err = tx.RPush(listener.JobId, listener.messageBucket(), b)
if err != nil {
return err
}
}
messageBytes = append(messageBytes, b)
}

return nil
})
}, backoff.NewExponentialBackOff())
err = listener.kv.updateBucketTx(func(tx *nutsdb.Tx) error {
return tx.RPush(listener.JobId, listener.messageBucket(), messageBytes...)
})
if err != nil {
slog.Error(
"error insert bucket",
Expand Down

0 comments on commit 5233a80

Please sign in to comment.