Skip to content

Commit

Permalink
refactor: change internal api to be unimported and add enable for met…
Browse files Browse the repository at this point in the history
…rics
  • Loading branch information
yudhasubki committed Dec 29, 2023
1 parent b5c3611 commit e439ab5
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 17 deletions.
12 changes: 9 additions & 3 deletions blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/yudhasubki/blockqueue/pkg/cas"
"github.com/yudhasubki/blockqueue/pkg/core"
"github.com/yudhasubki/blockqueue/pkg/etcd"
bqio "github.com/yudhasubki/blockqueue/pkg/io"
"github.com/yudhasubki/blockqueue/pkg/metric"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
"github.com/yudhasubki/eventpool"
)

Expand All @@ -34,12 +36,12 @@ func init() {
prometheus.Register(metric.MessagePublished)
}

func New[V chan bqio.ResponseMessages](db *db, bucket *kv) *BlockQueue[V] {
func New[V chan bqio.ResponseMessages](db *sqlite.SQLite, kv *etcd.Etcd) *BlockQueue[V] {
blockqueue := &BlockQueue[V]{
db: db,
db: newDb(db),
mtx: cas.New(),
jobs: make(map[string]*Job[V]),
kv: bucket,
kv: newKv(kv),
}
pool := eventpool.New()
pool.Submit(eventpool.EventpoolListener{
Expand Down Expand Up @@ -223,3 +225,7 @@ func (q *BlockQueue[V]) storeJob(name string, message io.Reader) error {
func (q *BlockQueue[V]) Close() {
q.pool.Close()
}

func (q *BlockQueue[V]) getTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error) {
return q.db.getTopics(ctx, filter)
}
6 changes: 2 additions & 4 deletions blockqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ func runBlockQueueTest(t *testing.T, test func(bq *BlockQueue[chan bqio.Response

require.NoError(t, err)

db := NewDb(sqlite)
runMigrate(t, db.SQLite)
runMigrate(t, sqlite)

bucket, err := etcd.New(persistenceBucketPath)
defer os.RemoveAll(persistenceBucketPath)
require.NoError(t, err)

kv := NewKV(bucket)
bq := New(db, kv)
bq := New(sqlite, bucket)

test(bq)

Expand Down
9 changes: 5 additions & 4 deletions cmd/blockqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (h *Http) Run(ctx context.Context, args []string) error {
slog.Error("failed to open database", "error", err)
return err
}
db := blockqueue.NewDb(sqlite)

etcd, err := etcd.New(
cfg.Etcd.Path,
Expand All @@ -58,7 +57,7 @@ func (h *Http) Run(ctx context.Context, args []string) error {

ctx, cancel := context.WithCancel(ctx)

stream := blockqueue.New(db, blockqueue.NewKV(etcd))
stream := blockqueue.New(sqlite, etcd)

err = stream.Run(ctx)
if err != nil {
Expand All @@ -69,9 +68,11 @@ func (h *Http) Run(ctx context.Context, args []string) error {
mux := chi.NewRouter()
mux.Mount("/", (&blockqueue.Http{
Stream: stream,
Db: db,
}).Router())
mux.Mount("/prometheus/metrics", promhttp.Handler())

if cfg.Metric.Enable {
mux.Mount("/prometheus/metrics", promhttp.Handler())
}

engine := nbhttp.NewEngine(nbhttp.Config{
Network: "tcp",
Expand Down
5 changes: 5 additions & 0 deletions cmd/blockqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
Logging LoggingConfig `yaml:"logging"`
SQLite SQLiteConfig `yaml:"sqlite"`
Job JobConfig `yaml:"job"`
Metric MetricConfig `yaml:"metric"`
}

func ReadConfigFile(filename string) (_ Config, err error) {
Expand Down Expand Up @@ -149,3 +150,7 @@ type EtcdConfig struct {
type JobConfig struct {
Interval time.Duration `yaml:"interval"`
}

type MetricConfig struct {
Enable bool `yaml:"enable"`
}
4 changes: 3 additions & 1 deletion config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ sqlite:
busy_timeout: 5000
etcd:
path: "etcdb"
sync: false
sync: false
metric:
enable: false
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type db struct {
*sqlite.SQLite
}

func NewDb(sqlite *sqlite.SQLite) *db {
func newDb(sqlite *sqlite.SQLite) *db {
return &db{
SQLite: sqlite,
}
Expand Down
5 changes: 2 additions & 3 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

type Http struct {
Stream *BlockQueue[chan io.ResponseMessages]
Db *db
}

type ctxKeyTopicName string
Expand Down Expand Up @@ -59,7 +58,7 @@ func (h *Http) createTopic(w http.ResponseWriter, r *http.Request) {
return
}

topics, err := h.Db.getTopics(r.Context(), core.FilterTopic{
topics, err := h.Stream.getTopics(r.Context(), core.FilterTopic{
Name: []string{request.Name},
})
if err != nil {
Expand Down Expand Up @@ -273,7 +272,7 @@ func (h *Http) topicExist(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
topicName := chi.URLParam(r, "topicName")

topics, err := h.Db.getTopics(r.Context(), core.FilterTopic{
topics, err := h.Stream.getTopics(r.Context(), core.FilterTopic{
Name: []string{topicName},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type kv struct {
db *etcd.Etcd
}

func NewKV(etcd *etcd.Etcd) *kv {
func newKv(etcd *etcd.Etcd) *kv {
return &kv{
db: etcd,
}
Expand Down

0 comments on commit e439ab5

Please sign in to comment.