diff --git a/cmd/maestro/environments/framework.go b/cmd/maestro/environments/framework.go index 9f54cfe6..4a158f06 100755 --- a/cmd/maestro/environments/framework.go +++ b/cmd/maestro/environments/framework.go @@ -140,6 +140,7 @@ func (e *Env) LoadServices() { e.Services.Generic = NewGenericServiceLocator(e) e.Services.Resources = NewResourceServiceLocator(e) e.Services.Events = NewEventServiceLocator(e) + e.Services.StatusEvents = NewStatusEventServiceLocator(e) e.Services.Consumers = NewConsumerServiceLocator(e) } diff --git a/cmd/maestro/environments/service_types.go b/cmd/maestro/environments/service_types.go index 648f563d..f47a2b41 100755 --- a/cmd/maestro/environments/service_types.go +++ b/cmd/maestro/environments/service_types.go @@ -34,6 +34,14 @@ func NewEventServiceLocator(env *Env) EventServiceLocator { } } +type StatusEventServiceLocator func() services.StatusEventService + +func NewStatusEventServiceLocator(env *Env) StatusEventServiceLocator { + return func() services.StatusEventService { + return services.NewStatusEventService(dao.NewStatusEventDao(&env.Database.SessionFactory)) + } +} + type ConsumerServiceLocator func() services.ConsumerService func NewConsumerServiceLocator(env *Env) ConsumerServiceLocator { diff --git a/cmd/maestro/environments/types.go b/cmd/maestro/environments/types.go index c9a52028..52b54a6c 100755 --- a/cmd/maestro/environments/types.go +++ b/cmd/maestro/environments/types.go @@ -48,10 +48,11 @@ type Handlers struct { } type Services struct { - Resources ResourceServiceLocator - Generic GenericServiceLocator - Events EventServiceLocator - Consumers ConsumerServiceLocator + Resources ResourceServiceLocator + Generic GenericServiceLocator + Events EventServiceLocator + StatusEvents StatusEventServiceLocator + Consumers ConsumerServiceLocator } type Clients struct { diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index b826eb7e..e69daa8b 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -17,25 +17,34 @@ func NewControllersServer(pulseServer *PulseServer) *ControllersServer { db.NewAdvisoryLockFactory(env().Database.SessionFactory), env().Services.Events(), ), + StatusController: controllers.NewStatusController( + env().Services.StatusEvents(), + ), } sourceClient := env().Clients.CloudEventsSource s.KindControllerManager.Add(&controllers.ControllerConfig{ Source: "Resources", Handlers: map[api.EventType][]controllers.ControllerHandlerFunc{ - api.CreateEventType: {sourceClient.OnCreate}, - api.UpdateEventType: {sourceClient.OnUpdate}, - api.DeleteEventType: {sourceClient.OnDelete}, - api.StatusUpdateEventType: {pulseServer.OnStatusUpdate}, + api.CreateEventType: {sourceClient.OnCreate}, + api.UpdateEventType: {sourceClient.OnUpdate}, + api.DeleteEventType: {sourceClient.OnDelete}, }, }) + s.StatusController.Add(map[api.StatusEventType][]controllers.StatusHandlerFunc{ + api.StatusUpdateEventType: {pulseServer.OnStatusUpdate}, + api.StatusDeleteEventType: {pulseServer.OnStatusUpdate}, + }) + return s } type ControllersServer struct { KindControllerManager *controllers.KindControllerManager - DB db.SessionFactory + StatusController *controllers.StatusController + + DB db.SessionFactory } // Start is a blocking call that starts this controller server @@ -44,8 +53,14 @@ func (s ControllersServer) Start(ctx context.Context) { log.Infof("Kind controller handling events") go s.KindControllerManager.Run(ctx.Done()) + log.Infof("Status controller handling events") + go s.StatusController.Run(ctx.Done()) log.Infof("Kind controller listening for events") - // blocking call - env().Database.SessionFactory.NewListener(ctx, "events", s.KindControllerManager.AddEvent) + go env().Database.SessionFactory.NewListener(ctx, "events", s.KindControllerManager.AddEvent) + log.Infof("Status controller listening for events") + go env().Database.SessionFactory.NewListener(ctx, "status_events", s.StatusController.AddStatusEvent) + + // block until the context is done + <-ctx.Done() } diff --git a/cmd/maestro/server/pulse_server.go b/cmd/maestro/server/pulse_server.go index 59791159..3238cd03 100644 --- a/cmd/maestro/server/pulse_server.go +++ b/cmd/maestro/server/pulse_server.go @@ -28,16 +28,16 @@ var log = logger.NewOCMLogger(context.Background()) // checking the liveness of Maestro instances, triggering status resync based on // instances' status and other conditions. type PulseServer struct { - instanceID string - pulseInterval int64 - instanceDao dao.InstanceDao - eventInstanceDao dao.EventInstanceDao - lockFactory db.LockFactory - eventBroadcaster *event.EventBroadcaster - resourceService services.ResourceService - eventService services.EventService - sourceClient cloudevents.SourceClient - statusDispatcher dispatcher.Dispatcher + instanceID string + pulseInterval int64 + instanceDao dao.InstanceDao + eventInstanceDao dao.EventInstanceDao + lockFactory db.LockFactory + eventBroadcaster *event.EventBroadcaster + resourceService services.ResourceService + statusEventService services.StatusEventService + sourceClient cloudevents.SourceClient + statusDispatcher dispatcher.Dispatcher } func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer { @@ -52,16 +52,16 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer { } sessionFactory := env().Database.SessionFactory return &PulseServer{ - instanceID: env().Config.MessageBroker.ClientID, - pulseInterval: env().Config.PulseServer.PulseInterval, - instanceDao: dao.NewInstanceDao(&sessionFactory), - eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), - lockFactory: db.NewAdvisoryLockFactory(sessionFactory), - eventBroadcaster: eventBroadcaster, - resourceService: env().Services.Resources(), - eventService: env().Services.Events(), - sourceClient: env().Clients.CloudEventsSource, - statusDispatcher: statusDispatcher, + instanceID: env().Config.MessageBroker.ClientID, + pulseInterval: env().Config.PulseServer.PulseInterval, + instanceDao: dao.NewInstanceDao(&sessionFactory), + eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), + lockFactory: db.NewAdvisoryLockFactory(sessionFactory), + eventBroadcaster: eventBroadcaster, + resourceService: env().Services.Resources(), + statusEventService: env().Services.StatusEvents(), + sourceClient: env().Clients.CloudEventsSource, + statusDispatcher: statusDispatcher, } } @@ -179,62 +179,47 @@ func (s *PulseServer) startSubscription(ctx context.Context) { return nil } - // // convert the resource status to cloudevent - // evt, err := api.JSONMAPToCloudEvent(resource.Status) - // if err != nil { - // return fmt.Errorf("failed to convert resource status to cloudevent: %v", err) - // } - - // // decode the cloudevent data as manifest status - // statusPayload := &workpayload.ManifestStatus{} - // if err := evt.DataAs(statusPayload); err != nil { - // return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err) - // } - - // // if the resource has been deleted from agent, delete it from maestro - // if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) { - // if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil { - // return svcErr - // } - - // log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID) - // resource.Payload = found.Payload - // s.eventBroadcaster.Broadcast(resource) - // return nil - // } + // convert the resource status to cloudevent + evt, err := api.JSONMAPToCloudEvent(resource.Status) + if err != nil { + return fmt.Errorf("failed to convert resource status to cloudevent: %v", err) + } + + // decode the cloudevent data as manifest status + statusPayload := &workpayload.ManifestStatus{} + if err := evt.DataAs(statusPayload); err != nil { + return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err) + } + + // if the resource has been deleted from agent, create status event and delete it from maestro + if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) { + _, sErr := s.statusEventService.Create(ctx, &api.StatusEvent{ + ResourceID: resource.ID, + Status: resource.Status, + StatusEventType: api.StatusDeleteEventType, + }) + if sErr != nil { + return fmt.Errorf("failed to create status event for resource status delete %s: %s", resource.ID, sErr.Error()) + } + if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil { + return fmt.Errorf("failed to delete resource %s: %s", resource.ID, svcErr.Error()) + } + } + // update the resource status _, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource) if svcErr != nil { return fmt.Errorf("failed to update resource status %s: %s", resource.ID, svcErr.Error()) } - // // broadcast the resource status updated only when the resource is updated - // if updated { - // log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID) - // s.eventBroadcaster.Broadcast(updatedResource) - // } + // create the status event only when the resource is updated if updated { - evt, sErr := s.eventService.Create(ctx, &api.Event{ - Source: "Resources", - SourceID: resource.ID, - EventType: api.StatusUpdateEventType, + _, sErr := s.statusEventService.Create(ctx, &api.StatusEvent{ + ResourceID: resource.ID, + StatusEventType: api.StatusUpdateEventType, }) if sErr != nil { - return fmt.Errorf("failed to create event for resource status update %s: %s", resource.ID, sErr.Error()) - } - instances, err := s.instanceDao.All(ctx) - if err != nil { - return fmt.Errorf("failed to get all maestro instances: %s", err) - } - eventInstanceList := make([]*api.EventInstance, len(instances)) - for i, instance := range instances { - eventInstanceList[i] = &api.EventInstance{ - EventID: evt.ID, - InstanceID: instance.ID, - } - } - if err := s.eventInstanceDao.CreateList(ctx, eventInstanceList); err != nil { - return fmt.Errorf("failed to create event instances for resource status update %s: %s", resource.ID, err.Error()) + return fmt.Errorf("failed to create status event for resource status update %s: %s", resource.ID, sErr.Error()) } } default: @@ -245,52 +230,42 @@ func (s *PulseServer) startSubscription(ctx context.Context) { }) } -// On StatusUpdate does three things: -// 1. Broadcast the resource status update to subscribers -// 2. Mark the event instance as done -// 3. If the resource has been deleted from agent, delete it from maestro if all event instances are done +// On StatusUpdate will be called on each new status event inserted into db. +// It does two things: +// 1. build the resource status and broadcast it to subscribers +// 2. add the event instance record to mark the event has been processed by the current instance func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { - resource, sErr := s.resourceService.Get(ctx, resourceID) + statusEvent, sErr := s.statusEventService.Get(ctx, eventID) if sErr != nil { - return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) - } - - // broadcast the resource status updated to subscribers - s.eventBroadcaster.Broadcast(resource) - - // mark the event instance as done - if err := s.eventInstanceDao.MarkAsDone(ctx, eventID, s.instanceID); err != nil { - return fmt.Errorf("failed to mark event instance (%s, %s) as done for resource status update %s: %s", eventID, s.instanceID, resourceID, err.Error()) - } - - // convert the resource status to cloudevent - cloudevt, err := api.JSONMAPToCloudEvent(resource.Status) - if err != nil { - return fmt.Errorf("failed to convert resource status to cloudevent: %v", err) - } - - // decode the cloudevent data as manifest status - statusPayload := &workpayload.ManifestStatus{} - if err := cloudevt.DataAs(statusPayload); err != nil { - return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err) + return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) } - // if the resource has been deleted from agent, delete it from maestro - deleted := false - if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) { - deleted = true - } - if deleted { - count, err := s.eventInstanceDao.GetUnhandleEventInstances(ctx, eventID) - if err != nil { - return fmt.Errorf("failed to get unhandled event instances for event %s: %s", eventID, err.Error()) + var resource *api.Resource + // check if the status event is delete event + if statusEvent.StatusEventType == api.StatusDeleteEventType { + // build resource with resource id and delete status + resource = &api.Resource{ + Meta: api.Meta{ + ID: resourceID, + }, + Status: statusEvent.Status, } - if count == 0 { - if sErr := s.resourceService.Delete(ctx, resourceID); sErr != nil { - return fmt.Errorf("failed to delete resource %s: %s", resourceID, sErr.Error()) - } + } else { + resource, sErr = s.resourceService.Get(ctx, resourceID) + if sErr != nil { + return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) } } - return nil + // broadcast the resource status to subscribers + log.V(4).Infof("Broadcast the resource status %s", resource.ID) + s.eventBroadcaster.Broadcast(resource) + + // add the event instance record + _, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: s.instanceID, + }) + + return err } diff --git a/pkg/api/event.go b/pkg/api/event.go index 659adb30..870de24f 100755 --- a/pkg/api/event.go +++ b/pkg/api/event.go @@ -9,10 +9,9 @@ import ( type EventType string const ( - CreateEventType EventType = "Create" - UpdateEventType EventType = "Update" - DeleteEventType EventType = "Delete" - StatusUpdateEventType EventType = "StatusUpdate" + CreateEventType EventType = "Create" + UpdateEventType EventType = "Update" + DeleteEventType EventType = "Delete" ) type Event struct { diff --git a/pkg/api/event_instances.go b/pkg/api/event_instances.go index e0be9631..413147e8 100644 --- a/pkg/api/event_instances.go +++ b/pkg/api/event_instances.go @@ -3,7 +3,6 @@ package api type EventInstance struct { EventID string InstanceID string - Done bool } type EventInstanceList []*EventInstance diff --git a/pkg/api/status_event.go b/pkg/api/status_event.go new file mode 100755 index 00000000..d9d31cbd --- /dev/null +++ b/pkg/api/status_event.go @@ -0,0 +1,39 @@ +package api + +import ( + "time" + + "gorm.io/datatypes" + "gorm.io/gorm" +) + +type StatusEventType string + +const ( + StatusUpdateEventType StatusEventType = "StatusUpdate" + StatusDeleteEventType StatusEventType = "StatusDelete" +) + +type StatusEvent struct { + Meta + ResourceID string + Status datatypes.JSONMap + StatusEventType StatusEventType // Update|Delete + ReconciledDate *time.Time `json:"gorm:null"` +} + +type StatusEventList []*StatusEvent +type StatusEventIndex map[string]*StatusEvent + +func (l StatusEventList) Index() StatusEventIndex { + index := StatusEventIndex{} + for _, o := range l { + index[o.ID] = o + } + return index +} + +func (e *StatusEvent) BeforeCreate(tx *gorm.DB) error { + e.ID = NewID() + return nil +} diff --git a/pkg/client/cloudevents/source_client.go b/pkg/client/cloudevents/source_client.go index 7b63a540..2245f4f3 100644 --- a/pkg/client/cloudevents/source_client.go +++ b/pkg/client/cloudevents/source_client.go @@ -17,9 +17,9 @@ import ( // SourceClient is an interface for publishing resource events to consumers // subscribing to and resyncing resource status from consumers. type SourceClient interface { - OnCreate(ctx context.Context, eventID, resourceID string) error - OnUpdate(ctx context.Context, eventID, resourceID string) error - OnDelete(ctx context.Context, eventID, resourceID string) error + OnCreate(ctx context.Context, id string) error + OnUpdate(ctx context.Context, id string) error + OnDelete(ctx context.Context, id string) error Subscribe(ctx context.Context, handlers ...cegeneric.ResourceHandler[*api.Resource]) Resync(ctx context.Context, consumers []string) error ReconnectedChan() <-chan struct{} @@ -49,10 +49,10 @@ func NewSourceClient(sourceOptions *ceoptions.CloudEventsSourceOptions, resource }, nil } -func (s *SourceClientImpl) OnCreate(ctx context.Context, eventID, resourceID string) error { +func (s *SourceClientImpl) OnCreate(ctx context.Context, id string) error { logger := logger.NewOCMLogger(ctx) - resource, err := s.ResourceService.Get(ctx, resourceID) + resource, err := s.ResourceService.Get(ctx, id) if err != nil { return err } @@ -74,10 +74,10 @@ func (s *SourceClientImpl) OnCreate(ctx context.Context, eventID, resourceID str return nil } -func (s *SourceClientImpl) OnUpdate(ctx context.Context, eventID, resourceID string) error { +func (s *SourceClientImpl) OnUpdate(ctx context.Context, id string) error { logger := logger.NewOCMLogger(ctx) - resource, err := s.ResourceService.Get(ctx, resourceID) + resource, err := s.ResourceService.Get(ctx, id) if err != nil { return err } @@ -99,10 +99,10 @@ func (s *SourceClientImpl) OnUpdate(ctx context.Context, eventID, resourceID str return nil } -func (s *SourceClientImpl) OnDelete(ctx context.Context, eventID, resourceID string) error { +func (s *SourceClientImpl) OnDelete(ctx context.Context, id string) error { logger := logger.NewOCMLogger(ctx) - resource, err := s.ResourceService.Get(ctx, resourceID) + resource, err := s.ResourceService.Get(ctx, id) if err != nil { return err } diff --git a/pkg/client/cloudevents/source_client_mock.go b/pkg/client/cloudevents/source_client_mock.go index 1b59c866..d8566099 100644 --- a/pkg/client/cloudevents/source_client_mock.go +++ b/pkg/client/cloudevents/source_client_mock.go @@ -39,8 +39,8 @@ func NewSourceClientMock(resourceService services.ResourceService) SourceClient } } -func (s *SourceClientMock) OnCreate(ctx context.Context, eventID, resourceID string) error { - resource, serviceErr := s.ResourceService.Get(ctx, resourceID) +func (s *SourceClientMock) OnCreate(ctx context.Context, id string) error { + resource, serviceErr := s.ResourceService.Get(ctx, id) if serviceErr != nil { return fmt.Errorf("failed to get resource: %v", serviceErr) } @@ -78,8 +78,8 @@ func (s *SourceClientMock) OnCreate(ctx context.Context, eventID, resourceID str return nil } -func (s *SourceClientMock) OnUpdate(ctx context.Context, eventID, resourceID string) error { - resource, serviceErr := s.ResourceService.Get(ctx, resourceID) +func (s *SourceClientMock) OnUpdate(ctx context.Context, id string) error { + resource, serviceErr := s.ResourceService.Get(ctx, id) if serviceErr != nil { return fmt.Errorf("failed to get resource: %v", serviceErr) } @@ -136,8 +136,8 @@ func (s *SourceClientMock) OnUpdate(ctx context.Context, eventID, resourceID str return nil } -func (s *SourceClientMock) OnDelete(ctx context.Context, eventID, resourceID string) error { - resource, serviceErr := s.ResourceService.Get(ctx, resourceID) +func (s *SourceClientMock) OnDelete(ctx context.Context, id string) error { + resource, serviceErr := s.ResourceService.Get(ctx, id) if serviceErr != nil { return fmt.Errorf("failed to get resource: %v", serviceErr) } diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 5001320c..6ed920b4 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -45,7 +45,7 @@ var logger = maestrologger.NewOCMLogger(context.Background()) // events sync will help us to handle unexpected errors (e.g. sever restart), it ensures we will not miss any events var defaultEventsSyncPeriod = 10 * time.Hour -type ControllerHandlerFunc func(ctx context.Context, eventID, sourceID string) error +type ControllerHandlerFunc func(ctx context.Context, sourceID string) error type ControllerConfig struct { Source string @@ -131,7 +131,7 @@ func (km *KindControllerManager) handleEvent(id string) error { return fmt.Errorf("error getting event with id(%s): %s", id, svcErr) } - if !acquired && event.EventType != api.StatusUpdateEventType { + if !acquired { logger.Infof("Event %s is processed by another worker, continue to process the next", id) return nil } @@ -153,20 +153,18 @@ func (km *KindControllerManager) handleEvent(id string) error { } for _, fn := range handlerFns { - err := fn(reqContext, id, event.SourceID) + err := fn(reqContext, event.SourceID) if err != nil { return fmt.Errorf("error handling event %s, %s, %s: %s", event.Source, event.EventType, id, err) } } // all handlers successfully executed - if event.EventType != api.StatusUpdateEventType { - now := time.Now() - event.ReconciledDate = &now - _, svcErr = km.events.Replace(reqContext, event) - if svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) - } + now := time.Now() + event.ReconciledDate = &now + _, svcErr = km.events.Replace(reqContext, event) + if svcErr != nil { + return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) } return nil } diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index a0f3aedb..3365073b 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -28,17 +28,17 @@ type exampleController struct { deleteCounter int } -func (d *exampleController) OnAdd(ctx context.Context, eventID, sourceID string) error { +func (d *exampleController) OnAdd(ctx context.Context, id string) error { d.addCounter++ return nil } -func (d *exampleController) OnUpdate(ctx context.Context, eventID, sourceID string) error { +func (d *exampleController) OnUpdate(ctx context.Context, id string) error { d.updateCounter++ return nil } -func (d *exampleController) OnDelete(ctx context.Context, eventID, sourceID string) error { +func (d *exampleController) OnDelete(ctx context.Context, id string) error { d.deleteCounter++ return nil } diff --git a/pkg/controllers/status_controller.go b/pkg/controllers/status_controller.go new file mode 100644 index 00000000..fcb6cf45 --- /dev/null +++ b/pkg/controllers/status_controller.go @@ -0,0 +1,133 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/services" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" +) + +const StatusEventID ControllerHandlerContextKey = "status_event" + +type StatusHandlerFunc func(ctx context.Context, eventID, sourceID string) error + +type StatusController struct { + controllers map[api.StatusEventType][]StatusHandlerFunc + statusEvents services.StatusEventService + eventsQueue workqueue.RateLimitingInterface +} + +func NewStatusController(statusEvents services.StatusEventService) *StatusController { + return &StatusController{ + controllers: map[api.StatusEventType][]StatusHandlerFunc{}, + statusEvents: statusEvents, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"), + } +} + +// AddStatusEvent adds a status event to the queue to be processed. +func (sc *StatusController) AddStatusEvent(id string) { + sc.eventsQueue.Add(id) +} + +func (sc *StatusController) Run(stopCh <-chan struct{}) { + logger.Infof("Starting status event controller") + defer sc.eventsQueue.ShutDown() + + // TODO: start a goroutine to sync all status events periodically + // use a jitter to avoid multiple instances syncing the events at the same time + // go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh) + + // start a goroutine to handle the status event from the event queue + // the .Until will re-kick the runWorker one second after the runWorker completes + go wait.Until(sc.runWorker, time.Second, stopCh) + + // wait until we're told to stop + <-stopCh + logger.Infof("Shutting down status event controller") +} + +func (sm *StatusController) runWorker() { + // hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so + // we don't worry about secondary waits + for sm.processNextEvent() { + } +} + +// processNextEvent deals with one key off the queue. +func (sm *StatusController) processNextEvent() bool { + // pull the next status event item from queue. + // events queue blocks until it can return an item to be processed + key, quit := sm.eventsQueue.Get() + if quit { + // the current queue is shutdown and becomes empty, quit this process + return false + } + defer sm.eventsQueue.Done(key) + + if err := sm.handleStatusEvent(key.(string)); err != nil { + logger.Error(fmt.Sprintf("Failed to handle the event %v, %v ", key, err)) + + // we failed to handle the status event, we should requeue the item to work on later + // this method will add a backoff to avoid hotlooping on particular items + sm.eventsQueue.AddRateLimited(key) + return true + } + + // we handle the status event successfully, tell the queue to stop tracking history for this status event + sm.eventsQueue.Forget(key) + return true +} + +// syncStatusEvents handles the status event with the given ID. +// It reads the status event from the database and is called on each replica +// without locking, ensuring the status event is broadcast to all subscribers. +func (sc *StatusController) handleStatusEvent(id string) error { + ctx := context.Background() + reqContext := context.WithValue(ctx, StatusEventID, id) + statusEvent, svcErr := sc.statusEvents.Get(reqContext, id) + if svcErr != nil { + if svcErr.Is404() { + // the status event is already deleted, we can ignore it + return nil + } + return fmt.Errorf("error getting status event with id(%s): %s", id, svcErr) + } + + if statusEvent.ReconciledDate != nil { + return nil + } + + handlerFns, found := sc.controllers[statusEvent.StatusEventType] + if !found { + logger.Infof("No handler functions found for status event '%s'\n", statusEvent.StatusEventType) + return nil + } + + for _, fn := range handlerFns { + err := fn(reqContext, id, statusEvent.ResourceID) + if err != nil { + return fmt.Errorf("error handling status event %s, %s, %s: %s", statusEvent.StatusEventType, id, statusEvent.ResourceID, err) + } + } + + return nil +} + +func (sc *StatusController) Add(handlers map[api.StatusEventType][]StatusHandlerFunc) { + for ev, fn := range handlers { + sc.add(ev, fn) + } +} + +func (sc *StatusController) add(ev api.StatusEventType, fns []StatusHandlerFunc) { + if _, exists := sc.controllers[ev]; !exists { + sc.controllers[ev] = []StatusHandlerFunc{} + } + + sc.controllers[ev] = append(sc.controllers[ev], fns...) +} diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instances.go index 382eca14..c7722b16 100644 --- a/pkg/dao/event_instances.go +++ b/pkg/dao/event_instances.go @@ -12,9 +12,6 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) - CreateList(ctx context.Context, eventInstanceList api.EventInstanceList) error - MarkAsDone(ctx context.Context, eventID, instanceID string) error - GetUnhandleEventInstances(ctx context.Context, eventID string) (int64, error) } var _ EventInstanceDao = &sqlEventInstanceDao{} @@ -38,17 +35,6 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin return &eventInstance, nil } -func (d *sqlEventInstanceDao) GetUnhandleEventInstances(ctx context.Context, eventID string) (int64, error) { - g2 := (*d.sessionFactory).New(ctx) - var count int64 - err := g2.Model(&api.EventInstance{}).Where("event_id = ? AND done = ?", eventID, false).Count(&count).Error - if err != nil { - return 0, err - } - - return count, nil -} - func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { g2 := (*d.sessionFactory).New(ctx) if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { @@ -57,21 +43,3 @@ func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.Eve } return eventInstance, nil } - -func (d *sqlEventInstanceDao) CreateList(ctx context.Context, eventInstanceList api.EventInstanceList) error { - g2 := (*d.sessionFactory).New(ctx) - if err := g2.Omit(clause.Associations).CreateInBatches(eventInstanceList, len(eventInstanceList)).Error; err != nil { - db.MarkForRollback(ctx, err) - return err - } - return nil -} - -func (d *sqlEventInstanceDao) MarkAsDone(ctx context.Context, eventID, instanceID string) error { - g2 := (*d.sessionFactory).New(ctx) - if err := g2.Omit(clause.Associations).Table("event_instances").Where("event_id = ? AND instance_id = ?", eventID, instanceID).Update("done", true).Error; err != nil { - db.MarkForRollback(ctx, err) - return err - } - return nil -} diff --git a/pkg/dao/status_event.go b/pkg/dao/status_event.go new file mode 100755 index 00000000..03916480 --- /dev/null +++ b/pkg/dao/status_event.go @@ -0,0 +1,113 @@ +package dao + +import ( + "context" + "fmt" + + "gorm.io/gorm/clause" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/db" +) + +type StatusEventDao interface { + Get(ctx context.Context, id string) (*api.StatusEvent, error) + Create(ctx context.Context, statusEvent *api.StatusEvent) (*api.StatusEvent, error) + Replace(ctx context.Context, statusEvent *api.StatusEvent) (*api.StatusEvent, error) + Delete(ctx context.Context, id string) error + FindByIDs(ctx context.Context, ids []string) (api.StatusEventList, error) + All(ctx context.Context) (api.StatusEventList, error) + + DeleteAllReconciledEvents(ctx context.Context) error + FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, error) +} + +var _ StatusEventDao = &sqlStatusEventDao{} + +type sqlStatusEventDao struct { + sessionFactory *db.SessionFactory +} + +func NewStatusEventDao(sessionFactory *db.SessionFactory) StatusEventDao { + return &sqlStatusEventDao{sessionFactory: sessionFactory} +} + +func (d *sqlStatusEventDao) Get(ctx context.Context, id string) (*api.StatusEvent, error) { + g2 := (*d.sessionFactory).New(ctx) + var statusEvent api.StatusEvent + if err := g2.Take(&statusEvent, "id = ?", id).Error; err != nil { + return nil, err + } + return &statusEvent, nil +} + +func (d *sqlStatusEventDao) Create(ctx context.Context, statusEvent *api.StatusEvent) (*api.StatusEvent, error) { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).Create(statusEvent).Error; err != nil { + db.MarkForRollback(ctx, err) + return nil, err + } + + notify := fmt.Sprintf("select pg_notify('%s', '%s')", "status_events", statusEvent.ID) + + err := g2.Exec(notify).Error + if err != nil { + return nil, err + } + + return statusEvent, nil +} + +func (d *sqlStatusEventDao) Replace(ctx context.Context, statusEvent *api.StatusEvent) (*api.StatusEvent, error) { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).Save(statusEvent).Error; err != nil { + db.MarkForRollback(ctx, err) + return nil, err + } + return statusEvent, nil +} + +func (d *sqlStatusEventDao) Delete(ctx context.Context, id string) error { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Unscoped().Omit(clause.Associations).Delete(&api.StatusEvent{Meta: api.Meta{ID: id}}).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil +} + +func (d *sqlStatusEventDao) FindByIDs(ctx context.Context, ids []string) (api.StatusEventList, error) { + g2 := (*d.sessionFactory).New(ctx) + statusEvents := api.StatusEventList{} + if err := g2.Where("id in (?)", ids).Find(&statusEvents).Error; err != nil { + return nil, err + } + return statusEvents, nil +} + +func (d *sqlStatusEventDao) DeleteAllReconciledEvents(ctx context.Context) error { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Unscoped().Omit(clause.Associations).Where("reconciled_date IS NOT NULL").Delete(&api.StatusEvent{}).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil +} + +func (d *sqlStatusEventDao) FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, error) { + g2 := (*d.sessionFactory).New(ctx) + statusEvents := api.StatusEventList{} + if err := g2.Where("reconciled_date IS NULL").Find(&statusEvents).Error; err != nil { + return nil, err + } + return statusEvents, nil +} + +func (d *sqlStatusEventDao) All(ctx context.Context) (api.StatusEventList, error) { + g2 := (*d.sessionFactory).New(ctx) + statusEvents := api.StatusEventList{} + if err := g2.Find(&statusEvents).Error; err != nil { + return nil, err + } + return statusEvents, nil +} diff --git a/pkg/db/migrations/202406211556_add_event_instances.go b/pkg/db/migrations/202406211556_add_event_instances.go deleted file mode 100644 index 72353394..00000000 --- a/pkg/db/migrations/202406211556_add_event_instances.go +++ /dev/null @@ -1,63 +0,0 @@ -package migrations - -import ( - "gorm.io/gorm" - - "github.com/go-gormigrate/gormigrate/v2" -) - -func addEventInstances() *gormigrate.Migration { - type EventInstance struct { - EventID string `gorm:"index"` // primary key of events table - InstanceID string `gorm:"index"` // primary key of server_instances table - Done bool - } - - return &gormigrate.Migration{ - ID: "202406211556", - Migrate: func(tx *gorm.DB) error { - if err := tx.AutoMigrate(&EventInstance{}); err != nil { - return err - } - // Step 1: Create the Trigger Function - triggerFunctionSQL := ` - CREATE OR REPLACE FUNCTION check_event_instances_done() - RETURNS TRIGGER AS $$ - BEGIN - RAISE NOTICE 'Checking event_id: %, undone count: %', NEW.event_id, - (SELECT COUNT(*) FROM event_instances WHERE event_id = NEW.event_id AND done = false); - IF (SELECT COUNT(*) FROM event_instances WHERE event_id = NEW.event_id AND done = false) = 0 THEN - RAISE NOTICE 'All instances done, updating reconciled_date for event_id: %', NEW.event_id; - UPDATE events SET reconciled_date = NOW() WHERE id = NEW.event_id; - END IF; - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - ` - if err := tx.Exec(triggerFunctionSQL).Error; err != nil { - return err - } - // Step 2: Create the Trigger - triggerSQL := ` - CREATE TRIGGER trg_check_event_instances_done - AFTER UPDATE ON event_instances - FOR EACH ROW - EXECUTE FUNCTION check_event_instances_done(); - ` - return tx.Exec(triggerSQL).Error - }, - Rollback: func(tx *gorm.DB) error { - // Rollback function to drop the trigger and trigger function - dropTriggerSQL := `DROP TRIGGER IF EXISTS trg_check_event_instances_done ON event_instances;` - if err := tx.Exec(dropTriggerSQL).Error; err != nil { - return err - } - dropFunctionSQL := `DROP FUNCTION IF EXISTS check_event_instances_done;` - if err := tx.Exec(dropFunctionSQL).Error; err != nil { - return err - } - - return tx.Migrator().DropTable(&EventInstance{}) - }, - } -} diff --git a/pkg/db/migrations/202406241426_add_status_events.go b/pkg/db/migrations/202406241426_add_status_events.go new file mode 100644 index 00000000..7326e268 --- /dev/null +++ b/pkg/db/migrations/202406241426_add_status_events.go @@ -0,0 +1,30 @@ +package migrations + +import ( + "time" + + "gorm.io/datatypes" + "gorm.io/gorm" + + "github.com/go-gormigrate/gormigrate/v2" +) + +func addStatusEvents() *gormigrate.Migration { + type StatusEvent struct { + Model + ResourceID string `gorm:"index"` // resource id + Status datatypes.JSON `gorm:"type:json"` + StatusEventType string // Update|Delete, any string + ReconciledDate *time.Time `gorm:"null;index"` + } + + return &gormigrate.Migration{ + ID: "202406241426", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&StatusEvent{}) + }, + Rollback: func(tx *gorm.DB) error { + return tx.Migrator().DropTable(&StatusEvent{}) + }, + } +} diff --git a/pkg/db/migrations/202406241506_add_event_instances.go b/pkg/db/migrations/202406241506_add_event_instances.go new file mode 100644 index 00000000..cdcfda14 --- /dev/null +++ b/pkg/db/migrations/202406241506_add_event_instances.go @@ -0,0 +1,24 @@ +package migrations + +import ( + "gorm.io/gorm" + + "github.com/go-gormigrate/gormigrate/v2" +) + +func addEventInstances() *gormigrate.Migration { + type EventInstance struct { + EventID string `gorm:"index"` // primary key of events table + InstanceID string `gorm:"index"` // primary key of server_instances table + } + + return &gormigrate.Migration{ + ID: "202406241506", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&EventInstance{}) + }, + Rollback: func(tx *gorm.DB) error { + return tx.Migrator().DropTable(&EventInstance{}) + }, + } +} diff --git a/pkg/db/migrations/migration_structs.go b/pkg/db/migrations/migration_structs.go index 927ffc99..2a5e652b 100755 --- a/pkg/db/migrations/migration_structs.go +++ b/pkg/db/migrations/migration_structs.go @@ -32,6 +32,7 @@ var MigrationList = []*gormigrate.Migration{ addConsumers(), dropDinosaurs(), addServerInstances(), + addStatusEvents(), addEventInstances(), } diff --git a/pkg/services/status_event.go b/pkg/services/status_event.go new file mode 100755 index 00000000..c39916c8 --- /dev/null +++ b/pkg/services/status_event.go @@ -0,0 +1,95 @@ +package services + +import ( + "context" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/errors" +) + +type StatusEventService interface { + Get(ctx context.Context, id string) (*api.StatusEvent, *errors.ServiceError) + Create(ctx context.Context, event *api.StatusEvent) (*api.StatusEvent, *errors.ServiceError) + Replace(ctx context.Context, event *api.StatusEvent) (*api.StatusEvent, *errors.ServiceError) + Delete(ctx context.Context, id string) *errors.ServiceError + All(ctx context.Context) (api.StatusEventList, *errors.ServiceError) + FindByIDs(ctx context.Context, ids []string) (api.StatusEventList, *errors.ServiceError) + + FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, *errors.ServiceError) + DeleteAllReconciledEvents(ctx context.Context) *errors.ServiceError +} + +func NewStatusEventService(statusEventDao dao.StatusEventDao) StatusEventService { + return &sqlStatusEventService{ + statusEventDao: statusEventDao, + } +} + +var _ StatusEventService = &sqlStatusEventService{} + +type sqlStatusEventService struct { + statusEventDao dao.StatusEventDao +} + +func (s *sqlStatusEventService) Get(ctx context.Context, id string) (*api.StatusEvent, *errors.ServiceError) { + event, err := s.statusEventDao.Get(ctx, id) + if err != nil { + return nil, handleGetError("StatusEvent", "id", id, err) + } + return event, nil +} + +func (s *sqlStatusEventService) Create(ctx context.Context, statusEvent *api.StatusEvent) (*api.StatusEvent, *errors.ServiceError) { + event, err := s.statusEventDao.Create(ctx, statusEvent) + if err != nil { + return nil, handleCreateError("StatusEvent", err) + } + return event, nil +} + +func (s *sqlStatusEventService) Replace(ctx context.Context, statusEvent *api.StatusEvent) (*api.StatusEvent, *errors.ServiceError) { + statusEvent, err := s.statusEventDao.Replace(ctx, statusEvent) + if err != nil { + return nil, handleUpdateError("StatusEvent", err) + } + return statusEvent, nil +} + +func (s *sqlStatusEventService) Delete(ctx context.Context, id string) *errors.ServiceError { + if err := s.statusEventDao.Delete(ctx, id); err != nil { + return handleDeleteError("StatusEvent", errors.GeneralError("Unable to delete status event: %s", err)) + } + return nil +} + +func (s *sqlStatusEventService) FindByIDs(ctx context.Context, ids []string) (api.StatusEventList, *errors.ServiceError) { + statusEvents, err := s.statusEventDao.FindByIDs(ctx, ids) + if err != nil { + return nil, errors.GeneralError("Unable to get all status events: %s", err) + } + return statusEvents, nil +} + +func (s *sqlStatusEventService) All(ctx context.Context) (api.StatusEventList, *errors.ServiceError) { + statusEvents, err := s.statusEventDao.All(ctx) + if err != nil { + return nil, errors.GeneralError("Unable to get all status events: %s", err) + } + return statusEvents, nil +} + +func (s *sqlStatusEventService) FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, *errors.ServiceError) { + statusEvents, err := s.statusEventDao.FindAllUnreconciledEvents(ctx) + if err != nil { + return nil, errors.GeneralError("Unable to get unreconciled status events: %s", err) + } + return statusEvents, nil +} + +func (s *sqlStatusEventService) DeleteAllReconciledEvents(ctx context.Context) *errors.ServiceError { + if err := s.statusEventDao.DeleteAllReconciledEvents(ctx); err != nil { + return handleDeleteError("StatusEvent", errors.GeneralError("Unable to delete reconciled status events: %s", err)) + } + return nil +} diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 5122eeee..010151c6 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -27,14 +27,14 @@ func TestControllerRacing(t *testing.T) { // the event with create type. Due to the event lock, each create event // should be only processed once. var proccessedEvent []string - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { events, err := dao.All(ctx) if err != nil { return err } for _, evt := range events { - if evt.SourceID != resourceID { + if evt.SourceID != id { continue } if evt.EventType != api.CreateEventType { @@ -44,7 +44,7 @@ func TestControllerRacing(t *testing.T) { if evt.ReconciledDate != nil { continue } - proccessedEvent = append(proccessedEvent, resourceID) + proccessedEvent = append(proccessedEvent, id) } return nil @@ -100,7 +100,7 @@ func TestControllerReconcile(t *testing.T) { processedTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, // and then, the controller will requeue this event, at that time, we handle this event successfully. - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { processedTimes = processedTimes + 1 if processedTimes == 1 { return fmt.Errorf("failed to process the event") @@ -196,9 +196,9 @@ func TestControllerSync(t *testing.T) { } var proccessedEvents []string - onUpsert := func(ctx context.Context, eventID, resourceID string) error { + onUpsert := func(ctx context.Context, id string) error { // we just record the processed event - proccessedEvents = append(proccessedEvents, resourceID) + proccessedEvents = append(proccessedEvents, id) return nil }