diff --git a/.travis.yml b/.travis.yml index 3de8427..d7aa2de 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: -- 1.11 +- 1.12 go_import_path: github.com/StudioSol/balancer install: - make upgrade diff --git a/balancer.go b/balancer.go index 9ce3463..0df1559 100644 --- a/balancer.go +++ b/balancer.go @@ -13,12 +13,16 @@ type bySecondsBehindMaster Servers func (a bySecondsBehindMaster) Len() int { return len(a) } func (a bySecondsBehindMaster) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a bySecondsBehindMaster) Less(i, j int) bool { + if a[i].health.secondsBehindMaster == nil && a[j].health.secondsBehindMaster == nil { + return false + } if a[i].health.secondsBehindMaster == nil && a[j].health.secondsBehindMaster != nil { return false } if a[i].health.secondsBehindMaster != nil && a[j].health.secondsBehindMaster == nil { return true } + return *a[i].health.secondsBehindMaster < *a[j].health.secondsBehindMaster } @@ -27,7 +31,6 @@ type byConnections Servers func (a byConnections) Len() int { return len(a) } func (a byConnections) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byConnections) Less(i, j int) bool { - if a[i].health.runningConnections == nil && a[j].health.runningConnections != nil { return false } @@ -37,6 +40,9 @@ func (a byConnections) Less(i, j int) bool { if a[i].health.runningConnections == a[j].health.runningConnections { + if a[i].health.openConnections == nil && a[j].health.openConnections == nil { + return false + } if a[i].health.openConnections == nil && a[j].health.openConnections != nil { return false } diff --git a/balancer_test.go b/balancer_test.go index b52d4c1..275bfa5 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -9,7 +9,8 @@ import ( ) var ( - ServerDownDueToMySQLConnection, ServerDownDueToMySQLSlaveStatus, ServerDownDueToMySQLThreadStatus *Server + ServerDownDueToMySQLConnection *Server + ServerUPWithMySQLSlaveStatusError, ServerUPWithMySQLThreadStatusError *Server ServerUP, ServerUPWithDelay, ServerUPWithHighThreadConnections, ServerUPWithDelayAndHighThreadConnections *Server ServerUPWithHighRunningConnections *Server ) @@ -28,51 +29,51 @@ func init() { errors.New("__MYSQL_CONNECTION_ERROR__"), intNilHelper, intNilHelper, intNilHelper, ) - ServerDownDueToMySQLSlaveStatus = &Server{ - name: "ServerDownDueToMySQLSlaveStatus", + ServerUPWithMySQLSlaveStatusError = &Server{ + name: "ServerUPWithMySQLSlaveStatusError", health: &ServerHealth{}, } - ServerDownDueToMySQLSlaveStatus.health.setDown( + ServerUPWithMySQLSlaveStatusError.health.setUP( errors.New("__MYSQL_SLAVE_STATUS_ERROR__"), intNilHelper, intNilHelper, intNilHelper, ) - ServerDownDueToMySQLThreadStatus = &Server{ - name: "ServerDownDueToMySQLThreadStatus", + ServerUPWithMySQLThreadStatusError = &Server{ + name: "ServerUPWithMySQLThreadStatusError", health: &ServerHealth{}, } - ServerDownDueToMySQLThreadStatus.health.setDown( - errors.New("__MYSQL_THREADS_STATUS_ERROR__"), &zeroHelper, intNilHelper, intNilHelper, + ServerUPWithMySQLThreadStatusError.health.setUP( + errors.New("__MYSQL_THREADS_STATUS_ERROR__"), intNilHelper, intNilHelper, intNilHelper, ) ServerUP = &Server{ name: "ServerUP", health: &ServerHealth{}, } - ServerUP.health.setUP(&zeroHelper, &oneHelper, &oneHelper) + ServerUP.health.setUP(nil, &zeroHelper, &oneHelper, &oneHelper) ServerUPWithDelay = &Server{ name: "ServerUPWithDelay", health: &ServerHealth{}, } - ServerUPWithDelay.health.setUP(&thousandHelper, &oneHelper, &oneHelper) + ServerUPWithDelay.health.setUP(nil, &thousandHelper, &oneHelper, &oneHelper) ServerUPWithHighThreadConnections = &Server{ name: "ServerUPWithHighThreadConnections", health: &ServerHealth{}, } - ServerUPWithHighThreadConnections.health.setUP(&zeroHelper, &thousandHelper, &oneHelper) + ServerUPWithHighThreadConnections.health.setUP(nil, &zeroHelper, &thousandHelper, &oneHelper) ServerUPWithDelayAndHighThreadConnections = &Server{ name: "ServerUPWithDelayAndHighThreadConnections", health: &ServerHealth{}, } - ServerUPWithDelayAndHighThreadConnections.health.setUP(&thousandHelper, &thousandHelper, &oneHelper) + ServerUPWithDelayAndHighThreadConnections.health.setUP(nil, &thousandHelper, &thousandHelper, &oneHelper) ServerUPWithHighRunningConnections = &Server{ name: "ServerUPWithHighRunningConnections", health: &ServerHealth{}, } - ServerUPWithHighRunningConnections.health.setUP(&zeroHelper, &thousandHelper, &thousandHelper) + ServerUPWithHighRunningConnections.health.setUP(nil, &zeroHelper, &thousandHelper, &thousandHelper) } func TestBalancer(t *testing.T) { @@ -86,18 +87,18 @@ func TestBalancer(t *testing.T) { So(balancer.PickServer(), ShouldBeNil) }) - Convey("It fails when the server is down due to error acquiring slave status", func() { + Convey("It succeeds when the server is up due to error acquiring slave status", func() { balancer := &Balancer{config: defaultConfig, servers: []*Server{ - ServerDownDueToMySQLSlaveStatus, + ServerUPWithMySQLSlaveStatusError, }} - So(balancer.PickServer(), ShouldBeNil) + So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLSlaveStatusError) }) - Convey("It fails when the server is down due to error acquiring thread status", func() { + Convey("It succeeds when the server is up due to error acquiring thread status", func() { balancer := &Balancer{config: defaultConfig, servers: []*Server{ - ServerDownDueToMySQLThreadStatus, + ServerUPWithMySQLThreadStatusError, }} - So(balancer.PickServer(), ShouldBeNil) + So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLThreadStatusError) }) Convey("It succeeds when the server is healthy", func() { @@ -124,34 +125,34 @@ func TestBalancer(t *testing.T) { }) Convey("Given a balancer with more than one server", t, func() { - Convey("It fails when all servers are down no matter the reason", func() { - balancer := &Balancer{config: defaultConfig, servers: []*Server{ - ServerDownDueToMySQLConnection, - ServerDownDueToMySQLSlaveStatus, - ServerDownDueToMySQLThreadStatus, - }} - So(balancer.PickServer(), ShouldBeNil) - balancer = &Balancer{config: defaultConfig, servers: []*Server{ + Convey("It fails when all servers are down with connection problem", func() { + balancer := &Balancer{config: defaultConfig, servers: []*Server{ ServerDownDueToMySQLConnection, ServerDownDueToMySQLConnection, ServerDownDueToMySQLConnection, }} So(balancer.PickServer(), ShouldBeNil) + }) - balancer = &Balancer{config: defaultConfig, servers: []*Server{ - ServerDownDueToMySQLSlaveStatus, - ServerDownDueToMySQLSlaveStatus, - ServerDownDueToMySQLSlaveStatus, - }} - So(balancer.PickServer(), ShouldBeNil) + Convey("It succeds when all servers are with slave errors but has connection available", func() { + + Convey("It succeds when one server has connection available", func() { + balancer := &Balancer{config: defaultConfig, servers: []*Server{ + ServerDownDueToMySQLConnection, + ServerUPWithMySQLSlaveStatusError, + }} + So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLSlaveStatusError) + }) + Convey("It succeds when all server has connection available", func() { + balancer := &Balancer{config: defaultConfig, servers: []*Server{ + ServerUPWithMySQLSlaveStatusError, + ServerUPWithMySQLThreadStatusError, + }} + + So(balancer.PickServer(), ShouldNotBeNil) + }) - balancer = &Balancer{config: defaultConfig, servers: []*Server{ - ServerDownDueToMySQLThreadStatus, - ServerDownDueToMySQLThreadStatus, - ServerDownDueToMySQLThreadStatus, - }} - So(balancer.PickServer(), ShouldBeNil) }) Convey("In the case of one healthy slave", func() { @@ -178,6 +179,13 @@ func TestBalancer(t *testing.T) { ServerUP, }} So(balancer.PickServer(), ShouldPointTo, ServerUP) + + balancer = &Balancer{config: defaultConfig, servers: []*Server{ + ServerDownDueToMySQLConnection, + ServerDownDueToMySQLConnection, + ServerUPWithMySQLThreadStatusError, + }} + So(balancer.PickServer(), ShouldPointTo, ServerUPWithMySQLThreadStatusError) }) }) @@ -192,6 +200,7 @@ func TestBalancer(t *testing.T) { ServerUPWithDelay, ServerUPWithHighThreadConnections, ServerUPWithDelayAndHighThreadConnections, + ServerUPWithMySQLThreadStatusError, }} So(balancer.PickServer(), ShouldPointTo, ServerUP) @@ -202,6 +211,7 @@ func TestBalancer(t *testing.T) { &ServerUP2, ServerUP, ServerUPWithDelayAndHighThreadConnections, + ServerUPWithMySQLThreadStatusError, }} So(balancer.PickServer(), ShouldPointTo, &ServerUP2) @@ -210,6 +220,7 @@ func TestBalancer(t *testing.T) { ServerUPWithDelay, ServerUPWithHighThreadConnections, ServerUPWithDelayAndHighThreadConnections, + ServerUPWithMySQLThreadStatusError, ServerUP, &ServerUP2, }} @@ -220,6 +231,7 @@ func TestBalancer(t *testing.T) { ServerUPWithDelayAndHighThreadConnections, ServerUPWithDelay, ServerUPWithHighThreadConnections, + ServerUPWithMySQLThreadStatusError, }} So(balancer.PickServer(), ShouldPointTo, ServerUPWithHighThreadConnections) @@ -227,6 +239,7 @@ func TestBalancer(t *testing.T) { ServerDownDueToMySQLConnection, ServerUPWithDelayAndHighThreadConnections, ServerUPWithDelay, + ServerUPWithMySQLThreadStatusError, }} So(balancer.PickServer(), ShouldPointTo, ServerUPWithDelay) @@ -240,6 +253,15 @@ func TestBalancer(t *testing.T) { ServerDownDueToMySQLConnection, ServerUPWithHighThreadConnections, ServerUPWithHighRunningConnections, + ServerUPWithMySQLThreadStatusError, + }} + So(balancer.PickServer(), ShouldPointTo, ServerUPWithHighThreadConnections) + + balancer = &Balancer{config: defaultConfig, servers: []*Server{ + ServerDownDueToMySQLConnection, + ServerUPWithDelayAndHighThreadConnections, + ServerUPWithHighThreadConnections, + ServerUPWithMySQLThreadStatusError, }} So(balancer.PickServer(), ShouldPointTo, ServerUPWithHighThreadConnections) diff --git a/health.go b/health.go index 4726ae6..b7b94a9 100644 --- a/health.go +++ b/health.go @@ -45,24 +45,21 @@ func (h *ServerHealth) GetRunningConnections() *int { return h.runningConnections } -func (h *ServerHealth) setUP(secondsBehindMaster, openConnections, runningConnections *int) { +func (h *ServerHealth) setStatus(up bool, err error, secondsBehindMaster, openConnections, runningConnections *int) { h.Lock() defer h.Unlock() - h.up = true - h.err = nil + h.up = up + h.err = err h.secondsBehindMaster = secondsBehindMaster h.openConnections = openConnections h.runningConnections = runningConnections h.lastUpdate = time.Now() } +func (h *ServerHealth) setUP(err error, secondsBehindMaster, openConnections, runningConnections *int) { + h.setStatus(true, err, secondsBehindMaster, openConnections, runningConnections) +} + func (h *ServerHealth) setDown(err error, secondsBehindMaster, openConnections, runningConnections *int) { - h.Lock() - defer h.Unlock() - h.up = false - h.err = err - h.secondsBehindMaster = secondsBehindMaster - h.openConnections = openConnections - h.runningConnections = runningConnections - h.lastUpdate = time.Now() + h.setStatus(false, err, secondsBehindMaster, openConnections, runningConnections) } diff --git a/server.go b/server.go index a452540..8fbee05 100644 --- a/server.go +++ b/server.go @@ -79,44 +79,23 @@ func (s *Server) CheckHealth(traceOn bool, logger Logger) { s.isChecking = false }() - if err := s.connectIfNecessary(traceOn, logger); err != nil { + if err := s.connectReadUser(traceOn, logger); err != nil { s.health.setDown( err, secondsBehindMaster, openConnections, runningConnections, ) return } - slaveStatusResult, err := s.rawQuery("SHOW SLAVE STATUS", logger) - if err != nil { - s.health.setDown( + if err := s.connectReplicationUser(traceOn, logger); err != nil { + s.health.setUP( err, secondsBehindMaster, openConnections, runningConnections, ) return } - rawSecondsBehindMaster := strings.TrimSpace(slaveStatusResult["Seconds_Behind_Master"]) - if rawSecondsBehindMaster == "" || strings.ToLower(rawSecondsBehindMaster) == "null" { - s.health.setDown( - fmt.Errorf("empty or null value for Seconds_Behind_Master returned from MySQL: %s", err), - secondsBehindMaster, openConnections, runningConnections, - ) - return - } - - tmp, err := strconv.Atoi(rawSecondsBehindMaster) - if err != nil { - s.health.setDown( - fmt.Errorf("unexpected value for Seconds_Behind_Master returned from MySQL (conversion error): %s", err), - secondsBehindMaster, openConnections, runningConnections, - ) - return - } - - secondsBehindMaster = &tmp - threadsConnectedResult, err := s.rawQuery("SHOW STATUS LIKE 'Threads_connected'", logger) if err != nil { - s.health.setDown( + s.health.setUP( fmt.Errorf("failed acquiring MySQL thread connected status: %s", err), secondsBehindMaster, openConnections, runningConnections, ) @@ -126,7 +105,7 @@ func (s *Server) CheckHealth(traceOn bool, logger Logger) { threadsConnected := threadsConnectedResult["Value"] tmp2, err := strconv.Atoi(threadsConnected) if err != nil { - s.health.setDown( + s.health.setUP( fmt.Errorf("unexpected value for Threads_connected returned from MySQL: %s", err), secondsBehindMaster, openConnections, runningConnections, ) @@ -137,7 +116,7 @@ func (s *Server) CheckHealth(traceOn bool, logger Logger) { threadsRunningResult, err := s.rawQuery("SHOW STATUS LIKE 'Threads_running'", logger) if err != nil { - s.health.setDown( + s.health.setUP( fmt.Errorf("failed acquiring MySQL thread running status: %s", err), secondsBehindMaster, openConnections, runningConnections, ) @@ -147,7 +126,7 @@ func (s *Server) CheckHealth(traceOn bool, logger Logger) { threadsRunning := threadsRunningResult["Value"] tmp3, err := strconv.Atoi(threadsRunning) if err != nil { - s.health.setDown( + s.health.setUP( fmt.Errorf("unexpected value for Threads_running returned from MySQL: %s", err), secondsBehindMaster, openConnections, runningConnections, ) @@ -156,10 +135,38 @@ func (s *Server) CheckHealth(traceOn bool, logger Logger) { runningConnections = &tmp3 - s.health.setUP(secondsBehindMaster, openConnections, runningConnections) + slaveStatusResult, err := s.rawQuery("SHOW SLAVE STATUS", logger) + if err != nil { + s.health.setUP( + err, secondsBehindMaster, openConnections, runningConnections, + ) + return + } + + rawSecondsBehindMaster := strings.TrimSpace(slaveStatusResult["Seconds_Behind_Master"]) + if rawSecondsBehindMaster == "" || strings.ToLower(rawSecondsBehindMaster) == "null" { + s.health.setUP( + fmt.Errorf("empty or null value for Seconds_Behind_Master returned from MySQL: %s", err), + secondsBehindMaster, openConnections, runningConnections, + ) + return + } + + tmp, err := strconv.Atoi(rawSecondsBehindMaster) + if err != nil { + s.health.setUP( + fmt.Errorf("unexpected value for Seconds_Behind_Master returned from MySQL (conversion error): %s", err), + secondsBehindMaster, openConnections, runningConnections, + ) + return + } + + secondsBehindMaster = &tmp + + s.health.setUP(nil, secondsBehindMaster, openConnections, runningConnections) } -func (s *Server) connectIfNecessary(traceOn bool, logger Logger) error { +func (s *Server) connectReadUser(traceOn bool, logger Logger) error { mutex.Lock() defer mutex.Unlock() @@ -171,6 +178,13 @@ func (s *Server) connectIfNecessary(traceOn bool, logger Logger) error { s.connection = conn } + return nil +} + +func (s *Server) connectReplicationUser(traceOn bool, logger Logger) error { + mutex.Lock() + defer mutex.Unlock() + if s.replicationConnection == nil { conn, err := s.connect(s.serverSettings.ReplicationDSN, traceOn, logger) if err != nil { diff --git a/server_test.go b/server_test.go index 0b8ebde..5324c4c 100644 --- a/server_test.go +++ b/server_test.go @@ -51,14 +51,15 @@ func getMock(t *testing.T) (*gorp.DbMap, sqlmock.Sqlmock) { func mockHealthQueries(t *testing.T, mock sqlmock.Sqlmock, secondsBehindMaster, openConnections, runningConnections driver.Value) { t.Helper() - mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows( - sqlmock.NewRows([]string{"Seconds_Behind_Master"}).AddRow(secondsBehindMaster)) - mock.ExpectQuery("SHOW STATUS LIKE 'Threads_connected'").WillReturnRows( sqlmock.NewRows([]string{"Value"}).AddRow(openConnections)) mock.ExpectQuery("SHOW STATUS LIKE 'Threads_running'").WillReturnRows( sqlmock.NewRows([]string{"Value"}).AddRow(runningConnections)) + + mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows( + sqlmock.NewRows([]string{"Seconds_Behind_Master"}).AddRow(secondsBehindMaster)) + } func TestServerAttributes(t *testing.T) { @@ -154,7 +155,6 @@ func TestCheckHealth(t *testing.T) { db, mock := getMock(t) logger := newLoggerMock() health := new(ServerHealth) - server := Server{ connection: db, replicationConnection: db, @@ -167,6 +167,8 @@ func TestCheckHealth(t *testing.T) { Convey("It should succeed without errors", func() { server.CheckHealth(false, logger) + So(health.up, ShouldBeTrue) + So(health.runningConnections, ShouldNotBeNil) So(*health.runningConnections, ShouldEqual, 1) @@ -183,9 +185,10 @@ func TestCheckHealth(t *testing.T) { Convey("When slave status are empty", func() { mockHealthQueries(t, mock, nil, 2, 1) - Convey("It should set health down", func() { + Convey("It should set error on check", func() { server.CheckHealth(false, logger) + So(health.up, ShouldBeTrue) So(health.err, ShouldNotBeNil) So(health.err.Error(), ShouldContainSubstring, "empty or null value for Seconds_Behind_Master") @@ -196,9 +199,10 @@ func TestCheckHealth(t *testing.T) { Convey("When openConnections are empty", func() { mockHealthQueries(t, mock, 0, nil, 1) - Convey("It should set health down", func() { + Convey("It should set error on check", func() { server.CheckHealth(false, logger) + So(health.up, ShouldBeTrue) So(health.err, ShouldNotBeNil) So(health.err.Error(), ShouldContainSubstring, "unexpected value for Threads_connected") @@ -209,9 +213,9 @@ func TestCheckHealth(t *testing.T) { Convey("When runningConnections are empty", func() { mockHealthQueries(t, mock, 0, 1, nil) - Convey("It should set health down", func() { + Convey("It should set error on check", func() { server.CheckHealth(false, logger) - + So(health.up, ShouldBeTrue) So(health.err, ShouldNotBeNil) So(health.err.Error(), ShouldContainSubstring, "unexpected value for Threads_running") @@ -219,4 +223,50 @@ func TestCheckHealth(t *testing.T) { }) }) }) + + Convey("Given a invalid server", t, func() { + logger := newLoggerMock() + health := new(ServerHealth) + server := Server{ + connection: nil, + replicationConnection: nil, + health: health, + } + + Convey("When slave connection is nil", func() { + + Convey("It should fail with server down and errors", func() { + server.CheckHealth(false, logger) + + So(health.up, ShouldBeFalse) + So(health.err, ShouldNotBeNil) + + }) + }) + + }) + + Convey("Given a valid server without replication connection", t, func() { + db, _ := getMock(t) + logger := newLoggerMock() + health := new(ServerHealth) + server := Server{ + connection: db, + replicationConnection: nil, + health: health, + } + + Convey("When slave connection is nil", func() { + + Convey("It should succeed with server down and errors", func() { + server.CheckHealth(false, logger) + + So(health.up, ShouldBeTrue) + So(health.err, ShouldNotBeNil) + + }) + }) + + }) + }