From c9c96444de9ef75bcedd98615979d9e7fcae230d Mon Sep 17 00:00:00 2001 From: Jozef Kralik Date: Thu, 10 Aug 2023 08:39:36 +0000 Subject: [PATCH] fix after CR --- charts/plgd-hub/README.md | 2 +- .../templates/coap-gateway/config.yaml | 3 + charts/plgd-hub/values.yaml | 3 + .../service/deviceSubscriptionHandlers.go | 2 +- coap-gateway/coapconv/coapconv.go | 4 - coap-gateway/config.yaml | 2 +- coap-gateway/service/clientObserver.go | 2 +- coap-gateway/service/config.go | 4 +- .../service/observation/deviceObserver.go | 4 +- .../service/observation/observedResource.go | 25 ++--- .../observedResource_internal_test.go | 102 +++++++----------- .../service/observation/resourcesObserver.go | 39 ++++--- coap-gateway/service/signIn.go | 2 +- coap-gateway/test/test.go | 2 +- grpc-gateway/pb/service.swagger.json | 2 +- grpc-gateway/pb/updateDeviceMetadata.pb.go | 2 +- grpc-gateway/pb/updateDeviceMetadata.proto | 2 +- resource-aggregate/commands/commands.pb.go | 2 +- .../cqrs/eventstore/mongodb/eventstore.go | 4 +- .../cqrs/eventstore/mongodb/save.go | 2 +- resource-aggregate/pb/commands.proto | 2 +- 21 files changed, 95 insertions(+), 117 deletions(-) diff --git a/charts/plgd-hub/README.md b/charts/plgd-hub/README.md index bd2d6f33a7..ac9c3a0255 100644 --- a/charts/plgd-hub/README.md +++ b/charts/plgd-hub/README.md @@ -195,7 +195,7 @@ global: | certmanager.internal.issuer.name | string | `nil` | Name | | certmanager.internal.issuer.spec | string | `nil` | cert-manager issuer spec | | cluster.dns | string | `"cluster.local"` | Cluster internal DNS prefix | -| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters | +| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"deviceTwin":{"forceResynchronization":false,"numberOfETAGsForBatchObservation":8},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters | | coapgateway.affinity | object | `{}` | Affinity definition | | coapgateway.apis | object | `{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}}` | For complete coap-gateway service configuration see [plgd/coap-gateway](https://github.com/plgd-dev/hub/tree/main/coap-gateway) | | coapgateway.apis.coap.tls.disconnectOnExpiredCertificate | bool | `false` | After the certificate expires, the connection will be disconnected | diff --git a/charts/plgd-hub/templates/coap-gateway/config.yaml b/charts/plgd-hub/templates/coap-gateway/config.yaml index d9383b96a8..cf71c80972 100644 --- a/charts/plgd-hub/templates/coap-gateway/config.yaml +++ b/charts/plgd-hub/templates/coap-gateway/config.yaml @@ -172,5 +172,8 @@ data: goPoolSize: {{ .taskQueue.goPoolSize }} size: {{ .taskQueue.size }} maxIdleTime: {{ .taskQueue.maxIdleTime | quote }} + deviceTwin: + numberOfETAGsForBatchObservation: {{ deviceTwin.numberOfETAGsForBatchObservation }} + forceResynchronization: {{ deviceTwin.forceResynchronization }} {{- end }} {{- end }} diff --git a/charts/plgd-hub/values.yaml b/charts/plgd-hub/values.yaml index 20f6e093db..5fe2afb840 100644 --- a/charts/plgd-hub/values.yaml +++ b/charts/plgd-hub/values.yaml @@ -998,6 +998,9 @@ coapgateway: goPoolSize: 1600 size: "2097152" maxIdleTime: "10m" + deviceTwin: + numberOfETAGsForBatchObservation: 8 + forceResynchronization: false identitystore: # -- Enable identity service diff --git a/cloud2cloud-connector/service/deviceSubscriptionHandlers.go b/cloud2cloud-connector/service/deviceSubscriptionHandlers.go index b113e1c216..2f9b0276dd 100644 --- a/cloud2cloud-connector/service/deviceSubscriptionHandlers.go +++ b/cloud2cloud-connector/service/deviceSubscriptionHandlers.go @@ -58,7 +58,7 @@ func (h deviceSubscriptionHandlers) OnDeviceSubscriberReconnectError(err error) type DevicesSubscription struct { ctx context.Context - data *kitSync.Map // //[deviceID]*deviceSubscription + data *kitSync.Map // [deviceID]*deviceSubscription rdClient pb.GrpcGatewayClient raClient raService.ResourceAggregateClient subscriber *subscriber.Subscriber diff --git a/coap-gateway/coapconv/coapconv.go b/coap-gateway/coapconv/coapconv.go index 380ede75fd..08fa672b2c 100644 --- a/coap-gateway/coapconv/coapconv.go +++ b/coap-gateway/coapconv/coapconv.go @@ -350,10 +350,6 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec // so latestETagResource is the last resource in the batch if latestETagResource != nil { requests = append(requests, latestETagResource) - } else if len(requests) > 0 && len(etag) > 0 { - // if there is no latestETagResource we need to set etag to the last resource in the batch - // to make sure that the one resource contains the etag - requests[len(requests)-1].Etag = etag } return requests, nil } diff --git a/coap-gateway/config.yaml b/coap-gateway/config.yaml index b0b44a69eb..f96fadcb79 100644 --- a/coap-gateway/config.yaml +++ b/coap-gateway/config.yaml @@ -161,7 +161,7 @@ clients: certFile: "/secrets/public/cert.crt" useSystemCAPool: false deviceTwin: - latestETAGsForNumbersOfResource: 8 + numberOfETAGsForBatchObservation: 8 forceResynchronization: false taskQueue: goPoolSize: 1600 diff --git a/coap-gateway/service/clientObserver.go b/coap-gateway/service/clientObserver.go index ccc7c63061..582de5a43e 100644 --- a/coap-gateway/service/clientObserver.go +++ b/coap-gateway/service/clientObserver.go @@ -68,7 +68,7 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE observation.WithTwinEnabled(twinEnabled), observation.WithObservationType(observationType), observation.WithLogger(c.getLogger()), observation.WithRequireBatchObserveEnabled(c.server.config.APIs.COAP.RequireBatchObserveEnabled), - observation.WithLatestETAGsForNumbersOfResource(c.server.config.DeviceTwin.LatestETAGsForNumbersOfResource), + observation.WithNumberOfETAGsForBatchObservation(c.server.config.DeviceTwin.NumberOfETAGsForBatchObservation), observation.WithTwinForceResynchronization(forceResynchronization || c.server.config.DeviceTwin.ForceResynchronization), ) if err != nil { diff --git a/coap-gateway/service/config.go b/coap-gateway/service/config.go index 87df806066..d17d8be230 100644 --- a/coap-gateway/service/config.go +++ b/coap-gateway/service/config.go @@ -109,8 +109,8 @@ func (c *InjectedCOAPConfig) Validate() error { } type DeviceTwinConfig struct { - LatestETAGsForNumbersOfResource uint32 `yaml:"latestETAGsForNumbersOfResource" json:"latestETAGsForNumbersOfResource"` - ForceResynchronization bool `yaml:"forceResynchronization" json:"forceResynchronization"` + NumberOfETAGsForBatchObservation uint32 `yaml:"numberOfETAGsForBatchObservation" json:"numberOfETAGsForBatchObservation"` + ForceResynchronization bool `yaml:"forceResynchronization" json:"forceResynchronization"` } type COAPConfig struct { diff --git a/coap-gateway/service/observation/deviceObserver.go b/coap-gateway/service/observation/deviceObserver.go index 4610413319..cfcbc79203 100644 --- a/coap-gateway/service/observation/deviceObserver.go +++ b/coap-gateway/service/observation/deviceObserver.go @@ -158,7 +158,7 @@ func (o LimitBatchObserveLatestETagsOpt) Apply(opts *DeviceObserverConfig) { opts.LimitBatchObserveLatestETags = o.limitBatchObserveLatestETags } -func WithLatestETAGsForNumbersOfResource(v uint32) LimitBatchObserveLatestETagsOpt { +func WithNumberOfETAGsForBatchObservation(v uint32) LimitBatchObserveLatestETagsOpt { return LimitBatchObserveLatestETagsOpt{ limitBatchObserveLatestETags: v, } @@ -373,7 +373,7 @@ func createDiscoveryResourceObserver(ctx context.Context, deviceID string, coapC // Create observer with a single observations for all published resources. func createPublishedResourcesObserver(ctx context.Context, deviceID string, coapConn ClientConn, callbacks ResourcesObserverCallbacks, published []*commands.Resource, logger log.Logger) (*resourcesObserver, error) { resourcesObserver := newResourcesObserver(deviceID, coapConn, callbacks, logger) - + // TODO get ETAG for each resource err := resourcesObserver.addResources(ctx, published) if err != nil { resourcesObserver.CleanObservedResources(ctx) diff --git a/coap-gateway/service/observation/observedResource.go b/coap-gateway/service/observation/observedResource.go index 7b64f13121..51d9bfe3f4 100644 --- a/coap-gateway/service/observation/observedResource.go +++ b/coap-gateway/service/observation/observedResource.go @@ -21,7 +21,6 @@ type Observation = interface { type observedResource struct { href string resInterface string - etags [][]byte synced atomic.Bool isObservable bool private struct { // guarded by mutex @@ -30,12 +29,11 @@ type observedResource struct { } } -func newObservedResource(href, resInterface string, etags [][]byte, isObservable bool) *observedResource { +func newObservedResource(href, resInterface string, isObservable bool) *observedResource { return &observedResource{ href: href, resInterface: resInterface, isObservable: isObservable, - etags: etags, } } @@ -51,13 +49,6 @@ func (r *observedResource) Interface() string { return r.resInterface } -func (r *observedResource) ETag() []byte { - if len(r.etags) == 0 { - return nil - } - return r.etags[0] -} - const ( // maxURIQueryLen is the maximum length of a URI query. See https://datatracker.ietf.org/doc/html/rfc7252#section-5.10 maxURIQueryLen = 255 @@ -67,11 +58,10 @@ const ( prefixQueryIncChanges = "incChanges=" ) -func (r *observedResource) EncodeETagsForIncrementChanges() []string { - if len(r.etags) <= 1 { +func encodeETagsForIncrementChanges(etags [][]byte) []string { + if len(etags) < 1 { return nil } - etags := r.etags[1:] etagsStr := make([]string, 0, (len(etags)/15)+1) var b strings.Builder for _, etag := range etags { @@ -113,13 +103,14 @@ func (r *observedResource) isBatchObservation() bool { return r.resInterface == interfaces.OC_IF_B } -func (r *observedResource) toCoapOptions() []message.Option { +func (r *observedResource) toCoapOptions(etags [][]byte) []message.Option { opts := make([]message.Option, 0, 2) - if r.ETag() != nil { + if len(etags) > 0 { opts = append(opts, message.Option{ ID: message.ETag, - Value: r.ETag(), + Value: etags[0], }) + etags = etags[1:] } if r.Interface() != "" { opts = append(opts, message.Option{ @@ -129,7 +120,7 @@ func (r *observedResource) toCoapOptions() []message.Option { } if r.isBatchObservation() { - for _, q := range r.EncodeETagsForIncrementChanges() { + for _, q := range encodeETagsForIncrementChanges(etags) { opts = append(opts, message.Option{ ID: message.URIQuery, Value: []byte(q), diff --git a/coap-gateway/service/observation/observedResource_internal_test.go b/coap-gateway/service/observation/observedResource_internal_test.go index fd2e40d90d..9632bb21d8 100644 --- a/coap-gateway/service/observation/observedResource_internal_test.go +++ b/coap-gateway/service/observation/observedResource_internal_test.go @@ -7,35 +7,23 @@ import ( ) func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) { - type fields struct { - etags [][]byte - } tests := []struct { - name string - fields fields - want []string + name string + etags [][]byte + want []string }{ { - name: "empty", - fields: fields{ - etags: nil, - }, + name: "empty", + etags: nil, }, { - name: "only-latest-etag", - fields: fields{ - etags: [][]byte{ - []byte("0"), - }, - }, + name: "not-nil", + etags: [][]byte{}, }, { name: "one-etag", - fields: fields{ - etags: [][]byte{ - []byte("0"), - []byte("01234567"), - }, + etags: [][]byte{ + []byte("01234567"), }, want: []string{ prefixQueryIncChanges + "MDEyMzQ1Njc", @@ -43,12 +31,9 @@ func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) { }, { name: "two-etags", - fields: fields{ - etags: [][]byte{ - []byte("0"), - []byte("1"), - []byte("2"), - }, + etags: [][]byte{ + []byte("1"), + []byte("2"), }, want: []string{ prefixQueryIncChanges + "MQ,Mg", @@ -56,13 +41,10 @@ func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) { }, { name: "two-etags-invalid-etag", - fields: fields{ - etags: [][]byte{ - []byte("0"), - []byte("1"), - []byte("2"), - []byte("invalid-etag-is-ignored"), - }, + etags: [][]byte{ + []byte("1"), + []byte("2"), + []byte("invalid-etag-is-ignored"), }, want: []string{ prefixQueryIncChanges + "MQ,Mg", @@ -70,31 +52,28 @@ func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) { }, { name: "multiple-etags", - fields: fields{ - etags: [][]byte{ - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), - []byte("01234567"), // 22 - }, + etags: [][]byte{ + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), // 21 }, want: []string{ prefixQueryIncChanges + "MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc", @@ -104,10 +83,7 @@ func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := &observedResource{ - etags: tt.fields.etags, - } - got := r.EncodeETagsForIncrementChanges() + got := encodeETagsForIncrementChanges(tt.etags) for _, g := range got { assert.Less(t, len(g), 255) // RFC 7641 - Uri-query length } diff --git a/coap-gateway/service/observation/resourcesObserver.go b/coap-gateway/service/observation/resourcesObserver.go index 542956ae34..ffe438afe2 100644 --- a/coap-gateway/service/observation/resourcesObserver.go +++ b/coap-gateway/service/observation/resourcesObserver.go @@ -92,10 +92,10 @@ func newResourcesObserver(deviceID string, coapConn ClientConn, callbacks Resour func (o *resourcesObserver) addResource(ctx context.Context, res *commands.Resource, obsInterface string, etags [][]byte) error { o.private.lock.Lock() defer o.private.lock.Unlock() - obs, err := o.addResourceLocked(res, obsInterface, etags) + obs, err := o.addResourceLocked(res, obsInterface) if err == nil && obs != nil { o.notifyAboutStartTwinSynchronization(ctx) - err = o.performObservationLocked([]*observedResource{obs}) + err = o.performObservationLocked([]*observedResourceWithETags{{observedResource: obs, etags: etags}}) if err != nil { o.private.resources, _ = o.private.resources.removeByHref(obs.Href()) defer o.notifyAboutFinishTwinSynchronization(ctx) @@ -139,7 +139,7 @@ func (o *resourcesObserver) setSynchronizedAtResource(href string) bool { return false } -func (o *resourcesObserver) addResourceLocked(res *commands.Resource, obsInterface string, etags [][]byte) (*observedResource, error) { +func (o *resourcesObserver) addResourceLocked(res *commands.Resource, obsInterface string) (*observedResource, error) { resID := res.GetResourceID() addObservationError := func(err error) error { return fmt.Errorf("cannot add resource observation: %w", err) @@ -154,7 +154,7 @@ func (o *resourcesObserver) addResourceLocked(res *commands.Resource, obsInterfa if o.private.resources.contains(href) { return nil, nil } - obsRes := newObservedResource(href, obsInterface, etags, res.IsObservable()) + obsRes := newObservedResource(href, obsInterface, res.IsObservable()) o.private.resources = o.private.resources.insert(obsRes) return obsRes, nil } @@ -163,24 +163,28 @@ func (o *resourcesObserver) addResourceLocked(res *commands.Resource, obsInterfa // // For observable resources subscribe to observations, for unobservable resources retrieve // their content. -func (o *resourcesObserver) handleResource(ctx context.Context, obsRes *observedResource) error { +func (o *resourcesObserver) handleResource(ctx context.Context, obsRes *observedResource, etags [][]byte) error { if obsRes.Href() == commands.StatusHref { return nil } if obsRes.isObservable { - obs, err := o.observeResource(ctx, obsRes) + obs, err := o.observeResource(ctx, obsRes, etags) if err != nil { return err } obsRes.SetObservation(obs) return nil } - return o.getResourceContent(ctx, obsRes.Href(), obsRes.ETag()) + var etag []byte + if len(etags) > 0 { + etag = etags[0] + } + return o.getResourceContent(ctx, obsRes.Href(), etag) } // Register to COAP-GW resource observation for given resource -func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observedResource) (Observation, error) { +func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observedResource, etags [][]byte) (Observation, error) { cannotObserveResourceError := func(deviceID, href string, err error) error { return fmt.Errorf("cannot observe resource /%v%v: %w", deviceID, href, err) } @@ -188,7 +192,7 @@ func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observe return nil, cannotObserveResourceError(o.deviceID, obsRes.Href(), emptyDeviceIDError()) } - opts := obsRes.toCoapOptions() + opts := obsRes.toCoapOptions(etags) batchObservation := obsRes.isBatchObservation() returnIfNonObservable := batchObservation && obsRes.Href() == resources.ResourceURI @@ -259,10 +263,15 @@ func (o *resourcesObserver) notifyAboutFinishTwinSynchronization(ctx context.Con } } -func (o *resourcesObserver) performObservationLocked(obs []*observedResource) error { +type observedResourceWithETags struct { + *observedResource + etags [][]byte +} + +func (o *resourcesObserver) performObservationLocked(obs []*observedResourceWithETags) error { var errors *multierror.Error for _, obsRes := range obs { - if err := o.handleResource(context.Background(), obsRes); err != nil { + if err := o.handleResource(context.Background(), obsRes.observedResource, obsRes.etags); err != nil { o.private.resources, _ = o.private.resources.removeByHref(obsRes.Href()) errors = multierror.Append(errors, err) } @@ -295,15 +304,15 @@ func (o *resourcesObserver) addResources(ctx context.Context, resources []*comma return errors.ErrorOrNil() } -func (o *resourcesObserver) addResourcesLocked(resources []*commands.Resource) ([]*observedResource, error) { +func (o *resourcesObserver) addResourcesLocked(resources []*commands.Resource) ([]*observedResourceWithETags, error) { var errors *multierror.Error - observedResources := make([]*observedResource, 0, len(resources)) + observedResources := make([]*observedResourceWithETags, 0, len(resources)) for _, resource := range resources { - observedResource, err := o.addResourceLocked(resource, "", nil) + observedResource, err := o.addResourceLocked(resource, "") if err != nil { errors = multierror.Append(errors, err) } else if observedResource != nil { - observedResources = append(observedResources, observedResource) + observedResources = append(observedResources, &observedResourceWithETags{observedResource: observedResource}) } } if errors.ErrorOrNil() != nil { diff --git a/coap-gateway/service/signIn.go b/coap-gateway/service/signIn.go index b4980cdca7..a49500d8a7 100644 --- a/coap-gateway/service/signIn.go +++ b/coap-gateway/service/signIn.go @@ -227,7 +227,7 @@ func asyncCreateDeviceObserver(x asyncCreateDeviceObserverArg) { observation.WithLogger(x.client.getLogger()), observation.WithRequireBatchObserveEnabled(x.client.server.config.APIs.COAP.RequireBatchObserveEnabled), observation.WithTwinEnabled(x.twinEnabled), - observation.WithLatestETAGsForNumbersOfResource(x.client.server.config.DeviceTwin.LatestETAGsForNumbersOfResource), + observation.WithNumberOfETAGsForBatchObservation(x.client.server.config.DeviceTwin.NumberOfETAGsForBatchObservation), observation.WithTwinForceResynchronization(x.client.server.config.DeviceTwin.ForceResynchronization), ) if err != nil { diff --git a/coap-gateway/test/test.go b/coap-gateway/test/test.go index a8b133a5cd..0b1511cfbf 100644 --- a/coap-gateway/test/test.go +++ b/coap-gateway/test/test.go @@ -21,7 +21,7 @@ func MakeConfig(t require.TestingT) service.Config { cfg.TaskQueue.Size = 2 * 1024 * 1024 cfg.APIs.COAP.Addr = config.COAP_GW_HOST cfg.APIs.COAP.RequireBatchObserveEnabled = false - cfg.DeviceTwin.LatestETAGsForNumbersOfResource = 3 + cfg.DeviceTwin.NumberOfETAGsForBatchObservation = 3 cfg.APIs.COAP.ExternalAddress = config.COAP_GW_HOST cfg.APIs.COAP.Protocols = []coapService.Protocol{coapService.TCP} if config.COAP_GATEWAY_UDP_ENABLED { diff --git a/grpc-gateway/pb/service.swagger.json b/grpc-gateway/pb/service.swagger.json index 5ae5a129d8..7a08550cef 100644 --- a/grpc-gateway/pb/service.swagger.json +++ b/grpc-gateway/pb/service.swagger.json @@ -268,7 +268,7 @@ }, "twinForceResynchronization": { "type": "boolean", - "description": "// force resynchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies." + "description": "force resynchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies." }, "timeToLive": { "type": "string", diff --git a/grpc-gateway/pb/updateDeviceMetadata.pb.go b/grpc-gateway/pb/updateDeviceMetadata.pb.go index 9ef46dbc05..48daa838b6 100644 --- a/grpc-gateway/pb/updateDeviceMetadata.pb.go +++ b/grpc-gateway/pb/updateDeviceMetadata.pb.go @@ -28,7 +28,7 @@ type UpdateDeviceMetadataRequest struct { DeviceId string `protobuf:"bytes,1,opt,name=device_id,json=deviceId,proto3" json:"device_id,omitempty"` TwinEnabled bool `protobuf:"varint,4,opt,name=twin_enabled,json=twinEnabled,proto3" json:"twin_enabled,omitempty"` - TwinForceResynchronization bool `protobuf:"varint,5,opt,name=twin_force_resynchronization,json=twinForceResynchronization,proto3" json:"twin_force_resynchronization,omitempty"` // // force resynchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies. + TwinForceResynchronization bool `protobuf:"varint,5,opt,name=twin_force_resynchronization,json=twinForceResynchronization,proto3" json:"twin_force_resynchronization,omitempty"` // force resynchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies. TimeToLive int64 `protobuf:"varint,3,opt,name=time_to_live,json=timeToLive,proto3" json:"time_to_live,omitempty"` // command validity in nanoseconds. 0 means forever and minimal value is 100000000 (100ms). } diff --git a/grpc-gateway/pb/updateDeviceMetadata.proto b/grpc-gateway/pb/updateDeviceMetadata.proto index ecca024a0c..ee14d64eb6 100644 --- a/grpc-gateway/pb/updateDeviceMetadata.proto +++ b/grpc-gateway/pb/updateDeviceMetadata.proto @@ -10,7 +10,7 @@ message UpdateDeviceMetadataRequest{ reserved 2; string device_id = 1; bool twin_enabled = 4; - bool twin_force_resynchronization = 5; // // force resynchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies. + bool twin_force_resynchronization = 5; // force resynchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies. int64 time_to_live = 3; // command validity in nanoseconds. 0 means forever and minimal value is 100000000 (100ms). // ShadowSynchronization shadow_synchronization = 2; replaced by twin_enabled diff --git a/resource-aggregate/commands/commands.pb.go b/resource-aggregate/commands/commands.pb.go index ad75df9381..bef36eda04 100644 --- a/resource-aggregate/commands/commands.pb.go +++ b/resource-aggregate/commands/commands.pb.go @@ -2235,7 +2235,7 @@ type isConfirmDeviceMetadataUpdateRequest_Confirm interface { } type ConfirmDeviceMetadataUpdateRequest_TwinEnabled struct { - TwinEnabled bool `protobuf:"varint,6,opt,name=twin_enabled,json=twinEnabled,proto3,oneof"` // // will set twin_enabled to true and TwinSynchronization.state to OUT_OF_SYNC. + TwinEnabled bool `protobuf:"varint,6,opt,name=twin_enabled,json=twinEnabled,proto3,oneof"` // will set twin_enabled to true and TwinSynchronization.state to OUT_OF_SYNC. } type ConfirmDeviceMetadataUpdateRequest_TwinForceResynchronization struct { diff --git a/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go b/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go index 59416d671a..e2b272b415 100644 --- a/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go +++ b/resource-aggregate/cqrs/eventstore/mongodb/eventstore.go @@ -271,7 +271,7 @@ func makeDBETag(etag *eventstore.ETagData) bson.M { } func makeDBDoc(events []eventstore.Event, marshaler MarshalerFunc) (bson.M, error) { - etag, e, err := makeDBEvents(events, marshaler) + etag, e, err := makeDBEventsAndGetETag(events, marshaler) if err != nil { return nil, fmt.Errorf("cannot insert first events('%v'): %w", events, err) } @@ -339,7 +339,7 @@ func (s *EventStore) Close(ctx context.Context) error { } // newDBEvent returns a new dbEvent for an eventstore. -func makeDBEvents(events []eventstore.Event, marshaler MarshalerFunc) (*eventstore.ETagData, []bson.M, error) { +func makeDBEventsAndGetETag(events []eventstore.Event, marshaler MarshalerFunc) (*eventstore.ETagData, []bson.M, error) { dbEvents := make([]bson.M, 0, len(events)) var etag *eventstore.ETagData for idx, event := range events { diff --git a/resource-aggregate/cqrs/eventstore/mongodb/save.go b/resource-aggregate/cqrs/eventstore/mongodb/save.go index b5a78ce52d..db3c77e9b8 100644 --- a/resource-aggregate/cqrs/eventstore/mongodb/save.go +++ b/resource-aggregate/cqrs/eventstore/mongodb/save.go @@ -39,7 +39,7 @@ func IsDup(err error) bool { } func (s *EventStore) saveEvent(ctx context.Context, col *mongo.Collection, events []eventstore.Event) (status eventstore.SaveStatus, err error) { - etag, e, err := makeDBEvents(events, s.dataMarshaler) + etag, e, err := makeDBEventsAndGetETag(events, s.dataMarshaler) if err != nil { return eventstore.Fail, err } diff --git a/resource-aggregate/pb/commands.proto b/resource-aggregate/pb/commands.proto index 9115582c60..3c8fb5b0b8 100644 --- a/resource-aggregate/pb/commands.proto +++ b/resource-aggregate/pb/commands.proto @@ -516,7 +516,7 @@ message ConfirmDeviceMetadataUpdateRequest { string correlation_id = 2; Status status = 3; oneof confirm { - bool twin_enabled = 6; // // will set twin_enabled to true and TwinSynchronization.state to OUT_OF_SYNC. + bool twin_enabled = 6; // will set twin_enabled to true and TwinSynchronization.state to OUT_OF_SYNC. bool twin_force_resynchronization = 7; // will set twin_enabled to true, set time TwinSynchronization.force_resynchronization_at and TwinSynchronization.state to OUT_OF_SYNC. // ShadowSynchronization shadow_synchronization = 4; replaced by twin_enabled