diff --git a/Containerfile.rhtap b/Containerfile.rhtap index 0e9534f9..8d4a14c4 100755 --- a/Containerfile.rhtap +++ b/Containerfile.rhtap @@ -1,28 +1,18 @@ -FROM brew.registry.redhat.io/rh-osbs/openshift-golang-builder:rhel_8_1.22 AS builder +FROM brew.registry.redhat.io/rh-osbs/openshift-golang-builder:rhel_9_1.22 AS builder ENV SOURCE_DIR=/maestro WORKDIR $SOURCE_DIR COPY . $SOURCE_DIR -ENV GOFLAGS="" -RUN make binary -RUN pwd +ENV GOEXPERIMENT=strictfipsruntime +ENV CGO_ENABLED=1 +RUN make binary BUILD_OPTS="-tags strictfipsruntime" FROM registry.access.redhat.com/ubi9/ubi-minimal:latest -RUN \ - microdnf update -y \ - && \ - microdnf install -y util-linux \ - && \ - microdnf clean all - -COPY --from=builder \ - /maestro/maestro \ - /usr/local/bin/ - +RUN microdnf update -y && microdnf install -y util-linux && microdnf clean all +COPY --from=builder /maestro/maestro /usr/local/bin/ EXPOSE 8000 - ENTRYPOINT ["/usr/local/bin/maestro", "server"] LABEL name="maestro" \ diff --git a/Makefile b/Makefile index cca7740f..90ad5c2e 100755 --- a/Makefile +++ b/Makefile @@ -178,7 +178,8 @@ lint: # Build binaries # NOTE it may be necessary to use CGO_ENABLED=0 for backwards compatibility with centos7 if not using centos7 binary: check-gopath - ${GO} build -tags=kafka ./cmd/maestro + ${GO} mod vendor + ${GO} build $(BUILD_OPTS) ./cmd/maestro .PHONY: binary # Install diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index c0f41c30..1b3611df 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -11,6 +11,9 @@ import ( "github.com/openshift-online/maestro/cmd/maestro/environments" "github.com/openshift-online/maestro/cmd/maestro/server" + "github.com/openshift-online/maestro/pkg/config" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" ) @@ -35,24 +38,40 @@ func runServer(cmd *cobra.Command, args []string) { klog.Fatalf("Unable to initialize environment: %s", err.Error()) } + healthcheckServer := server.NewHealthCheckServer() + // Create event broadcaster to broadcast resource status update events to subscribers eventBroadcaster := event.NewEventBroadcaster() // Create the event server based on the message broker type: // For gRPC, create a gRPC broker to handle resource spec and status events. - // For MQTT, create a Pulse server to handle resource spec and status events. + // For MQTT/Kafka, create a message queue based event server to handle resource spec and status events. var eventServer server.EventServer if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" { klog.Info("Setting up grpc broker") eventServer = server.NewGRPCBroker(eventBroadcaster) } else { - klog.Info("Setting up pulse server") - eventServer = server.NewPulseServer(eventBroadcaster) + klog.Info("Setting up message queue event server") + var statusDispatcher dispatcher.Dispatcher + subscriptionType := environments.Environment().Config.EventServer.SubscriptionType + switch config.SubscriptionType(subscriptionType) { + case config.SharedSubscriptionType: + statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource) + case config.BroadcastSubscriptionType: + statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory), + dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig) + default: + klog.Errorf("Unsupported subscription type: %s", subscriptionType) + } + + // Set the status dispatcher for the healthcheck server + healthcheckServer.SetStatusDispatcher(statusDispatcher) + eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) } + // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() - healthcheckServer := server.NewHealthCheckServer() controllersServer := server.NewControllersServer(eventServer) ctx, cancel := context.WithCancel(context.Background()) @@ -70,10 +89,6 @@ func runServer(cmd *cobra.Command, args []string) { if err := metricsServer.Stop(); err != nil { klog.Errorf("Failed to stop metrics server, %v", err) } - - if err := healthcheckServer.Stop(); err != nil { - klog.Errorf("Failed to stop healthcheck server, %v", err) - } }() // Start the event broadcaster @@ -82,7 +97,7 @@ func runServer(cmd *cobra.Command, args []string) { // Run the servers go apiserver.Start() go metricsServer.Start() - go healthcheckServer.Start() + go healthcheckServer.Start(ctx) go eventServer.Start(ctx) go controllersServer.Start(ctx) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index d7d9cb68..dcf713b4 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -5,6 +5,7 @@ import ( "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/controllers" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/logger" @@ -18,6 +19,8 @@ func NewControllersServer(eventServer EventServer) *ControllersServer { ), StatusController: controllers.NewStatusController( env().Services.StatusEvents(), + dao.NewInstanceDao(&env().Database.SessionFactory), + dao.NewEventInstanceDao(&env().Database.SessionFactory), ), } diff --git a/cmd/maestro/server/pulse_server.go b/cmd/maestro/server/event_server.go similarity index 62% rename from cmd/maestro/server/pulse_server.go rename to cmd/maestro/server/event_server.go index 35214cb0..5514a3a3 100644 --- a/cmd/maestro/server/pulse_server.go +++ b/cmd/maestro/server/event_server.go @@ -3,11 +3,9 @@ package server import ( "context" "fmt" - "time" "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/client/cloudevents" - "github.com/openshift-online/maestro/pkg/config" "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/dispatcher" @@ -15,8 +13,6 @@ import ( "github.com/openshift-online/maestro/pkg/logger" "github.com/openshift-online/maestro/pkg/services" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" @@ -45,16 +41,14 @@ type EventServer interface { OnStatusUpdate(ctx context.Context, eventID, resourceID string) error } -var _ EventServer = &PulseServer{} +var _ EventServer = &MessageQueueEventServer{} -// PulseServer represents a server responsible for publish resource spec events from -// resource controller and handle resource status update events from the maestro agent. -// It also periodic heartbeat updates and checking the liveness of Maestro instances, -// triggering status resync based on instances' status and other conditions. -type PulseServer struct { +// MessageQueueEventServer represents a event server responsible for publish resource spec events +// from resource controller and handle resource status update events from the message queue. +// It also maintains a status dispatcher to dispatch status update events to the corresponding +// maestro instances. +type MessageQueueEventServer struct { instanceID string - pulseInterval int64 - instanceDao dao.InstanceDao eventInstanceDao dao.EventInstanceDao lockFactory db.LockFactory eventBroadcaster *event.EventBroadcaster // event broadcaster to broadcast resource status update events to subscribers @@ -64,22 +58,10 @@ type PulseServer struct { statusDispatcher dispatcher.Dispatcher } -func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer { - var statusDispatcher dispatcher.Dispatcher - switch config.SubscriptionType(env().Config.PulseServer.SubscriptionType) { - case config.SharedSubscriptionType: - statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource) - case config.BroadcastSubscriptionType: - statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory), - dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig) - default: - klog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType) - } +func NewMessageQueueEventServer(eventBroadcaster *event.EventBroadcaster, statusDispatcher dispatcher.Dispatcher) EventServer { sessionFactory := env().Database.SessionFactory - return &PulseServer{ + return &MessageQueueEventServer{ instanceID: env().Config.MessageBroker.ClientID, - pulseInterval: env().Config.PulseServer.PulseInterval, - instanceDao: dao.NewInstanceDao(&sessionFactory), eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), lockFactory: db.NewAdvisoryLockFactory(sessionFactory), eventBroadcaster: eventBroadcaster, @@ -90,92 +72,24 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer { } } -// Start initializes and runs the pulse server, updating and checking Maestro instances' liveness, -// initializes subscription to status update messages and triggers status resync based on -// instances' status and other conditions. -func (s *PulseServer) Start(ctx context.Context) { - log.Infof("Starting pulse server") +// Start initializes and runs the event server. It starts the subscription +// to resource status update messages and the status dispatcher. +func (s *MessageQueueEventServer) Start(ctx context.Context) { + log.Infof("Starting message queue event server") // start subscribing to resource status update messages. s.startSubscription(ctx) // start the status dispatcher go s.statusDispatcher.Start(ctx) - // start a goroutine to periodically update heartbeat for the current maestro instance - go wait.UntilWithContext(ctx, s.pulse, time.Duration(s.pulseInterval*int64(time.Second))) - - // start a goroutine to periodically check the liveness of maestro instances - go wait.UntilWithContext(ctx, s.checkInstances, time.Duration(s.pulseInterval/3*int64(time.Second))) - // wait until context is canceled <-ctx.Done() - log.Infof("Shutting down pulse server") -} - -func (s *PulseServer) pulse(ctx context.Context) { - log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID) - instance := &api.ServerInstance{ - Meta: api.Meta{ - ID: s.instanceID, - UpdatedAt: time.Now(), - }, - } - _, err := s.instanceDao.UpSert(ctx, instance) - if err != nil { - log.Error(fmt.Sprintf("Unable to upsert maestro instance: %s", err.Error())) - } -} - -func (s *PulseServer) checkInstances(ctx context.Context) { - log.V(10).Infof("Checking liveness of maestro instances") - // lock the Instance with a fail-fast advisory lock context. - // this allows concurrent processing of many instances by one or more maestro instances exclusively. - lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-pulse-check", db.Instances) - // Ensure that the transaction related to this lock always end. - defer s.lockFactory.Unlock(ctx, lockOwnerID) - if err != nil { - log.Error(fmt.Sprintf("error obtaining the instance lock: %v", err)) - return - } - // skip if the lock is not acquired - if !acquired { - log.V(4).Infof("failed to acquire the lock as another maestro instance is checking instances, skip") - return - } - - instances, err := s.instanceDao.All(ctx) - if err != nil { - log.Error(fmt.Sprintf("Unable to get all maestro instances: %s", err.Error())) - return - } - - inactiveInstanceIDs := []string{} - for _, instance := range instances { - // Instances pulsing within the last three check intervals are considered as active. - if instance.UpdatedAt.After(time.Now().Add(time.Duration(int64(-3*time.Second) * s.pulseInterval))) { - if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil { - log.Error(fmt.Sprintf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error())) - } - } else { - if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil { - log.Error(fmt.Sprintf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error())) - } else { - inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID) - } - } - } - - if len(inactiveInstanceIDs) > 0 { - // batch delete inactive instances - if err := s.instanceDao.DeleteByIDs(ctx, inactiveInstanceIDs); err != nil { - log.Error(fmt.Sprintf("Unable to delete inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error())) - } - } + log.Infof("Shutting down message queue event server") } // startSubscription initiates the subscription to resource status update messages. // It runs asynchronously in the background until the provided context is canceled. -func (s *PulseServer) startSubscription(ctx context.Context) { +func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error { log.V(4).Infof("received action %s for resource %s", action, resource.ID) @@ -200,17 +114,17 @@ func (s *PulseServer) startSubscription(ctx context.Context) { } // OnCreate will be called on each new resource creation event inserted into db. -func (s *PulseServer) OnCreate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnCreate(ctx context.Context, resourceID string) error { return s.sourceClient.OnCreate(ctx, resourceID) } // OnUpdate will be called on each new resource update event inserted into db. -func (s *PulseServer) OnUpdate(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnUpdate(ctx context.Context, resourceID string) error { return s.sourceClient.OnUpdate(ctx, resourceID) } // OnDelete will be called on each new resource deletion event inserted into db. -func (s *PulseServer) OnDelete(ctx context.Context, resourceID string) error { +func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID string) error { return s.sourceClient.OnDelete(ctx, resourceID) } @@ -218,42 +132,17 @@ func (s *PulseServer) OnDelete(ctx context.Context, resourceID string) error { // It does two things: // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance -func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { - statusEvent, sErr := s.statusEventService.Get(ctx, eventID) - if sErr != nil { - return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) - } - - var resource *api.Resource - // check if the status event is delete event - if statusEvent.StatusEventType == api.StatusDeleteEventType { - // build resource with resource id and delete status - resource = &api.Resource{ - Meta: api.Meta{ - ID: resourceID, - }, - Source: statusEvent.ResourceSource, - Type: statusEvent.ResourceType, - Status: statusEvent.Status, - } - } else { - resource, sErr = s.resourceService.Get(ctx, resourceID) - if sErr != nil { - return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) - } - } - - // broadcast the resource status to subscribers - log.V(4).Infof("Broadcast the resource status %s", resource.ID) - s.eventBroadcaster.Broadcast(resource) - - // add the event instance record - _, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: s.instanceID, - }) - - return err +func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { + return broadcastStatusEvent( + ctx, + s.statusEventService, + s.resourceService, + s.eventInstanceDao, + s.eventBroadcaster, + s.instanceID, + eventID, + resourceID, + ) } // handleStatusUpdate processes the resource status update from the agent. @@ -317,6 +206,7 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer ResourceID: resource.ID, ResourceSource: resource.Source, ResourceType: resource.Type, + Payload: found.Payload, Status: resource.Status, StatusEventType: api.StatusDeleteEventType, }) @@ -347,3 +237,47 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer return nil } + +func broadcastStatusEvent(ctx context.Context, + statusEventService services.StatusEventService, + resourceService services.ResourceService, + eventInstanceDao dao.EventInstanceDao, + eventBroadcaster *event.EventBroadcaster, + instanceID, eventID, resourceID string) error { + statusEvent, sErr := statusEventService.Get(ctx, eventID) + if sErr != nil { + return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) + } + + var resource *api.Resource + // check if the status event is delete event + if statusEvent.StatusEventType == api.StatusDeleteEventType { + // build resource with resource id and delete status + resource = &api.Resource{ + Meta: api.Meta{ + ID: resourceID, + }, + Source: statusEvent.ResourceSource, + Type: statusEvent.ResourceType, + Payload: statusEvent.Payload, + Status: statusEvent.Status, + } + } else { + resource, sErr = resourceService.Get(ctx, resourceID) + if sErr != nil { + return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) + } + } + + // broadcast the resource status to subscribers + log.V(4).Infof("Broadcast the resource status %s", resource.ID) + eventBroadcaster.Broadcast(resource) + + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: instanceID, + }) + + return err +} diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 450adc70..cf301fd1 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -430,41 +430,16 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { - statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID) - if sErr != nil { - return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) - } - - var resource *api.Resource - // check if the status event is delete event - if statusEvent.StatusEventType == api.StatusDeleteEventType { - // build resource with resource id and delete status - resource = &api.Resource{ - Meta: api.Meta{ - ID: resourceID, - }, - Source: statusEvent.ResourceSource, - Type: statusEvent.ResourceType, - Status: statusEvent.Status, - } - } else { - resource, sErr = bkr.resourceService.Get(ctx, resourceID) - if sErr != nil { - return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) - } - } - - // broadcast the resource status to subscribers - log.V(4).Infof("Broadcast the resource status %s", resource.ID) - bkr.eventBroadcaster.Broadcast(resource) - - // add the event instance record - _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: bkr.instanceID, - }) - - return err + return broadcastStatusEvent( + ctx, + bkr.statusEventService, + bkr.resourceService, + bkr.eventInstanceDao, + bkr.eventBroadcaster, + bkr.instanceID, + eventID, + resourceID, + ) } // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. diff --git a/cmd/maestro/server/healthcheck_server.go b/cmd/maestro/server/healthcheck_server.go index ed099c1a..22f01875 100755 --- a/cmd/maestro/server/healthcheck_server.go +++ b/cmd/maestro/server/healthcheck_server.go @@ -2,44 +2,66 @@ package server import ( "context" + e "errors" "fmt" - "net" "net/http" + "time" - health "github.com/docker/go-healthcheck" "github.com/gorilla/mux" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" + "github.com/openshift-online/maestro/pkg/dispatcher" + "gorm.io/gorm" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ) -var ( - updater = health.NewStatusUpdater() -) - -var _ Server = &healthCheckServer{} - -type healthCheckServer struct { - httpServer *http.Server +type HealthCheckServer struct { + httpServer *http.Server + statusDispatcher dispatcher.Dispatcher + lockFactory db.LockFactory + instanceDao dao.InstanceDao + instanceID string + heartbeatInterval int + brokerType string } -func NewHealthCheckServer() *healthCheckServer { +func NewHealthCheckServer() *HealthCheckServer { router := mux.NewRouter() - health.DefaultRegistry = health.NewRegistry() - health.Register("maintenance_status", updater) - router.HandleFunc("/healthcheck", health.StatusHandler).Methods(http.MethodGet) - router.HandleFunc("/healthcheck/down", downHandler).Methods(http.MethodPost) - router.HandleFunc("/healthcheck/up", upHandler).Methods(http.MethodPost) - srv := &http.Server{ Handler: router, Addr: env().Config.HTTPServer.Hostname + ":" + env().Config.HealthCheck.BindPort, } - return &healthCheckServer{ - httpServer: srv, + sessionFactory := env().Database.SessionFactory + server := &HealthCheckServer{ + httpServer: srv, + lockFactory: db.NewAdvisoryLockFactory(sessionFactory), + instanceDao: dao.NewInstanceDao(&sessionFactory), + instanceID: env().Config.MessageBroker.ClientID, + heartbeatInterval: env().Config.HealthCheck.HeartbeartInterval, + brokerType: env().Config.MessageBroker.MessageBrokerType, } + + router.HandleFunc("/healthcheck", server.healthCheckHandler).Methods(http.MethodGet) + + return server +} + +func (s *HealthCheckServer) SetStatusDispatcher(dispatcher dispatcher.Dispatcher) { + s.statusDispatcher = dispatcher } -func (s healthCheckServer) Start() { +func (s *HealthCheckServer) Start(ctx context.Context) { + klog.Infof("Starting HealthCheck server") + + // start a goroutine to periodically update heartbeat for the current maestro instance + go wait.UntilWithContext(ctx, s.pulse, time.Duration(s.heartbeatInterval*int(time.Second))) + + // start a goroutine to periodically check the liveness of maestro instances + go wait.UntilWithContext(ctx, s.checkInstances, time.Duration(s.heartbeatInterval/3*int(time.Second))) + var err error if env().Config.HealthCheck.EnableHTTPS { if env().Config.HTTPServer.HTTPSCertFile == "" || env().Config.HTTPServer.HTTPSKeyFile == "" { @@ -58,25 +80,139 @@ func (s healthCheckServer) Start() { } check(err, "HealthCheck server terminated with errors") klog.Infof("HealthCheck server terminated") -} -func (s healthCheckServer) Stop() error { - return s.httpServer.Shutdown(context.Background()) -} + // wait until context is done + <-ctx.Done() -// Unimplemented -func (s healthCheckServer) Listen() (listener net.Listener, err error) { - return nil, nil + klog.Infof("Shutting down HealthCheck server") + s.httpServer.Shutdown(context.Background()) } -// Unimplemented -func (s healthCheckServer) Serve(listener net.Listener) { +func (s *HealthCheckServer) pulse(ctx context.Context) { + klog.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID) + // If there are multiple requests at the same time, it will cause the race conditions among these + // requests (read–modify–write), the advisory lock is used here to prevent the race conditions. + lockOwnerID, err := s.lockFactory.NewAdvisoryLock(ctx, s.instanceID, db.Instances) + // Ensure that the transaction related to this lock always end. + defer s.lockFactory.Unlock(ctx, lockOwnerID) + if err != nil { + klog.Errorf("Error obtaining the instance (%s) lock: %v", s.instanceID, err) + return + } + found, err := s.instanceDao.Get(ctx, s.instanceID) + if err != nil { + if e.Is(err, gorm.ErrRecordNotFound) { + // create a new instance if not found + klog.V(10).Infof("Creating new maestro instance: %s", s.instanceID) + instance := &api.ServerInstance{ + Meta: api.Meta{ + ID: s.instanceID, + }, + LastHeartbeat: time.Now(), + } + _, err := s.instanceDao.Create(ctx, instance) + if err != nil { + klog.Errorf("Unable to create maestro instance: %s", err.Error()) + } + return + } + klog.Errorf("Unable to get maestro instance: %s", err.Error()) + return + } + found.LastHeartbeat = time.Now() + _, err = s.instanceDao.Replace(ctx, found) + if err != nil { + klog.Errorf("Unable to update heartbeat for maestro instance: %s", err.Error()) + } } -func upHandler(w http.ResponseWriter, r *http.Request) { - updater.Update(nil) +func (s *HealthCheckServer) checkInstances(ctx context.Context) { + klog.V(10).Infof("Checking liveness of maestro instances") + // lock the Instance with a fail-fast advisory lock context. + // this allows concurrent processing of many instances by one or more maestro instances exclusively. + lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-liveness-check", db.Instances) + // Ensure that the transaction related to this lock always end. + defer s.lockFactory.Unlock(ctx, lockOwnerID) + if err != nil { + klog.Errorf("Error obtaining the instance lock: %v", err) + return + } + // skip if the lock is not acquired + if !acquired { + klog.V(10).Infof("failed to acquire the lock as another maestro instance is checking instances, skip") + return + } + + instances, err := s.instanceDao.All(ctx) + if err != nil { + klog.Errorf("Unable to get all maestro instances: %s", err.Error()) + return + } + + activeInstanceIDs := []string{} + inactiveInstanceIDs := []string{} + for _, instance := range instances { + // Instances pulsing within the last three check intervals are considered as active. + if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second) * s.heartbeatInterval))) { + if s.brokerType == "mqtt" { + if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil { + klog.Errorf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error()) + } + } + // mark the instance as active after it is added to the status dispatcher + activeInstanceIDs = append(activeInstanceIDs, instance.ID) + } else { + if s.brokerType == "mqtt" { + if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil { + klog.Errorf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error()) + } + } + // mark the instance as inactive after it is removed from the status dispatcher + inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID) + } + } + + if len(activeInstanceIDs) > 0 { + // batch mark active instances + if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil { + klog.Errorf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error()) + } + } + + if len(inactiveInstanceIDs) > 0 { + // batch mark inactive instances + if err := s.instanceDao.MarkUnreadyByIDs(ctx, inactiveInstanceIDs); err != nil { + klog.Errorf("Unable to mark inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error()) + } + } } -func downHandler(w http.ResponseWriter, r *http.Request) { - updater.Update(fmt.Errorf("maintenance mode")) +// healthCheckHandler returns a 200 OK if the instance is ready, 503 Service Unavailable otherwise. +func (s *HealthCheckServer) healthCheckHandler(w http.ResponseWriter, r *http.Request) { + instance, err := s.instanceDao.Get(r.Context(), s.instanceID) + if err != nil { + klog.Errorf("Error getting instance: %v", err) + w.WriteHeader(http.StatusInternalServerError) + _, err := w.Write([]byte(`{"status": "error"}`)) + if err != nil { + klog.Errorf("Error writing healthcheck response: %v", err) + } + return + } + if instance.Ready { + klog.Infof("Instance is ready") + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(`{"status": "ok"}`)) + if err != nil { + klog.Errorf("Error writing healthcheck response: %v", err) + } + return + } + + klog.Infof("Instance not ready") + w.WriteHeader(http.StatusServiceUnavailable) + _, err = w.Write([]byte(`{"status": "not ready"}`)) + if err != nil { + klog.Errorf("Error writing healthcheck response: %v", err) + } } diff --git a/go.mod b/go.mod index cbc03d3d..fdb11b00 100755 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/cloudevents/sdk-go/v2 v2.15.3-0.20240911135016-682f3a9684e4 github.com/deckarep/golang-set/v2 v2.6.0 - github.com/docker/go-healthcheck v0.1.0 github.com/evanphx/json-patch v5.9.0+incompatible github.com/getsentry/sentry-go v0.20.0 github.com/ghodss/yaml v1.0.0 @@ -24,6 +23,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.1 + github.com/jackc/pgx/v5 v5.3.0 github.com/jinzhu/inflection v1.0.0 github.com/lib/pq v1.10.7 github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 @@ -55,7 +55,7 @@ require ( k8s.io/klog/v2 v2.130.1 open-cluster-management.io/api v0.15.1-0.20241120090202-cb7ce98ab874 open-cluster-management.io/ocm v0.15.1-0.20241125065026-7a190f1a2b18 - open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f + open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 sigs.k8s.io/yaml v1.4.0 ) @@ -79,7 +79,6 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect - github.com/docker/distribution v2.8.1+incompatible // indirect github.com/eclipse/paho.golang v0.21.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/felixge/fgprof v0.9.4 // indirect @@ -109,7 +108,6 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgx/v5 v5.3.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index 8167e651..63622520 100644 --- a/go.sum +++ b/go.sum @@ -113,8 +113,6 @@ github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfc github.com/docker/docker v20.10.17+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= -github.com/docker/go-healthcheck v0.1.0 h1:6ZrRr63F5LLsPwSlbZgjgoxNu+o1VlMIhCQWgbfrgU0= -github.com/docker/go-healthcheck v0.1.0/go.mod h1:3v7a0338vhH6WnYFtUd66S+9QK3M6xK4sKr7gGrht6o= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= @@ -896,6 +894,8 @@ open-cluster-management.io/ocm v0.15.1-0.20241125065026-7a190f1a2b18 h1:sLg+JNnj open-cluster-management.io/ocm v0.15.1-0.20241125065026-7a190f1a2b18/go.mod h1:083SWgAjjvkc0TcOwf8wI8HCrCYaUWP860YTs+y8zXY= open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f h1:zeC7QrFNarfK2zY6jGtd+mX+yDrQQmnH/J8A7n5Nh38= open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA= +open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 h1:w15NHc6cBfYxKHtF6zGLeQ1iTUqtN53sdONi9XXy5Xc= +open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.19.2 h1:3sPrF58XQEPzbE8T81TN6selQIMGbtYwuaJ6eDssDF8= diff --git a/pkg/api/server_instance.go b/pkg/api/server_instance.go index f9f74fef..dd532306 100644 --- a/pkg/api/server_instance.go +++ b/pkg/api/server_instance.go @@ -1,11 +1,15 @@ package api +import "time" + // ServerInstance is employed by Maestro to discover active server instances. The updatedAt field // determines the liveness of the instance; if the instance remains unchanged for three consecutive // check intervals (30 seconds by default), it is marked as dead. // However, it is not meant for direct exposure to end users through the API. type ServerInstance struct { Meta + LastHeartbeat time.Time // LastHeartbeat indicates the last time the instance sent a heartbeat. + Ready bool // Ready indicates whether the instance is ready to serve requests. } type ServerInstanceList []*ServerInstance diff --git a/pkg/api/status_event.go b/pkg/api/status_event.go index f876abce..3e519ae0 100755 --- a/pkg/api/status_event.go +++ b/pkg/api/status_event.go @@ -19,6 +19,7 @@ type StatusEvent struct { ResourceID string ResourceSource string ResourceType ResourceType + Payload datatypes.JSONMap Status datatypes.JSONMap StatusEventType StatusEventType // Update|Delete ReconciledDate *time.Time `json:"gorm:null"` diff --git a/pkg/config/config.go b/pkg/config/config.go index fbcbd052..f9445036 100755 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,7 +17,7 @@ type ApplicationConfig struct { GRPCServer *GRPCServerConfig `json:"grpc_server"` Metrics *MetricsConfig `json:"metrics"` HealthCheck *HealthCheckConfig `json:"health_check"` - PulseServer *PulseServerConfig `json:"pulse_server"` + EventServer *EventServerConfig `json:"event_server"` Database *DatabaseConfig `json:"database"` MessageBroker *MessageBrokerConfig `json:"message_broker"` OCM *OCMConfig `json:"ocm"` @@ -30,7 +30,7 @@ func NewApplicationConfig() *ApplicationConfig { GRPCServer: NewGRPCServerConfig(), Metrics: NewMetricsConfig(), HealthCheck: NewHealthCheckConfig(), - PulseServer: NewPulseServerConfig(), + EventServer: NewEventServerConfig(), Database: NewDatabaseConfig(), MessageBroker: NewMessageBrokerConfig(), OCM: NewOCMConfig(), @@ -44,7 +44,7 @@ func (c *ApplicationConfig) AddFlags(flagset *pflag.FlagSet) { c.GRPCServer.AddFlags(flagset) c.Metrics.AddFlags(flagset) c.HealthCheck.AddFlags(flagset) - c.PulseServer.AddFlags(flagset) + c.EventServer.AddFlags(flagset) c.Database.AddFlags(flagset) c.MessageBroker.AddFlags(flagset) c.OCM.AddFlags(flagset) @@ -61,7 +61,7 @@ func (c *ApplicationConfig) ReadFiles() []string { {c.OCM.ReadFiles, "OCM"}, {c.Metrics.ReadFiles, "Metrics"}, {c.HealthCheck.ReadFiles, "HealthCheck"}, - {c.PulseServer.ReadFiles, "PulseServer"}, + {c.EventServer.ReadFiles, "EventServer"}, {c.Sentry.ReadFiles, "Sentry"}, } messages := []string{} diff --git a/pkg/config/db.go b/pkg/config/db.go index 08ab1837..20df3cfa 100755 --- a/pkg/config/db.go +++ b/pkg/config/db.go @@ -1,13 +1,12 @@ package config import ( - "context" "fmt" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - "github.com/openshift-online/maestro/pkg/constants" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/spf13/pflag" + + "github.com/openshift-online/maestro/pkg/constants" ) type DatabaseConfig struct { @@ -31,6 +30,7 @@ type DatabaseConfig struct { AuthMethod string `json:"auth_method"` TokenRequestScope string `json:"token_request_scope"` + Token *azcore.AccessToken } func NewDatabaseConfig() *DatabaseConfig { @@ -83,22 +83,7 @@ func (c *DatabaseConfig) ReadFiles() error { return err } - if c.AuthMethod == constants.AuthMethodMicrosoftEntra { - // ARO-HCP environment variable configuration is set by the Azure workload identity webhook. - // Use [WorkloadIdentityCredential] directly when not using the webhook or needing more control over its configuration. - cred, err := azidentity.NewDefaultAzureCredential(nil) - if err != nil { - return err - } - // The access token can be expired. but the existing connections are not invalidated. - // TODO: how to reconnect due to the network is broken etc. Right now, gorm does not have this feature. - // refer to https://github.com/go-gorm/gorm/issues/5602 & https://github.com/go-gorm/gorm/pull/1721. - token, err := cred.GetToken(context.Background(), policy.TokenRequestOptions{Scopes: []string{c.TokenRequestScope}}) - if err != nil { - return err - } - c.Password = token.Token - } else { + if c.AuthMethod == constants.AuthMethodPassword { err = readFileValueString(c.PasswordFile, &c.Password) if err != nil { return err @@ -117,13 +102,13 @@ func (c *DatabaseConfig) ConnectionStringWithName(name string, withSSL bool) str var cmd string if withSSL { cmd = fmt.Sprintf( - "host=%s port=%d user=%s password='%s' dbname=%s sslmode=%s sslrootcert=%s", - c.Host, c.Port, c.Username, c.Password, name, c.SSLMode, c.RootCertFile, + "host=%s port=%d user=%s dbname=%s sslmode=%s sslrootcert='%s'", + c.Host, c.Port, c.Username, name, c.SSLMode, c.RootCertFile, ) } else { cmd = fmt.Sprintf( - "host=%s port=%d user=%s password='%s' dbname=%s sslmode=disable", - c.Host, c.Port, c.Username, c.Password, name, + "host=%s port=%d user=%s dbname=%s sslmode=disable", + c.Host, c.Port, c.Username, name, ) } diff --git a/pkg/config/pulse_server.go b/pkg/config/event_server.go similarity index 77% rename from pkg/config/pulse_server.go rename to pkg/config/event_server.go index 3b29b80f..87bc5f67 100644 --- a/pkg/config/pulse_server.go +++ b/pkg/config/event_server.go @@ -11,9 +11,8 @@ const ( BroadcastSubscriptionType SubscriptionType = "broadcast" ) -// PulseServerConfig contains the configuration for the maestro pulse server. -type PulseServerConfig struct { - PulseInterval int64 `json:"pulse_interval"` +// EventServerConfig contains the configuration for the message queue event server. +type EventServerConfig struct { SubscriptionType string `json:"subscription_type"` ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"` } @@ -25,10 +24,9 @@ type ConsistentHashConfig struct { Load float64 `json:"load"` } -// NewPulseServerConfig creates a new PulseServerConfig with default 15 second pulse interval. -func NewPulseServerConfig() *PulseServerConfig { - return &PulseServerConfig{ - PulseInterval: 15, +// NewEventServerConfig creates a new EventServerConfig with default settings. +func NewEventServerConfig() *EventServerConfig { + return &EventServerConfig{ SubscriptionType: "shared", ConsistentHashConfig: NewConsistentHashConfig(), } @@ -46,20 +44,18 @@ func NewConsistentHashConfig() *ConsistentHashConfig { } } -// AddFlags configures the PulseServerConfig with command line flags. -// It allows users to customize the interval for maestro instance pulses and subscription type. -// - "pulse-interval" sets the time between maestro instance pulses (in seconds) to indicate its liveness (default: 15 seconds). +// AddFlags configures the EventServerConfig with command line flags. +// It allows users to customize the subscription type and ConsistentHashConfig settings. // - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast". // "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages. // "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it. // If subscription type is "broadcast", ConsistentHashConfig settings can be configured for the hashing algorithm. -func (c *PulseServerConfig) AddFlags(fs *pflag.FlagSet) { - fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness") +func (c *EventServerConfig) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance)") c.ConsistentHashConfig.AddFlags(fs) } -func (c *PulseServerConfig) ReadFiles() error { +func (c *EventServerConfig) ReadFiles() error { c.ConsistentHashConfig.ReadFiles() return nil } diff --git a/pkg/config/pulse_server_test.go b/pkg/config/event_server_test.go similarity index 82% rename from pkg/config/pulse_server_test.go rename to pkg/config/event_server_test.go index 803c5dc1..44fd2a7a 100644 --- a/pkg/config/pulse_server_test.go +++ b/pkg/config/event_server_test.go @@ -7,17 +7,16 @@ import ( "github.com/spf13/pflag" ) -func TestPulseServerConfig(t *testing.T) { +func TestEventServerConfig(t *testing.T) { cases := []struct { name string input map[string]string - want *PulseServerConfig + want *EventServerConfig }{ { name: "default subscription type", input: map[string]string{}, - want: &PulseServerConfig{ - PulseInterval: 15, + want: &EventServerConfig{ SubscriptionType: "shared", ConsistentHashConfig: &ConsistentHashConfig{ PartitionCount: 7, @@ -31,8 +30,7 @@ func TestPulseServerConfig(t *testing.T) { input: map[string]string{ "subscription-type": "broadcast", }, - want: &PulseServerConfig{ - PulseInterval: 15, + want: &EventServerConfig{ SubscriptionType: "broadcast", ConsistentHashConfig: &ConsistentHashConfig{ PartitionCount: 7, @@ -49,8 +47,7 @@ func TestPulseServerConfig(t *testing.T) { "consistent-hash-replication-factor": "30", "consistent-hash-load": "1.5", }, - want: &PulseServerConfig{ - PulseInterval: 15, + want: &EventServerConfig{ SubscriptionType: "broadcast", ConsistentHashConfig: &ConsistentHashConfig{ PartitionCount: 10, @@ -61,7 +58,7 @@ func TestPulseServerConfig(t *testing.T) { }, } - config := NewPulseServerConfig() + config := NewEventServerConfig() pflag.NewFlagSet("test", pflag.ContinueOnError) fs := pflag.CommandLine config.AddFlags(fs) @@ -72,7 +69,7 @@ func TestPulseServerConfig(t *testing.T) { fs.Set(key, value) } if !reflect.DeepEqual(config, tc.want) { - t.Errorf("NewPulseServerConfig() = %v; want %v", config, tc.want) + t.Errorf("NewEventServerConfig() = %v; want %v", config, tc.want) } // clear flags fs.VisitAll(func(f *pflag.Flag) { diff --git a/pkg/config/health_check.go b/pkg/config/health_check.go index f811e597..122f4b6e 100755 --- a/pkg/config/health_check.go +++ b/pkg/config/health_check.go @@ -5,20 +5,23 @@ import ( ) type HealthCheckConfig struct { - BindPort string `json:"bind_port"` - EnableHTTPS bool `json:"enable_https"` + BindPort string `json:"bind_port"` + EnableHTTPS bool `json:"enable_https"` + HeartbeartInterval int `json:"heartbeat_interval"` } func NewHealthCheckConfig() *HealthCheckConfig { return &HealthCheckConfig{ - BindPort: "8083", - EnableHTTPS: false, + BindPort: "8083", + EnableHTTPS: false, + HeartbeartInterval: 15, } } func (c *HealthCheckConfig) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.BindPort, "health-check-server-bindport", c.BindPort, "Health check server bind port") fs.BoolVar(&c.EnableHTTPS, "enable-health-check-https", c.EnableHTTPS, "Enable HTTPS for health check server") + fs.IntVar(&c.HeartbeartInterval, "heartbeat-interval", c.HeartbeartInterval, "Heartbeat interval for health check server") } func (c *HealthCheckConfig) ReadFiles() error { diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 30877d12..c44409ef 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -5,4 +5,8 @@ const ( AuthMethodPassword = "password" // Standard postgres username/password authentication. AuthMethodMicrosoftEntra = "az-entra" // Microsoft Entra ID-based token authentication. + + // MinTokenLifeThreshold defines the minimum remaining lifetime (in seconds) of the access token before + // it should be refreshed. + MinTokenLifeThreshold = 60.0 ) diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 608c56be..f06f592a 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -202,6 +202,7 @@ func (km *KindControllerManager) processNextEvent() bool { } func (km *KindControllerManager) syncEvents() { + logger.Infof("purge all reconciled events") // delete the reconciled events from the database firstly if err := km.events.DeleteAllReconciledEvents(context.Background()); err != nil { // this process is called periodically, so if the error happened, we will wait for the next cycle to handle @@ -210,6 +211,7 @@ func (km *KindControllerManager) syncEvents() { return } + logger.Infof("sync all unreconciled events") unreconciledEvents, err := km.events.FindAllUnreconciledEvents(context.Background()) if err != nil { logger.Error(fmt.Sprintf("Failed to list unreconciled events from db, %v", err)) diff --git a/pkg/controllers/status_controller.go b/pkg/controllers/status_controller.go index fcb6cf45..7aea2b60 100644 --- a/pkg/controllers/status_controller.go +++ b/pkg/controllers/status_controller.go @@ -6,6 +6,7 @@ import ( "time" "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/services" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" @@ -16,16 +17,22 @@ const StatusEventID ControllerHandlerContextKey = "status_event" type StatusHandlerFunc func(ctx context.Context, eventID, sourceID string) error type StatusController struct { - controllers map[api.StatusEventType][]StatusHandlerFunc - statusEvents services.StatusEventService - eventsQueue workqueue.RateLimitingInterface + controllers map[api.StatusEventType][]StatusHandlerFunc + statusEvents services.StatusEventService + instanceDao dao.InstanceDao + eventInstanceDao dao.EventInstanceDao + eventsQueue workqueue.RateLimitingInterface } -func NewStatusController(statusEvents services.StatusEventService) *StatusController { +func NewStatusController(statusEvents services.StatusEventService, + instanceDao dao.InstanceDao, + eventInstanceDao dao.EventInstanceDao) *StatusController { return &StatusController{ - controllers: map[api.StatusEventType][]StatusHandlerFunc{}, - statusEvents: statusEvents, - eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"), + controllers: map[api.StatusEventType][]StatusHandlerFunc{}, + statusEvents: statusEvents, + instanceDao: instanceDao, + eventInstanceDao: eventInstanceDao, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"), } } @@ -38,9 +45,8 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) { logger.Infof("Starting status event controller") defer sc.eventsQueue.ShutDown() - // TODO: start a goroutine to sync all status events periodically // use a jitter to avoid multiple instances syncing the events at the same time - // go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh) + go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh) // start a goroutine to handle the status event from the event queue // the .Until will re-kick the runWorker one second after the runWorker completes @@ -51,35 +57,35 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) { logger.Infof("Shutting down status event controller") } -func (sm *StatusController) runWorker() { +func (sc *StatusController) runWorker() { // hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so // we don't worry about secondary waits - for sm.processNextEvent() { + for sc.processNextEvent() { } } // processNextEvent deals with one key off the queue. -func (sm *StatusController) processNextEvent() bool { +func (sc *StatusController) processNextEvent() bool { // pull the next status event item from queue. // events queue blocks until it can return an item to be processed - key, quit := sm.eventsQueue.Get() + key, quit := sc.eventsQueue.Get() if quit { // the current queue is shutdown and becomes empty, quit this process return false } - defer sm.eventsQueue.Done(key) + defer sc.eventsQueue.Done(key) - if err := sm.handleStatusEvent(key.(string)); err != nil { + if err := sc.handleStatusEvent(key.(string)); err != nil { logger.Error(fmt.Sprintf("Failed to handle the event %v, %v ", key, err)) // we failed to handle the status event, we should requeue the item to work on later // this method will add a backoff to avoid hotlooping on particular items - sm.eventsQueue.AddRateLimited(key) + sc.eventsQueue.AddRateLimited(key) return true } // we handle the status event successfully, tell the queue to stop tracking history for this status event - sm.eventsQueue.Forget(key) + sc.eventsQueue.Forget(key) return true } @@ -131,3 +137,42 @@ func (sc *StatusController) add(ev api.StatusEventType, fns []StatusHandlerFunc) sc.controllers[ev] = append(sc.controllers[ev], fns...) } + +func (sc *StatusController) syncStatusEvents() { + ctx := context.Background() + + readyInstanceIDs, err := sc.instanceDao.FindReadyIDs(ctx) + if err != nil { + logger.Error(fmt.Sprintf("Failed to find ready instances from db, %v", err)) + return + } + logger.Infof("purge status events on the ready instances: %s", readyInstanceIDs) + + // find the status events that already were dispatched to all ready instances + statusEventIDs, err := sc.eventInstanceDao.GetEventsAssociatedWithInstances(ctx, readyInstanceIDs) + if err != nil { + logger.Error(fmt.Sprintf("Failed to find handled status events from db, %v", err)) + return + } + + // batch delete the handled status events + batches := batchStatusEventIDs(statusEventIDs, 500) + for _, batch := range batches { + if err := sc.statusEvents.DeleteAllEvents(ctx, batch); err != nil { + logger.Error(fmt.Sprintf("Failed to delete handled status events from db, %v", err)) + return + } + } +} + +func batchStatusEventIDs(statusEventIDs []string, batchSize int) [][]string { + batches := [][]string{} + for i := 0; i < len(statusEventIDs); i += batchSize { + end := i + batchSize + if end > len(statusEventIDs) { + end = len(statusEventIDs) + } + batches = append(batches, statusEventIDs[i:end]) + } + return batches +} diff --git a/pkg/controllers/status_controller_test.go b/pkg/controllers/status_controller_test.go new file mode 100644 index 00000000..c49736c4 --- /dev/null +++ b/pkg/controllers/status_controller_test.go @@ -0,0 +1,64 @@ +package controllers + +import ( + "testing" +) + +func TestBatchStatusEventIDs(t *testing.T) { + const batchSize = 500 + + cases := []struct { + name string + statusEventIDs []string + expected [][]string + }{ + { + name: "empty input", + statusEventIDs: []string{}, + expected: [][]string{}, + }, + { + name: "single batch less than batch size", + statusEventIDs: make([]string, 499), + expected: [][]string{make([]string, 499)}, + }, + { + name: "single batch equal to batch size", + statusEventIDs: make([]string, batchSize), + expected: [][]string{make([]string, batchSize)}, + }, + { + name: "multiple batches full", + statusEventIDs: make([]string, batchSize*2), + expected: [][]string{make([]string, batchSize), make([]string, batchSize)}, + }, + { + name: "multiple batches partial last", + statusEventIDs: make([]string, batchSize+100), + expected: [][]string{make([]string, batchSize), make([]string, 100)}, + }, + { + name: "multiple batches full partial last", + statusEventIDs: make([]string, batchSize*2+300), + expected: [][]string{make([]string, batchSize), make([]string, batchSize), make([]string, 300)}, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result := batchStatusEventIDs(tt.statusEventIDs, batchSize) + + // Ensure the number of batches is correct + if len(result) != len(tt.expected) { + t.Errorf("number of batches mismatch, got %d, want %d", len(result), len(tt.expected)) + } + + // Check the length of each batch + for i := range result { + if len(result[i]) != len(tt.expected[i]) { + t.Errorf("length of batch %d mismatch, got %d, want %d", i+1, len(result[i]), len(tt.expected[i])) + } + } + }) + } +} diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instances.go index c7722b16..7ec1eef5 100644 --- a/pkg/dao/event_instances.go +++ b/pkg/dao/event_instances.go @@ -12,6 +12,9 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) + + FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) + GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) } var _ EventInstanceDao = &sqlEventInstanceDao{} @@ -43,3 +46,36 @@ func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.Eve } return eventInstance, nil } + +func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { + g2 := (*d.sessionFactory).New(ctx) + eventInstances := api.EventInstanceList{} + if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil { + return nil, err + } + return eventInstances, nil +} + +func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) { + var eventIDs []string + + instanceCount := len(instanceIDs) + if instanceCount == 0 { + return eventIDs, nil + } + + g2 := (*d.sessionFactory).New(ctx) + + // Currently, the instance table should be small, if the instance table become to large, + // consider using join to optimize + if err := g2.Table("event_instances"). + Select("event_id"). + Where("instance_id IN ?", instanceIDs). + Group("event_id"). + Having("COUNT(DISTINCT instance_id) = ?", instanceCount). + Scan(&eventIDs).Error; err != nil { + return nil, err + } + + return eventIDs, nil +} diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index db8e35ad..8be26a3c 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -14,11 +14,13 @@ type InstanceDao interface { Get(ctx context.Context, id string) (*api.ServerInstance, error) Create(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) Replace(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) - UpSert(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) + MarkReadyByIDs(ctx context.Context, ids []string) error + MarkUnreadyByIDs(ctx context.Context, ids []string) error Delete(ctx context.Context, id string) error DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) + FindReadyIDs(ctx context.Context) ([]string, error) All(ctx context.Context) (api.ServerInstanceList, error) } @@ -59,13 +61,22 @@ func (d *sqlInstanceDao) Replace(ctx context.Context, instance *api.ServerInstan return instance, nil } -func (d *sqlInstanceDao) UpSert(ctx context.Context, instance *api.ServerInstance) (*api.ServerInstance, error) { +func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error { g2 := (*d.sessionFactory).New(ctx) - if err := g2.Unscoped().Omit(clause.Associations).Save(instance).Error; err != nil { + if err := g2.Model(&api.ServerInstance{}).Where("id in (?)", ids).Update("ready", true).Error; err != nil { db.MarkForRollback(ctx, err) - return nil, err + return err } - return instance, nil + return nil +} + +func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) error { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Model(&api.ServerInstance{}).Where("id in (?)", ids).Update("ready", false).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil } func (d *sqlInstanceDao) Delete(ctx context.Context, id string) error { @@ -117,3 +128,16 @@ func (d *sqlInstanceDao) All(ctx context.Context) (api.ServerInstanceList, error } return instances, nil } + +func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) { + g2 := (*d.sessionFactory).New(ctx) + instances := api.ServerInstanceList{} + if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil { + return nil, err + } + ids := make([]string, len(instances)) + for i, instance := range instances { + ids[i] = instance.ID + } + return ids, nil +} diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index 7d3076bf..73ff4e21 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -67,6 +67,28 @@ func (d *instanceDaoMock) UpSert(ctx context.Context, instance *api.ServerInstan return instance, nil } +func (d *instanceDaoMock) MarkReadyByIDs(ctx context.Context, ids []string) error { + d.mux.Lock() + defer d.mux.Unlock() + for _, instance := range d.instances { + if contains(ids, instance.ID) { + instance.Ready = true + } + } + return nil +} + +func (d *instanceDaoMock) MarkUnreadyByIDs(ctx context.Context, ids []string) error { + d.mux.Lock() + defer d.mux.Unlock() + for _, instance := range d.instances { + if contains(ids, instance.ID) { + instance.Ready = false + } + } + return nil +} + func (d *instanceDaoMock) Delete(ctx context.Context, ID string) error { d.mux.Lock() defer d.mux.Unlock() @@ -114,6 +136,19 @@ func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime tim return instances, nil } +func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + ids := []string{} + for _, instance := range d.instances { + if instance.Ready { + ids = append(ids, instance.ID) + } + } + return ids, nil +} + func (d *instanceDaoMock) All(ctx context.Context) (api.ServerInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() diff --git a/pkg/dao/status_event.go b/pkg/dao/status_event.go index 03916480..298f9dc4 100755 --- a/pkg/dao/status_event.go +++ b/pkg/dao/status_event.go @@ -19,6 +19,7 @@ type StatusEventDao interface { All(ctx context.Context) (api.StatusEventList, error) DeleteAllReconciledEvents(ctx context.Context) error + DeleteAllEvents(ctx context.Context, eventIDs []string) error FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, error) } @@ -94,6 +95,19 @@ func (d *sqlStatusEventDao) DeleteAllReconciledEvents(ctx context.Context) error return nil } +func (d *sqlStatusEventDao) DeleteAllEvents(ctx context.Context, eventIDs []string) error { + if len(eventIDs) == 0 { + return nil + } + + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Unscoped().Omit(clause.Associations).Where("id IN ?", eventIDs).Delete(&api.StatusEvent{}).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil +} + func (d *sqlStatusEventDao) FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, error) { g2 := (*d.sessionFactory).New(ctx) statusEvents := api.StatusEventList{} diff --git a/pkg/db/db_session/default.go b/pkg/db/db_session/default.go index 6d39fde4..9c557a40 100755 --- a/pkg/db/db_session/default.go +++ b/pkg/db/db_session/default.go @@ -6,13 +6,18 @@ import ( "fmt" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/stdlib" + "github.com/lib/pq" "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/logger" - "github.com/lib/pq" - "github.com/openshift-online/maestro/pkg/config" + "github.com/openshift-online/maestro/pkg/constants" "github.com/openshift-online/maestro/pkg/db" ocmlogger "github.com/openshift-online/maestro/pkg/logger" ) @@ -48,20 +53,16 @@ func (f *Default) Init(config *config.DatabaseConfig) { err error ) - // Open connection to DB via standard library - dbx, err = sql.Open(config.Dialect, config.ConnectionString(config.SSLMode != disable)) + connConfig, err := pgx.ParseConfig(config.ConnectionString(config.SSLMode != disable)) if err != nil { - dbx, err = sql.Open(config.Dialect, config.ConnectionString(false)) - if err != nil { - panic(fmt.Sprintf( - "SQL failed to connect to %s database %s with connection string: %s\nError: %s", - config.Dialect, - config.Name, - config.LogSafeConnectionString(config.SSLMode != disable), - err.Error(), - )) - } + panic(fmt.Sprintf( + "GORM failed to parse the connection string: %s\nError: %s", + config.LogSafeConnectionString(config.SSLMode != disable), + err.Error(), + )) } + + dbx = stdlib.OpenDB(*connConfig, stdlib.OptionBeforeConnect(setPassword(config))) dbx.SetMaxOpenConns(config.MaxOpenConnections) // Connect GORM to use the same connection @@ -93,6 +94,46 @@ func (f *Default) Init(config *config.DatabaseConfig) { }) } +func setPassword(dbConfig *config.DatabaseConfig) func(ctx context.Context, connConfig *pgx.ConnConfig) error { + return func(ctx context.Context, connConfig *pgx.ConnConfig) error { + if dbConfig.AuthMethod == constants.AuthMethodPassword { + connConfig.Password = dbConfig.Password + return nil + } else if dbConfig.AuthMethod == constants.AuthMethodMicrosoftEntra { + if isExpired(dbConfig.Token) { + token, err := getAccessToken(ctx, dbConfig) + if err != nil { + return err + } + connConfig.Password = token.Token + dbConfig.Token = token + } else { + connConfig.Password = dbConfig.Token.Token + } + } + return nil + } +} + +func getAccessToken(ctx context.Context, dbConfig *config.DatabaseConfig) (*azcore.AccessToken, error) { + // ARO-HCP environment variable configuration is set by the Azure workload identity webhook. + // Use [WorkloadIdentityCredential] directly when not using the webhook or needing more control over its configuration. + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, err + } + token, err := cred.GetToken(ctx, policy.TokenRequestOptions{Scopes: []string{dbConfig.TokenRequestScope}}) + if err != nil { + return nil, err + } + return &token, nil +} + +func isExpired(accessToken *azcore.AccessToken) bool { + return accessToken == nil || + time.Until(accessToken.ExpiresOn).Seconds() < constants.MinTokenLifeThreshold +} + func (f *Default) DirectDB() *sql.DB { return f.db } @@ -112,13 +153,17 @@ func waitForNotification(ctx context.Context, l *pq.Listener, callback func(id s case <-time.After(10 * time.Second): logger.V(10).Infof("Received no events on channel during interval. Pinging source") go func() { - l.Ping() + // TODO: Need to handle the error, especially in cases of network failure. + err := l.Ping() + if err != nil { + logger.Error(err.Error()) + } }() } } } -func newListener(ctx context.Context, connstr, channel string, callback func(id string)) { +func newListener(ctx context.Context, dbConfig *config.DatabaseConfig, channel string, callback func(id string)) { logger := ocmlogger.NewOCMLogger(ctx) plog := func(ev pq.ListenerEventType, err error) { @@ -126,6 +171,18 @@ func newListener(ctx context.Context, connstr, channel string, callback func(id logger.Error(err.Error()) } } + connstr := dbConfig.ConnectionString(true) + // append the password to the connection string + if dbConfig.AuthMethod == constants.AuthMethodPassword { + connstr += fmt.Sprintf(" password='%s'", dbConfig.Password) + } else if dbConfig.AuthMethod == constants.AuthMethodMicrosoftEntra { + token, err := getAccessToken(ctx, dbConfig) + if err != nil { + panic(err) + } + connstr += fmt.Sprintf(" password='%s'", token.Token) + } + listener := pq.NewListener(connstr, 10*time.Second, time.Minute, plog) err := listener.Listen(channel) if err != nil { @@ -137,7 +194,7 @@ func newListener(ctx context.Context, connstr, channel string, callback func(id } func (f *Default) NewListener(ctx context.Context, channel string, callback func(id string)) { - newListener(ctx, f.config.ConnectionString(true), channel, callback) + newListener(ctx, f.config, channel, callback) } func (f *Default) New(ctx context.Context) *gorm.DB { diff --git a/pkg/db/db_session/test.go b/pkg/db/db_session/test.go index 672afe94..f21c3358 100755 --- a/pkg/db/db_session/test.go +++ b/pkg/db/db_session/test.go @@ -11,6 +11,8 @@ import ( "gorm.io/gorm/logger" "k8s.io/klog/v2" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/stdlib" "github.com/openshift-online/maestro/pkg/config" "github.com/openshift-online/maestro/pkg/db" ) @@ -41,7 +43,7 @@ func NewTestFactory(config *config.DatabaseConfig) *Test { // Init will: // - initialize a template1 DB with migrations -// - rebuild AMS DB from template1 +// - rebuild Maestro DB from template1 // - return a new connection factory // Go includes database connection pooling in the platform. Gorm uses the same and provides a method to // clone a connection via New(), which is safe for use by concurrent Goroutines. @@ -81,7 +83,7 @@ func resetDB(config *config.DatabaseConfig) error { dbx, _, cleanup := connect("postgres", config) defer cleanup() - // Drop `all` connections to both `template1` and AMS DB, so it can be dropped and created + // Drop `all` connections to both `template1` and Maestro DB, so it can be dropped and created if err := dropConnections(dbx, "template1"); err != nil { return err } @@ -89,7 +91,7 @@ func resetDB(config *config.DatabaseConfig) error { return err } - // Rebuild AMS DB + // Rebuild Maestro DB query := fmt.Sprintf("DROP DATABASE IF EXISTS %s", config.Name) if _, err := dbx.Exec(query); err != nil { return fmt.Errorf("SQL failed to DROP database %s: %s", config.Name, err.Error()) @@ -98,7 +100,7 @@ func resetDB(config *config.DatabaseConfig) error { if _, err := dbx.Exec(query); err != nil { return fmt.Errorf("SQL failed to CREATE database %s: %s", config.Name, err.Error()) } - // As `template1` had all migrations, so now AMS DB has them too! + // As `template1` had all migrations, so now Maestro DB has them too! return nil } @@ -110,20 +112,17 @@ func connect(name string, config *config.DatabaseConfig) (*sql.DB, *gorm.DB, fun err error ) - dbx, err = sql.Open(config.Dialect, config.ConnectionStringWithName(name, config.SSLMode != disable)) + connConfig, err := pgx.ParseConfig(config.ConnectionStringWithName(name, config.SSLMode != disable)) if err != nil { - dbx, err = sql.Open(config.Dialect, config.ConnectionStringWithName(name, false)) - if err != nil { - panic(fmt.Sprintf( - "SQL failed to connect to %s database %s with connection string: %s\nError: %s", - config.Dialect, - name, - config.LogSafeConnectionStringWithName(name, config.SSLMode != disable), - err.Error(), - )) - } + panic(fmt.Sprintf( + "GORM failed to parse the connection string: %s\nError: %s", + config.LogSafeConnectionStringWithName(name, config.SSLMode != disable), + err.Error(), + )) } + dbx = stdlib.OpenDB(*connConfig, stdlib.OptionBeforeConnect(setPassword(config))) + // Connect GORM to use the same connection conf := &gorm.Config{ PrepareStmt: false, @@ -217,5 +216,5 @@ func (f *Test) ResetDB() { } func (f *Test) NewListener(ctx context.Context, channel string, callback func(id string)) { - newListener(ctx, f.config.ConnectionString(true), channel, callback) + newListener(ctx, f.config, channel, callback) } diff --git a/pkg/db/migrations/202311151856_add_consumers.go b/pkg/db/migrations/202311151856_add_consumers.go index 0a764d25..f3dcf565 100755 --- a/pkg/db/migrations/202311151856_add_consumers.go +++ b/pkg/db/migrations/202311151856_add_consumers.go @@ -22,7 +22,7 @@ func addConsumers() *gormigrate.Migration { } if err := CreateFK(tx, fkMigration{ - "resources", "consumers", "consumer_name", "consumers(name)", + "resources", "consumers", "consumer_name", "consumers(name)", "ON DELETE RESTRICT ON UPDATE RESTRICT", }); err != nil { return err } diff --git a/pkg/db/migrations/202406241426_add_status_events.go b/pkg/db/migrations/202406241426_add_status_events.go index cc165b12..2bc4ef2c 100644 --- a/pkg/db/migrations/202406241426_add_status_events.go +++ b/pkg/db/migrations/202406241426_add_status_events.go @@ -15,6 +15,7 @@ func addStatusEvents() *gormigrate.Migration { ResourceID string `gorm:"index"` // resource id ResourceSource string ResourceType string + Payload datatypes.JSON `gorm:"type:json"` Status datatypes.JSON `gorm:"type:json"` StatusEventType string // Update|Delete, any string ReconciledDate *time.Time `gorm:"null;index"` diff --git a/pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go b/pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go new file mode 100644 index 00000000..6565e5a0 --- /dev/null +++ b/pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go @@ -0,0 +1,30 @@ +package migrations + +import ( + "time" + + "gorm.io/gorm" + + "github.com/go-gormigrate/gormigrate/v2" +) + +func addLastHeartBeatAndReadyColumnInServerInstancesTable() *gormigrate.Migration { + type ServerInstance struct { + LastHeartbeat time.Time + Ready bool `gorm:"default:false"` + } + + return &gormigrate.Migration{ + ID: "202412171429", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&ServerInstance{}) + }, + Rollback: func(tx *gorm.DB) error { + err := tx.Migrator().DropColumn(&ServerInstance{}, "ready") + if err != nil { + return err + } + return tx.Migrator().DropColumn(&ServerInstance{}, "last_heartbeat") + }, + } +} diff --git a/pkg/db/migrations/202412181141_alter_event_instances.go b/pkg/db/migrations/202412181141_alter_event_instances.go new file mode 100644 index 00000000..d07c5cc0 --- /dev/null +++ b/pkg/db/migrations/202412181141_alter_event_instances.go @@ -0,0 +1,51 @@ +package migrations + +import ( + "gorm.io/gorm" + + "github.com/go-gormigrate/gormigrate/v2" +) + +func alterEventInstances() *gormigrate.Migration { + type EventInstance struct { + EventID string `gorm:"index:idx_status_event_instance"` // primary key of status_events table + InstanceID string `gorm:"index:idx_status_event_instance"` // primary key of server_instances table + SpecEventID string `gorm:"index"` // primary key of events table + } + + return &gormigrate.Migration{ + ID: "202412181141", + Migrate: func(tx *gorm.DB) error { + if err := tx.AutoMigrate(&EventInstance{}); err != nil { + return err + } + + return CreateFK(tx, fkMigration{ + "event_instances", "server_instances", "instance_id", "server_instances(id)", "ON DELETE CASCADE", + }, fkMigration{ + "event_instances", "status_events", "event_id", "status_events(id)", "ON DELETE CASCADE", + }, fkMigration{ + "event_instances", "events", "spec_event_id", "events(id)", "ON DELETE CASCADE", + }) + }, + Rollback: func(tx *gorm.DB) error { + if err := tx.Migrator().DropColumn(&EventInstance{}, "spec_event_id"); err != nil { + return err + } + + if err := tx.Migrator().DropIndex(&EventInstance{}, "idx_status_event_instance"); err != nil { + return err + } + + if err := tx.Migrator().DropConstraint(&EventInstance{}, fkName("event_instances", "server_instances")); err != nil { + return err + } + + if err := tx.Migrator().DropConstraint(&EventInstance{}, fkName("event_instances", "status_events")); err != nil { + return err + } + + return tx.Migrator().DropConstraint(&EventInstance{}, fkName("event_instances", "events")) + }, + } +} diff --git a/pkg/db/migrations/migration_structs.go b/pkg/db/migrations/migration_structs.go index 2a5e652b..79c79b70 100755 --- a/pkg/db/migrations/migration_structs.go +++ b/pkg/db/migrations/migration_structs.go @@ -34,6 +34,8 @@ var MigrationList = []*gormigrate.Migration{ addServerInstances(), addStatusEvents(), addEventInstances(), + addLastHeartBeatAndReadyColumnInServerInstancesTable(), + alterEventInstances(), } // Model represents the base model struct. All entities will have this struct embedded. @@ -45,23 +47,28 @@ type Model struct { } type fkMigration struct { - Model string - Dest string - Field string - Reference string + Model string + Dest string + Field string + Reference string + Constraint string } func CreateFK(g2 *gorm.DB, fks ...fkMigration) error { - var query = `ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s ON DELETE RESTRICT ON UPDATE RESTRICT;` var drop = `ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s;` for _, fk := range fks { - name := fmt.Sprintf("fk_%s_%s", fk.Model, fk.Dest) + name := fkName(fk.Model, fk.Dest) g2.Exec(fmt.Sprintf(drop, fk.Model, name)) - if err := g2.Exec(fmt.Sprintf(query, fk.Model, name, fk.Field, fk.Reference)).Error; err != nil { + if err := g2.Exec(fmt.Sprintf(`ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s %s;`, + fk.Model, name, fk.Field, fk.Reference, fk.Constraint)).Error; err != nil { return err } } return nil } + +func fkName(model, dest string) string { + return fmt.Sprintf("fk_%s_%s", model, dest) +} diff --git a/pkg/services/status_event.go b/pkg/services/status_event.go index c39916c8..f69c80b2 100755 --- a/pkg/services/status_event.go +++ b/pkg/services/status_event.go @@ -18,6 +18,7 @@ type StatusEventService interface { FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, *errors.ServiceError) DeleteAllReconciledEvents(ctx context.Context) *errors.ServiceError + DeleteAllEvents(ctx context.Context, eventIDs []string) *errors.ServiceError } func NewStatusEventService(statusEventDao dao.StatusEventDao) StatusEventService { @@ -93,3 +94,10 @@ func (s *sqlStatusEventService) DeleteAllReconciledEvents(ctx context.Context) * } return nil } + +func (s *sqlStatusEventService) DeleteAllEvents(ctx context.Context, eventIDs []string) *errors.ServiceError { + if err := s.statusEventDao.DeleteAllEvents(ctx, eventIDs); err != nil { + return handleDeleteError("StatusEvent", errors.GeneralError("Unable to delete events %s: %s", eventIDs, err)) + } + return nil +} diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go index 3a1c9086..9f40225d 100644 --- a/test/e2e/pkg/sourceclient_test.go +++ b/test/e2e/pkg/sourceclient_test.go @@ -452,6 +452,10 @@ func AssertWatchResult(result *WatchedResult) error { } if meta.IsStatusConditionTrue(watchedWork.Status.Conditions, common.ManifestsDeleted) && !watchedWork.DeletionTimestamp.IsZero() { + if len(watchedWork.Spec.Workload.Manifests) == 0 { + return fmt.Errorf("expected the deleted work has spec, but failed %v", watchedWork.Spec) + } + hasDeletedWork = true } } diff --git a/test/helper.go b/test/helper.go index bf8b22df..e9fcfe5a 100755 --- a/test/helper.go +++ b/test/helper.go @@ -13,6 +13,8 @@ import ( "time" "github.com/openshift-online/maestro/pkg/controllers" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" "github.com/openshift-online/maestro/pkg/logger" "k8s.io/klog/v2" @@ -70,13 +72,14 @@ type Helper struct { ContextCancelFunc context.CancelFunc EventBroadcaster *event.EventBroadcaster + StatusDispatcher dispatcher.Dispatcher Store *MemoryStore GRPCSourceClient *generic.CloudEventSourceClient[*api.Resource] DBFactory db.SessionFactory AppConfig *config.ApplicationConfig APIServer server.Server MetricsServer server.Server - HealthCheckServer server.Server + HealthCheckServer *server.HealthCheckServer EventServer server.EventServer ControllerManager *server.ControllersServer WorkAgentHolder *work.ClientHolder @@ -95,7 +98,7 @@ func NewHelper(t *testing.T) *Helper { fmt.Println("Unable to read JWT keys - this may affect tests that make authenticated server requests") } - env := environments.Environment() + env := helper.Env() // Manually set environment name, ignoring environment variables env.Name = environments.TestingEnv err = env.AddFlags(pflag.CommandLine) @@ -118,10 +121,17 @@ func NewHelper(t *testing.T) *Helper { Ctx: ctx, ContextCancelFunc: cancel, EventBroadcaster: event.NewEventBroadcaster(), - AppConfig: env.Config, - DBFactory: env.Database.SessionFactory, - JWTPrivateKey: jwtKey, - JWTCA: jwtCA, + StatusDispatcher: dispatcher.NewHashDispatcher( + helper.Env().Config.MessageBroker.ClientID, + dao.NewInstanceDao(&helper.Env().Database.SessionFactory), + dao.NewConsumerDao(&helper.Env().Database.SessionFactory), + helper.Env().Clients.CloudEventsSource, + helper.Env().Config.EventServer.ConsistentHashConfig, + ), + AppConfig: env.Config, + DBFactory: env.Database.SessionFactory, + JWTPrivateKey: jwtKey, + JWTCA: jwtCA, } // TODO jwk mock server needs to be refactored out of the helper and into the testing environment @@ -130,14 +140,17 @@ func NewHelper(t *testing.T) *Helper { helper.sendShutdownSignal, helper.stopAPIServer, helper.stopMetricsServer, - helper.stopHealthCheckServer, jwkMockTeardown, } + if err := helper.MigrateDB(); err != nil { + panic(err) + } + helper.startEventBroadcaster() helper.startAPIServer() helper.startMetricsServer() - helper.startHealthCheckServer() + helper.startHealthCheckServer(helper.Ctx) helper.startEventServer(helper.Ctx) }) helper.T = t @@ -192,31 +205,25 @@ func (helper *Helper) stopMetricsServer() error { return nil } -func (helper *Helper) startHealthCheckServer() { +func (helper *Helper) startHealthCheckServer(ctx context.Context) { + helper.Env().Config.HealthCheck.HeartbeartInterval = 1 helper.HealthCheckServer = server.NewHealthCheckServer() + helper.HealthCheckServer.SetStatusDispatcher(helper.StatusDispatcher) go func() { klog.V(10).Info("Test health check server started") - helper.HealthCheckServer.Start() + helper.HealthCheckServer.Start(ctx) klog.V(10).Info("Test health check server stopped") }() } -func (helper *Helper) stopHealthCheckServer() error { - if err := helper.HealthCheckServer.Stop(); err != nil { - return fmt.Errorf("unable to stop health check server: %s", err.Error()) - } - return nil -} - func (helper *Helper) sendShutdownSignal() error { helper.ContextCancelFunc() return nil } func (helper *Helper) startEventServer(ctx context.Context) { - helper.Env().Config.PulseServer.PulseInterval = 1 - helper.Env().Config.PulseServer.SubscriptionType = "broadcast" - helper.EventServer = server.NewPulseServer(helper.EventBroadcaster) + // helper.Env().Config.EventServer.SubscriptionType = "broadcast" + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) go func() { klog.V(10).Info("Test event server started") helper.EventServer.Start(ctx) @@ -240,6 +247,8 @@ func (helper *Helper) StartControllerManager(ctx context.Context) { ), StatusController: controllers.NewStatusController( helper.Env().Services.StatusEvents(), + dao.NewInstanceDao(&helper.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), ), } @@ -339,7 +348,7 @@ func (helper *Helper) RestartMetricsServer() { func (helper *Helper) Reset() { klog.Infof("Reseting testing environment") - env := environments.Environment() + env := helper.Env() // Reset the configuration env.Config = config.NewApplicationConfig() @@ -483,11 +492,9 @@ func (helper *Helper) CleanDB() error { for _, table := range []string{ "events", "status_events", - "event_instances", "resources", "consumers", "server_instances", - "migrations", } { if g2.Migrator().HasTable(table) { // remove table contents instead of dropping table @@ -506,10 +513,6 @@ func (helper *Helper) ResetDB() error { return err } - if err := helper.MigrateDB(); err != nil { - return err - } - return nil } diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index a6ce56cc..1285eb60 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -85,6 +85,8 @@ func TestControllerRacing(t *testing.T) { ), StatusController: controllers.NewStatusController( h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), ), } @@ -181,6 +183,8 @@ func TestControllerReconcile(t *testing.T) { ), StatusController: controllers.NewStatusController( h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), ), } @@ -312,6 +316,8 @@ func TestControllerSync(t *testing.T) { ), StatusController: controllers.NewStatusController( h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), ), } @@ -347,3 +353,154 @@ func TestControllerSync(t *testing.T) { // cancel the context to stop the controller manager cancel() } + +func TestStatusControllerSync(t *testing.T) { + h, _ := test.RegisterIntegration(t) + + account := h.NewRandAccount() + ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) + + instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) + statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) + + // prepare instances + if _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "i1"}, Ready: true, LastHeartbeat: time.Now()}); err != nil { + t.Fatal(err) + } + if _, err := instanceDao.Create(ctx, &api.ServerInstance{Meta: api.Meta{ID: "i2"}}); err != nil { + t.Fatal(err) + } + if _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "i3"}, Ready: true, LastHeartbeat: time.Now()}); err != nil { + t.Fatal(err) + } + + // prepare events + evt1, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt2, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt3, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt4, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt5, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + + readyInstances, err := instanceDao.FindReadyIDs(ctx) + if err != nil { + t.Fatal(err) + } + + // prepare event-instances + for _, id := range readyInstances { + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: id, EventID: evt1.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: id, EventID: evt2.ID}); err != nil { + t.Fatal(err) + } + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i2", EventID: evt1.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i1", EventID: evt3.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i2", EventID: evt3.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i1", EventID: evt4.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i3", EventID: evt5.ID}); err != nil { + t.Fatal(err) + } + + // start the controller + go func() { + s := &server.ControllersServer{ + KindControllerManager: controllers.NewKindControllerManager( + db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.Env().Services.Events(), + ), + StatusController: controllers.NewStatusController( + h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), + ), + } + + s.Start(ctx) + }() + + purged := []string{evt1.ID, evt2.ID} + remained := []string{evt3.ID, evt4.ID, evt5.ID} + Eventually(func() error { + events, err := statusEventDao.FindByIDs(ctx, remained) + if err != nil { + return err + } + + if len(events) != 3 { + return fmt.Errorf("should have events %s remained, but got %v", remained, events) + } + + events, err = statusEventDao.FindByIDs(ctx, purged) + if err != nil { + return err + } + + if len(events) != 0 { + return fmt.Errorf("should purge the events %s, but got %+v", purged, events) + } + + eventInstances, err := eventInstanceDao.FindStatusEvents(ctx, purged) + if err != nil { + return err + } + if len(eventInstances) != 0 { + return fmt.Errorf("should purge the event-instances %s, but got %+v", purged, eventInstances) + } + + if _, err := eventInstanceDao.Get(ctx, evt3.ID, "i1"); err != nil { + return fmt.Errorf("%s-%s is not found", "e3", "i1") + } + if _, err := eventInstanceDao.Get(ctx, evt3.ID, "i2"); err != nil { + return fmt.Errorf("%s-%s is not found", "e3", "i2") + } + if _, err := eventInstanceDao.Get(ctx, evt4.ID, "i1"); err != nil { + return fmt.Errorf("%s-%s is not found", "e4", "i1") + } + if _, err := eventInstanceDao.Get(ctx, evt5.ID, "i3"); err != nil { + return fmt.Errorf("%s-%s is not found", "e5", "i3") + } + + return nil + }, 5*time.Second, 1*time.Second).Should(Succeed()) + + // cleanup + for _, evtID := range remained { + if err := statusEventDao.Delete(ctx, evtID); err != nil { + t.Fatal(err) + } + } + if err := instanceDao.DeleteByIDs(ctx, []string{"i1", "i2", "i3"}); err != nil { + t.Fatal(err) + } + + // cancel the context to stop the controller manager + cancel() +} diff --git a/test/integration/pulse_server_test.go b/test/integration/status_hash_test.go similarity index 82% rename from test/integration/pulse_server_test.go rename to test/integration/status_hash_test.go index 9556f2bd..000f33aa 100644 --- a/test/integration/pulse_server_test.go +++ b/test/integration/status_hash_test.go @@ -16,7 +16,7 @@ import ( "github.com/openshift-online/maestro/test" ) -func TestPulseServer(t *testing.T) { +func TestEventServer(t *testing.T) { h, _ := test.RegisterIntegration(t) ctx, cancel := context.WithCancel(context.Background()) defer func() { @@ -25,10 +25,11 @@ func TestPulseServer(t *testing.T) { instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) // insert one existing instances - _, err := instanceDao.UpSert(ctx, &api.ServerInstance{ + _, err := instanceDao.Create(ctx, &api.ServerInstance{ Meta: api.Meta{ ID: "instance1", }, + LastHeartbeat: time.Now(), }) Expect(err).NotTo(HaveOccurred()) @@ -39,13 +40,23 @@ func TestPulseServer(t *testing.T) { return err } - if len(instances) != 1 { + if len(instances) != 2 { return fmt.Errorf("expected 1 instance, got %d", len(instances)) } - instance := instances[0] - if instance.UpdatedAt.IsZero() { - return fmt.Errorf("expected instance.UpdatedAt to be non-zero") + var instance *api.ServerInstance + for _, i := range instances { + if i.ID == *instanceID { + instance = i + } + } + + if instance.LastHeartbeat.IsZero() { + return fmt.Errorf("expected instance.LastHeartbeat to be non-zero") + } + + if !instance.Ready { + return fmt.Errorf("expected instance.Ready to be true") } if instance.ID != *instanceID { @@ -57,18 +68,20 @@ func TestPulseServer(t *testing.T) { // the cluster1 name cannot be changed, because consistent hash makes it allocate to different instance. // the case here we want to the new consumer allocate to new instance(cluster1) which is a fake instance. - // after 3*pulseInterval (3s), it will relocate to maestro instance. + // after 3*heartbeatInterval (3s), it will relocate to maestro instance. clusterName := "cluster1" consumer := h.CreateConsumer(clusterName) // insert a new instance with the same name to consumer name // to make sure the consumer is hashed to the new instance firstly. - // after the new instance is stale after 3*pulseInterval (3s), the current + // after the new instance is stale after 3*heartbeatInterval (3s), the current // instance will take over the consumer and resync the resource status. - _, err = instanceDao.UpSert(ctx, &api.ServerInstance{ + _, err = instanceDao.Create(ctx, &api.ServerInstance{ Meta: api.Meta{ ID: clusterName, }, + LastHeartbeat: time.Now(), + Ready: true, }) Expect(err).NotTo(HaveOccurred()) diff --git a/test/registration.go b/test/registration.go index 1897a89a..f7e6804b 100755 --- a/test/registration.go +++ b/test/registration.go @@ -15,8 +15,12 @@ func RegisterIntegration(t *testing.T) (*Helper, *openapi.APIClient) { gm.RegisterTestingT(t) // Create a new helper helper := NewHelper(t) + // Reset the database to a seeded blank state - helper.ResetDB() + if err := helper.ResetDB(); err != nil { + panic(err) + } + // Create an api client client := helper.NewApiClient()