diff --git a/cloudevents/generic/agentclient.go b/cloudevents/generic/agentclient.go index 412e0bc3f..95691d660 100644 --- a/cloudevents/generic/agentclient.go +++ b/cloudevents/generic/agentclient.go @@ -4,11 +4,9 @@ import ( "context" "fmt" "strconv" - "time" cloudevents "github.com/cloudevents/sdk-go/v2" - "k8s.io/client-go/util/flowcontrol" "k8s.io/klog/v2" "open-cluster-management.io/api/cloudevents/generic/options" @@ -21,14 +19,12 @@ import ( // An agent is a component that handles the deployment of requested resources on the managed cluster and status report // to the source. type CloudEventAgentClient[T ResourceObject] struct { - cloudEventsOptions options.CloudEventsOptions - cloudEventsClient cloudevents.Client - lister Lister[T] - codecs map[types.CloudEventsDataType]Codec[T] - statusHashGetter StatusHashGetter[T] - rateLimiter flowcontrol.RateLimiter - agentID string - clusterName string + *baseClient + lister Lister[T] + codecs map[types.CloudEventsDataType]Codec[T] + statusHashGetter StatusHashGetter[T] + agentID string + clusterName string } // NewCloudEventAgentClient returns an instance for CloudEventAgentClient. The following arguments are required to @@ -45,8 +41,12 @@ func NewCloudEventAgentClient[T ResourceObject]( statusHashGetter StatusHashGetter[T], codecs ...Codec[T], ) (*CloudEventAgentClient[T], error) { - cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx) - if err != nil { + baseClient := &baseClient{ + cloudEventsOptions: agentOptions.CloudEventsOptions, + cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit), + } + + if err := baseClient.connect(ctx); err != nil { return nil, err } @@ -56,14 +56,12 @@ func NewCloudEventAgentClient[T ResourceObject]( } return &CloudEventAgentClient[T]{ - cloudEventsOptions: agentOptions.CloudEventsOptions, - cloudEventsClient: cloudEventsClient, - lister: lister, - codecs: evtCodes, - statusHashGetter: statusHashGetter, - rateLimiter: NewRateLimiter(agentOptions.EventRateLimit), - agentID: agentOptions.AgentID, - clusterName: agentOptions.ClusterName, + baseClient: baseClient, + lister: lister, + codecs: evtCodes, + statusHashGetter: statusHashGetter, + agentID: agentOptions.AgentID, + clusterName: agentOptions.ClusterName, }, nil } @@ -101,12 +99,7 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error { return fmt.Errorf("failed to set data to cloud event: %v", err) } - sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) - if err != nil { - return err - } - - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil { + if err := c.publish(ctx, evt); err != nil { return err } } @@ -130,12 +123,7 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types. return err } - sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) - if err != nil { - return err - } - - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil { + if err := c.publish(ctx, *evt); err != nil { return err } @@ -145,63 +133,67 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types. // Subscribe the events that are from the source status resync request or source resource spec request. // For status resync request, agent publish the current resources status back as response. // For resource spec request, agent receives resource spec and handles the spec with resource handlers. -func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error { - return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) { - klog.V(4).Infof("Received event:\n%s", evt) - - eventType, err := types.ParseCloudEventsType(evt.Type()) - if err != nil { - klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err) - return - } +func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) { + c.subscribe(ctx, func(ctx context.Context, evt cloudevents.Event) { + c.receive(ctx, evt, handlers...) + }) +} - if eventType.Action == types.ResyncRequestAction { - if eventType.SubResource != types.SubResourceStatus { - klog.Warningf("unsupported resync event type %s, ignore", eventType) - return - } +func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) { + klog.V(4).Infof("Received event:\n%s", evt) - if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil { - klog.Errorf("failed to resync manifestsstatus, %v", err) - } + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err) + return + } + if eventType.Action == types.ResyncRequestAction { + if eventType.SubResource != types.SubResourceStatus { + klog.Warningf("unsupported resync event type %s, ignore", eventType) return } - if eventType.SubResource != types.SubResourceSpec { - klog.Warningf("unsupported event type %s, ignore", eventType) - return + if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil { + klog.Errorf("failed to resync manifestsstatus, %v", err) } - codec, ok := c.codecs[eventType.CloudEventsDataType] - if !ok { - klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType) - return - } + return + } - obj, err := codec.Decode(&evt) - if err != nil { - klog.Errorf("failed to decode spec, %v", err) - return - } + if eventType.SubResource != types.SubResourceSpec { + klog.Warningf("unsupported event type %s, ignore", eventType) + return + } - action, err := c.specAction(evt.Source(), obj) - if err != nil { - klog.Errorf("failed to generate spec action %s, %v", evt, err) - return - } + codec, ok := c.codecs[eventType.CloudEventsDataType] + if !ok { + klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType) + return + } - if len(action) == 0 { - // no action is required, ignore - return - } + obj, err := codec.Decode(&evt) + if err != nil { + klog.Errorf("failed to decode spec, %v", err) + return + } - for _, handler := range handlers { - if err := handler(action, obj); err != nil { - klog.Errorf("failed to handle spec event %s, %v", evt, err) - } + action, err := c.specAction(evt.Source(), obj) + if err != nil { + klog.Errorf("failed to generate spec action %s, %v", evt, err) + return + } + + if len(action) == 0 { + // no action is required, ignore + return + } + + for _, handler := range handlers { + if err := handler(action, obj); err != nil { + klog.Errorf("failed to handle spec event %s, %v", evt, err) } - }) + } } // Upon receiving the status resync event, the agent responds by sending resource status events to the broker as @@ -287,30 +279,6 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R return types.Modified, nil } -func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimiter, - sender cloudevents.Client, evt cloudevents.Event) error { - now := time.Now() - - err := limiter.Wait(sendingCtx) - if err != nil { - return fmt.Errorf("client rate limiter Wait returned an error: %w", err) - } - - latency := time.Since(now) - if latency > longThrottleLatency { - klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s", - latency, evt)) - } - - klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt) - - if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) { - return fmt.Errorf("failed to send event %s, %v", evt, result) - } - - return nil -} - func getObj[T ResourceObject](resourceID string, objs []T) (obj T, exists bool) { for _, obj := range objs { if string(obj.GetUID()) == resourceID { diff --git a/cloudevents/generic/agentclient_test.go b/cloudevents/generic/agentclient_test.go index d6a12f90e..86bd7fdfd 100644 --- a/cloudevents/generic/agentclient_test.go +++ b/cloudevents/generic/agentclient_test.go @@ -273,9 +273,7 @@ func TestStatusResyncResponse(t *testing.T) { t.Errorf("unexpected error %v", err) } - if err := agent.Subscribe(context.TODO()); err != nil { - t.Errorf("unexpected error %v", err) - } + agent.receive(context.TODO(), c.requestEvent) c.validate(client.GetSentEvents()) }) @@ -440,13 +438,11 @@ func TestReceiveResourceSpec(t *testing.T) { var actualEvent types.ResourceAction var actualRes *mockResource - if err := agent.Subscribe(context.TODO(), func(event types.ResourceAction, resource *mockResource) error { + agent.receive(context.TODO(), c.requestEvent, func(event types.ResourceAction, resource *mockResource) error { actualEvent = event actualRes = resource return nil - }); err != nil { - t.Errorf("unexpected error %v", err) - } + }) c.validate(actualEvent, actualRes) }) diff --git a/cloudevents/generic/baseclient.go b/cloudevents/generic/baseclient.go new file mode 100644 index 000000000..be9e6b7fd --- /dev/null +++ b/cloudevents/generic/baseclient.go @@ -0,0 +1,209 @@ +package generic + +import ( + "context" + "fmt" + "sync" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + + "open-cluster-management.io/api/cloudevents/generic/options" +) + +const ( + restartReceiverSignal = iota + stopReceiverSignal +) + +type receiveFn func(ctx context.Context, evt cloudevents.Event) + +type baseClient struct { + sync.RWMutex + cloudEventsOptions options.CloudEventsOptions + cloudEventsClient cloudevents.Client + cloudEventsRateLimiter flowcontrol.RateLimiter + receiverChan chan int +} + +func (c *baseClient) connect(ctx context.Context) error { + var err error + c.cloudEventsClient, err = c.cloudEventsOptions.Client(ctx) + if err != nil { + return err + } + + // start a go routine to handle cloudevents client connection errors + go func() { + var err error + + // the reconnect backoff will stop at [1,5) min interval. If we don't backoff for 10min, we reset the backoff. + connBackoffManager := wait.NewExponentialBackoffManager(5*time.Second, 1*time.Minute, 10*time.Minute, 5.0, 1.0, &clock.RealClock{}) + cloudEventsClient := c.cloudEventsClient + + for { + if cloudEventsClient == nil { + klog.V(4).Infof("reconnecting the cloudevents client") + cloudEventsClient, err = c.cloudEventsOptions.Client(ctx) + // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection + // errors + if err != nil { + // failed to reconnect, try agin + runtime.HandleError(fmt.Errorf("the cloudevents client reconnect failed, %v", err)) + <-connBackoffManager.Backoff().C() + continue + } + + // the cloudevents network connection is back, refresh the current cloudevents client and send the + // receiver restart signal + klog.V(4).Infof("the cloudevents client is reconnected") + c.resetClient(cloudEventsClient) + c.sendReceiverSignal(restartReceiverSignal) + } + + select { + case <-ctx.Done(): + return + case err, ok := <-c.cloudEventsOptions.ErrorChan(): + if !ok { + // error channel is closed, do nothing + return + } + + runtime.HandleError(fmt.Errorf("the cloudevents client is disconnected, %v", err)) + + // the cloudevents client network connection is closed, send the receiver stop signal, set the current + // client to nil and retry + c.sendReceiverSignal(stopReceiverSignal) + + cloudEventsClient = nil + c.resetClient(cloudEventsClient) + + <-connBackoffManager.Backoff().C() + } + } + }() + + return nil +} + +func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { + now := time.Now() + + if err := c.cloudEventsRateLimiter.Wait(ctx); err != nil { + return fmt.Errorf("client rate limiter Wait returned an error: %w", err) + } + + latency := time.Since(now) + if latency > longThrottleLatency { + klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s", + latency, evt)) + } + + sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) + if err != nil { + return err + } + + klog.V(4).Infof("Sent event: %v\n%s", ctx, evt) + + // make sure the current client is the newest + c.RLock() + defer c.RUnlock() + + if c.cloudEventsClient == nil { + return fmt.Errorf("the cloudevents client is not ready") + } + + if result := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) { + return fmt.Errorf("failed to send event %s, %v", evt, result) + } + + return nil +} + +func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { + c.Lock() + defer c.Unlock() + + // make sure there is only one subscription go routine starting for one client. + if c.receiverChan != nil { + klog.Warningf("the subscription has already started") + return + } + + c.receiverChan = make(chan int) + + // start a go routine to handle cloudevents subscription + go func() { + receiverCtx, receiverCancel := context.WithCancel(context.TODO()) + cloudEventsClient := c.cloudEventsClient + + for { + if cloudEventsClient != nil { + // TODO send a resync request + + go func() { + if err := cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) { + receive(receiverCtx, evt) + }); err != nil { + runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err)) + } + }() + } + + select { + case <-ctx.Done(): + receiverCancel() + close(c.receiverChan) + return + case signal, ok := <-c.receiverChan: + if !ok { + // receiver channel is closed, stop the receiver + receiverCancel() + return + } + + switch signal { + case restartReceiverSignal: + klog.V(4).Infof("restart the cloudevents receiver") + // make sure the current client is the newest + c.RLock() + cloudEventsClient = c.cloudEventsClient + c.RUnlock() + + // rebuild the receiver context + receiverCtx, receiverCancel = context.WithCancel(context.TODO()) + case stopReceiverSignal: + klog.V(4).Infof("stop the cloudevents receiver") + receiverCancel() + cloudEventsClient = nil + default: + runtime.HandleError(fmt.Errorf("unknown receiver signal %d", signal)) + } + } + } + }() +} + +func (c *baseClient) resetClient(client cloudevents.Client) { + c.Lock() + defer c.Unlock() + + c.cloudEventsClient = client +} + +func (c *baseClient) sendReceiverSignal(signal int) { + c.RLock() + defer c.RUnlock() + + if c.receiverChan != nil { + c.receiverChan <- signal + } +} diff --git a/cloudevents/generic/interface.go b/cloudevents/generic/interface.go index c3c9a0725..484b99255 100644 --- a/cloudevents/generic/interface.go +++ b/cloudevents/generic/interface.go @@ -61,5 +61,5 @@ type CloudEventsClient[T ResourceObject] interface { // Subscribe the resources status/spec event to the broker to receive the resources status/spec and use // ResourceHandler to handle them. - Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error + Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) } diff --git a/cloudevents/generic/options/fake/fakeoptions.go b/cloudevents/generic/options/fake/fakeoptions.go index 0d9574f58..11eff3c1e 100644 --- a/cloudevents/generic/options/fake/fakeoptions.go +++ b/cloudevents/generic/options/fake/fakeoptions.go @@ -38,6 +38,10 @@ func (o *CloudEventsFakeOptions) Client(ctx context.Context) (cloudevents.Client return o.client, nil } +func (o *CloudEventsFakeOptions) ErrorChan() <-chan error { + return nil +} + type CloudEventsFakeClient struct { sentEvents []cloudevents.Event receivedEvents []cloudevents.Event diff --git a/cloudevents/generic/options/mqtt/agentoptions.go b/cloudevents/generic/options/mqtt/agentoptions.go index 1525ff68c..3f9361870 100644 --- a/cloudevents/generic/options/mqtt/agentoptions.go +++ b/cloudevents/generic/options/mqtt/agentoptions.go @@ -16,6 +16,7 @@ import ( type mqttAgentOptions struct { MQTTOptions + errorChan chan error clusterName string agentID string } @@ -24,6 +25,7 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt return &options.CloudEventsAgentOptions{ CloudEventsOptions: &mqttAgentOptions{ MQTTOptions: *mqttOptions, + errorChan: make(chan error), clusterName: clusterName, agentID: agentID, }, @@ -59,6 +61,9 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro receiver, err := o.GetCloudEventsClient( ctx, fmt.Sprintf("%s-client", o.agentID), + func(err error) { + o.errorChan <- err + }, cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), cloudeventsmqtt.WithSubscribe( &paho.Subscribe{ @@ -76,3 +81,7 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro } return receiver, nil } + +func (o *mqttAgentOptions) ErrorChan() <-chan error { + return o.errorChan +} diff --git a/cloudevents/generic/options/mqtt/options.go b/cloudevents/generic/options/mqtt/options.go index e79e0af62..4c6212b50 100644 --- a/cloudevents/generic/options/mqtt/options.go +++ b/cloudevents/generic/options/mqtt/options.go @@ -124,15 +124,18 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect { func (o *MQTTOptions) GetCloudEventsClient( ctx context.Context, clientID string, - clientOpts ...cloudeventsmqtt.Option) (cloudevents.Client, error) { + errorHandler func(error), + clientOpts ...cloudeventsmqtt.Option, +) (cloudevents.Client, error) { netConn, err := o.GetNetConn() if err != nil { return nil, err } config := &paho.ClientConfig{ - ClientID: clientID, - Conn: netConn, + ClientID: clientID, + Conn: netConn, + OnClientError: errorHandler, } opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))} diff --git a/cloudevents/generic/options/mqtt/sourceoptions.go b/cloudevents/generic/options/mqtt/sourceoptions.go index 095c13522..5e4f20898 100644 --- a/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/cloudevents/generic/options/mqtt/sourceoptions.go @@ -16,13 +16,15 @@ import ( type mqttSourceOptions struct { MQTTOptions - sourceID string + errorChan chan error + sourceID string } func NewSourceOptions(mqttOptions *MQTTOptions, sourceID string) *options.CloudEventsSourceOptions { return &options.CloudEventsSourceOptions{ CloudEventsOptions: &mqttSourceOptions{ MQTTOptions: *mqttOptions, + errorChan: make(chan error), sourceID: sourceID, }, SourceID: sourceID, @@ -55,6 +57,9 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err receiver, err := o.GetCloudEventsClient( ctx, fmt.Sprintf("%s-client", o.sourceID), + func(err error) { + o.errorChan <- err + }, cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), cloudeventsmqtt.WithSubscribe( &paho.Subscribe{ @@ -72,3 +77,7 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err } return receiver, nil } + +func (o *mqttSourceOptions) ErrorChan() <-chan error { + return o.errorChan +} diff --git a/cloudevents/generic/options/options.go b/cloudevents/generic/options/options.go index 68c9dec44..57fbf14e9 100644 --- a/cloudevents/generic/options/options.go +++ b/cloudevents/generic/options/options.go @@ -18,6 +18,10 @@ type CloudEventsOptions interface { // Client returns a cloudevents client for sending and receiving cloudevents Client(ctx context.Context) (cloudevents.Client, error) + + // ErrorChan returns a chan which will receive the cloudevents connection error. The source/agent client will try to + // reconnect the when this error occurs. + ErrorChan() <-chan error } // EventRateLimit for limiting the event sending rate. diff --git a/cloudevents/generic/sourceclient.go b/cloudevents/generic/sourceclient.go index 25f790dd5..d0dc09f9c 100644 --- a/cloudevents/generic/sourceclient.go +++ b/cloudevents/generic/sourceclient.go @@ -8,7 +8,6 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/util/flowcontrol" "k8s.io/klog/v2" "open-cluster-management.io/api/cloudevents/generic/options" @@ -21,13 +20,11 @@ import ( // A source is a component that runs on a server, it can be a controller on the hub cluster or a RESTful service // handling resource requests. type CloudEventSourceClient[T ResourceObject] struct { - cloudEventsOptions options.CloudEventsOptions - cloudEventsClient cloudevents.Client - lister Lister[T] - codecs map[types.CloudEventsDataType]Codec[T] - statusHashGetter StatusHashGetter[T] - rateLimiter flowcontrol.RateLimiter - sourceID string + *baseClient + lister Lister[T] + codecs map[types.CloudEventsDataType]Codec[T] + statusHashGetter StatusHashGetter[T] + sourceID string } // NewCloudEventSourceClient returns an instance for CloudEventSourceClient. The following arguments are required to @@ -44,8 +41,12 @@ func NewCloudEventSourceClient[T ResourceObject]( statusHashGetter StatusHashGetter[T], codecs ...Codec[T], ) (*CloudEventSourceClient[T], error) { - cloudEventsClient, err := sourceOptions.CloudEventsOptions.Client(ctx) - if err != nil { + baseClient := &baseClient{ + cloudEventsOptions: sourceOptions.CloudEventsOptions, + cloudEventsRateLimiter: NewRateLimiter(sourceOptions.EventRateLimit), + } + + if err := baseClient.connect(ctx); err != nil { return nil, err } @@ -55,13 +56,11 @@ func NewCloudEventSourceClient[T ResourceObject]( } return &CloudEventSourceClient[T]{ - cloudEventsOptions: sourceOptions.CloudEventsOptions, - cloudEventsClient: cloudEventsClient, - lister: lister, - codecs: evtCodes, - statusHashGetter: statusHashGetter, - rateLimiter: NewRateLimiter(sourceOptions.EventRateLimit), - sourceID: sourceOptions.SourceID, + baseClient: baseClient, + lister: lister, + codecs: evtCodes, + statusHashGetter: statusHashGetter, + sourceID: sourceOptions.SourceID, }, nil } @@ -99,12 +98,7 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error { return fmt.Errorf("failed to set data to cloud event: %v", err) } - sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) - if err != nil { - return err - } - - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil { + if err := c.publish(ctx, evt); err != nil { return err } } @@ -128,12 +122,7 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types return err } - sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) - if err != nil { - return err - } - - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil { + if err := c.publish(ctx, *evt); err != nil { return err } @@ -143,73 +132,73 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types // Subscribe the events that are from the agent spec resync request or agent resource status request. // For spec resync request, source publish the current resources spec back as response. // For resource status request, source receives resource status and handles the status with resource handlers. -func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error { - if err := c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) { - klog.V(4).Infof("Received event:\n%s", evt) - - eventType, err := types.ParseCloudEventsType(evt.Type()) - if err != nil { - klog.Errorf("failed to parse cloud event type, %v", err) - return - } +func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) { + c.subscribe(ctx, func(ctx context.Context, evt cloudevents.Event) { + c.receive(ctx, evt, handlers...) + }) +} - if eventType.Action == types.ResyncRequestAction { - if eventType.SubResource != types.SubResourceSpec { - klog.Warningf("unsupported event type %s, ignore", eventType) - return - } +func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) { + klog.V(4).Infof("Received event:\n%s", evt) - if err := c.respondResyncSpecRequest(ctx, eventType.CloudEventsDataType, evt); err != nil { - klog.Errorf("failed to resync resources spec, %v", err) - } + eventType, err := types.ParseCloudEventsType(evt.Type()) + if err != nil { + klog.Errorf("failed to parse cloud event type, %v", err) + return + } + if eventType.Action == types.ResyncRequestAction { + if eventType.SubResource != types.SubResourceSpec { + klog.Warningf("unsupported event type %s, ignore", eventType) return } - codec, ok := c.codecs[eventType.CloudEventsDataType] - if !ok { - klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType) - return + if err := c.respondResyncSpecRequest(ctx, eventType.CloudEventsDataType, evt); err != nil { + klog.Errorf("failed to resync resources spec, %v", err) } - if eventType.SubResource != types.SubResourceStatus { - klog.Warningf("unsupported event type %s, ignore", eventType) - return - } + return + } - clusterName, err := evt.Context.GetExtension(types.ExtensionClusterName) - if err != nil { - klog.Errorf("failed to find cluster name, %v", err) - return - } + codec, ok := c.codecs[eventType.CloudEventsDataType] + if !ok { + klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType) + return + } - obj, err := codec.Decode(&evt) - if err != nil { - klog.Errorf("failed to decode status, %v", err) - return - } + if eventType.SubResource != types.SubResourceStatus { + klog.Warningf("unsupported event type %s, ignore", eventType) + return + } - action, err := c.statusAction(fmt.Sprintf("%s", clusterName), obj) - if err != nil { - klog.Errorf("failed to generate status event %s, %v", evt, err) - return - } + clusterName, err := evt.Context.GetExtension(types.ExtensionClusterName) + if err != nil { + klog.Errorf("failed to find cluster name, %v", err) + return + } - if len(action) == 0 { - // no action is required, ignore - return - } + obj, err := codec.Decode(&evt) + if err != nil { + klog.Errorf("failed to decode status, %v", err) + return + } - for _, handler := range handlers { - if err := handler(action, obj); err != nil { - klog.Errorf("failed to handle status event %s, %v", evt, err) - } - } - }); err != nil { - return err + action, err := c.statusAction(fmt.Sprintf("%s", clusterName), obj) + if err != nil { + klog.Errorf("failed to generate status event %s, %v", evt, err) + return } - return nil + if len(action) == 0 { + // no action is required, ignore + return + } + + for _, handler := range handlers { + if err := handler(action, obj); err != nil { + klog.Errorf("failed to handle status event %s, %v", evt, err) + } + } } // Upon receiving the spec resync event, the source responds by sending resource status events to the broker as follows: @@ -271,13 +260,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest( WithClusterName(fmt.Sprintf("%s", clusterName)). WithDeletionTimestamp(metav1.Now().Time). NewEvent() - - sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) - if err != nil { - return err - } - - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil { + if err := c.publish(ctx, evt); err != nil { return err } } diff --git a/cloudevents/generic/sourceclient_test.go b/cloudevents/generic/sourceclient_test.go index 0b834130f..1d41b4a76 100644 --- a/cloudevents/generic/sourceclient_test.go +++ b/cloudevents/generic/sourceclient_test.go @@ -273,9 +273,7 @@ func TestSpecResyncResponse(t *testing.T) { t.Errorf("unexpected error %v", err) } - if err := source.Subscribe(context.TODO()); err != nil { - t.Errorf("unexpected error %v", err) - } + source.receive(context.TODO(), c.requestEvent) c.validate(fakeClient.GetSentEvents()) }) @@ -408,13 +406,11 @@ func TestReceiveResourceStatus(t *testing.T) { var actualEvent types.ResourceAction var actualRes *mockResource - if err := source.Subscribe(context.TODO(), func(event types.ResourceAction, resource *mockResource) error { + source.receive(context.TODO(), c.requestEvent, func(event types.ResourceAction, resource *mockResource) error { actualEvent = event actualRes = resource return nil - }); err != nil { - t.Errorf("unexpected error %v", err) - } + }) c.validate(actualEvent, actualRes) }) diff --git a/cloudevents/work/clientbuilder.go b/cloudevents/work/clientbuilder.go index ca8a6a5aa..810bf2758 100644 --- a/cloudevents/work/clientbuilder.go +++ b/cloudevents/work/clientbuilder.go @@ -6,7 +6,6 @@ import ( "time" "k8s.io/client-go/rest" - "k8s.io/klog/v2" workclientset "open-cluster-management.io/api/client/work/clientset/versioned" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" @@ -144,12 +143,7 @@ func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, config *mqtt. // only written from the server. we may need to revisit the implementation in the future. manifestWorkClient.SetLister(namespacedLister) - go func() { - err := cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher)) - if err != nil { - klog.Errorf("failed to subscribe to %s, %v", config.BrokerHost, err) - } - }() + cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher)) return &ClientHolder{ workClient: workClient, diff --git a/test/integration/cloudevents/source/client.go b/test/integration/cloudevents/source/client.go index c2099a4d7..8407a93b6 100644 --- a/test/integration/cloudevents/source/client.go +++ b/test/integration/cloudevents/source/client.go @@ -9,7 +9,6 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" - "k8s.io/klog/v2" "open-cluster-management.io/api/cloudevents/generic" "open-cluster-management.io/api/cloudevents/generic/options/mqtt" @@ -126,13 +125,9 @@ func StartResourceSourceClient(ctx context.Context, config *mqtt.MQTTOptions) (g return nil, err } - go func() { - if err := client.Subscribe(ctx, func(action types.ResourceAction, resource *Resource) error { - return GetStore().UpdateStatus(resource) - }); err != nil { - klog.Fatalf("failed to subscribe to mqtt broker, %v", err) - } - }() + client.Subscribe(ctx, func(action types.ResourceAction, resource *Resource) error { + return GetStore().UpdateStatus(resource) + }) return client, nil }