Skip to content

Commit

Permalink
server up when is connected but has replication errors
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalbh authored and Victor committed Jul 22, 2019
1 parent 005554f commit 7990328
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 90 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.11
- 1.12
go_import_path: github.com/StudioSol/balancer
install:
- make upgrade
Expand Down
8 changes: 7 additions & 1 deletion balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ type bySecondsBehindMaster Servers
func (a bySecondsBehindMaster) Len() int { return len(a) }
func (a bySecondsBehindMaster) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a bySecondsBehindMaster) Less(i, j int) bool {
if a[i].health.secondsBehindMaster == nil && a[j].health.secondsBehindMaster == nil {
return false
}
if a[i].health.secondsBehindMaster == nil && a[j].health.secondsBehindMaster != nil {
return false
}
if a[i].health.secondsBehindMaster != nil && a[j].health.secondsBehindMaster == nil {
return true
}

return *a[i].health.secondsBehindMaster < *a[j].health.secondsBehindMaster
}

Expand All @@ -27,7 +31,6 @@ type byConnections Servers
func (a byConnections) Len() int { return len(a) }
func (a byConnections) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byConnections) Less(i, j int) bool {

if a[i].health.runningConnections == nil && a[j].health.runningConnections != nil {
return false
}
Expand All @@ -37,6 +40,9 @@ func (a byConnections) Less(i, j int) bool {

if a[i].health.runningConnections == a[j].health.runningConnections {

if a[i].health.openConnections == nil && a[j].health.openConnections == nil {
return false
}
if a[i].health.openConnections == nil && a[j].health.openConnections != nil {
return false
}
Expand Down
100 changes: 61 additions & 39 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
)

var (
ServerDownDueToMySQLConnection, ServerDownDueToMySQLSlaveStatus, ServerDownDueToMySQLThreadStatus *Server
ServerDownDueToMySQLConnection *Server
ServerUPWithMySQLSlaveStatusError, ServerUPWithMySQLThreadStatusError *Server
ServerUP, ServerUPWithDelay, ServerUPWithHighThreadConnections, ServerUPWithDelayAndHighThreadConnections *Server
ServerUPWithHighRunningConnections *Server
)
Expand All @@ -28,51 +29,51 @@ func init() {
errors.New("__MYSQL_CONNECTION_ERROR__"), intNilHelper, intNilHelper, intNilHelper,
)

ServerDownDueToMySQLSlaveStatus = &Server{
name: "ServerDownDueToMySQLSlaveStatus",
ServerUPWithMySQLSlaveStatusError = &Server{
name: "ServerUPWithMySQLSlaveStatusError",
health: &ServerHealth{},
}
ServerDownDueToMySQLSlaveStatus.health.setDown(
ServerUPWithMySQLSlaveStatusError.health.setUP(
errors.New("__MYSQL_SLAVE_STATUS_ERROR__"), intNilHelper, intNilHelper, intNilHelper,
)

ServerDownDueToMySQLThreadStatus = &Server{
name: "ServerDownDueToMySQLThreadStatus",
ServerUPWithMySQLThreadStatusError = &Server{
name: "ServerUPWithMySQLThreadStatusError",
health: &ServerHealth{},
}
ServerDownDueToMySQLThreadStatus.health.setDown(
errors.New("__MYSQL_THREADS_STATUS_ERROR__"), &zeroHelper, intNilHelper, intNilHelper,
ServerUPWithMySQLThreadStatusError.health.setUP(
errors.New("__MYSQL_THREADS_STATUS_ERROR__"), intNilHelper, intNilHelper, intNilHelper,
)

ServerUP = &Server{
name: "ServerUP",
health: &ServerHealth{},
}
ServerUP.health.setUP(&zeroHelper, &oneHelper, &oneHelper)
ServerUP.health.setUP(nil, &zeroHelper, &oneHelper, &oneHelper)

ServerUPWithDelay = &Server{
name: "ServerUPWithDelay",
health: &ServerHealth{},
}
ServerUPWithDelay.health.setUP(&thousandHelper, &oneHelper, &oneHelper)
ServerUPWithDelay.health.setUP(nil, &thousandHelper, &oneHelper, &oneHelper)

ServerUPWithHighThreadConnections = &Server{
name: "ServerUPWithHighThreadConnections",
health: &ServerHealth{},
}
ServerUPWithHighThreadConnections.health.setUP(&zeroHelper, &thousandHelper, &oneHelper)
ServerUPWithHighThreadConnections.health.setUP(nil, &zeroHelper, &thousandHelper, &oneHelper)

ServerUPWithDelayAndHighThreadConnections = &Server{
name: "ServerUPWithDelayAndHighThreadConnections",
health: &ServerHealth{},
}
ServerUPWithDelayAndHighThreadConnections.health.setUP(&thousandHelper, &thousandHelper, &oneHelper)
ServerUPWithDelayAndHighThreadConnections.health.setUP(nil, &thousandHelper, &thousandHelper, &oneHelper)

ServerUPWithHighRunningConnections = &Server{
name: "ServerUPWithHighRunningConnections",
health: &ServerHealth{},
}
ServerUPWithHighRunningConnections.health.setUP(&zeroHelper, &thousandHelper, &thousandHelper)
ServerUPWithHighRunningConnections.health.setUP(nil, &zeroHelper, &thousandHelper, &thousandHelper)
}

func TestBalancer(t *testing.T) {
Expand All @@ -86,18 +87,18 @@ func TestBalancer(t *testing.T) {
So(balancer.PickServer(), ShouldBeNil)
})

Convey("It fails when the server is down due to error acquiring slave status", func() {
Convey("It succeeds when the server is up due to error acquiring slave status", func() {
balancer := &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLSlaveStatus,
ServerUPWithMySQLSlaveStatusError,
}}
So(balancer.PickServer(), ShouldBeNil)
So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLSlaveStatusError)
})

Convey("It fails when the server is down due to error acquiring thread status", func() {
Convey("It succeeds when the server is up due to error acquiring thread status", func() {
balancer := &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLThreadStatus,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldBeNil)
So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLThreadStatusError)
})

Convey("It succeeds when the server is healthy", func() {
Expand All @@ -124,34 +125,34 @@ func TestBalancer(t *testing.T) {
})

Convey("Given a balancer with more than one server", t, func() {
Convey("It fails when all servers are down no matter the reason", func() {
balancer := &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLConnection,
ServerDownDueToMySQLSlaveStatus,
ServerDownDueToMySQLThreadStatus,
}}
So(balancer.PickServer(), ShouldBeNil)

balancer = &Balancer{config: defaultConfig, servers: []*Server{
Convey("It fails when all servers are down with connection problem", func() {
balancer := &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLConnection,
ServerDownDueToMySQLConnection,
ServerDownDueToMySQLConnection,
}}
So(balancer.PickServer(), ShouldBeNil)
})

balancer = &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLSlaveStatus,
ServerDownDueToMySQLSlaveStatus,
ServerDownDueToMySQLSlaveStatus,
}}
So(balancer.PickServer(), ShouldBeNil)
Convey("It succeds when all servers are with slave errors but has connection available", func() {

Convey("It succeds when one server has connection available", func() {
balancer := &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLConnection,
ServerUPWithMySQLSlaveStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLSlaveStatusError)
})
Convey("It succeds when all server has connection available", func() {
balancer := &Balancer{config: defaultConfig, servers: []*Server{
ServerUPWithMySQLSlaveStatusError,
ServerUPWithMySQLThreadStatusError,
}}

So(balancer.PickServer(), ShouldNotBeNil)
})

balancer = &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLThreadStatus,
ServerDownDueToMySQLThreadStatus,
ServerDownDueToMySQLThreadStatus,
}}
So(balancer.PickServer(), ShouldBeNil)
})

Convey("In the case of one healthy slave", func() {
Expand All @@ -178,6 +179,13 @@ func TestBalancer(t *testing.T) {
ServerUP,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUP)

balancer = &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLConnection,
ServerDownDueToMySQLConnection,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLThreadStatusError)
})
})

Expand All @@ -192,6 +200,7 @@ func TestBalancer(t *testing.T) {
ServerUPWithDelay,
ServerUPWithHighThreadConnections,
ServerUPWithDelayAndHighThreadConnections,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUP)

Expand All @@ -202,6 +211,7 @@ func TestBalancer(t *testing.T) {
&ServerUP2,
ServerUP,
ServerUPWithDelayAndHighThreadConnections,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, &ServerUP2)

Expand All @@ -210,6 +220,7 @@ func TestBalancer(t *testing.T) {
ServerUPWithDelay,
ServerUPWithHighThreadConnections,
ServerUPWithDelayAndHighThreadConnections,
ServerUPWithMySQLThreadStatusError,
ServerUP,
&ServerUP2,
}}
Expand All @@ -220,13 +231,15 @@ func TestBalancer(t *testing.T) {
ServerUPWithDelayAndHighThreadConnections,
ServerUPWithDelay,
ServerUPWithHighThreadConnections,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUPWithHighThreadConnections)

balancer = &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLConnection,
ServerUPWithDelayAndHighThreadConnections,
ServerUPWithDelay,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUPWithDelay)

Expand All @@ -240,6 +253,15 @@ func TestBalancer(t *testing.T) {
ServerDownDueToMySQLConnection,
ServerUPWithHighThreadConnections,
ServerUPWithHighRunningConnections,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUPWithHighThreadConnections)

balancer = &Balancer{config: defaultConfig, servers: []*Server{
ServerDownDueToMySQLConnection,
ServerUPWithDelayAndHighThreadConnections,
ServerUPWithHighThreadConnections,
ServerUPWithMySQLThreadStatusError,
}}
So(balancer.PickServer(), ShouldPointTo, ServerUPWithHighThreadConnections)

Expand Down
19 changes: 8 additions & 11 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,21 @@ func (h *ServerHealth) GetRunningConnections() *int {
return h.runningConnections
}

func (h *ServerHealth) setUP(secondsBehindMaster, openConnections, runningConnections *int) {
func (h *ServerHealth) setStatus(up bool, err error, secondsBehindMaster, openConnections, runningConnections *int) {
h.Lock()
defer h.Unlock()
h.up = true
h.err = nil
h.up = up
h.err = err
h.secondsBehindMaster = secondsBehindMaster
h.openConnections = openConnections
h.runningConnections = runningConnections
h.lastUpdate = time.Now()
}

func (h *ServerHealth) setUP(err error, secondsBehindMaster, openConnections, runningConnections *int) {
h.setStatus(true, err, secondsBehindMaster, openConnections, runningConnections)
}

func (h *ServerHealth) setDown(err error, secondsBehindMaster, openConnections, runningConnections *int) {
h.Lock()
defer h.Unlock()
h.up = false
h.err = err
h.secondsBehindMaster = secondsBehindMaster
h.openConnections = openConnections
h.runningConnections = runningConnections
h.lastUpdate = time.Now()
h.setStatus(false, err, secondsBehindMaster, openConnections, runningConnections)
}
Loading

0 comments on commit 7990328

Please sign in to comment.