Skip to content

Commit

Permalink
feat: add metric publish message to the job
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Dec 14, 2023
1 parent 93b7314 commit 4a2ad4e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 4 deletions.
9 changes: 8 additions & 1 deletion blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (

"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/yudhasubki/blockqueue/pkg/cas"
"github.com/yudhasubki/blockqueue/pkg/core"
bqio "github.com/yudhasubki/blockqueue/pkg/io"
"github.com/yudhasubki/blockqueue/pkg/metric"
"github.com/yudhasubki/eventpool"
)

Expand All @@ -28,6 +30,10 @@ type BlockQueue[V chan bqio.ResponseMessages] struct {
pool *eventpool.Eventpool
}

func init() {
prometheus.Register(metric.MessagePublished)
}

func New[V chan bqio.ResponseMessages](db *db, bucket *kv) *BlockQueue[V] {
blockqueue := &BlockQueue[V]{
db: db,
Expand Down Expand Up @@ -207,8 +213,9 @@ func (q *BlockQueue[V]) storeJob(name string, message io.Reader) error {
return err
}

return q.db.tx(context.Background(), func(ctx context.Context, tx *sqlx.Tx) error {
go metric.MessagePublished.Inc()

return q.db.tx(context.Background(), func(ctx context.Context, tx *sqlx.Tx) error {
return q.db.createMessages(ctx, request)
})
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/blockqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-chi/chi/v5"
"github.com/lesismal/nbio/nbhttp"
"github.com/prometheus/client_golang/prometheus/promhttp"
blockqueue "github.com/yudhasubki/blockqueue"
"github.com/yudhasubki/blockqueue/pkg/etcd"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
Expand Down Expand Up @@ -67,6 +68,7 @@ func (h *Http) Run(ctx context.Context, args []string) error {
Stream: stream,
Db: db,
}).Router())
mux.Mount("/prometheus/metrics", promhttp.Handler())

engine := nbhttp.NewEngine(nbhttp.Config{
Network: "tcp",
Expand Down
3 changes: 0 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ import (

"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/yudhasubki/blockqueue/pkg/cas"
"github.com/yudhasubki/blockqueue/pkg/core"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
)

type db struct {
mtx *cas.SpinLock
*sqlite.SQLite
}

func NewDb(sqlite *sqlite.SQLite) *db {
return &db{
mtx: cas.New(),
SQLite: sqlite,
}
}
Expand Down
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ require (
require (
github.com/antlabs/stl v0.0.1 // indirect
github.com/antlabs/timer v0.0.11 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/lesismal/llib v1.1.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tidwall/btree v1.6.0 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sys v0.15.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
25 changes: 25 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ github.com/antlabs/stl v0.0.1 h1:TRD3csCrjREeLhLoQ/supaoCvFhNLBTNIwuRGrDIs6Q=
github.com/antlabs/stl v0.0.1/go.mod h1:wvVwP1loadLG3cRjxUxK8RL4Co5xujGaZlhbztmUEqQ=
github.com/antlabs/timer v0.0.11 h1:z75oGFLeTqJHMOcWzUPBKsBbQAz4Ske3AfqJ7bsdcwU=
github.com/antlabs/timer v0.0.11/go.mod h1:JNV8J3yGvMKhCavGXgj9HXrVZkfdQyKCcqXBT8RdyuU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -15,6 +19,11 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
Expand All @@ -28,6 +37,8 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/nutsdb/nutsdb v1.0.2-0.20231210073356-fec3b20ddfa0 h1:7JLN1uW/tG5WMKqx965lJoicIagxCWca10uQLPOfMrk=
github.com/nutsdb/nutsdb v1.0.2-0.20231210073356-fec3b20ddfa0/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/nutsdb/nutsdb v1.0.2-0.20231211145816-3468fe949461 h1:wb5O82PuENmF1H0+Mv3IUYY66YS5z0cPgCSX/PNbSqE=
Expand All @@ -36,6 +47,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
Expand All @@ -62,6 +81,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -86,6 +106,11 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM=
Expand Down
14 changes: 14 additions & 0 deletions pkg/metric/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package metric

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"
)

var (
MessagePublished = prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("message_store_to_the_topic_watcher"),
Help: "The total number succesfully published to the topic",
})
)

0 comments on commit 4a2ad4e

Please sign in to comment.