Skip to content

Commit

Permalink
add status controller and status events table.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jun 25, 2024
1 parent 52299c9 commit 0410db8
Show file tree
Hide file tree
Showing 21 changed files with 588 additions and 252 deletions.
1 change: 1 addition & 0 deletions cmd/maestro/environments/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (e *Env) LoadServices() {
e.Services.Generic = NewGenericServiceLocator(e)
e.Services.Resources = NewResourceServiceLocator(e)
e.Services.Events = NewEventServiceLocator(e)
e.Services.StatusEvents = NewStatusEventServiceLocator(e)
e.Services.Consumers = NewConsumerServiceLocator(e)
}

Expand Down
8 changes: 8 additions & 0 deletions cmd/maestro/environments/service_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func NewEventServiceLocator(env *Env) EventServiceLocator {
}
}

type StatusEventServiceLocator func() services.StatusEventService

func NewStatusEventServiceLocator(env *Env) StatusEventServiceLocator {
return func() services.StatusEventService {
return services.NewStatusEventService(dao.NewStatusEventDao(&env.Database.SessionFactory))
}
}

type ConsumerServiceLocator func() services.ConsumerService

func NewConsumerServiceLocator(env *Env) ConsumerServiceLocator {
Expand Down
9 changes: 5 additions & 4 deletions cmd/maestro/environments/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type Handlers struct {
}

type Services struct {
Resources ResourceServiceLocator
Generic GenericServiceLocator
Events EventServiceLocator
Consumers ConsumerServiceLocator
Resources ResourceServiceLocator
Generic GenericServiceLocator
Events EventServiceLocator
StatusEvents StatusEventServiceLocator
Consumers ConsumerServiceLocator
}

type Clients struct {
Expand Down
29 changes: 22 additions & 7 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,34 @@ func NewControllersServer(pulseServer *PulseServer) *ControllersServer {
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
env().Services.StatusEvents(),
),
}

sourceClient := env().Clients.CloudEventsSource
s.KindControllerManager.Add(&controllers.ControllerConfig{
Source: "Resources",
Handlers: map[api.EventType][]controllers.ControllerHandlerFunc{
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
api.StatusUpdateEventType: {pulseServer.OnStatusUpdate},
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
},
})

s.StatusController.Add(map[api.StatusEventType][]controllers.StatusHandlerFunc{
api.StatusUpdateEventType: {pulseServer.OnStatusUpdate},
api.StatusDeleteEventType: {pulseServer.OnStatusUpdate},
})

return s
}

type ControllersServer struct {
KindControllerManager *controllers.KindControllerManager
DB db.SessionFactory
StatusController *controllers.StatusController

DB db.SessionFactory
}

// Start is a blocking call that starts this controller server
Expand All @@ -44,8 +53,14 @@ func (s ControllersServer) Start(ctx context.Context) {

log.Infof("Kind controller handling events")
go s.KindControllerManager.Run(ctx.Done())
log.Infof("Status controller handling events")
go s.StatusController.Run(ctx.Done())

log.Infof("Kind controller listening for events")
// blocking call
env().Database.SessionFactory.NewListener(ctx, "events", s.KindControllerManager.AddEvent)
go env().Database.SessionFactory.NewListener(ctx, "events", s.KindControllerManager.AddEvent)
log.Infof("Status controller listening for events")
go env().Database.SessionFactory.NewListener(ctx, "status_events", s.StatusController.AddStatusEvent)

// block until the context is done
<-ctx.Done()
}
189 changes: 82 additions & 107 deletions cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ var log = logger.NewOCMLogger(context.Background())
// checking the liveness of Maestro instances, triggering status resync based on
// instances' status and other conditions.
type PulseServer struct {
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster
resourceService services.ResourceService
eventService services.EventService
sourceClient cloudevents.SourceClient
statusDispatcher dispatcher.Dispatcher
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster
resourceService services.ResourceService
statusEventService services.StatusEventService
sourceClient cloudevents.SourceClient
statusDispatcher dispatcher.Dispatcher
}

func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer {
Expand All @@ -52,16 +52,16 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer {
}
sessionFactory := env().Database.SessionFactory
return &PulseServer{
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
sourceClient: env().Clients.CloudEventsSource,
statusDispatcher: statusDispatcher,
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
resourceService: env().Services.Resources(),
statusEventService: env().Services.StatusEvents(),
sourceClient: env().Clients.CloudEventsSource,
statusDispatcher: statusDispatcher,
}
}

Expand Down Expand Up @@ -179,62 +179,47 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
return nil
}

// // convert the resource status to cloudevent
// evt, err := api.JSONMAPToCloudEvent(resource.Status)
// if err != nil {
// return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
// }

// // decode the cloudevent data as manifest status
// statusPayload := &workpayload.ManifestStatus{}
// if err := evt.DataAs(statusPayload); err != nil {
// return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
// }

// // if the resource has been deleted from agent, delete it from maestro
// if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
// if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
// return svcErr
// }

// log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID)
// resource.Payload = found.Payload
// s.eventBroadcaster.Broadcast(resource)
// return nil
// }
// convert the resource status to cloudevent
evt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
}

// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
if err := evt.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
}

// if the resource has been deleted from agent, create status event and delete it from maestro
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
_, sErr := s.statusEventService.Create(ctx, &api.StatusEvent{
ResourceID: resource.ID,
Status: resource.Status,
StatusEventType: api.StatusDeleteEventType,
})
if sErr != nil {
return fmt.Errorf("failed to create status event for resource status delete %s: %s", resource.ID, sErr.Error())
}
if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
return fmt.Errorf("failed to delete resource %s: %s", resource.ID, svcErr.Error())
}
}

// update the resource status
_, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
if svcErr != nil {
return fmt.Errorf("failed to update resource status %s: %s", resource.ID, svcErr.Error())
}

// // broadcast the resource status updated only when the resource is updated
// if updated {
// log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID)
// s.eventBroadcaster.Broadcast(updatedResource)
// }
// create the status event only when the resource is updated
if updated {
evt, sErr := s.eventService.Create(ctx, &api.Event{
Source: "Resources",
SourceID: resource.ID,
EventType: api.StatusUpdateEventType,
_, sErr := s.statusEventService.Create(ctx, &api.StatusEvent{
ResourceID: resource.ID,
StatusEventType: api.StatusUpdateEventType,
})
if sErr != nil {
return fmt.Errorf("failed to create event for resource status update %s: %s", resource.ID, sErr.Error())
}
instances, err := s.instanceDao.All(ctx)
if err != nil {
return fmt.Errorf("failed to get all maestro instances: %s", err)
}
eventInstanceList := make([]*api.EventInstance, len(instances))
for i, instance := range instances {
eventInstanceList[i] = &api.EventInstance{
EventID: evt.ID,
InstanceID: instance.ID,
}
}
if err := s.eventInstanceDao.CreateList(ctx, eventInstanceList); err != nil {
return fmt.Errorf("failed to create event instances for resource status update %s: %s", resource.ID, err.Error())
return fmt.Errorf("failed to create status event for resource status update %s: %s", resource.ID, sErr.Error())
}
}
default:
Expand All @@ -245,52 +230,42 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
})
}

// On StatusUpdate does three things:
// 1. Broadcast the resource status update to subscribers
// 2. Mark the event instance as done
// 3. If the resource has been deleted from agent, delete it from maestro if all event instances are done
// On StatusUpdate will be called on each new status event inserted into db.
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
resource, sErr := s.resourceService.Get(ctx, resourceID)
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}

// broadcast the resource status updated to subscribers
s.eventBroadcaster.Broadcast(resource)

// mark the event instance as done
if err := s.eventInstanceDao.MarkAsDone(ctx, eventID, s.instanceID); err != nil {
return fmt.Errorf("failed to mark event instance (%s, %s) as done for resource status update %s: %s", eventID, s.instanceID, resourceID, err.Error())
}

// convert the resource status to cloudevent
cloudevt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
}

// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
if err := cloudevt.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

// if the resource has been deleted from agent, delete it from maestro
deleted := false
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
deleted = true
}
if deleted {
count, err := s.eventInstanceDao.GetUnhandleEventInstances(ctx, eventID)
if err != nil {
return fmt.Errorf("failed to get unhandled event instances for event %s: %s", eventID, err.Error())
var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Status: statusEvent.Status,
}
if count == 0 {
if sErr := s.resourceService.Delete(ctx, resourceID); sErr != nil {
return fmt.Errorf("failed to delete resource %s: %s", resourceID, sErr.Error())
}
} else {
resource, sErr = s.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

return nil
// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
s.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: s.instanceID,
})

return err
}
7 changes: 3 additions & 4 deletions pkg/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
type EventType string

const (
CreateEventType EventType = "Create"
UpdateEventType EventType = "Update"
DeleteEventType EventType = "Delete"
StatusUpdateEventType EventType = "StatusUpdate"
CreateEventType EventType = "Create"
UpdateEventType EventType = "Update"
DeleteEventType EventType = "Delete"
)

type Event struct {
Expand Down
1 change: 0 additions & 1 deletion pkg/api/event_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
type EventInstance struct {
EventID string
InstanceID string
Done bool
}

type EventInstanceList []*EventInstance
39 changes: 39 additions & 0 deletions pkg/api/status_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package api

import (
"time"

"gorm.io/datatypes"
"gorm.io/gorm"
)

type StatusEventType string

const (
StatusUpdateEventType StatusEventType = "StatusUpdate"
StatusDeleteEventType StatusEventType = "StatusDelete"
)

type StatusEvent struct {
Meta
ResourceID string
Status datatypes.JSONMap
StatusEventType StatusEventType // Update|Delete
ReconciledDate *time.Time `json:"gorm:null"`
}

type StatusEventList []*StatusEvent
type StatusEventIndex map[string]*StatusEvent

func (l StatusEventList) Index() StatusEventIndex {
index := StatusEventIndex{}
for _, o := range l {
index[o.ID] = o
}
return index
}

func (e *StatusEvent) BeforeCreate(tx *gorm.DB) error {
e.ID = NewID()
return nil
}
Loading

0 comments on commit 0410db8

Please sign in to comment.