Skip to content

Commit

Permalink
fix grpc status update.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jun 25, 2024
1 parent d5654af commit 52299c9
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 74 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ CLIENT_SECRET ?= maestro
ENABLE_JWT ?= true
ENABLE_AUTHZ ?= true
ENABLE_OCM_MOCK ?= false

ENABLE_GRPC ?= false

# default replicas for maestro server
REPLICAS ?= 1

# Enable set images
POSTGRES_IMAGE ?= docker.io/library/postgres:14.2
MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18
Expand Down Expand Up @@ -265,6 +267,7 @@ cmds:
--ignore-unknown-parameters="true" \
--param="ENVIRONMENT=$(OCM_ENV)" \
--param="GLOG_V=$(glog_v)" \
--param="REPLICAS=$(REPLICAS)" \
--param="DATABASE_HOST=$(db_host)" \
--param="DATABASE_NAME=$(db_name)" \
--param="DATABASE_PASSWORD=$(db_password)" \
Expand Down
2 changes: 1 addition & 1 deletion cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func runServer(cmd *cobra.Command, args []string) {
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
pulseServer := server.NewPulseServer(eventBroadcaster)
controllersServer := server.NewControllersServer()
controllersServer := server.NewControllersServer(pulseServer)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
10 changes: 5 additions & 5 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer() *ControllersServer {
func NewControllersServer(pulseServer *PulseServer) *ControllersServer {

s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
Expand All @@ -20,13 +20,13 @@ func NewControllersServer() *ControllersServer {
}

sourceClient := env().Clients.CloudEventsSource

s.KindControllerManager.Add(&controllers.ControllerConfig{
Source: "Resources",
Handlers: map[api.EventType][]controllers.ControllerHandlerFunc{
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
api.StatusUpdateEventType: {pulseServer.OnStatusUpdate},
},
})

Expand Down
128 changes: 103 additions & 25 deletions cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ type PulseServer struct {
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster
resourceService services.ResourceService
eventService services.EventService
sourceClient cloudevents.SourceClient
statusDispatcher dispatcher.Dispatcher
}
Expand All @@ -53,9 +55,11 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer {
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
sourceClient: env().Clients.CloudEventsSource,
statusDispatcher: statusDispatcher,
}
Expand Down Expand Up @@ -175,39 +179,63 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
return nil
}

// convert the resource status to cloudevent
evt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
}
// // convert the resource status to cloudevent
// evt, err := api.JSONMAPToCloudEvent(resource.Status)
// if err != nil {
// return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
// }

// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
if err := evt.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
}
// // decode the cloudevent data as manifest status
// statusPayload := &workpayload.ManifestStatus{}
// if err := evt.DataAs(statusPayload); err != nil {
// return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
// }

// if the resource has been deleted from agent, delete it from maestro
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
return svcErr
}
// // if the resource has been deleted from agent, delete it from maestro
// if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
// if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
// return svcErr
// }

log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID)
resource.Payload = found.Payload
s.eventBroadcaster.Broadcast(resource)
return nil
}
// log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID)
// resource.Payload = found.Payload
// s.eventBroadcaster.Broadcast(resource)
// return nil
// }
// update the resource status
updatedResource, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
_, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
if svcErr != nil {
return svcErr
return fmt.Errorf("failed to update resource status %s: %s", resource.ID, svcErr.Error())
}

// broadcast the resource status updated only when the resource is updated
// // broadcast the resource status updated only when the resource is updated
// if updated {
// log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID)
// s.eventBroadcaster.Broadcast(updatedResource)
// }
if updated {
log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID)
s.eventBroadcaster.Broadcast(updatedResource)
evt, sErr := s.eventService.Create(ctx, &api.Event{
Source: "Resources",
SourceID: resource.ID,
EventType: api.StatusUpdateEventType,
})
if sErr != nil {
return fmt.Errorf("failed to create event for resource status update %s: %s", resource.ID, sErr.Error())
}
instances, err := s.instanceDao.All(ctx)
if err != nil {
return fmt.Errorf("failed to get all maestro instances: %s", err)
}
eventInstanceList := make([]*api.EventInstance, len(instances))
for i, instance := range instances {
eventInstanceList[i] = &api.EventInstance{
EventID: evt.ID,
InstanceID: instance.ID,
}
}
if err := s.eventInstanceDao.CreateList(ctx, eventInstanceList); err != nil {
return fmt.Errorf("failed to create event instances for resource status update %s: %s", resource.ID, err.Error())
}
}
default:
return fmt.Errorf("unsupported action %s", action)
Expand All @@ -216,3 +244,53 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
return nil
})
}

// On StatusUpdate does three things:
// 1. Broadcast the resource status update to subscribers
// 2. Mark the event instance as done
// 3. If the resource has been deleted from agent, delete it from maestro if all event instances are done
func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
resource, sErr := s.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}

// broadcast the resource status updated to subscribers
s.eventBroadcaster.Broadcast(resource)

// mark the event instance as done
if err := s.eventInstanceDao.MarkAsDone(ctx, eventID, s.instanceID); err != nil {
return fmt.Errorf("failed to mark event instance (%s, %s) as done for resource status update %s: %s", eventID, s.instanceID, resourceID, err.Error())
}

// convert the resource status to cloudevent
cloudevt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
}

// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
if err := cloudevt.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
}

// if the resource has been deleted from agent, delete it from maestro
deleted := false
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
deleted = true
}
if deleted {
count, err := s.eventInstanceDao.GetUnhandleEventInstances(ctx, eventID)
if err != nil {
return fmt.Errorf("failed to get unhandled event instances for event %s: %s", eventID, err.Error())
}
if count == 0 {
if sErr := s.resourceService.Delete(ctx, resourceID); sErr != nil {
return fmt.Errorf("failed to delete resource %s: %s", resourceID, sErr.Error())
}
}
}

return nil
}
9 changes: 5 additions & 4 deletions pkg/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
type EventType string

const (
CreateEventType EventType = "Create"
UpdateEventType EventType = "Update"
DeleteEventType EventType = "Delete"
CreateEventType EventType = "Create"
UpdateEventType EventType = "Update"
DeleteEventType EventType = "Delete"
StatusUpdateEventType EventType = "StatusUpdate"
)

type Event struct {
Meta
Source string // MyTable
SourceID string // primary key of MyTable
EventType EventType // Add|Update|Delete
EventType EventType // Add|Update|Delete|StatusUpdate
ReconciledDate *time.Time `json:"gorm:null"`
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/api/event_instances.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package api

type EventInstance struct {
EventID string
InstanceID string
Done bool
}

type EventInstanceList []*EventInstance
18 changes: 9 additions & 9 deletions pkg/client/cloudevents/source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
// SourceClient is an interface for publishing resource events to consumers
// subscribing to and resyncing resource status from consumers.
type SourceClient interface {
OnCreate(ctx context.Context, id string) error
OnUpdate(ctx context.Context, id string) error
OnDelete(ctx context.Context, id string) error
OnCreate(ctx context.Context, eventID, resourceID string) error
OnUpdate(ctx context.Context, eventID, resourceID string) error
OnDelete(ctx context.Context, eventID, resourceID string) error
Subscribe(ctx context.Context, handlers ...cegeneric.ResourceHandler[*api.Resource])
Resync(ctx context.Context, consumers []string) error
ReconnectedChan() <-chan struct{}
Expand Down Expand Up @@ -49,10 +49,10 @@ func NewSourceClient(sourceOptions *ceoptions.CloudEventsSourceOptions, resource
}, nil
}

func (s *SourceClientImpl) OnCreate(ctx context.Context, id string) error {
func (s *SourceClientImpl) OnCreate(ctx context.Context, eventID, resourceID string) error {
logger := logger.NewOCMLogger(ctx)

resource, err := s.ResourceService.Get(ctx, id)
resource, err := s.ResourceService.Get(ctx, resourceID)
if err != nil {
return err
}
Expand All @@ -74,10 +74,10 @@ func (s *SourceClientImpl) OnCreate(ctx context.Context, id string) error {
return nil
}

func (s *SourceClientImpl) OnUpdate(ctx context.Context, id string) error {
func (s *SourceClientImpl) OnUpdate(ctx context.Context, eventID, resourceID string) error {
logger := logger.NewOCMLogger(ctx)

resource, err := s.ResourceService.Get(ctx, id)
resource, err := s.ResourceService.Get(ctx, resourceID)
if err != nil {
return err
}
Expand All @@ -99,10 +99,10 @@ func (s *SourceClientImpl) OnUpdate(ctx context.Context, id string) error {
return nil
}

func (s *SourceClientImpl) OnDelete(ctx context.Context, id string) error {
func (s *SourceClientImpl) OnDelete(ctx context.Context, eventID, resourceID string) error {
logger := logger.NewOCMLogger(ctx)

resource, err := s.ResourceService.Get(ctx, id)
resource, err := s.ResourceService.Get(ctx, resourceID)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/client/cloudevents/source_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewSourceClientMock(resourceService services.ResourceService) SourceClient
}
}

func (s *SourceClientMock) OnCreate(ctx context.Context, id string) error {
resource, serviceErr := s.ResourceService.Get(ctx, id)
func (s *SourceClientMock) OnCreate(ctx context.Context, eventID, resourceID string) error {
resource, serviceErr := s.ResourceService.Get(ctx, resourceID)
if serviceErr != nil {
return fmt.Errorf("failed to get resource: %v", serviceErr)
}
Expand Down Expand Up @@ -78,8 +78,8 @@ func (s *SourceClientMock) OnCreate(ctx context.Context, id string) error {
return nil
}

func (s *SourceClientMock) OnUpdate(ctx context.Context, id string) error {
resource, serviceErr := s.ResourceService.Get(ctx, id)
func (s *SourceClientMock) OnUpdate(ctx context.Context, eventID, resourceID string) error {
resource, serviceErr := s.ResourceService.Get(ctx, resourceID)
if serviceErr != nil {
return fmt.Errorf("failed to get resource: %v", serviceErr)
}
Expand Down Expand Up @@ -136,8 +136,8 @@ func (s *SourceClientMock) OnUpdate(ctx context.Context, id string) error {
return nil
}

func (s *SourceClientMock) OnDelete(ctx context.Context, id string) error {
resource, serviceErr := s.ResourceService.Get(ctx, id)
func (s *SourceClientMock) OnDelete(ctx context.Context, eventID, resourceID string) error {
resource, serviceErr := s.ResourceService.Get(ctx, resourceID)
if serviceErr != nil {
return fmt.Errorf("failed to get resource: %v", serviceErr)
}
Expand Down
Loading

0 comments on commit 52299c9

Please sign in to comment.