diff --git a/blockqueue.go b/blockqueue.go index ddb9f82..0cf8362 100644 --- a/blockqueue.go +++ b/blockqueue.go @@ -12,8 +12,10 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/yudhasubki/blockqueue/pkg/cas" "github.com/yudhasubki/blockqueue/pkg/core" + "github.com/yudhasubki/blockqueue/pkg/etcd" bqio "github.com/yudhasubki/blockqueue/pkg/io" "github.com/yudhasubki/blockqueue/pkg/metric" + "github.com/yudhasubki/blockqueue/pkg/sqlite" "github.com/yudhasubki/eventpool" ) @@ -34,12 +36,12 @@ func init() { prometheus.Register(metric.MessagePublished) } -func New[V chan bqio.ResponseMessages](db *db, bucket *kv) *BlockQueue[V] { +func New[V chan bqio.ResponseMessages](db *sqlite.SQLite, kv *etcd.Etcd) *BlockQueue[V] { blockqueue := &BlockQueue[V]{ - db: db, + db: newDb(db), mtx: cas.New(), jobs: make(map[string]*Job[V]), - kv: bucket, + kv: newKv(kv), } pool := eventpool.New() pool.Submit(eventpool.EventpoolListener{ @@ -223,3 +225,7 @@ func (q *BlockQueue[V]) storeJob(name string, message io.Reader) error { func (q *BlockQueue[V]) Close() { q.pool.Close() } + +func (q *BlockQueue[V]) getTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error) { + return q.db.getTopics(ctx, filter) +} diff --git a/blockqueue_test.go b/blockqueue_test.go index dfa0984..6f25191 100644 --- a/blockqueue_test.go +++ b/blockqueue_test.go @@ -35,15 +35,13 @@ func runBlockQueueTest(t *testing.T, test func(bq *BlockQueue[chan bqio.Response require.NoError(t, err) - db := NewDb(sqlite) - runMigrate(t, db.SQLite) + runMigrate(t, sqlite) bucket, err := etcd.New(persistenceBucketPath) defer os.RemoveAll(persistenceBucketPath) require.NoError(t, err) - kv := NewKV(bucket) - bq := New(db, kv) + bq := New(sqlite, bucket) test(bq) diff --git a/cmd/blockqueue/http.go b/cmd/blockqueue/http.go index 4bd6c1f..43636c4 100644 --- a/cmd/blockqueue/http.go +++ b/cmd/blockqueue/http.go @@ -45,7 +45,6 @@ func (h *Http) Run(ctx context.Context, args []string) error { slog.Error("failed to open database", "error", err) return err } - db := blockqueue.NewDb(sqlite) etcd, err := etcd.New( cfg.Etcd.Path, @@ -58,7 +57,7 @@ func (h *Http) Run(ctx context.Context, args []string) error { ctx, cancel := context.WithCancel(ctx) - stream := blockqueue.New(db, blockqueue.NewKV(etcd)) + stream := blockqueue.New(sqlite, etcd) err = stream.Run(ctx) if err != nil { @@ -69,9 +68,11 @@ func (h *Http) Run(ctx context.Context, args []string) error { mux := chi.NewRouter() mux.Mount("/", (&blockqueue.Http{ Stream: stream, - Db: db, }).Router()) - mux.Mount("/prometheus/metrics", promhttp.Handler()) + + if cfg.Metric.Enable { + mux.Mount("/prometheus/metrics", promhttp.Handler()) + } engine := nbhttp.NewEngine(nbhttp.Config{ Network: "tcp", diff --git a/cmd/blockqueue/main.go b/cmd/blockqueue/main.go index 01f12b3..59c2a4e 100644 --- a/cmd/blockqueue/main.go +++ b/cmd/blockqueue/main.go @@ -72,6 +72,7 @@ type Config struct { Logging LoggingConfig `yaml:"logging"` SQLite SQLiteConfig `yaml:"sqlite"` Job JobConfig `yaml:"job"` + Metric MetricConfig `yaml:"metric"` } func ReadConfigFile(filename string) (_ Config, err error) { @@ -149,3 +150,7 @@ type EtcdConfig struct { type JobConfig struct { Interval time.Duration `yaml:"interval"` } + +type MetricConfig struct { + Enable bool `yaml:"enable"` +} diff --git a/config.yaml.example b/config.yaml.example index ae1134b..a3d35e3 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -9,4 +9,6 @@ sqlite: busy_timeout: 5000 etcd: path: "etcdb" - sync: false \ No newline at end of file + sync: false +metric: + enable: false \ No newline at end of file diff --git a/db.go b/db.go index 0433110..9ed163b 100644 --- a/db.go +++ b/db.go @@ -13,7 +13,7 @@ type db struct { *sqlite.SQLite } -func NewDb(sqlite *sqlite.SQLite) *db { +func newDb(sqlite *sqlite.SQLite) *db { return &db{ SQLite: sqlite, } diff --git a/http.go b/http.go index 1da1f5d..b265b58 100644 --- a/http.go +++ b/http.go @@ -15,7 +15,6 @@ import ( type Http struct { Stream *BlockQueue[chan io.ResponseMessages] - Db *db } type ctxKeyTopicName string @@ -59,7 +58,7 @@ func (h *Http) createTopic(w http.ResponseWriter, r *http.Request) { return } - topics, err := h.Db.getTopics(r.Context(), core.FilterTopic{ + topics, err := h.Stream.getTopics(r.Context(), core.FilterTopic{ Name: []string{request.Name}, }) if err != nil { @@ -273,7 +272,7 @@ func (h *Http) topicExist(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { topicName := chi.URLParam(r, "topicName") - topics, err := h.Db.getTopics(r.Context(), core.FilterTopic{ + topics, err := h.Stream.getTopics(r.Context(), core.FilterTopic{ Name: []string{topicName}, }) if err != nil { diff --git a/kv.go b/kv.go index cac316c..66dbb98 100644 --- a/kv.go +++ b/kv.go @@ -12,7 +12,7 @@ type kv struct { db *etcd.Etcd } -func NewKV(etcd *etcd.Etcd) *kv { +func newKv(etcd *etcd.Etcd) *kv { return &kv{ db: etcd, }