Skip to content

Commit

Permalink
feat: add postgresql compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Jul 15, 2024
1 parent aa5d4f9 commit fa2ed59
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 22 deletions.
17 changes: 17 additions & 0 deletions cmd/blockqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
blockqueue "github.com/yudhasubki/blockqueue"
"github.com/yudhasubki/blockqueue/pkg/etcd"
"github.com/yudhasubki/blockqueue/pkg/postgre"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
"github.com/yudhasubki/blockqueue/pkg/turso"
)
Expand Down Expand Up @@ -47,6 +48,22 @@ func (h *Http) Run(ctx context.Context, args []string) error {
return err
}
driver = turso
case "pgsql":
pg, err := postgre.New(postgre.Config{
Host: cfg.PgSQL.Host,
Username: cfg.PgSQL.Username,
Password: cfg.PgSQL.Password,
Name: cfg.PgSQL.Name,
Port: cfg.PgSQL.Port,
Timezone: cfg.PgSQL.Timezone,
MaxOpenConns: cfg.PgSQL.MaxOpenConns,
MaxIdleConns: cfg.PgSQL.MaxIdleConns,
})
if err != nil {
slog.Error("failed to open database", "error", err)
return err
}
driver = pg
case "sqlite", "":
sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
BusyTimeout: cfg.SQLite.BusyTimeout,
Expand Down
11 changes: 11 additions & 0 deletions cmd/blockqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
Logging LoggingConfig `yaml:"logging"`
SQLite SQLiteConfig `yaml:"sqlite"`
Turso TursoConfig `yaml:"turso"`
PgSQL PostgreConfig `yaml:"pgsql"`
Job JobConfig `yaml:"job"`
Metric MetricConfig `yaml:"metric"`
}
Expand Down Expand Up @@ -152,6 +153,16 @@ type TursoConfig struct {
URL string `yaml:"url"`
}

type PostgreConfig struct {
Host string `yaml:"host"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Name string `yaml:"name"`
Port int `yaml:"port"`
Timezone string `yaml:"timezone"`
MaxOpenConns int `yaml:"max_open_conns"`
MaxIdleConns int `yaml:"max_idle_conns"`
}
type EtcdConfig struct {
Path string `yaml:"path"`
Sync bool `yaml:"sync"`
Expand Down
25 changes: 23 additions & 2 deletions cmd/blockqueue/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"

blockqueue "github.com/yudhasubki/blockqueue"
"github.com/yudhasubki/blockqueue/pkg/postgre"
"github.com/yudhasubki/blockqueue/pkg/sqlite"
"github.com/yudhasubki/blockqueue/pkg/turso"
)
Expand All @@ -36,14 +37,34 @@ func (m *Migrate) Run(ctx context.Context, args []string) error {
return err
}

var driver blockqueue.Driver
var (
driver blockqueue.Driver
migrationPath = "migration/sqlite"
)
switch cfg.Http.Driver {
case "turso":
turso, err := turso.New(cfg.Turso.URL)
if err != nil {
return err
}
driver = turso
case "pgsql":
pg, err := postgre.New(postgre.Config{
Host: cfg.PgSQL.Host,
Username: cfg.PgSQL.Username,
Password: cfg.PgSQL.Password,
Name: cfg.PgSQL.Name,
Port: cfg.PgSQL.Port,
Timezone: cfg.PgSQL.Timezone,
MaxOpenConns: cfg.PgSQL.MaxOpenConns,
MaxIdleConns: cfg.PgSQL.MaxIdleConns,
})
if err != nil {
slog.Error("failed to open database", "error", err)
return err
}
driver = pg
migrationPath = "migration/pgsql"
case "sqlite", "":
sqlite, err := sqlite.New(cfg.SQLite.DatabaseName, sqlite.Config{
BusyTimeout: cfg.SQLite.BusyTimeout,
Expand All @@ -56,7 +77,7 @@ func (m *Migrate) Run(ctx context.Context, args []string) error {
driver = sqlite
}

_ = filepath.Walk("migration/", func(path string, info os.FileInfo, err error) error {
_ = filepath.Walk(migrationPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion config.yaml.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
http:
port: 8080
shutdown: "30s"
driver: "sqlite"
driver: "sqlite|turso|pgsql"
logging:
level: "debug"
type: "json"
Expand All @@ -10,6 +10,15 @@ sqlite:
busy_timeout: 5000
turso:
url: "libsql://tursodatabase-username.turso.io"
pgsql:
host: "localhost"
username: "postgres"
password: ""
name: "blockqueue"
port: 5432
timezone: "UTC"
max_open_conns: 10
max_idle_conns: 10
etcd:
path: "etcdb"
sync: false
Expand Down
6 changes: 3 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (d *db) getSubscribers(ctx context.Context, filter core.FilterSubscriber) (
}

func (d *db) createTxTopic(ctx context.Context, tx *sqlx.Tx, topic core.Topic) error {
stmt, err := tx.PrepareNamedContext(ctx, "INSERT INTO topics (`id`, `name`) VALUES (:id, :name)")
stmt, err := tx.PrepareNamedContext(ctx, "INSERT INTO topics (id, name) VALUES (:id, :name)")
if err != nil {
return err
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (d *db) deleteTxSubscribers(ctx context.Context, tx *sqlx.Tx, topic core.Su
}

func (d *db) createTxSubscribers(ctx context.Context, tx *sqlx.Tx, subscribers core.Subscribers) error {
_, err := tx.NamedExecContext(ctx, "INSERT INTO topic_subscribers (`id`, `topic_id`, `name`, `option`) VALUES (:id, :topic_id, :name, :option)", subscribers)
_, err := tx.NamedExecContext(ctx, "INSERT INTO topic_subscribers (id, topic_id, name, option) VALUES (:id, :topic_id, :name, :option)", subscribers)
if err != nil {
return err
}
Expand All @@ -138,7 +138,7 @@ func (d *db) createTxSubscribers(ctx context.Context, tx *sqlx.Tx, subscribers c
}

func (d *db) createMessages(ctx context.Context, message core.Message) error {
stmt, err := d.Database.Conn().PrepareNamedContext(ctx, "INSERT INTO topic_messages (`id`, `topic_id`, `message`, `status`) VALUES (:id, :topic_id, :message, :status)")
stmt, err := d.Database.Conn().PrepareNamedContext(ctx, "INSERT INTO topic_messages (id, topic_id, message, status) VALUES (:id, :topic_id, :message, :status)")
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ require (
github.com/google/uuid v1.4.0
github.com/jmoiron/sqlx v1.3.5
github.com/lesismal/nbio v1.3.20
github.com/lib/pq v1.2.0
github.com/mattn/go-sqlite3 v1.14.18
github.com/nutsdb/nutsdb v1.0.3-0.20231216092931-133fd88373b5
github.com/prometheus/client_golang v1.17.0
github.com/stretchr/testify v1.8.4
github.com/tursodatabase/libsql-client-go v0.0.0-20240416075003-747366ff79c4
github.com/yudhasubki/eventpool v0.1.5
gopkg.in/guregu/null.v4 v4.0.0
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -25,23 +29,21 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lesismal/llib v1.1.12 // indirect
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tidwall/btree v1.6.0 // indirect
github.com/tursodatabase/libsql-client-go v0.0.0-20240416075003-747366ff79c4 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.22.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
nhooyr.io/websocket v1.8.10 // indirect
)
25 changes: 15 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -26,10 +27,16 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lesismal/llib v1.1.12 h1:KJFB8bL02V+QGIvILEw/w7s6bKj9Ps9Px97MZP2EOk0=
github.com/lesismal/llib v1.1.12/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg=
github.com/lesismal/nbio v1.3.20 h1:btQdW4u8yAo2xg1PeU/gOWR0IPj2wUK+ZeVc5zHIEn4=
Expand All @@ -43,10 +50,6 @@ github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/nutsdb/nutsdb v1.0.2-0.20231210073356-fec3b20ddfa0 h1:7JLN1uW/tG5WMKqx965lJoicIagxCWca10uQLPOfMrk=
github.com/nutsdb/nutsdb v1.0.2-0.20231210073356-fec3b20ddfa0/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/nutsdb/nutsdb v1.0.2-0.20231211145816-3468fe949461 h1:wb5O82PuENmF1H0+Mv3IUYY66YS5z0cPgCSX/PNbSqE=
github.com/nutsdb/nutsdb v1.0.2-0.20231211145816-3468fe949461/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/nutsdb/nutsdb v1.0.3-0.20231216092931-133fd88373b5 h1:eSMyRgjdRSGrnH0jzuRIdusDFGRRBXOOrUSIDqoAiBk=
github.com/nutsdb/nutsdb v1.0.3-0.20231216092931-133fd88373b5/go.mod h1:jIbbpBXajzTMZ0o33Yn5zoYIo3v0Dz4WstkVce+sYuQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -61,10 +64,10 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
Expand All @@ -81,8 +84,9 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand All @@ -102,8 +106,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand All @@ -121,8 +125,9 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM=
gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
gopkg.in/guregu/null.v4 v4.0.0 h1:1Wm3S1WEA2I26Kq+6vcW+w0gcDo44YKYD7YIEJNHDjg=
Expand Down
10 changes: 10 additions & 0 deletions migration/pgsql/1721051631_create_topic_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS topics (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(150) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP DEFAULT NULL
);

CREATE UNIQUE INDEX idx_topic_name_unique ON topics(name) WHERE deleted_at IS NULL;
CREATE INDEX idx_topic_search_by_id ON topics (id);
CREATE INDEX idx_topic_search_by_name ON topics (name, deleted_at);
17 changes: 17 additions & 0 deletions migration/pgsql/1721051648_create_topic_subscribers_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

CREATE TABLE IF NOT EXISTS topic_subscribers (
id VARCHAR(36) PRIMARY KEY,
topic_id VARCHAR(36) NOT NULL,
name VARCHAR(150) NOT NULL,
option JSON DEFAULT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP DEFAULT NULL,
FOREIGN KEY (topic_id)
REFERENCES topics (id)
ON DELETE CASCADE
ON UPDATE NO ACTION
);

CREATE UNIQUE INDEX idx_subscriber_name_unique ON topic_subscribers(name, topic_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_topic_subscribers_search_by_topic_id ON topic_subscribers (topic_id, deleted_at);
CREATE INDEX idx_topic_subscribers_search_by_name ON topic_subscribers (name, deleted_at);
13 changes: 13 additions & 0 deletions migration/pgsql/1721051691_create_topic_message_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS topic_messages (
id VARCHAR(36) PRIMARY KEY,
topic_id VARCHAR(36) NOT NULL,
message TEXT NOT NULL,
status VARCHAR DEFAULT 'waiting',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (topic_id)
REFERENCES topics (id)
ON DELETE CASCADE
ON UPDATE NO ACTION
);

CREATE INDEX idx_topic_messages_search_by_topic_and_status ON topic_messages (topic_id, status);
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS topic_messages (
`id` VARCHAR(36) PRIMARY KEY,
`topic_id` VARCHAR(36) NOT NULL,
`message` TEXT NOT NULL,
`status` BOOLEAN DEFAULT 0,
`status` VARCHAR(15) DEFAULT 'waiting',
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (`topic_id`)
REFERENCES `topics` (`id`)
Expand Down
52 changes: 52 additions & 0 deletions pkg/postgre/postgre.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package postgre

import (
"fmt"
"log/slog"
"net/url"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)

type Postgre struct {
Database *sqlx.DB
}

type Config struct {
Host string
Username string
Password string
Name string
Port int
Timezone string
MaxOpenConns int
MaxIdleConns int
}

func New(config Config) (*Postgre, error) {
db, err := sqlx.Connect("postgres", fmt.Sprintf(
"host=%s user=%s password=%s dbname=%s port=%d sslmode=disable TimeZone=%s",
config.Host,
config.Username,
config.Password,
config.Name,
config.Port,
url.QueryEscape(config.Timezone)))
if err != nil {
slog.Error("[pgx.NewConnPool] failed to connect to database", "error", err)
return &Postgre{}, nil
}

return &Postgre{
Database: db,
}, nil
}

func (pg *Postgre) Conn() *sqlx.DB {
return pg.Database
}

func (pg *Postgre) Close() error {
return pg.Database.Close()
}

0 comments on commit fa2ed59

Please sign in to comment.