Skip to content

Commit

Permalink
Merge pull request #33 from gocardless/lawrence-support-sync-replication
Browse files Browse the repository at this point in the history
stolon-proxy --synchronous-replica
  • Loading branch information
lawrencejones authored Feb 10, 2020
2 parents b266e6d + 3d5ac9f commit 2acac17
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions cmd/proxy/cmd/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ var CmdProxy = &cobra.Command{
type config struct {
cmd.CommonConfig

listenAddress string
port string
stopListening bool
debug bool
listenAddress string
port string
stopListening bool
synchronousReplica bool
debug bool

keepAliveIdle int
keepAliveCount int
Expand All @@ -66,6 +67,7 @@ func init() {
CmdProxy.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "127.0.0.1", "proxy listening address")
CmdProxy.PersistentFlags().StringVar(&cfg.port, "port", "5432", "proxy listening port")
CmdProxy.PersistentFlags().BoolVar(&cfg.stopListening, "stop-listening", true, "stop listening on store error")
CmdProxy.PersistentFlags().BoolVar(&cfg.synchronousReplica, "synchronous-replica", false, "proxy to synchronous replica only, not master")
CmdProxy.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging")
CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveIdle, "tcp-keepalive-idle", 0, "set tcp keepalive idle (seconds)")
CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveCount, "tcp-keepalive-count", 0, "set tcp keepalive probe count number")
Expand All @@ -79,7 +81,8 @@ type ClusterChecker struct {
listenAddress string
port string

stopListening bool
stopListening bool
synchronousReplica bool

listener *net.TCPListener
pp *pollon.Proxy
Expand All @@ -96,12 +99,13 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
}

return &ClusterChecker{
uid: uid,
listenAddress: cfg.listenAddress,
port: cfg.port,
stopListening: cfg.stopListening,
e: e,
endPollonProxyCh: make(chan error),
uid: uid,
listenAddress: cfg.listenAddress,
port: cfg.port,
stopListening: cfg.stopListening,
synchronousReplica: cfg.synchronousReplica,
e: e,
endPollonProxyCh: make(chan error),
}, nil
}

Expand Down Expand Up @@ -190,7 +194,7 @@ func (c *ClusterChecker) Check() error {

log.Debugf("cd dump: %s", spew.Sdump(cd))
if cd == nil {
log.Infow("no clusterdata available, closing connections to master")
log.Infow("no clusterdata available, closing connections to target")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
return nil
}
Expand All @@ -205,7 +209,7 @@ func (c *ClusterChecker) Check() error {

proxy := cd.Proxy
if proxy == nil {
log.Infow("no proxy object available, closing connections to master")
log.Infow("no proxy object available, closing connections to target")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, cluster.NoGeneration, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
Expand All @@ -214,9 +218,22 @@ func (c *ClusterChecker) Check() error {
return nil
}

db, ok := cd.DBs[proxy.Spec.MasterDBUID]
var db *cluster.DB
var ok bool

// Unless we've specified we want to proxy to a sync replica
if c.synchronousReplica {
if len(db.Status.SynchronousStandbys) > 0 {
db, ok = cd.DBs[db.Status.SynchronousStandbys[0]]
} else {
log.Infow("no synchronous standbys available on master")
}
} else {
db, ok = cd.DBs[proxy.Spec.MasterDBUID]
}

if !ok {
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
log.Infow("no db object available, closing connections")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, proxy.Generation, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
Expand All @@ -231,22 +248,22 @@ func (c *ClusterChecker) Check() error {
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
return nil
}
log.Infow("master address", "address", addr)
log.Infow("target address", "address", addr)
if err = c.SetProxyInfo(c.e, proxy.Generation, 2*cluster.DefaultProxyTimeoutInterval); err != nil {
// if we failed to update our proxy info when a master is defined we
// cannot ignore this error since the sentinel won't know that we exist
// and are sending connections to a master so, when electing a new
// master, it'll not wait for us to close connections to the old one.
// if we failed to update our proxy info when a target is defined we cannot
// ignore this error since the sentinel won't know that we exist and are
// sending connections to a potential master so, when electing a new master,
// it'll not wait for us to close connections to the old one.
return fmt.Errorf("failed to update proxyInfo: %v", err)
}

// start proxing only if we are inside enabledProxies, this ensures that the
// sentinel has read our proxyinfo and knows we are alive
if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
log.Infow("proxying to master address", "address", addr)
log.Infow("proxying to target address", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: addr})
} else {
log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr)
log.Infow("not proxying since we aren't in the enabled proxies list", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
}

Expand Down

0 comments on commit 2acac17

Please sign in to comment.