From 52299c9b17d29e988ca8fd5fd796b1091f7c0576 Mon Sep 17 00:00:00 2001 From: morvencao Date: Fri, 21 Jun 2024 13:00:28 +0000 Subject: [PATCH] fix grpc status update. Signed-off-by: morvencao --- Makefile | 5 +- cmd/maestro/servecmd/cmd.go | 2 +- cmd/maestro/server/controllers.go | 10 +- cmd/maestro/server/pulse_server.go | 128 ++++++++++++++---- pkg/api/event.go | 9 +- pkg/api/event_instances.go | 9 ++ pkg/client/cloudevents/source_client.go | 18 +-- pkg/client/cloudevents/source_client_mock.go | 12 +- pkg/controllers/framework.go | 28 ++-- pkg/controllers/framework_test.go | 6 +- pkg/dao/event_instances.go | 77 +++++++++++ pkg/db/migrations/202309020925_add_events.go | 2 +- .../202406211556_add_event_instances.go | 63 +++++++++ pkg/db/migrations/migration_structs.go | 1 + test/e2e/setup/e2e_setup.sh | 1 + test/integration/controller_test.go | 12 +- 16 files changed, 309 insertions(+), 74 deletions(-) create mode 100644 pkg/api/event_instances.go create mode 100644 pkg/dao/event_instances.go create mode 100644 pkg/db/migrations/202406211556_add_event_instances.go diff --git a/Makefile b/Makefile index ffb1794e..5e0778b7 100755 --- a/Makefile +++ b/Makefile @@ -76,9 +76,11 @@ CLIENT_SECRET ?= maestro ENABLE_JWT ?= true ENABLE_AUTHZ ?= true ENABLE_OCM_MOCK ?= false - ENABLE_GRPC ?= false +# default replicas for maestro server +REPLICAS ?= 1 + # Enable set images POSTGRES_IMAGE ?= docker.io/library/postgres:14.2 MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18 @@ -265,6 +267,7 @@ cmds: --ignore-unknown-parameters="true" \ --param="ENVIRONMENT=$(OCM_ENV)" \ --param="GLOG_V=$(glog_v)" \ + --param="REPLICAS=$(REPLICAS)" \ --param="DATABASE_HOST=$(db_host)" \ --param="DATABASE_NAME=$(db_name)" \ --param="DATABASE_PASSWORD=$(db_password)" \ diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index 9f239855..3b2f51ed 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -43,7 +43,7 @@ func runServer(cmd *cobra.Command, args []string) { metricsServer := server.NewMetricsServer() healthcheckServer := server.NewHealthCheckServer() pulseServer := server.NewPulseServer(eventBroadcaster) - controllersServer := server.NewControllersServer() + controllersServer := server.NewControllersServer(pulseServer) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index f453d6a0..b826eb7e 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -10,7 +10,7 @@ import ( "github.com/openshift-online/maestro/pkg/logger" ) -func NewControllersServer() *ControllersServer { +func NewControllersServer(pulseServer *PulseServer) *ControllersServer { s := &ControllersServer{ KindControllerManager: controllers.NewKindControllerManager( @@ -20,13 +20,13 @@ func NewControllersServer() *ControllersServer { } sourceClient := env().Clients.CloudEventsSource - s.KindControllerManager.Add(&controllers.ControllerConfig{ Source: "Resources", Handlers: map[api.EventType][]controllers.ControllerHandlerFunc{ - api.CreateEventType: {sourceClient.OnCreate}, - api.UpdateEventType: {sourceClient.OnUpdate}, - api.DeleteEventType: {sourceClient.OnDelete}, + api.CreateEventType: {sourceClient.OnCreate}, + api.UpdateEventType: {sourceClient.OnUpdate}, + api.DeleteEventType: {sourceClient.OnDelete}, + api.StatusUpdateEventType: {pulseServer.OnStatusUpdate}, }, }) diff --git a/cmd/maestro/server/pulse_server.go b/cmd/maestro/server/pulse_server.go index e7c86a70..59791159 100644 --- a/cmd/maestro/server/pulse_server.go +++ b/cmd/maestro/server/pulse_server.go @@ -31,9 +31,11 @@ type PulseServer struct { instanceID string pulseInterval int64 instanceDao dao.InstanceDao + eventInstanceDao dao.EventInstanceDao lockFactory db.LockFactory eventBroadcaster *event.EventBroadcaster resourceService services.ResourceService + eventService services.EventService sourceClient cloudevents.SourceClient statusDispatcher dispatcher.Dispatcher } @@ -53,9 +55,11 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer { instanceID: env().Config.MessageBroker.ClientID, pulseInterval: env().Config.PulseServer.PulseInterval, instanceDao: dao.NewInstanceDao(&sessionFactory), + eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory), lockFactory: db.NewAdvisoryLockFactory(sessionFactory), eventBroadcaster: eventBroadcaster, resourceService: env().Services.Resources(), + eventService: env().Services.Events(), sourceClient: env().Clients.CloudEventsSource, statusDispatcher: statusDispatcher, } @@ -175,39 +179,63 @@ func (s *PulseServer) startSubscription(ctx context.Context) { return nil } - // convert the resource status to cloudevent - evt, err := api.JSONMAPToCloudEvent(resource.Status) - if err != nil { - return fmt.Errorf("failed to convert resource status to cloudevent: %v", err) - } + // // convert the resource status to cloudevent + // evt, err := api.JSONMAPToCloudEvent(resource.Status) + // if err != nil { + // return fmt.Errorf("failed to convert resource status to cloudevent: %v", err) + // } - // decode the cloudevent data as manifest status - statusPayload := &workpayload.ManifestStatus{} - if err := evt.DataAs(statusPayload); err != nil { - return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err) - } + // // decode the cloudevent data as manifest status + // statusPayload := &workpayload.ManifestStatus{} + // if err := evt.DataAs(statusPayload); err != nil { + // return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err) + // } - // if the resource has been deleted from agent, delete it from maestro - if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) { - if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil { - return svcErr - } + // // if the resource has been deleted from agent, delete it from maestro + // if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) { + // if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil { + // return svcErr + // } - log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID) - resource.Payload = found.Payload - s.eventBroadcaster.Broadcast(resource) - return nil - } + // log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID) + // resource.Payload = found.Payload + // s.eventBroadcaster.Broadcast(resource) + // return nil + // } // update the resource status - updatedResource, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource) + _, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource) if svcErr != nil { - return svcErr + return fmt.Errorf("failed to update resource status %s: %s", resource.ID, svcErr.Error()) } - // broadcast the resource status updated only when the resource is updated + // // broadcast the resource status updated only when the resource is updated + // if updated { + // log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID) + // s.eventBroadcaster.Broadcast(updatedResource) + // } if updated { - log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID) - s.eventBroadcaster.Broadcast(updatedResource) + evt, sErr := s.eventService.Create(ctx, &api.Event{ + Source: "Resources", + SourceID: resource.ID, + EventType: api.StatusUpdateEventType, + }) + if sErr != nil { + return fmt.Errorf("failed to create event for resource status update %s: %s", resource.ID, sErr.Error()) + } + instances, err := s.instanceDao.All(ctx) + if err != nil { + return fmt.Errorf("failed to get all maestro instances: %s", err) + } + eventInstanceList := make([]*api.EventInstance, len(instances)) + for i, instance := range instances { + eventInstanceList[i] = &api.EventInstance{ + EventID: evt.ID, + InstanceID: instance.ID, + } + } + if err := s.eventInstanceDao.CreateList(ctx, eventInstanceList); err != nil { + return fmt.Errorf("failed to create event instances for resource status update %s: %s", resource.ID, err.Error()) + } } default: return fmt.Errorf("unsupported action %s", action) @@ -216,3 +244,53 @@ func (s *PulseServer) startSubscription(ctx context.Context) { return nil }) } + +// On StatusUpdate does three things: +// 1. Broadcast the resource status update to subscribers +// 2. Mark the event instance as done +// 3. If the resource has been deleted from agent, delete it from maestro if all event instances are done +func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { + resource, sErr := s.resourceService.Get(ctx, resourceID) + if sErr != nil { + return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) + } + + // broadcast the resource status updated to subscribers + s.eventBroadcaster.Broadcast(resource) + + // mark the event instance as done + if err := s.eventInstanceDao.MarkAsDone(ctx, eventID, s.instanceID); err != nil { + return fmt.Errorf("failed to mark event instance (%s, %s) as done for resource status update %s: %s", eventID, s.instanceID, resourceID, err.Error()) + } + + // convert the resource status to cloudevent + cloudevt, err := api.JSONMAPToCloudEvent(resource.Status) + if err != nil { + return fmt.Errorf("failed to convert resource status to cloudevent: %v", err) + } + + // decode the cloudevent data as manifest status + statusPayload := &workpayload.ManifestStatus{} + if err := cloudevt.DataAs(statusPayload); err != nil { + return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err) + } + + // if the resource has been deleted from agent, delete it from maestro + deleted := false + if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) { + deleted = true + } + if deleted { + count, err := s.eventInstanceDao.GetUnhandleEventInstances(ctx, eventID) + if err != nil { + return fmt.Errorf("failed to get unhandled event instances for event %s: %s", eventID, err.Error()) + } + if count == 0 { + if sErr := s.resourceService.Delete(ctx, resourceID); sErr != nil { + return fmt.Errorf("failed to delete resource %s: %s", resourceID, sErr.Error()) + } + } + } + + return nil +} diff --git a/pkg/api/event.go b/pkg/api/event.go index b2b726e9..659adb30 100755 --- a/pkg/api/event.go +++ b/pkg/api/event.go @@ -9,16 +9,17 @@ import ( type EventType string const ( - CreateEventType EventType = "Create" - UpdateEventType EventType = "Update" - DeleteEventType EventType = "Delete" + CreateEventType EventType = "Create" + UpdateEventType EventType = "Update" + DeleteEventType EventType = "Delete" + StatusUpdateEventType EventType = "StatusUpdate" ) type Event struct { Meta Source string // MyTable SourceID string // primary key of MyTable - EventType EventType // Add|Update|Delete + EventType EventType // Add|Update|Delete|StatusUpdate ReconciledDate *time.Time `json:"gorm:null"` } diff --git a/pkg/api/event_instances.go b/pkg/api/event_instances.go new file mode 100644 index 00000000..e0be9631 --- /dev/null +++ b/pkg/api/event_instances.go @@ -0,0 +1,9 @@ +package api + +type EventInstance struct { + EventID string + InstanceID string + Done bool +} + +type EventInstanceList []*EventInstance diff --git a/pkg/client/cloudevents/source_client.go b/pkg/client/cloudevents/source_client.go index 2245f4f3..7b63a540 100644 --- a/pkg/client/cloudevents/source_client.go +++ b/pkg/client/cloudevents/source_client.go @@ -17,9 +17,9 @@ import ( // SourceClient is an interface for publishing resource events to consumers // subscribing to and resyncing resource status from consumers. type SourceClient interface { - OnCreate(ctx context.Context, id string) error - OnUpdate(ctx context.Context, id string) error - OnDelete(ctx context.Context, id string) error + OnCreate(ctx context.Context, eventID, resourceID string) error + OnUpdate(ctx context.Context, eventID, resourceID string) error + OnDelete(ctx context.Context, eventID, resourceID string) error Subscribe(ctx context.Context, handlers ...cegeneric.ResourceHandler[*api.Resource]) Resync(ctx context.Context, consumers []string) error ReconnectedChan() <-chan struct{} @@ -49,10 +49,10 @@ func NewSourceClient(sourceOptions *ceoptions.CloudEventsSourceOptions, resource }, nil } -func (s *SourceClientImpl) OnCreate(ctx context.Context, id string) error { +func (s *SourceClientImpl) OnCreate(ctx context.Context, eventID, resourceID string) error { logger := logger.NewOCMLogger(ctx) - resource, err := s.ResourceService.Get(ctx, id) + resource, err := s.ResourceService.Get(ctx, resourceID) if err != nil { return err } @@ -74,10 +74,10 @@ func (s *SourceClientImpl) OnCreate(ctx context.Context, id string) error { return nil } -func (s *SourceClientImpl) OnUpdate(ctx context.Context, id string) error { +func (s *SourceClientImpl) OnUpdate(ctx context.Context, eventID, resourceID string) error { logger := logger.NewOCMLogger(ctx) - resource, err := s.ResourceService.Get(ctx, id) + resource, err := s.ResourceService.Get(ctx, resourceID) if err != nil { return err } @@ -99,10 +99,10 @@ func (s *SourceClientImpl) OnUpdate(ctx context.Context, id string) error { return nil } -func (s *SourceClientImpl) OnDelete(ctx context.Context, id string) error { +func (s *SourceClientImpl) OnDelete(ctx context.Context, eventID, resourceID string) error { logger := logger.NewOCMLogger(ctx) - resource, err := s.ResourceService.Get(ctx, id) + resource, err := s.ResourceService.Get(ctx, resourceID) if err != nil { return err } diff --git a/pkg/client/cloudevents/source_client_mock.go b/pkg/client/cloudevents/source_client_mock.go index d8566099..1b59c866 100644 --- a/pkg/client/cloudevents/source_client_mock.go +++ b/pkg/client/cloudevents/source_client_mock.go @@ -39,8 +39,8 @@ func NewSourceClientMock(resourceService services.ResourceService) SourceClient } } -func (s *SourceClientMock) OnCreate(ctx context.Context, id string) error { - resource, serviceErr := s.ResourceService.Get(ctx, id) +func (s *SourceClientMock) OnCreate(ctx context.Context, eventID, resourceID string) error { + resource, serviceErr := s.ResourceService.Get(ctx, resourceID) if serviceErr != nil { return fmt.Errorf("failed to get resource: %v", serviceErr) } @@ -78,8 +78,8 @@ func (s *SourceClientMock) OnCreate(ctx context.Context, id string) error { return nil } -func (s *SourceClientMock) OnUpdate(ctx context.Context, id string) error { - resource, serviceErr := s.ResourceService.Get(ctx, id) +func (s *SourceClientMock) OnUpdate(ctx context.Context, eventID, resourceID string) error { + resource, serviceErr := s.ResourceService.Get(ctx, resourceID) if serviceErr != nil { return fmt.Errorf("failed to get resource: %v", serviceErr) } @@ -136,8 +136,8 @@ func (s *SourceClientMock) OnUpdate(ctx context.Context, id string) error { return nil } -func (s *SourceClientMock) OnDelete(ctx context.Context, id string) error { - resource, serviceErr := s.ResourceService.Get(ctx, id) +func (s *SourceClientMock) OnDelete(ctx context.Context, eventID, resourceID string) error { + resource, serviceErr := s.ResourceService.Get(ctx, resourceID) if serviceErr != nil { return fmt.Errorf("failed to get resource: %v", serviceErr) } diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 608c56be..5001320c 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -45,7 +45,7 @@ var logger = maestrologger.NewOCMLogger(context.Background()) // events sync will help us to handle unexpected errors (e.g. sever restart), it ensures we will not miss any events var defaultEventsSyncPeriod = 10 * time.Hour -type ControllerHandlerFunc func(ctx context.Context, id string) error +type ControllerHandlerFunc func(ctx context.Context, eventID, sourceID string) error type ControllerConfig struct { Source string @@ -120,11 +120,6 @@ func (km *KindControllerManager) handleEvent(id string) error { return fmt.Errorf("error obtaining the event lock: %v", err) } - if !acquired { - logger.Infof("Event %s is processed by another worker, continue to process the next", id) - return nil - } - reqContext := context.WithValue(ctx, EventID, id) event, svcErr := km.events.Get(reqContext, id) @@ -136,6 +131,11 @@ func (km *KindControllerManager) handleEvent(id string) error { return fmt.Errorf("error getting event with id(%s): %s", id, svcErr) } + if !acquired && event.EventType != api.StatusUpdateEventType { + logger.Infof("Event %s is processed by another worker, continue to process the next", id) + return nil + } + if event.ReconciledDate != nil { return nil } @@ -153,18 +153,20 @@ func (km *KindControllerManager) handleEvent(id string) error { } for _, fn := range handlerFns { - err := fn(reqContext, event.SourceID) + err := fn(reqContext, id, event.SourceID) if err != nil { - return fmt.Errorf("error handing event %s, %s, %s: %s", event.Source, event.EventType, id, err) + return fmt.Errorf("error handling event %s, %s, %s: %s", event.Source, event.EventType, id, err) } } // all handlers successfully executed - now := time.Now() - event.ReconciledDate = &now - _, svcErr = km.events.Replace(reqContext, event) - if svcErr != nil { - return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) + if event.EventType != api.StatusUpdateEventType { + now := time.Now() + event.ReconciledDate = &now + _, svcErr = km.events.Replace(reqContext, event) + if svcErr != nil { + return fmt.Errorf("error updating event with id(%s): %s", id, svcErr) + } } return nil } diff --git a/pkg/controllers/framework_test.go b/pkg/controllers/framework_test.go index 3365073b..a0f3aedb 100755 --- a/pkg/controllers/framework_test.go +++ b/pkg/controllers/framework_test.go @@ -28,17 +28,17 @@ type exampleController struct { deleteCounter int } -func (d *exampleController) OnAdd(ctx context.Context, id string) error { +func (d *exampleController) OnAdd(ctx context.Context, eventID, sourceID string) error { d.addCounter++ return nil } -func (d *exampleController) OnUpdate(ctx context.Context, id string) error { +func (d *exampleController) OnUpdate(ctx context.Context, eventID, sourceID string) error { d.updateCounter++ return nil } -func (d *exampleController) OnDelete(ctx context.Context, id string) error { +func (d *exampleController) OnDelete(ctx context.Context, eventID, sourceID string) error { d.deleteCounter++ return nil } diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instances.go new file mode 100644 index 00000000..382eca14 --- /dev/null +++ b/pkg/dao/event_instances.go @@ -0,0 +1,77 @@ +package dao + +import ( + "context" + + "gorm.io/gorm/clause" + + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/db" +) + +type EventInstanceDao interface { + Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) + Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) + CreateList(ctx context.Context, eventInstanceList api.EventInstanceList) error + MarkAsDone(ctx context.Context, eventID, instanceID string) error + GetUnhandleEventInstances(ctx context.Context, eventID string) (int64, error) +} + +var _ EventInstanceDao = &sqlEventInstanceDao{} + +type sqlEventInstanceDao struct { + sessionFactory *db.SessionFactory +} + +func NewEventInstanceDao(sessionFactory *db.SessionFactory) EventInstanceDao { + return &sqlEventInstanceDao{sessionFactory: sessionFactory} +} + +func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) { + g2 := (*d.sessionFactory).New(ctx) + eventInstance := api.EventInstance{} + err := g2.Take(&eventInstance, "event_id = ? AND instance_id = ?", eventID, instanceID).Error + if err != nil { + return nil, err + } + + return &eventInstance, nil +} + +func (d *sqlEventInstanceDao) GetUnhandleEventInstances(ctx context.Context, eventID string) (int64, error) { + g2 := (*d.sessionFactory).New(ctx) + var count int64 + err := g2.Model(&api.EventInstance{}).Where("event_id = ? AND done = ?", eventID, false).Count(&count).Error + if err != nil { + return 0, err + } + + return count, nil +} + +func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil { + db.MarkForRollback(ctx, err) + return nil, err + } + return eventInstance, nil +} + +func (d *sqlEventInstanceDao) CreateList(ctx context.Context, eventInstanceList api.EventInstanceList) error { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).CreateInBatches(eventInstanceList, len(eventInstanceList)).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil +} + +func (d *sqlEventInstanceDao) MarkAsDone(ctx context.Context, eventID, instanceID string) error { + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Omit(clause.Associations).Table("event_instances").Where("event_id = ? AND instance_id = ?", eventID, instanceID).Update("done", true).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil +} diff --git a/pkg/db/migrations/202309020925_add_events.go b/pkg/db/migrations/202309020925_add_events.go index 3cc3728b..cd83bc9c 100755 --- a/pkg/db/migrations/202309020925_add_events.go +++ b/pkg/db/migrations/202309020925_add_events.go @@ -15,7 +15,7 @@ func addEvents() *gormigrate.Migration { // SourceID must be an indexable key for querying, *not* a json data payload. // an indexed column of data json data would explode SourceID string `gorm:"index"` // primary key of MyTable - EventType string // Add|Update|Delete, any string + EventType string // Add|Update|Delete|StatusUpdate, any string ReconciledDate *time.Time `gorm:"null;index"` } diff --git a/pkg/db/migrations/202406211556_add_event_instances.go b/pkg/db/migrations/202406211556_add_event_instances.go new file mode 100644 index 00000000..72353394 --- /dev/null +++ b/pkg/db/migrations/202406211556_add_event_instances.go @@ -0,0 +1,63 @@ +package migrations + +import ( + "gorm.io/gorm" + + "github.com/go-gormigrate/gormigrate/v2" +) + +func addEventInstances() *gormigrate.Migration { + type EventInstance struct { + EventID string `gorm:"index"` // primary key of events table + InstanceID string `gorm:"index"` // primary key of server_instances table + Done bool + } + + return &gormigrate.Migration{ + ID: "202406211556", + Migrate: func(tx *gorm.DB) error { + if err := tx.AutoMigrate(&EventInstance{}); err != nil { + return err + } + // Step 1: Create the Trigger Function + triggerFunctionSQL := ` + CREATE OR REPLACE FUNCTION check_event_instances_done() + RETURNS TRIGGER AS $$ + BEGIN + RAISE NOTICE 'Checking event_id: %, undone count: %', NEW.event_id, + (SELECT COUNT(*) FROM event_instances WHERE event_id = NEW.event_id AND done = false); + IF (SELECT COUNT(*) FROM event_instances WHERE event_id = NEW.event_id AND done = false) = 0 THEN + RAISE NOTICE 'All instances done, updating reconciled_date for event_id: %', NEW.event_id; + UPDATE events SET reconciled_date = NOW() WHERE id = NEW.event_id; + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + ` + if err := tx.Exec(triggerFunctionSQL).Error; err != nil { + return err + } + // Step 2: Create the Trigger + triggerSQL := ` + CREATE TRIGGER trg_check_event_instances_done + AFTER UPDATE ON event_instances + FOR EACH ROW + EXECUTE FUNCTION check_event_instances_done(); + ` + return tx.Exec(triggerSQL).Error + }, + Rollback: func(tx *gorm.DB) error { + // Rollback function to drop the trigger and trigger function + dropTriggerSQL := `DROP TRIGGER IF EXISTS trg_check_event_instances_done ON event_instances;` + if err := tx.Exec(dropTriggerSQL).Error; err != nil { + return err + } + dropFunctionSQL := `DROP FUNCTION IF EXISTS check_event_instances_done;` + if err := tx.Exec(dropFunctionSQL).Error; err != nil { + return err + } + + return tx.Migrator().DropTable(&EventInstance{}) + }, + } +} diff --git a/pkg/db/migrations/migration_structs.go b/pkg/db/migrations/migration_structs.go index f41484a2..927ffc99 100755 --- a/pkg/db/migrations/migration_structs.go +++ b/pkg/db/migrations/migration_structs.go @@ -32,6 +32,7 @@ var MigrationList = []*gormigrate.Migration{ addConsumers(), dropDinosaurs(), addServerInstances(), + addEventInstances(), } // Model represents the base model struct. All entities will have this struct embedded. diff --git a/test/e2e/setup/e2e_setup.sh b/test/e2e/setup/e2e_setup.sh index cfde4ee6..db2985bb 100755 --- a/test/e2e/setup/e2e_setup.sh +++ b/test/e2e/setup/e2e_setup.sh @@ -76,6 +76,7 @@ kubectl $1 apply -f ./test/e2e/setup/service-ca/ export ENABLE_JWT=false export ENABLE_OCM_MOCK=true export ENABLE_GRPC=true +export REPLICAS=2 kubectl create namespace $namespace || true make template \ deploy-secrets \ diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index 010151c6..5122eeee 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -27,14 +27,14 @@ func TestControllerRacing(t *testing.T) { // the event with create type. Due to the event lock, each create event // should be only processed once. var proccessedEvent []string - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { events, err := dao.All(ctx) if err != nil { return err } for _, evt := range events { - if evt.SourceID != id { + if evt.SourceID != resourceID { continue } if evt.EventType != api.CreateEventType { @@ -44,7 +44,7 @@ func TestControllerRacing(t *testing.T) { if evt.ReconciledDate != nil { continue } - proccessedEvent = append(proccessedEvent, id) + proccessedEvent = append(proccessedEvent, resourceID) } return nil @@ -100,7 +100,7 @@ func TestControllerReconcile(t *testing.T) { processedTimes := 0 // this handler will return an error at the first time to simulate an error happened when handing an event, // and then, the controller will requeue this event, at that time, we handle this event successfully. - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { processedTimes = processedTimes + 1 if processedTimes == 1 { return fmt.Errorf("failed to process the event") @@ -196,9 +196,9 @@ func TestControllerSync(t *testing.T) { } var proccessedEvents []string - onUpsert := func(ctx context.Context, id string) error { + onUpsert := func(ctx context.Context, eventID, resourceID string) error { // we just record the processed event - proccessedEvents = append(proccessedEvents, id) + proccessedEvents = append(proccessedEvents, resourceID) return nil }