Skip to content

Commit

Permalink
separate the clientid and sourceid for mqtt source client
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Dec 20, 2023
1 parent a2f58d6 commit 0738fe1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
6 changes: 4 additions & 2 deletions cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ type mqttSourceOptions struct {
MQTTOptions
errorChan chan error
sourceID string
clientID string
}

func NewSourceOptions(mqttOptions *MQTTOptions, sourceID string) *options.CloudEventsSourceOptions {
func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
return &options.CloudEventsSourceOptions{
CloudEventsOptions: &mqttSourceOptions{
MQTTOptions: *mqttOptions,
errorChan: make(chan error),
sourceID: sourceID,
clientID: clientID,
},
SourceID: sourceID,
}
Expand Down Expand Up @@ -56,7 +58,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-client", o.sourceID),
o.clientID,
func(err error) {
o.errorChan <- err
},
Expand Down
3 changes: 2 additions & 1 deletion test/integration/cloudevents/source/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ func statusHashGetter(obj *Resource) (string, error) {
}

func StartMQTTResourceSourceClient(ctx context.Context, config *mqtt.MQTTOptions, eventHub *EventHub) (generic.CloudEventsClient[*Resource], error) {
sourceID := "integration-test"
client, err := generic.NewCloudEventSourceClient[*Resource](
ctx,
mqtt.NewSourceOptions(config, "integration-test"),
mqtt.NewSourceOptions(config, fmt.Sprintf("%s-client", sourceID), sourceID),
&resourceLister{},
statusHashGetter,
&resourceCodec{},
Expand Down

0 comments on commit 0738fe1

Please sign in to comment.