diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 2345a420..8d4aca79 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -15,7 +15,7 @@ permissions: jobs: e2e: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v4 @@ -32,7 +32,7 @@ jobs: container_tool: docker SERVER_REPLICAS: 2 e2e-broadcast-subscription: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v4 @@ -50,7 +50,7 @@ jobs: SERVER_REPLICAS: 2 ENABLE_BROADCAST_SUBSCRIPTION: true e2e-grpc-broker: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v4 diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index c0f41c30..876da6e6 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -11,6 +11,9 @@ import ( "github.com/openshift-online/maestro/cmd/maestro/environments" "github.com/openshift-online/maestro/cmd/maestro/server" + "github.com/openshift-online/maestro/pkg/config" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" ) @@ -35,6 +38,8 @@ func runServer(cmd *cobra.Command, args []string) { klog.Fatalf("Unable to initialize environment: %s", err.Error()) } + healthcheckServer := server.NewHealthCheckServer() + // Create event broadcaster to broadcast resource status update events to subscribers eventBroadcaster := event.NewEventBroadcaster() @@ -42,17 +47,34 @@ func runServer(cmd *cobra.Command, args []string) { // For gRPC, create a gRPC broker to handle resource spec and status events. // For MQTT, create a Pulse server to handle resource spec and status events. var eventServer server.EventServer - if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { + switch environments.Environment().Config.MessageBroker.MessageBrokerType { + case "mqtt": + klog.Info("Setting up pulse server") + var statusDispatcher dispatcher.Dispatcher + subscriptionType := environments.Environment().Config.EventServer.SubscriptionType + switch config.SubscriptionType(subscriptionType) { + case config.SharedSubscriptionType: + statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource) + case config.BroadcastSubscriptionType: + statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory), + dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig) + default: + klog.Errorf("Unsupported subscription type: %s", subscriptionType) + } + + // Set the status dispatcher for the healthcheck server + healthcheckServer.SetStatusDispatcher(statusDispatcher) + eventServer = server.NewMQTTEventServer(eventBroadcaster, statusDispatcher) + case "grpc": klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) - } else { - klog.Info("Setting up pulse server") - eventServer = server.NewPulseServer(eventBroadcaster) + default: + klog.Errorf("Unsupported message broker type: %s", environments.Environment().Config.MessageBroker.MessageBrokerType) } + // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - healthcheckServer := server.NewHealthCheckServer() controllersServer := server.NewControllersServer(eventServer) ctx, cancel := context.WithCancel(context.Background()) @@ -70,10 +92,6 @@ func runServer(cmd *cobra.Command, args []string) { if err := metricsServer.Stop(); err != nil { klog.Errorf("Failed to stop metrics server, %v", err) } - - if err := healthcheckServer.Stop(); err != nil { - klog.Errorf("Failed to stop healthcheck server, %v", err) - } }() // Start the event broadcaster @@ -82,7 +100,7 @@ func runServer(cmd *cobra.Command, args []string) { // Run the servers go apiserver.Start() go metricsServer.Start() - go healthcheckServer.Start() + go healthcheckServer.Start(ctx) go eventServer.Start(ctx) go controllersServer.Start(ctx) diff --git a/cmd/maestro/server/pulse_server.go b/cmd/maestro/server/event_server.go similarity index 66% rename from cmd/maestro/server/pulse_server.go rename to cmd/maestro/server/event_server.go index e8590c72..77ba990b 100644 --- a/cmd/maestro/server/pulse_server.go +++ b/cmd/maestro/server/event_server.go @@ -3,11 +3,9 @@ package server import ( "context" "fmt" - "time" "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/client/cloudevents" - "github.com/openshift-online/maestro/pkg/config" "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/dispatcher" @@ -15,8 +13,6 @@ import ( "github.com/openshift-online/maestro/pkg/logger" "github.com/openshift-online/maestro/pkg/services" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" @@ -45,16 +41,14 @@ type EventServer interface { OnStatusUpdate(ctx context.Context, eventID, resourceID string) error } -var _ EventServer = &PulseServer{} +var _ EventServer = &MQTTEventServer{} -// PulseServer represents a server responsible for publish resource spec events from +// MQTTEventServer represents a server responsible for publish resource spec events from // resource controller and handle resource status update events from the maestro agent. // It also periodic heartbeat updates and checking the liveness of Maestro instances, // triggering status resync based on instances' status and other conditions. -type PulseServer struct { +type MQTTEventServer struct { instanceID string - pulseInterval int64 - instanceDao dao.InstanceDao eventInstanceDao dao.EventInstanceDao lockFactory db.LockFactory eventBroadcaster *event.EventBroadcaster // event broadcaster to broadcast resource status update events to subscribers @@ -64,22 +58,10 @@ type PulseServer struct { statusDispatcher dispatcher.Dispatcher } -func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer { - var statusDispatcher dispatcher.Dispatcher - switch config.SubscriptionType(env().Config.PulseServer.SubscriptionType) { - case config.SharedSubscriptionType: - statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource) - case config.BroadcastSubscriptionType: - statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory), - dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig) - default: - klog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType) - } +func NewMQTTEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer { sessionFactory := env().Database.SessionFactory - return &PulseServer{ + return &MQTTEventServer{ instanceID: env().Config.MessageBroker.ClientID, - pulseInterval: env().Config.PulseServer.PulseInterval, - instanceDao: dao.NewInstanceDao(&sessionFactory), eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), lockFactory: db.NewAdvisoryLockFactory(sessionFactory), eventBroadcaster: eventBroadcaster, @@ -93,7 +75,7 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer { // Start initializes and runs the pulse server, updating and checking Maestro instances' liveness, // initializes subscription to status update messages and triggers status resync based on // instances' status and other conditions. -func (s *PulseServer) Start(ctx context.Context) { +func (s *MQTTEventServer) Start(ctx context.Context) { log.Infof("Starting pulse server") // start subscribing to resource status update messages. @@ -101,91 +83,14 @@ func (s *PulseServer) Start(ctx context.Context) { // start the status dispatcher go s.statusDispatcher.Start(ctx) - // start a goroutine to periodically update heartbeat for the current maestro instance - go wait.UntilWithContext(ctx, s.pulse, time.Duration(s.pulseInterval*int64(time.Second))) - - // start a goroutine to periodically check the liveness of maestro instances - go wait.UntilWithContext(ctx, s.checkInstances, time.Duration(s.pulseInterval/3*int64(time.Second))) - // wait until context is canceled <-ctx.Done() log.Infof("Shutting down pulse server") } -func (s *PulseServer) pulse(ctx context.Context) { - log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID) - instance := &api.ServerInstance{ - Meta: api.Meta{ - ID: s.instanceID, - }, - LastPulse: time.Now(), - } - _, err := s.instanceDao.UpSert(ctx, instance) - if err != nil { - log.Error(fmt.Sprintf("Unable to upsert maestro instance: %s", err.Error())) - } -} - -func (s *PulseServer) checkInstances(ctx context.Context) { - log.V(10).Infof("Checking liveness of maestro instances") - // lock the Instance with a fail-fast advisory lock context. - // this allows concurrent processing of many instances by one or more maestro instances exclusively. - lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-pulse-check", db.Instances) - // Ensure that the transaction related to this lock always end. - defer s.lockFactory.Unlock(ctx, lockOwnerID) - if err != nil { - log.Error(fmt.Sprintf("error obtaining the instance lock: %v", err)) - return - } - // skip if the lock is not acquired - if !acquired { - log.V(4).Infof("failed to acquire the lock as another maestro instance is checking instances, skip") - return - } - - instances, err := s.instanceDao.All(ctx) - if err != nil { - log.Error(fmt.Sprintf("Unable to get all maestro instances: %s", err.Error())) - return - } - - activeInstanceIDs := []string{} - inactiveInstanceIDs := []string{} - for _, instance := range instances { - // Instances pulsing within the last three check intervals are considered as active. - if instance.LastPulse.After(time.Now().Add(time.Duration(int64(-3*time.Second) * s.pulseInterval))) { - if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil { - log.Error(fmt.Sprintf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error())) - } - // mark the instance as active after it is added to the status dispatcher - activeInstanceIDs = append(activeInstanceIDs, instance.ID) - } else { - if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil { - log.Error(fmt.Sprintf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error())) - } else { - inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID) - } - } - } - - if len(activeInstanceIDs) > 0 { - // batch mark active instances - if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil { - log.Error(fmt.Sprintf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error())) - } - } - - if len(inactiveInstanceIDs) > 0 { - // batch delete inactive instances - if err := s.instanceDao.DeleteByIDs(ctx, inactiveInstanceIDs); err != nil { - log.Error(fmt.Sprintf("Unable to delete inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error())) - } - } -} - // startSubscription initiates the subscription to resource status update messages. // It runs asynchronously in the background until the provided context is canceled. -func (s *PulseServer) startSubscription(ctx context.Context) { +func (s *MQTTEventServer) startSubscription(ctx context.Context) { s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error { log.V(4).Infof("received action %s for resource %s", action, resource.ID) @@ -210,17 +115,17 @@ func (s *PulseServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *PulseServer) OnCreate(ctx context.Context, resourceID string) error { +func (s *MQTTEventServer) OnCreate(ctx context.Context, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *PulseServer) OnUpdate(ctx context.Context, resourceID string) error { +func (s *MQTTEventServer) OnUpdate(ctx context.Context, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *PulseServer) OnDelete(ctx context.Context, resourceID string) error { +func (s *MQTTEventServer) OnDelete(ctx context.Context, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } @@ -228,7 +133,7 @@ func (s *PulseServer) OnDelete(ctx context.Context, resourceID string) error { // It does two things: // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance -func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { +func (s *MQTTEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { statusEvent, sErr := s.statusEventService.Get(ctx, eventID) if sErr != nil { return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) diff --git a/cmd/maestro/server/healthcheck_server.go b/cmd/maestro/server/healthcheck_server.go index c4d75db5..07bed664 100755 --- a/cmd/maestro/server/healthcheck_server.go +++ b/cmd/maestro/server/healthcheck_server.go @@ -2,25 +2,32 @@ package server import ( "context" + e "errors" "fmt" - "net" "net/http" + "time" "github.com/gorilla/mux" + "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" + "github.com/openshift-online/maestro/pkg/dispatcher" + "gorm.io/gorm" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ) -var _ Server = &healthCheckServer{} - -type healthCheckServer struct { - httpServer *http.Server - instanceDao dao.InstanceDao - instanceID string - brokerType string +type HealthCheckServer struct { + httpServer *http.Server + statusDispatcher dispatcher.Dispatcher + lockFactory db.LockFactory + instanceDao dao.InstanceDao + instanceID string + heartbeatInterval int + brokerType string } -func NewHealthCheckServer() *healthCheckServer { +func NewHealthCheckServer() *HealthCheckServer { router := mux.NewRouter() srv := &http.Server{ Handler: router, @@ -28,11 +35,13 @@ func NewHealthCheckServer() *healthCheckServer { } sessionFactory := env().Database.SessionFactory - server := &healthCheckServer{ - httpServer: srv, - instanceDao: dao.NewInstanceDao(&sessionFactory), - instanceID: env().Config.MessageBroker.ClientID, - brokerType: env().Config.MessageBroker.MessageBrokerType, + server := &HealthCheckServer{ + httpServer: srv, + lockFactory: db.NewAdvisoryLockFactory(sessionFactory), + instanceDao: dao.NewInstanceDao(&sessionFactory), + instanceID: env().Config.MessageBroker.ClientID, + heartbeatInterval: env().Config.HealthCheck.HeartbeartInterval, + brokerType: env().Config.MessageBroker.MessageBrokerType, } router.HandleFunc("/healthcheck", server.healthCheckHandler).Methods(http.MethodGet) @@ -40,7 +49,19 @@ func NewHealthCheckServer() *healthCheckServer { return server } -func (s healthCheckServer) Start() { +func (s *HealthCheckServer) SetStatusDispatcher(dispatcher dispatcher.Dispatcher) { + s.statusDispatcher = dispatcher +} + +func (s *HealthCheckServer) Start(ctx context.Context) { + klog.Infof("Starting HealthCheck server") + + // start a goroutine to periodically update heartbeat for the current maestro instance + go wait.UntilWithContext(ctx, s.pulse, time.Duration(s.heartbeatInterval*int(time.Second))) + + // start a goroutine to periodically check the liveness of maestro instances + go wait.UntilWithContext(ctx, s.checkInstances, time.Duration(s.heartbeatInterval/3*int(time.Second))) + var err error if env().Config.HealthCheck.EnableHTTPS { if env().Config.HTTPServer.HTTPSCertFile == "" || env().Config.HTTPServer.HTTPSKeyFile == "" { @@ -59,58 +80,137 @@ func (s healthCheckServer) Start() { } check(err, "HealthCheck server terminated with errors") klog.Infof("HealthCheck server terminated") -} -func (s healthCheckServer) Stop() error { - return s.httpServer.Shutdown(context.Background()) -} - -// Unimplemented -func (s healthCheckServer) Listen() (listener net.Listener, err error) { - return nil, nil -} + // wait until context is done + <-ctx.Done() -// Unimplemented -func (s healthCheckServer) Serve(listener net.Listener) { + klog.Infof("Shutting down HealthCheck server") + s.httpServer.Shutdown(context.Background()) } -// healthCheckHandler returns a 200 OK if the instance is ready, 503 Service Unavailable otherwise. -func (s healthCheckServer) healthCheckHandler(w http.ResponseWriter, r *http.Request) { - // For MQTT, check if the instance is ready - if s.brokerType == "mqtt" { - instance, err := s.instanceDao.Get(r.Context(), s.instanceID) - if err != nil { - klog.Errorf("Error getting instance: %v", err) - w.WriteHeader(http.StatusInternalServerError) - _, err := w.Write([]byte(`{"status": "error"}`)) +func (s *HealthCheckServer) pulse(ctx context.Context) { + klog.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID) + // If there are multiple requests at the same time, it will cause the race conditions among these + // requests (read–modify–write), the advisory lock is used here to prevent the race conditions. + lockOwnerID, err := s.lockFactory.NewAdvisoryLock(ctx, s.instanceID, db.Instances) + // Ensure that the transaction related to this lock always end. + defer s.lockFactory.Unlock(ctx, lockOwnerID) + if err != nil { + klog.Errorf("Error obtaining the instance (%s) lock: %v", s.instanceID, err) + return + } + found, err := s.instanceDao.Get(ctx, s.instanceID) + if err != nil { + if e.Is(err, gorm.ErrRecordNotFound) { + // create a new instance if not found + klog.V(10).Infof("Creating new maestro instance: %s", s.instanceID) + instance := &api.ServerInstance{ + Meta: api.Meta{ + ID: s.instanceID, + }, + LastHeartbeat: time.Now(), + } + _, err := s.instanceDao.Create(ctx, instance) if err != nil { - klog.Errorf("Error writing healthcheck response: %v", err) + klog.Errorf("Unable to create maestro instance: %s", err.Error()) } return } - if instance.Ready { - klog.Infof("Instance is ready") - w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte(`{"status": "ok"}`)) - if err != nil { - klog.Errorf("Error writing healthcheck response: %v", err) + klog.Errorf("Unable to get maestro instance: %s", err.Error()) + } + found.LastHeartbeat = time.Now() + _, err = s.instanceDao.Replace(ctx, found) + if err != nil { + klog.Errorf("Unable to update heartbeat for maestro instance: %s", err.Error()) + } +} + +func (s *HealthCheckServer) checkInstances(ctx context.Context) { + klog.V(10).Infof("Checking liveness of maestro instances") + // lock the Instance with a fail-fast advisory lock context. + // this allows concurrent processing of many instances by one or more maestro instances exclusively. + lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-liveness-check", db.Instances) + // Ensure that the transaction related to this lock always end. + defer s.lockFactory.Unlock(ctx, lockOwnerID) + if err != nil { + klog.Errorf("Error obtaining the instance lock: %v", err) + return + } + // skip if the lock is not acquired + if !acquired { + klog.V(10).Infof("failed to acquire the lock as another maestro instance is checking instances, skip") + return + } + + instances, err := s.instanceDao.All(ctx) + if err != nil { + klog.Errorf("Unable to get all maestro instances: %s", err.Error()) + return + } + + activeInstanceIDs := []string{} + inactiveInstanceIDs := []string{} + for _, instance := range instances { + // Instances pulsing within the last three check intervals are considered as active. + if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second) * s.heartbeatInterval))) { + if s.brokerType == "mqtt" { + if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil { + klog.Errorf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error()) + } } - return + // mark the instance as active after it is added to the status dispatcher + activeInstanceIDs = append(activeInstanceIDs, instance.ID) + } else { + if s.brokerType == "mqtt" { + if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil { + klog.Errorf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error()) + } + } + // mark the instance as inactive after it is removed from the status dispatcher + inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID) } + } - klog.Infof("Instance not ready") - w.WriteHeader(http.StatusServiceUnavailable) - _, err = w.Write([]byte(`{"status": "not ready"}`)) + if len(activeInstanceIDs) > 0 { + // batch mark active instances + if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil { + klog.Errorf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error()) + } + } + + if len(inactiveInstanceIDs) > 0 { + // batch mark inactive instances + if err := s.instanceDao.MarkUnreadyByIDs(ctx, inactiveInstanceIDs); err != nil { + klog.Errorf("Unable to mark inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error()) + } + } +} + +// healthCheckHandler returns a 200 OK if the instance is ready, 503 Service Unavailable otherwise. +func (s *HealthCheckServer) healthCheckHandler(w http.ResponseWriter, r *http.Request) { + instance, err := s.instanceDao.Get(r.Context(), s.instanceID) + if err != nil { + klog.Errorf("Error getting instance: %v", err) + w.WriteHeader(http.StatusInternalServerError) + _, err := w.Write([]byte(`{"status": "error"}`)) + if err != nil { + klog.Errorf("Error writing healthcheck response: %v", err) + } + return + } + if instance.Ready { + klog.Infof("Instance is ready") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(`{"status": "ok"}`)) if err != nil { klog.Errorf("Error writing healthcheck response: %v", err) } return } - // For gRPC broker, return 200 OK for now - klog.Infof("Instance is ready") - w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte(`{"status": "ok"}`)) + klog.Infof("Instance not ready") + w.WriteHeader(http.StatusServiceUnavailable) + _, err = w.Write([]byte(`{"status": "not ready"}`)) if err != nil { klog.Errorf("Error writing healthcheck response: %v", err) } diff --git a/pkg/api/server_instance.go b/pkg/api/server_instance.go index d737e61d..dd532306 100644 --- a/pkg/api/server_instance.go +++ b/pkg/api/server_instance.go @@ -8,8 +8,8 @@ import "time" // However, it is not meant for direct exposure to end users through the API. type ServerInstance struct { Meta - LastPulse time.Time // LastPulse indicates the last time the instance pulsed. - Ready bool // Ready indicates whether the instance is ready to serve requests. + LastHeartbeat time.Time // LastHeartbeat indicates the last time the instance sent a heartbeat. + Ready bool // Ready indicates whether the instance is ready to serve requests. } type ServerInstanceList []*ServerInstance diff --git a/pkg/config/config.go b/pkg/config/config.go index fbcbd052..f9445036 100755 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,7 +17,7 @@ type ApplicationConfig struct { GRPCServer *GRPCServerConfig `json:"grpc_server"` Metrics *MetricsConfig `json:"metrics"` HealthCheck *HealthCheckConfig `json:"health_check"` - PulseServer *PulseServerConfig `json:"pulse_server"` + EventServer *EventServerConfig `json:"event_server"` Database *DatabaseConfig `json:"database"` MessageBroker *MessageBrokerConfig `json:"message_broker"` OCM *OCMConfig `json:"ocm"` @@ -30,7 +30,7 @@ func NewApplicationConfig() *ApplicationConfig { GRPCServer: NewGRPCServerConfig(), Metrics: NewMetricsConfig(), HealthCheck: NewHealthCheckConfig(), - PulseServer: NewPulseServerConfig(), + EventServer: NewEventServerConfig(), Database: NewDatabaseConfig(), MessageBroker: NewMessageBrokerConfig(), OCM: NewOCMConfig(), @@ -44,7 +44,7 @@ func (c *ApplicationConfig) AddFlags(flagset *pflag.FlagSet) { c.GRPCServer.AddFlags(flagset) c.Metrics.AddFlags(flagset) c.HealthCheck.AddFlags(flagset) - c.PulseServer.AddFlags(flagset) + c.EventServer.AddFlags(flagset) c.Database.AddFlags(flagset) c.MessageBroker.AddFlags(flagset) c.OCM.AddFlags(flagset) @@ -61,7 +61,7 @@ func (c *ApplicationConfig) ReadFiles() []string { {c.OCM.ReadFiles, "OCM"}, {c.Metrics.ReadFiles, "Metrics"}, {c.HealthCheck.ReadFiles, "HealthCheck"}, - {c.PulseServer.ReadFiles, "PulseServer"}, + {c.EventServer.ReadFiles, "EventServer"}, {c.Sentry.ReadFiles, "Sentry"}, } messages := []string{} diff --git a/pkg/config/pulse_server.go b/pkg/config/event_server.go similarity index 83% rename from pkg/config/pulse_server.go rename to pkg/config/event_server.go index 3b29b80f..03abb2ff 100644 --- a/pkg/config/pulse_server.go +++ b/pkg/config/event_server.go @@ -11,9 +11,8 @@ const ( BroadcastSubscriptionType SubscriptionType = "broadcast" ) -// PulseServerConfig contains the configuration for the maestro pulse server. -type PulseServerConfig struct { - PulseInterval int64 `json:"pulse_interval"` +// EventServerConfig contains the configuration for the maestro pulse server. +type EventServerConfig struct { SubscriptionType string `json:"subscription_type"` ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"` } @@ -25,10 +24,9 @@ type ConsistentHashConfig struct { Load float64 `json:"load"` } -// NewPulseServerConfig creates a new PulseServerConfig with default 15 second pulse interval. -func NewPulseServerConfig() *PulseServerConfig { - return &PulseServerConfig{ - PulseInterval: 15, +// NewEventServerConfig creates a new EventServerConfig with default settings. +func NewEventServerConfig() *EventServerConfig { + return &EventServerConfig{ SubscriptionType: "shared", ConsistentHashConfig: NewConsistentHashConfig(), } @@ -46,20 +44,19 @@ func NewConsistentHashConfig() *ConsistentHashConfig { } } -// AddFlags configures the PulseServerConfig with command line flags. +// AddFlags configures the EventServerConfig with command line flags. // It allows users to customize the interval for maestro instance pulses and subscription type. // - "pulse-interval" sets the time between maestro instance pulses (in seconds) to indicate its liveness (default: 15 seconds). // - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast". // "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages. // "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it. // If subscription type is "broadcast", ConsistentHashConfig settings can be configured for the hashing algorithm. -func (c *PulseServerConfig) AddFlags(fs *pflag.FlagSet) { - fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness") +func (c *EventServerConfig) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance)") c.ConsistentHashConfig.AddFlags(fs) } -func (c *PulseServerConfig) ReadFiles() error { +func (c *EventServerConfig) ReadFiles() error { c.ConsistentHashConfig.ReadFiles() return nil } diff --git a/pkg/config/pulse_server_test.go b/pkg/config/event_server_test.go similarity index 82% rename from pkg/config/pulse_server_test.go rename to pkg/config/event_server_test.go index 803c5dc1..44fd2a7a 100644 --- a/pkg/config/pulse_server_test.go +++ b/pkg/config/event_server_test.go @@ -7,17 +7,16 @@ import ( "github.com/spf13/pflag" ) -func TestPulseServerConfig(t *testing.T) { +func TestEventServerConfig(t *testing.T) { cases := []struct { name string input map[string]string - want *PulseServerConfig + want *EventServerConfig }{ { name: "default subscription type", input: map[string]string{}, - want: &PulseServerConfig{ - PulseInterval: 15, + want: &EventServerConfig{ SubscriptionType: "shared", ConsistentHashConfig: &ConsistentHashConfig{ PartitionCount: 7, @@ -31,8 +30,7 @@ func TestPulseServerConfig(t *testing.T) { input: map[string]string{ "subscription-type": "broadcast", }, - want: &PulseServerConfig{ - PulseInterval: 15, + want: &EventServerConfig{ SubscriptionType: "broadcast", ConsistentHashConfig: &ConsistentHashConfig{ PartitionCount: 7, @@ -49,8 +47,7 @@ func TestPulseServerConfig(t *testing.T) { "consistent-hash-replication-factor": "30", "consistent-hash-load": "1.5", }, - want: &PulseServerConfig{ - PulseInterval: 15, + want: &EventServerConfig{ SubscriptionType: "broadcast", ConsistentHashConfig: &ConsistentHashConfig{ PartitionCount: 10, @@ -61,7 +58,7 @@ func TestPulseServerConfig(t *testing.T) { }, } - config := NewPulseServerConfig() + config := NewEventServerConfig() pflag.NewFlagSet("test", pflag.ContinueOnError) fs := pflag.CommandLine config.AddFlags(fs) @@ -72,7 +69,7 @@ func TestPulseServerConfig(t *testing.T) { fs.Set(key, value) } if !reflect.DeepEqual(config, tc.want) { - t.Errorf("NewPulseServerConfig() = %v; want %v", config, tc.want) + t.Errorf("NewEventServerConfig() = %v; want %v", config, tc.want) } // clear flags fs.VisitAll(func(f *pflag.Flag) { diff --git a/pkg/config/health_check.go b/pkg/config/health_check.go index f811e597..122f4b6e 100755 --- a/pkg/config/health_check.go +++ b/pkg/config/health_check.go @@ -5,20 +5,23 @@ import ( ) type HealthCheckConfig struct { - BindPort string `json:"bind_port"` - EnableHTTPS bool `json:"enable_https"` + BindPort string `json:"bind_port"` + EnableHTTPS bool `json:"enable_https"` + HeartbeartInterval int `json:"heartbeat_interval"` } func NewHealthCheckConfig() *HealthCheckConfig { return &HealthCheckConfig{ - BindPort: "8083", - EnableHTTPS: false, + BindPort: "8083", + EnableHTTPS: false, + HeartbeartInterval: 15, } } func (c *HealthCheckConfig) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.BindPort, "health-check-server-bindport", c.BindPort, "Health check server bind port") fs.BoolVar(&c.EnableHTTPS, "enable-health-check-https", c.EnableHTTPS, "Enable HTTPS for health check server") + fs.IntVar(&c.HeartbeartInterval, "heartbeat-interval", c.HeartbeartInterval, "Heartbeat interval for health check server") } func (c *HealthCheckConfig) ReadFiles() error { diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index e51ee1bb..21e732ca 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -14,8 +14,8 @@ type InstanceDao interface { Get(ctx context.Context, id string) (*api.ServerInstance, error) Create(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) Replace(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) - UpSert(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) MarkReadyByIDs(ctx context.Context, ids []string) error + MarkUnreadyByIDs(ctx context.Context, ids []string) error Delete(ctx context.Context, id string) error DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) @@ -60,18 +60,18 @@ func (d *sqlInstanceDao) Replace(ctx context.Context, instance *api.ServerInstan return instance, nil } -func (d *sqlInstanceDao) UpSert(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) { +func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error { g2 := (*d.sessionFactory).New(ctx) - if err := g2.Unscoped().Omit(clause.Associations).Save(instance).Error; err != nil { + if err := g2.Model(&api.ServerInstance{}).Where("id in (?)", ids).Update("ready", true).Error; err != nil { db.MarkForRollback(ctx, err) - return nil, err + return err } - return instance, nil + return nil } -func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error { +func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) error { g2 := (*d.sessionFactory).New(ctx) - if err := g2.Model(&api.ServerInstance{}).Where("id in (?)", ids).Update("ready", true).Error; err != nil { + if err := g2.Model(&api.ServerInstance{}).Where("id in (?)", ids).Update("ready", false).Error; err != nil { db.MarkForRollback(ctx, err) return err } diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index 9723f9ad..b872b337 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -78,6 +78,17 @@ func (d *instanceDaoMock) MarkReadyByIDs(ctx context.Context, ids []string) erro return nil } +func (d *instanceDaoMock) MarkUnreadyByIDs(ctx context.Context, ids []string) error { + d.mux.Lock() + defer d.mux.Unlock() + for _, instance := range d.instances { + if contains(ids, instance.ID) { + instance.Ready = false + } + } + return nil +} + func (d *instanceDaoMock) Delete(ctx context.Context, ID string) error { d.mux.Lock() defer d.mux.Unlock() diff --git a/pkg/db/migrations/202401151014_add_server_instances.go b/pkg/db/migrations/202401151014_add_server_instances.go index 3de9fbc8..2a733721 100644 --- a/pkg/db/migrations/202401151014_add_server_instances.go +++ b/pkg/db/migrations/202401151014_add_server_instances.go @@ -11,8 +11,8 @@ import ( func addServerInstances() *gormigrate.Migration { type ServerInstance struct { Model - LastPulse time.Time - Ready bool `gorm:"default:false"` + LastHeartbeat time.Time + Ready bool `gorm:"default:false"` } return &gormigrate.Migration{ diff --git a/test/helper.go b/test/helper.go index bf8b22df..9d4c353b 100755 --- a/test/helper.go +++ b/test/helper.go @@ -13,6 +13,8 @@ import ( "time" "github.com/openshift-online/maestro/pkg/controllers" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" "github.com/openshift-online/maestro/pkg/logger" "k8s.io/klog/v2" @@ -70,13 +72,14 @@ type Helper struct { ContextCancelFunc context.CancelFunc EventBroadcaster *event.EventBroadcaster + StatusDispatcher dispatcher.Dispatcher Store *MemoryStore GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource] DBFactory db.SessionFactory AppConfig *config.ApplicationConfig APIServer server.Server MetricsServer server.Server - HealthCheckServer server.Server + HealthCheckServer *server.HealthCheckServer EventServer server.EventServer ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder @@ -95,7 +98,7 @@ func NewHelper(t *testing.T) *Helper { fmt.Println("Unable to read JWT keys - this may affect tests that make authenticated server requests") } - env := environments.Environment() + env := helper.Env() // Manually set environment name, ignoring environment variables env.Name = environments.TestingEnv err = env.AddFlags(pflag.CommandLine) @@ -118,10 +121,12 @@ func NewHelper(t *testing.T) *Helper { Ctx: ctx, ContextCancelFunc: cancel, EventBroadcaster: event.NewEventBroadcaster(), - AppConfig: env.Config, - DBFactory: env.Database.SessionFactory, - JWTPrivateKey: jwtKey, - JWTCA: jwtCA, + StatusDispatcher: dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), + dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig), + AppConfig: env.Config, + DBFactory: env.Database.SessionFactory, + JWTPrivateKey: jwtKey, + JWTCA: jwtCA, } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -130,14 +135,13 @@ func NewHelper(t *testing.T) *Helper { helper.sendShutdownSignal, helper.stopAPIServer, helper.stopMetricsServer, - helper.stopHealthCheckServer, jwkMockTeardown, } helper.startEventBroadcaster() helper.startAPIServer() helper.startMetricsServer() - helper.startHealthCheckServer() + helper.startHealthCheckServer(helper.Ctx) helper.startEventServer(helper.Ctx) }) helper.T = t @@ -192,31 +196,25 @@ func (helper *Helper) stopMetricsServer() error { return nil } -func (helper *Helper) startHealthCheckServer() { +func (helper *Helper) startHealthCheckServer(ctx context.Context) { + helper.Env().Config.HealthCheck.HeartbeartInterval = 1 helper.HealthCheckServer = server.NewHealthCheckServer() + helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher) go func() { klog.V(10).Info("Test health check server started") - helper.HealthCheckServer.Start() + helper.HealthCheckServer.Start(ctx) klog.V(10).Info("Test health check server stopped") }() } -func (helper *Helper) stopHealthCheckServer() error { - if err := helper.HealthCheckServer.Stop(); err != nil { - return fmt.Errorf("unable to stop health check server: %s", err.Error()) - } - return nil -} - func (helper *Helper) sendShutdownSignal() error { helper.ContextCancelFunc() return nil } func (helper *Helper) startEventServer(ctx context.Context) { - helper.Env().Config.PulseServer.PulseInterval = 1 - helper.Env().Config.PulseServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewPulseServer(helper.EventBroadcaster) + // helper.Env().Config.EventServer.SubscriptionType = "broadcast" + helper.EventServer = server.NewMQTTEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) @@ -339,7 +337,7 @@ func (helper *Helper) RestartMetricsServer() { func (helper *Helper) Reset() { klog.Infof("Reseting testing environment") - env := environments.Environment() + env := helper.Env() // Reset the configuration env.Config = config.NewApplicationConfig() diff --git a/test/integration/pulse_server_test.go b/test/integration/status_hash_test.go similarity index 86% rename from test/integration/pulse_server_test.go rename to test/integration/status_hash_test.go index 9556f2bd..6873b5df 100644 --- a/test/integration/pulse_server_test.go +++ b/test/integration/status_hash_test.go @@ -16,7 +16,7 @@ import ( "github.com/openshift-online/maestro/test" ) -func TestPulseServer(t *testing.T) { +func TestEventServer(t *testing.T) { h, _ := test.RegisterIntegration(t) ctx, cancel := context.WithCancel(context.Background()) defer func() { @@ -25,10 +25,11 @@ func TestPulseServer(t *testing.T) { instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) // insert one existing instances - _, err := instanceDao.UpSert(ctx, &api.ServerInstance{ + _, err := instanceDao.Create(ctx, &api.ServerInstance{ Meta: api.Meta{ ID: "instance1", }, + LastHeartbeat: time.Now(), }) Expect(err).NotTo(HaveOccurred()) @@ -39,13 +40,23 @@ func TestPulseServer(t *testing.T) { return err } - if len(instances) != 1 { + if len(instances) != 2 { return fmt.Errorf("expected 1 instance, got %d", len(instances)) } - instance := instances[0] - if instance.UpdatedAt.IsZero() { - return fmt.Errorf("expected instance.UpdatedAt to be non-zero") + var instance *api.ServerInstance + for _, i := range instances { + if i.ID == *instanceID { + instance = i + } + } + + if instance.LastHeartbeat.IsZero() { + return fmt.Errorf("expected instance.LastHeartbeat to be non-zero") + } + + if !instance.Ready { + return fmt.Errorf("expected instance.Ready to be true") } if instance.ID != *instanceID { @@ -65,10 +76,12 @@ func TestPulseServer(t *testing.T) { // to make sure the consumer is hashed to the new instance firstly. // after the new instance is stale after 3*pulseInterval (3s), the current // instance will take over the consumer and resync the resource status. - _, err = instanceDao.UpSert(ctx, &api.ServerInstance{ + _, err = instanceDao.Create(ctx, &api.ServerInstance{ Meta: api.Meta{ ID: clusterName, }, + LastHeartbeat: time.Now(), + Ready: true, }) Expect(err).NotTo(HaveOccurred())