Skip to content

Commit

Permalink
ensure maestro instance ready after adding to hash ring.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Dec 12, 2024
1 parent bc2f131 commit a322718
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 26 deletions.
73 changes: 54 additions & 19 deletions cmd/maestro/server/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,38 @@ import (
"net"
"net/http"

health "github.com/docker/go-healthcheck"
"github.com/gorilla/mux"
"github.com/openshift-online/maestro/pkg/dao"
"k8s.io/klog/v2"
)

var (
updater = health.NewStatusUpdater()
)

var _ Server = &healthCheckServer{}

type healthCheckServer struct {
httpServer *http.Server
httpServer *http.Server
instanceDao dao.InstanceDao
instanceID string
brokerType string
}

func NewHealthCheckServer() *healthCheckServer {
router := mux.NewRouter()
health.DefaultRegistry = health.NewRegistry()
health.Register("maintenance_status", updater)
router.HandleFunc("/healthcheck", health.StatusHandler).Methods(http.MethodGet)
router.HandleFunc("/healthcheck/down", downHandler).Methods(http.MethodPost)
router.HandleFunc("/healthcheck/up", upHandler).Methods(http.MethodPost)

srv := &http.Server{
Handler: router,
Addr: env().Config.HTTPServer.Hostname + ":" + env().Config.HealthCheck.BindPort,
}

return &healthCheckServer{
httpServer: srv,
sessionFactory := env().Database.SessionFactory
server := &healthCheckServer{
httpServer: srv,
instanceDao: dao.NewInstanceDao(&sessionFactory),
instanceID: env().Config.MessageBroker.ClientID,
brokerType: env().Config.MessageBroker.MessageBrokerType,
}

router.HandleFunc("/healthcheck", server.healthCheckHandler).Methods(http.MethodGet)

return server
}

func (s healthCheckServer) Start() {
Expand Down Expand Up @@ -73,10 +74,44 @@ func (s healthCheckServer) Listen() (listener net.Listener, err error) {
func (s healthCheckServer) Serve(listener net.Listener) {
}

func upHandler(w http.ResponseWriter, r *http.Request) {
updater.Update(nil)
}
// healthCheckHandler returns a 200 OK if the instance is ready, 503 Service Unavailable otherwise.
func (s healthCheckServer) healthCheckHandler(w http.ResponseWriter, r *http.Request) {
// For MQTT, check if the instance is ready
if s.brokerType == "mqtt" {
instance, err := s.instanceDao.Get(r.Context(), s.instanceID)
if err != nil {
klog.Errorf("Error getting instance: %v", err)
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(`{"status": "error"}`))
if err != nil {
klog.Errorf("Error writing healthcheck response: %v", err)
}
return
}
if instance.Ready {
klog.Infof("Instance is ready")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"status": "ok"}`))
if err != nil {
klog.Errorf("Error writing healthcheck response: %v", err)
}
return
}

klog.Infof("Instance not ready")
w.WriteHeader(http.StatusServiceUnavailable)
_, err = w.Write([]byte(`{"status": "not ready"}`))
if err != nil {
klog.Errorf("Error writing healthcheck response: %v", err)
}
return
}

func downHandler(w http.ResponseWriter, r *http.Request) {
updater.Update(fmt.Errorf("maintenance mode"))
// For gRPC broker, return 200 OK for now
klog.Infof("Instance is ready")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"status": "ok"}`))
if err != nil {
klog.Errorf("Error writing healthcheck response: %v", err)
}
}
16 changes: 13 additions & 3 deletions cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func (s *PulseServer) pulse(ctx context.Context) {
log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID)
instance := &api.ServerInstance{
Meta: api.Meta{
ID: s.instanceID,
UpdatedAt: time.Now(),
ID: s.instanceID,
},
LastPulse: time.Now(),
}
_, err := s.instanceDao.UpSert(ctx, instance)
if err != nil {
Expand Down Expand Up @@ -149,13 +149,16 @@ func (s *PulseServer) checkInstances(ctx context.Context) {
return
}

activeInstanceIDs := []string{}
inactiveInstanceIDs := []string{}
for _, instance := range instances {
// Instances pulsing within the last three check intervals are considered as active.
if instance.UpdatedAt.After(time.Now().Add(time.Duration(int64(-3*time.Second) * s.pulseInterval))) {
if instance.LastPulse.After(time.Now().Add(time.Duration(int64(-3*time.Second) * s.pulseInterval))) {
if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil {
log.Error(fmt.Sprintf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error()))
}
// mark the instance as active after it is added to the status dispatcher
activeInstanceIDs = append(activeInstanceIDs, instance.ID)
} else {
if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil {
log.Error(fmt.Sprintf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error()))
Expand All @@ -165,6 +168,13 @@ func (s *PulseServer) checkInstances(ctx context.Context) {
}
}

if len(activeInstanceIDs) > 0 {
// batch mark active instances
if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil {
log.Error(fmt.Sprintf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error()))
}
}

if len(inactiveInstanceIDs) > 0 {
// batch delete inactive instances
if err := s.instanceDao.DeleteByIDs(ctx, inactiveInstanceIDs); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/cloudevents/sdk-go/v2 v2.15.3-0.20240911135016-682f3a9684e4
github.com/deckarep/golang-set/v2 v2.6.0
github.com/docker/go-healthcheck v0.1.0
github.com/evanphx/json-patch v5.9.0+incompatible
github.com/getsentry/sentry-go v0.20.0
github.com/ghodss/yaml v1.0.0
Expand Down Expand Up @@ -79,7 +78,6 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/eclipse/paho.golang v0.21.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/felixge/fgprof v0.9.4 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfc
github.com/docker/docker v20.10.17+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-healthcheck v0.1.0 h1:6ZrRr63F5LLsPwSlbZgjgoxNu+o1VlMIhCQWgbfrgU0=
github.com/docker/go-healthcheck v0.1.0/go.mod h1:3v7a0338vhH6WnYFtUd66S+9QK3M6xK4sKr7gGrht6o=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/server_instance.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package api

import "time"

// ServerInstance is employed by Maestro to discover active server instances. The updatedAt field
// determines the liveness of the instance; if the instance remains unchanged for three consecutive
// check intervals (30 seconds by default), it is marked as dead.
// However, it is not meant for direct exposure to end users through the API.
type ServerInstance struct {
Meta
LastPulse time.Time // LastPulse indicates the last time the instance pulsed.
Ready bool // Ready indicates whether the instance is ready to serve requests.
}

type ServerInstanceList []*ServerInstance
Expand Down
10 changes: 10 additions & 0 deletions pkg/dao/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type InstanceDao interface {
Create(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error)
Replace(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error)
UpSert(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error)
MarkReadyByIDs(ctx context.Context, ids []string) error
Delete(ctx context.Context, id string) error
DeleteByIDs(ctx context.Context, ids []string) error
FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error)
Expand Down Expand Up @@ -68,6 +69,15 @@ func (d *sqlInstanceDao) UpSert(ctx context.Context, instance *api.ServerInstanc
return instance, nil
}

func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Model(&api.ServerInstance{}).Where("id in (?)", ids).Update("ready", true).Error; err != nil {
db.MarkForRollback(ctx, err)
return err
}
return nil
}

func (d *sqlInstanceDao) Delete(ctx context.Context, id string) error {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Delete(&api.ServerInstance{Meta: api.Meta{ID: id}}).Error; err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/dao/mocks/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ func (d *instanceDaoMock) UpSert(ctx context.Context, instance *api.ServerInstan
return instance, nil
}

func (d *instanceDaoMock) MarkReadyByIDs(ctx context.Context, ids []string) error {
d.mux.Lock()
defer d.mux.Unlock()
for _, instance := range d.instances {
if contains(ids, instance.ID) {
instance.Ready = true
}
}
return nil
}

func (d *instanceDaoMock) Delete(ctx context.Context, ID string) error {
d.mux.Lock()
defer d.mux.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/db/migrations/202401151014_add_server_instances.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package migrations

import (
"time"

"gorm.io/gorm"

"github.com/go-gormigrate/gormigrate/v2"
Expand All @@ -9,6 +11,8 @@ import (
func addServerInstances() *gormigrate.Migration {
type ServerInstance struct {
Model
LastPulse time.Time
Ready bool `gorm:"default:false"`
}

return &gormigrate.Migration{
Expand Down

0 comments on commit a322718

Please sign in to comment.