Skip to content

Commit

Permalink
global db support (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
ice-cronus authored Jul 19, 2024
1 parent 47dd6f8 commit b0b5afa
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 139 deletions.
30 changes: 20 additions & 10 deletions connectors/storage/v2/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,33 @@ type (
)

// Private API.
var (
//nolint:gochecknoglobals // .
globalDB *DB
)

const (
globalDBYamlKey = "global"
)

type (
lb struct {
replicas []*pgxpool.Pool
currentIndex uint64
}
config struct {
WintrStorage struct {
Credentials struct {
User string `yaml:"user"`
Password string `yaml:"password"`
} `yaml:"credentials" mapstructure:"credentials"`
Timeout string `yaml:"timeout" mapstructure:"timeout"`
PrimaryURL string `yaml:"primaryURL" mapstructure:"primaryURL"` //nolint:tagliatelle // Nope.
ReplicaURLs []string `yaml:"replicaURLs" mapstructure:"replicaURLs"` //nolint:tagliatelle // Nope.
RunDDL bool `yaml:"runDDL" mapstructure:"runDDL"` //nolint:tagliatelle // Nope.
} `yaml:"wintr/connectors/storage/v2" mapstructure:"wintr/connectors/storage/v2"` //nolint:tagliatelle // Nope.
WintrStorage storageCfg `yaml:"wintr/connectors/storage/v2" mapstructure:"wintr/connectors/storage/v2"` //nolint:tagliatelle // Nope.
}
storageCfg struct {
Credentials struct {
User string `yaml:"user"`
Password string `yaml:"password"`
} `yaml:"credentials" mapstructure:"credentials"`
Timeout string `yaml:"timeout" mapstructure:"timeout"`
PrimaryURL string `yaml:"primaryURL" mapstructure:"primaryURL"` //nolint:tagliatelle // Nope.
ReplicaURLs []string `yaml:"replicaURLs" mapstructure:"replicaURLs"` //nolint:tagliatelle // Nope.
RunDDL bool `yaml:"runDDL" mapstructure:"runDDL"` //nolint:tagliatelle // Nope.
IgnoreGlobal bool `yaml:"ignoreGlobal" mapstructure:"ignoreGlobal"` //nolint:tagliatelle // Nope.
}
advisoryLockMutex struct {
conn *pgxpool.Conn
Expand Down
45 changes: 35 additions & 10 deletions connectors/storage/v2/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,56 @@ import (
"github.com/ice-blockchain/wintr/log"
)

//nolint:gochecknoinits // GlobalDB is single instance, we initialize it here.
func init() {
var cfg config
appcfg.MustLoadFromKey(globalDBYamlKey, &cfg)
if cfg.WintrStorage.PrimaryURL != "" || len(cfg.WintrStorage.ReplicaURLs) > 0 {
globalDB = mustConnectWithCfg(context.Background(), &cfg.WintrStorage, "")
}
}

func MustConnect(ctx context.Context, ddl, applicationYAMLKey string) *DB {
var cfg config
appcfg.MustLoadFromKey(applicationYAMLKey, &cfg)
if globalDB != nil && !cfg.WintrStorage.IgnoreGlobal {
if globalDB.master != nil {
mustRunDDL(ctx, globalDB.master, ddl)
}

return globalDB
}

return mustConnectWithCfg(ctx, &cfg.WintrStorage, ddl)
}

func mustConnectWithCfg(ctx context.Context, cfg *storageCfg, ddl string) *DB {
var replicas []*pgxpool.Pool
var master *pgxpool.Pool
if cfg.WintrStorage.PrimaryURL != "" {
master = mustConnectPool(ctx, cfg.WintrStorage.Timeout, cfg.WintrStorage.Credentials.User, cfg.WintrStorage.Credentials.Password, cfg.WintrStorage.PrimaryURL) //nolint:lll // .
if cfg.PrimaryURL != "" {
master = mustConnectPool(ctx, cfg.Timeout, cfg.Credentials.User, cfg.Credentials.Password, cfg.PrimaryURL)
}
for ix, url := range cfg.WintrStorage.ReplicaURLs {
for ix, url := range cfg.ReplicaURLs {
if ix == 0 {
replicas = make([]*pgxpool.Pool, len(cfg.WintrStorage.ReplicaURLs)) //nolint:makezero // Not needed, we know the size.
replicas = make([]*pgxpool.Pool, len(cfg.ReplicaURLs)) //nolint:makezero // Not needed, we know the size.
}
replicas[ix] = mustConnectPool(ctx, cfg.WintrStorage.Timeout, cfg.WintrStorage.Credentials.User, cfg.WintrStorage.Credentials.Password, url)
replicas[ix] = mustConnectPool(ctx, cfg.Timeout, cfg.Credentials.User, cfg.Credentials.Password, url)
}
if master != nil && ddl != "" && cfg.WintrStorage.RunDDL {
for _, statement := range strings.Split(ddl, "----") {
_, err := master.Exec(ctx, statement)
log.Panic(errors.Wrapf(err, "failed to run statement: %v", statement))
}
if master != nil && ddl != "" && cfg.RunDDL {
mustRunDDL(ctx, master, ddl)
}
db := &DB{master: master, lb: &lb{replicas: replicas}, acquiredLocks: make(map[int64]*pgxpool.Conn)}

return db
}

func mustRunDDL(ctx context.Context, master *pgxpool.Pool, ddl string) {
for _, statement := range strings.Split(ddl, "----") {
_, err := master.Exec(ctx, statement)
log.Panic(errors.Wrapf(err, "failed to run statement: %v", statement))
}
}

//nolint:mnd,gomnd // Configuration.
func mustConnectPool(ctx context.Context, timeout, user, pass, url string) (db *pgxpool.Pool) {
poolConfig, err := pgxpool.ParseConfig(url)
Expand Down
112 changes: 0 additions & 112 deletions connectors/storage/v3/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,118 +1889,6 @@ func (l *lb) BitFieldRO(ctx context.Context, key string, values ...interface{})
return l.instance().BitFieldRO(ctx, key, values...)
}

func (l *lb) FTAggregate(ctx context.Context, index, query string) *redis.MapStringInterfaceCmd {
return l.instance().FTAggregate(ctx, index, query)
}

func (l *lb) FT_List(ctx context.Context) *redis.StringSliceCmd { //nolint:stylecheck,revive // Comes from interface.
return l.instance().FT_List(ctx)
}

func (l *lb) FTAggregateWithArgs(ctx context.Context, index, query string, options *redis.FTAggregateOptions) *redis.AggregateCmd {
return l.instance().FTAggregateWithArgs(ctx, index, query, options)
}

func (l *lb) FTAliasAdd(ctx context.Context, index, alias string) *redis.StatusCmd {
return l.instance().FTAliasAdd(ctx, index, alias)
}

func (l *lb) FTAliasDel(ctx context.Context, alias string) *redis.StatusCmd {
return l.instance().FTAliasDel(ctx, alias)
}

func (l *lb) FTAliasUpdate(ctx context.Context, index, alias string) *redis.StatusCmd {
return l.instance().FTAliasUpdate(ctx, index, alias)
}

func (l *lb) FTAlter(ctx context.Context, index string, skipInitalScan bool, definition []any) *redis.StatusCmd {
return l.instance().FTAlter(ctx, index, skipInitalScan, definition)
}

func (l *lb) FTConfigGet(ctx context.Context, option string) *redis.MapMapStringInterfaceCmd {
return l.instance().FTConfigGet(ctx, option)
}

func (l *lb) FTConfigSet(ctx context.Context, option string, value any) *redis.StatusCmd {
return l.instance().FTConfigSet(ctx, option, value)
}

func (l *lb) FTCreate(ctx context.Context, index string, options *redis.FTCreateOptions, schema ...*redis.FieldSchema) *redis.StatusCmd {
return l.instance().FTCreate(ctx, index, options, schema...)
}

func (l *lb) FTCursorDel(ctx context.Context, index string, cursorID int) *redis.StatusCmd {
return l.instance().FTCursorDel(ctx, index, cursorID)
}

func (l *lb) FTCursorRead(ctx context.Context, index string, cursorID, count int) *redis.MapStringInterfaceCmd {
return l.instance().FTCursorRead(ctx, index, cursorID, count)
}

func (l *lb) FTDictAdd(ctx context.Context, dict string, term ...any) *redis.IntCmd {
return l.instance().FTDictAdd(ctx, dict, term...)
}

func (l *lb) FTDictDel(ctx context.Context, dict string, term ...any) *redis.IntCmd {
return l.instance().FTDictDel(ctx, dict, term...)
}

func (l *lb) FTDictDump(ctx context.Context, dict string) *redis.StringSliceCmd {
return l.instance().FTDictDump(ctx, dict)
}

func (l *lb) FTDropIndex(ctx context.Context, index string) *redis.StatusCmd {
return l.instance().FTDropIndex(ctx, index)
}

func (l *lb) FTDropIndexWithArgs(ctx context.Context, index string, options *redis.FTDropIndexOptions) *redis.StatusCmd {
return l.instance().FTDropIndexWithArgs(ctx, index, options)
}

func (l *lb) FTExplain(ctx context.Context, index, query string) *redis.StringCmd {
return l.instance().FTExplain(ctx, index, query)
}

func (l *lb) FTExplainWithArgs(ctx context.Context, index, query string, options *redis.FTExplainOptions) *redis.StringCmd {
return l.instance().FTExplainWithArgs(ctx, index, query, options)
}

func (l *lb) FTInfo(ctx context.Context, index string) *redis.FTInfoCmd {
return l.instance().FTInfo(ctx, index)
}

func (l *lb) FTSpellCheck(ctx context.Context, index, query string) *redis.FTSpellCheckCmd {
return l.instance().FTSpellCheck(ctx, index, query)
}

func (l *lb) FTSpellCheckWithArgs(ctx context.Context, index, query string, options *redis.FTSpellCheckOptions) *redis.FTSpellCheckCmd {
return l.instance().FTSpellCheckWithArgs(ctx, index, query, options)
}

func (l *lb) FTSearch(ctx context.Context, index, query string) *redis.FTSearchCmd {
return l.instance().FTSearch(ctx, index, query)
}

func (l *lb) FTSearchWithArgs(ctx context.Context, index, query string, options *redis.FTSearchOptions) *redis.FTSearchCmd {
return l.instance().FTSearchWithArgs(ctx, index, query, options)
}

func (l *lb) FTSynDump(ctx context.Context, index string) *redis.FTSynDumpCmd {
return l.instance().FTSynDump(ctx, index)
}

func (l *lb) FTSynUpdate(ctx context.Context, index string, synGroupID any, terms []any) *redis.StatusCmd {
return l.instance().FTSynUpdate(ctx, index, synGroupID, terms)
}

func (l *lb) FTSynUpdateWithArgs(ctx context.Context, index string, synGroupID any, options *redis.FTSynUpdateOptions, terms []any) *redis.StatusCmd {
return l.instance().FTSynUpdateWithArgs(ctx, index, synGroupID, options, terms)
}

func (l *lb) FTTagVals(ctx context.Context, index, field string) *redis.StringSliceCmd {
return l.instance().FTTagVals(ctx, index, field)
}

func (l *lb) HExpire(ctx context.Context, key string, expiration stdlibtime.Duration, fields ...string) *redis.IntSliceCmd {
return l.instance().HExpire(ctx, key, expiration, fields...)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/jackc/pgx/v5 v5.6.0
github.com/joho/godotenv v1.5.1
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.5.4
github.com/redis/go-redis/v9 v9.6.0
github.com/rs/zerolog v1.33.0
github.com/sendgrid/rest v2.6.9+incompatible
github.com/sendgrid/sendgrid-go v3.14.0+incompatible
Expand Down Expand Up @@ -112,7 +112,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
github.com/moby/sys/mount v0.3.4 // indirect
github.com/moby/sys/mountinfo v0.7.2 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/moby/sys/mount v0.3.4 h1:yn5jq4STPztkkzSKpZkLcmjue+bZJ0u2AuQY1iNI1Ww=
github.com/moby/sys/mount v0.3.4/go.mod h1:KcQJMbQdJHPlq5lcYT+/CjatWM4PuxKe+XLSVS4J6Os=
github.com/moby/sys/mountinfo v0.7.2 h1:1shs6aH5s4o5H2zQLn796ADW1wMrIwHsyJ2v9KouLrg=
github.com/moby/sys/mountinfo v0.7.2/go.mod h1:1YOa8w8Ih7uW0wALDUgT1dTTSBrZ+HiBLGws92L2RU4=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
Expand Down Expand Up @@ -302,8 +301,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.45.1 h1:tPfeYCk+uZHjmDRwHHQmvHRYL2t44ROTujLeFVBmjCA=
github.com/quic-go/quic-go v0.45.1/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/redis/go-redis/v9 v9.5.4 h1:vOFYDKKVgrI5u++QvnMT7DksSMYg7Aw/Np4vLJLKLwY=
github.com/redis/go-redis/v9 v9.5.4/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.6.0 h1:NLck+Rab3AOTHw21CGRpvQpgTrAU4sgdCswqGtlhGRA=
github.com/redis/go-redis/v9 v9.6.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM=
github.com/refraction-networking/utls v1.6.7/go.mod h1:BC3O4vQzye5hqpmDTWUqi4P5DDhzJfkV1tdqtawQIH0=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down

0 comments on commit b0b5afa

Please sign in to comment.