Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ retry connecting the cloudevents client #280

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 68 additions & 100 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"

"open-cluster-management.io/api/cloudevents/generic/options"
Expand All @@ -21,14 +19,12 @@ import (
// An agent is a component that handles the deployment of requested resources on the managed cluster and status report
// to the source.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
rateLimiter flowcontrol.RateLimiter
agentID string
clusterName string
*baseClient
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
agentID string
clusterName string
}

// NewCloudEventAgentClient returns an instance for CloudEventAgentClient. The following arguments are required to
Expand All @@ -45,8 +41,12 @@ func NewCloudEventAgentClient[T ResourceObject](
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
) (*CloudEventAgentClient[T], error) {
cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx)
if err != nil {
baseClient := &baseClient{
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
}

if err := baseClient.connect(ctx); err != nil {
return nil, err
}

Expand All @@ -56,14 +56,12 @@ func NewCloudEventAgentClient[T ResourceObject](
}

return &CloudEventAgentClient[T]{
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
rateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
baseClient: baseClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
}, nil
}

Expand Down Expand Up @@ -101,12 +99,7 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
if err := c.publish(ctx, evt); err != nil {
return err
}
}
Expand All @@ -130,12 +123,7 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
if err := c.publish(ctx, *evt); err != nil {
return err
}

Expand All @@ -145,63 +133,67 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
// Subscribe the events that are from the source status resync request or source resource spec request.
// For status resync request, agent publish the current resources status back as response.
// For resource spec request, agent receives resource spec and handles the spec with resource handlers.
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
}
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) {
c.subscribe(ctx, func(ctx context.Context, evt cloudevents.Event) {
c.receive(ctx, evt, handlers...)
})
}

if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
klog.Warningf("unsupported resync event type %s, ignore", eventType)
return
}
func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) {
klog.V(4).Infof("Received event:\n%s", evt)

if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
}

if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
klog.Warningf("unsupported resync event type %s, ignore", eventType)
return
}

if eventType.SubResource != types.SubResourceSpec {
klog.Warningf("unsupported event type %s, ignore", eventType)
return
if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}

codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
return
}
return
}

obj, err := codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return
}
if eventType.SubResource != types.SubResourceSpec {
klog.Warningf("unsupported event type %s, ignore", eventType)
return
}

action, err := c.specAction(evt.Source(), obj)
if err != nil {
klog.Errorf("failed to generate spec action %s, %v", evt, err)
return
}
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
return
}

if len(action) == 0 {
// no action is required, ignore
return
}
obj, err := codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return
}

for _, handler := range handlers {
if err := handler(action, obj); err != nil {
klog.Errorf("failed to handle spec event %s, %v", evt, err)
}
action, err := c.specAction(evt.Source(), obj)
if err != nil {
klog.Errorf("failed to generate spec action %s, %v", evt, err)
return
}

if len(action) == 0 {
// no action is required, ignore
return
}

for _, handler := range handlers {
if err := handler(action, obj); err != nil {
klog.Errorf("failed to handle spec event %s, %v", evt, err)
}
})
}
}

// Upon receiving the status resync event, the agent responds by sending resource status events to the broker as
Expand Down Expand Up @@ -287,30 +279,6 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R
return types.Modified, nil
}

func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimiter,
sender cloudevents.Client, evt cloudevents.Event) error {
now := time.Now()

err := limiter.Wait(sendingCtx)
if err != nil {
return fmt.Errorf("client rate limiter Wait returned an error: %w", err)
}

latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt))
}

klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt)

if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

return nil
}

func getObj[T ResourceObject](resourceID string, objs []T) (obj T, exists bool) {
for _, obj := range objs {
if string(obj.GetUID()) == resourceID {
Expand Down
10 changes: 3 additions & 7 deletions cloudevents/generic/agentclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ func TestStatusResyncResponse(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if err := agent.Subscribe(context.TODO()); err != nil {
t.Errorf("unexpected error %v", err)
}
agent.receive(context.TODO(), c.requestEvent)

c.validate(client.GetSentEvents())
})
Expand Down Expand Up @@ -440,13 +438,11 @@ func TestReceiveResourceSpec(t *testing.T) {

var actualEvent types.ResourceAction
var actualRes *mockResource
if err := agent.Subscribe(context.TODO(), func(event types.ResourceAction, resource *mockResource) error {
agent.receive(context.TODO(), c.requestEvent, func(event types.ResourceAction, resource *mockResource) error {
actualEvent = event
actualRes = resource
return nil
}); err != nil {
t.Errorf("unexpected error %v", err)
}
})

c.validate(actualEvent, actualRes)
})
Expand Down
Loading
Loading