diff --git a/Makefile b/Makefile index 981aabfbb..0e4e55ce0 100644 --- a/Makefile +++ b/Makefile @@ -147,9 +147,15 @@ define CLEAN-DOCKER-DEVICE sudo rm -rf $(WORKING_DIRECTORY)/.tmp/$(1) || true endef -simulators: +simulators/clean: + $(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME)) + $(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME)) +.PHONY: simulators/clean + +simulators: simulators/clean $(call RUN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME),$(DEVICE_SIMULATOR_IMG)) $(call RUN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME),$(DEVICE_SIMULATOR_RES_OBSERVABLE_IMG)) +.PHONY: simulators env/test/mem: clean certificates nats mongo privateKeys .PHONY: env/test/mem @@ -292,12 +298,10 @@ $(test-targets): %: env build: $(SUBDIRS) -clean: +clean: simulators/clean docker rm -f mongo || true docker rm -f nats || true docker rm -f nats-cloud-connector || true - $(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME)) - $(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME)) sudo rm -rf ./.tmp/certs || true sudo rm -rf ./.tmp/mongo || true sudo rm -rf ./.tmp/home || true diff --git a/charts/plgd-hub/README.md b/charts/plgd-hub/README.md index 9e2a135e4..e183f220d 100644 --- a/charts/plgd-hub/README.md +++ b/charts/plgd-hub/README.md @@ -197,7 +197,7 @@ global: | certmanager.internal.issuer.name | string | `nil` | Name | | certmanager.internal.issuer.spec | string | `nil` | cert-manager issuer spec | | cluster.dns | string | `"cluster.local"` | Cluster internal DNS prefix | -| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters | +| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"deviceTwin":{"maxETagsCountInRequest":8,"useETags":true},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters | | coapgateway.affinity | object | `{}` | Affinity definition | | coapgateway.apis | object | `{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}}` | For complete coap-gateway service configuration see [plgd/coap-gateway](https://github.com/plgd-dev/hub/tree/main/coap-gateway) | | coapgateway.apis.coap.tls.disconnectOnExpiredCertificate | bool | `false` | After the certificate expires, the connection will be disconnected | diff --git a/charts/plgd-hub/templates/coap-gateway/config.yaml b/charts/plgd-hub/templates/coap-gateway/config.yaml index d9383b96a..5748d784a 100644 --- a/charts/plgd-hub/templates/coap-gateway/config.yaml +++ b/charts/plgd-hub/templates/coap-gateway/config.yaml @@ -172,5 +172,8 @@ data: goPoolSize: {{ .taskQueue.goPoolSize }} size: {{ .taskQueue.size }} maxIdleTime: {{ .taskQueue.maxIdleTime | quote }} + deviceTwin: + maxETagsCountInRequest: {{ deviceTwin.maxETagsCountInRequest }} + useETags: {{ deviceTwin.useETags }} {{- end }} {{- end }} diff --git a/charts/plgd-hub/values.yaml b/charts/plgd-hub/values.yaml index 038b469c0..aa99cd3d1 100644 --- a/charts/plgd-hub/values.yaml +++ b/charts/plgd-hub/values.yaml @@ -1004,6 +1004,9 @@ coapgateway: goPoolSize: 1600 size: "2097152" maxIdleTime: "10m" + deviceTwin: + maxETagsCountInRequest: 8 + useETags: true identitystore: # -- Enable identity service diff --git a/cloud2cloud-connector/service/deviceSubscriptionHandlers.go b/cloud2cloud-connector/service/deviceSubscriptionHandlers.go index b113e1c21..2f9b0276d 100644 --- a/cloud2cloud-connector/service/deviceSubscriptionHandlers.go +++ b/cloud2cloud-connector/service/deviceSubscriptionHandlers.go @@ -58,7 +58,7 @@ func (h deviceSubscriptionHandlers) OnDeviceSubscriberReconnectError(err error) type DevicesSubscription struct { ctx context.Context - data *kitSync.Map // //[deviceID]*deviceSubscription + data *kitSync.Map // [deviceID]*deviceSubscription rdClient pb.GrpcGatewayClient raClient raService.ResourceAggregateClient subscriber *subscriber.Subscriber diff --git a/coap-gateway/coapconv/coapconv.go b/coap-gateway/coapconv/coapconv.go index c2c4d0aea..08fa672b2 100644 --- a/coap-gateway/coapconv/coapconv.go +++ b/coap-gateway/coapconv/coapconv.go @@ -133,6 +133,11 @@ func NewCoapResourceRetrieveRequest(ctx context.Context, messagePool *pool.Pool, if event.GetResourceInterface() != "" { req.AddOptionString(message.URIQuery, "if="+event.GetResourceInterface()) } + if len(event.GetEtag()) > 0 { + if err := req.AddETag(event.GetEtag()); err != nil { + return nil, err + } + } return req, nil } @@ -186,6 +191,14 @@ func NewCommandMetadata(sequenceNumber uint64, connectionID string) *commands.Co } } +func getETagFromMessage(msg interface{ ETag() ([]byte, error) }) []byte { + etag, err := msg.ETag() + if err != nil { + etag = nil + } + return etag +} + func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlationID, connectionID string, req *pool.Message) *commands.ConfirmResourceRetrieveRequest { content := NewContent(req.Options(), req.Body()) metadata := NewCommandMetadata(req.Sequence(), connectionID) @@ -196,6 +209,7 @@ func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlat Status: CoapCodeToStatus(req.Code()), Content: content, CommandMetadata: metadata, + Etag: getETagFromMessage(req), } } @@ -256,6 +270,7 @@ func NewNotifyResourceChangedRequest(resourceID *commands.ResourceId, connection Content: content, CommandMetadata: metadata, Status: CoapCodeToStatus(req.Code()), + Etag: getETagFromMessage(req), } } @@ -298,6 +313,8 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec } requests := make([]*commands.NotifyResourceChangedRequest, 0, len(rs)) + etag := getETagFromMessage(req) + var latestETagResource *commands.NotifyResourceChangedRequest for _, r := range rs { isEmpty, filterOut := filterOutEmptyResource(r) if filterOut { @@ -312,8 +329,7 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec data = nil code = commands.Status_NOT_FOUND } - - requests = append(requests, &commands.NotifyResourceChangedRequest{ + resourceChangedReq := &commands.NotifyResourceChangedRequest{ ResourceId: commands.NewResourceID(deviceID, r.Href()), Content: &commands.Content{ ContentType: getContentFormatString(ct), @@ -322,7 +338,18 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec }, CommandMetadata: metadata, Status: code, - }) + Etag: r.ETag, + } + if len(etag) > 0 && bytes.Equal(etag, r.ETag) { + latestETagResource = resourceChangedReq + continue + } + requests = append(requests, resourceChangedReq) + } + // send latestETagResource need to be send as last because the resource are applied in order in resource aggregate + // so latestETagResource is the last resource in the batch + if latestETagResource != nil { + requests = append(requests, latestETagResource) } return requests, nil } @@ -379,6 +406,7 @@ func NewRetrieveResourceRequest(resourceID *commands.ResourceId, req *mux.Messag CorrelationId: correlationID, ResourceInterface: resourceInterface, CommandMetadata: metadata, + Etag: getETagFromMessage(req), }, nil } diff --git a/coap-gateway/config.yaml b/coap-gateway/config.yaml index ac2157c10..f7f342030 100644 --- a/coap-gateway/config.yaml +++ b/coap-gateway/config.yaml @@ -160,6 +160,9 @@ clients: keyFile: "/secrets/private/cert.key" certFile: "/secrets/public/cert.crt" useSystemCAPool: false +deviceTwin: + maxETagsCountInRequest: 8 + useETags: false taskQueue: goPoolSize: 1600 size: 2097152 diff --git a/coap-gateway/service/clientObserver.go b/coap-gateway/service/clientObserver.go index d46ef527a..cfdb12f32 100644 --- a/coap-gateway/service/clientObserver.go +++ b/coap-gateway/service/clientObserver.go @@ -45,7 +45,7 @@ func (c *session) replaceDeviceObserver(deviceObserverFuture *future.Future) *fu } // Replace deviceObserver instance in the client if Device Twin setting was changed for the device. -func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinEnabled bool) (bool, error) { +func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinEnabled, twinForceSynchronization bool) (bool, error) { obs, err := c.getDeviceObserver(ctx) if err != nil { return false, err @@ -53,7 +53,8 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE prevTwinEnabled := obs.GetTwinEnabled() deviceID := obs.GetDeviceID() observationType := obs.GetObservationType() - if prevTwinEnabled == twinEnabled { + twinEnabled = twinEnabled || twinForceSynchronization + if !twinForceSynchronization && prevTwinEnabled == twinEnabled { return prevTwinEnabled, nil } deviceObserverFuture, setDeviceObserver := future.New() @@ -67,6 +68,8 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE observation.WithTwinEnabled(twinEnabled), observation.WithObservationType(observationType), observation.WithLogger(c.getLogger()), observation.WithRequireBatchObserveEnabled(c.server.config.APIs.COAP.RequireBatchObserveEnabled), + observation.WithMaxETagsCountInRequest(c.server.config.DeviceTwin.MaxETagsCountInRequest), + observation.WithUseETags(!twinForceSynchronization && c.server.config.DeviceTwin.UseETags), ) if err != nil { setDeviceObserver(nil, err) diff --git a/coap-gateway/service/clientRetrieveHandler.go b/coap-gateway/service/clientRetrieveHandler.go index ebdf508c6..f63e364f3 100644 --- a/coap-gateway/service/clientRetrieveHandler.go +++ b/coap-gateway/service/clientRetrieveHandler.go @@ -1,6 +1,7 @@ package service import ( + "bytes" "context" "errors" "fmt" @@ -39,14 +40,17 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er var content *commands.Content var code coapCodes.Code resourceInterface := message.GetResourceInterface(req) + etag, err := req.ETag() + if err != nil { + etag = nil + } if resourceInterface == "" { - content, code, err = clientRetrieveFromResourceTwinHandler(req.Context(), client, deviceID, href) + content, code, err = clientRetrieveFromResourceTwinHandler(req.Context(), client, deviceID, href, etag) if err != nil { return nil, statusErrorf(code, errFmtRetrieveResource, fmt.Sprintf(" /%v%v from resource twin", deviceID, href), err) } } else { - code = coapCodes.Content - content, err = clientRetrieveFromDeviceHandler(req, client, deviceID, href) + content, code, err = clientRetrieveFromDeviceHandler(req, client, deviceID, href) if err != nil { code = coapconv.GrpcErr2CoapCode(err, coapconv.Retrieve) return nil, statusErrorf(code, errFmtRetrieveResource, fmt.Sprintf(" /%v%v from device", deviceID, href), err) @@ -63,7 +67,7 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er return client.createResponse(code, req.Token(), mediaType, content.Data), nil } -func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string) (*commands.Content, coapCodes.Code, error) { +func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string, etag []byte) (*commands.Content, coapCodes.Code, error) { RetrieveResourcesClient, err := client.server.rdClient.GetResources(ctx, &pbGRPC.GetResourcesRequest{ ResourceIdFilter: []string{ commands.NewResourceID(deviceID, href).ToString(), @@ -86,26 +90,29 @@ func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, return nil, coapconv.GrpcErr2CoapCode(err, coapconv.Retrieve), err } if resourceValue.GetData().GetResourceId().GetDeviceId() == deviceID && resourceValue.GetData().GetResourceId().GetHref() == href && resourceValue.GetData().Content != nil { + if etag != nil && bytes.Equal(etag, resourceValue.GetData().GetEtag()) { + return nil, coapCodes.Valid, nil + } return resourceValue.GetData().Content, coapCodes.Content, nil } } return nil, coapCodes.NotFound, fmt.Errorf("not found") } -func clientRetrieveFromDeviceHandler(req *mux.Message, client *session, deviceID, href string) (*commands.Content, error) { +func clientRetrieveFromDeviceHandler(req *mux.Message, client *session, deviceID, href string) (*commands.Content, coapCodes.Code, error) { retrieveCommand, err := coapconv.NewRetrieveResourceRequest(commands.NewResourceID(deviceID, href), req, client.RemoteAddr().String()) if err != nil { - return nil, err + return nil, coapCodes.ServiceUnavailable, err } retrievedEvent, err := client.server.raClient.SyncRetrieveResource(req.Context(), "*", retrieveCommand) if err != nil { - return nil, err + return nil, coapCodes.ServiceUnavailable, err } content, err := commands.EventContentToContent(retrievedEvent) if err != nil { - return nil, err + return nil, coapCodes.ServiceUnavailable, err } - return content, nil + return content, coapconv.StatusToCoapCode(retrievedEvent.GetStatus(), coapconv.Retrieve), nil } diff --git a/coap-gateway/service/clientRetrieveHandler_test.go b/coap-gateway/service/clientRetrieveHandler_test.go index 716ac278d..1484fed73 100644 --- a/coap-gateway/service/clientRetrieveHandler_test.go +++ b/coap-gateway/service/clientRetrieveHandler_test.go @@ -10,13 +10,18 @@ import ( "github.com/plgd-dev/go-coap/v3/message" coapCodes "github.com/plgd-dev/go-coap/v3/message/codes" + coapgwTest "github.com/plgd-dev/hub/v2/coap-gateway/test" "github.com/plgd-dev/hub/v2/coap-gateway/uri" + "github.com/plgd-dev/hub/v2/pkg/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestClientRetrieveHandler(t *testing.T) { - shutdown := setUp(t) + coapgwCfg := coapgwTest.MakeConfig(t) + coapgwCfg.Log.DumpBody = true + coapgwCfg.Log.Level = log.DebugLevel + shutdown := setUp(t, coapgwCfg) defer shutdown() co := testCoapDial(t, "", true, true, time.Now().Add(time.Minute)) @@ -30,6 +35,7 @@ func TestClientRetrieveHandler(t *testing.T) { type args struct { path string query string + etag []byte } tests := []struct { name string @@ -65,6 +71,24 @@ func TestClientRetrieveHandler(t *testing.T) { }, wantsCode: coapCodes.Content, }, + { + name: "found with etag", + args: args{ + path: uri.ResourceRoute + "/" + CertIdentity + TestAResourceHref, + etag: []byte(TestETag), + }, + wantsCode: coapCodes.Valid, + }, + + { + name: "found with with interface,etag", + args: args{ + path: uri.ResourceRoute + "/" + CertIdentity + TestAResourceHref, + etag: []byte(TestETag), + query: "if=oic.if.baseline", + }, + wantsCode: coapCodes.Valid, + }, } testPrepareDevice(t, co) @@ -75,6 +99,9 @@ func TestClientRetrieveHandler(t *testing.T) { defer cancel() req, err := co.NewGetRequest(ctx, tt.args.path) require.NoError(t, err) + if tt.args.etag != nil { + req.SetOptionBytes(message.ETag, tt.args.etag) + } if tt.args.query != "" { req.SetOptionString(message.URIQuery, tt.args.query) } diff --git a/coap-gateway/service/config.go b/coap-gateway/service/config.go index c2fa93376..c52159687 100644 --- a/coap-gateway/service/config.go +++ b/coap-gateway/service/config.go @@ -19,10 +19,11 @@ import ( // Config represent application configuration type Config struct { - Log LogConfig `yaml:"log" json:"log"` - APIs APIsConfig `yaml:"apis" json:"apis"` - Clients ClientsConfig `yaml:"clients" json:"clients"` - TaskQueue queue.Config `yaml:"taskQueue" json:"taskQueue"` + Log LogConfig `yaml:"log" json:"log"` + APIs APIsConfig `yaml:"apis" json:"apis"` + Clients ClientsConfig `yaml:"clients" json:"clients"` + DeviceTwin DeviceTwinConfig `yaml:"deviceTwin" json:"deviceTwin"` + TaskQueue queue.Config `yaml:"taskQueue" json:"taskQueue"` } func (c *Config) Validate() error { @@ -107,6 +108,11 @@ func (c *InjectedCOAPConfig) Validate() error { return nil } +type DeviceTwinConfig struct { + MaxETagsCountInRequest uint32 `yaml:"maxETagsCountInRequest" json:"maxETagsCountInRequest"` + UseETags bool `yaml:"useETags" json:"useETags"` +} + type COAPConfig struct { coapService.Config `yaml:",inline" json:",inline"` ExternalAddress string `yaml:"externalAddress" json:"externalAddress"` @@ -114,7 +120,8 @@ type COAPConfig struct { OwnerCacheExpiration time.Duration `yaml:"ownerCacheExpiration" json:"ownerCacheExpiration"` SubscriptionBufferSize int `yaml:"subscriptionBufferSize" json:"subscriptionBufferSize"` RequireBatchObserveEnabled bool `yaml:"requireBatchObserveEnabled" json:"requireBatchObserveEnabled"` - InjectedCOAPConfig InjectedCOAPConfig `yaml:"-" json:"-"` + + InjectedCOAPConfig InjectedCOAPConfig `yaml:"-" json:"-"` } type COAPConfigMarshalerUnmarshaler struct { diff --git a/coap-gateway/service/message/log.go b/coap-gateway/service/message/log.go index 1ae8271f2..2e10e17c4 100644 --- a/coap-gateway/service/message/log.go +++ b/coap-gateway/service/message/log.go @@ -17,10 +17,11 @@ type JsonCoapMessage struct { Observe *uint32 `json:"observe,omitempty"` ContentFormat string `json:"contentFormat,omitempty"` Body interface{} `json:"body,omitempty"` + Etag []byte `json:"etag,omitempty"` } func (c JsonCoapMessage) IsEmpty() bool { - return c.Code == "" && c.Path == "" && c.Token == "" && len(c.Queries) == 0 && c.Observe == nil && c.ContentFormat == "" && c.Body == nil + return c.Code == "" && c.Path == "" && c.Token == "" && len(c.Queries) == 0 && c.Observe == nil && c.ContentFormat == "" && c.Body == nil && c.Etag == nil } func readBody(r io.ReadSeeker) []byte { @@ -102,6 +103,11 @@ func ToJson(m *pool.Message, withBody, withToken bool) JsonCoapMessage { if withToken { token = m.Token().String() } + var etag []byte + e, err := m.ETag() + if err == nil { + etag = e + } msg := JsonCoapMessage{ Code: m.Code().String(), @@ -110,6 +116,7 @@ func ToJson(m *pool.Message, withBody, withToken bool) JsonCoapMessage { Queries: queries, Observe: obs, Body: body, + Etag: etag, } return msg } diff --git a/coap-gateway/service/observation/deviceObserver.go b/coap-gateway/service/observation/deviceObserver.go index 687110cff..7020d318c 100644 --- a/coap-gateway/service/observation/deviceObserver.go +++ b/coap-gateway/service/observation/deviceObserver.go @@ -17,6 +17,7 @@ import ( "github.com/plgd-dev/hub/v2/pkg/log" pkgStrings "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" + pbRD "github.com/plgd-dev/hub/v2/resource-directory/pb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -27,6 +28,7 @@ type ObservationType int type GrpcGatewayClient interface { GetDevicesMetadata(ctx context.Context, in *pb.GetDevicesMetadataRequest, opts ...grpc.CallOption) (pb.GrpcGateway_GetDevicesMetadataClient, error) GetResourceLinks(ctx context.Context, in *pb.GetResourceLinksRequest, opts ...grpc.CallOption) (pb.GrpcGateway_GetResourceLinksClient, error) + GetLatestDeviceETags(ctx context.Context, in *pbRD.GetLatestDeviceETagsRequest, opts ...grpc.CallOption) (*pbRD.GetLatestDeviceETagsResponse, error) } type ResourceAggregateClient interface { @@ -55,7 +57,9 @@ type DeviceObserverConfig struct { ObservationType ObservationType TwinEnabled bool TwinEnabledSet bool + UseETags bool RequireBatchObserveEnabled bool + MaxETagsCountInRequest uint32 } type ClientConn interface { @@ -115,6 +119,21 @@ func WithTwinEnabled(twinEnabled bool) TwinEnabledOpt { } } +// Force twinEnabled value +type UseETagsOpt struct { + useETags bool +} + +func (o UseETagsOpt) Apply(opts *DeviceObserverConfig) { + opts.UseETags = o.useETags +} + +func WithUseETags(useETags bool) UseETagsOpt { + return UseETagsOpt{ + useETags: useETags, + } +} + // Set logger option type LoggerOpt struct { logger log.Logger @@ -130,6 +149,21 @@ func (o LoggerOpt) Apply(opts *DeviceObserverConfig) { opts.Logger = o.logger } +// Limit of the number of latest etags acquired from event store. +type MaxETagsCountInRequestOpt struct { + maxETagsCountInRequest uint32 +} + +func (o MaxETagsCountInRequestOpt) Apply(opts *DeviceObserverConfig) { + opts.MaxETagsCountInRequest = o.maxETagsCountInRequest +} + +func WithMaxETagsCountInRequest(v uint32) MaxETagsCountInRequestOpt { + return MaxETagsCountInRequestOpt{ + maxETagsCountInRequest: v, + } +} + func prepareSetupDeviceObserver(ctx context.Context, deviceID string, coapConn ClientConn, rdClient GrpcGatewayClient, raClient ResourceAggregateClient, cfg DeviceObserverConfig) (DeviceObserverConfig, []*commands.Resource, error) { links, sequence, err := GetResourceLinks(ctx, coapConn, resources.ResourceURI) switch { @@ -164,6 +198,21 @@ func prepareSetupDeviceObserver(ctx context.Context, deviceID string, coapConn C return cfg, published, nil } +func getETags(ctx context.Context, deviceID string, rdClient GrpcGatewayClient, cfg DeviceObserverConfig) [][]byte { + if !cfg.UseETags { + return nil + } + r, err := rdClient.GetLatestDeviceETags(ctx, &pbRD.GetLatestDeviceETagsRequest{ + DeviceId: deviceID, + Limit: cfg.MaxETagsCountInRequest, + }) + if err != nil { + cfg.Logger.Debugf("NewDeviceObserver: failed to get latest device(%v) etag: %v", deviceID, err) + return nil + } + return r.GetEtags() +} + // Create new deviceObserver with given settings func NewDeviceObserver(ctx context.Context, deviceID string, coapConn ClientConn, rdClient GrpcGatewayClient, raClient ResourceAggregateClient, callbacks ResourcesObserverCallbacks, opts ...Option) (*DeviceObserver, error) { createError := func(err error) error { @@ -174,7 +223,9 @@ func NewDeviceObserver(ctx context.Context, deviceID string, coapConn ClientConn } cfg := DeviceObserverConfig{ - Logger: log.Get(), + Logger: log.Get(), + MaxETagsCountInRequest: 8, + UseETags: true, } for _, o := range opts { o.Apply(&cfg) @@ -202,7 +253,8 @@ func NewDeviceObserver(ctx context.Context, deviceID string, coapConn ClientConn } if cfg.ObservationType == ObservationType_PerDevice { - resourcesObserver, err := createDiscoveryResourceObserver(ctx, deviceID, coapConn, callbacks, cfg.Logger) + etags := getETags(ctx, deviceID, rdClient, cfg) + resourcesObserver, err := createDiscoveryResourceObserver(ctx, deviceID, coapConn, callbacks, etags, cfg.Logger) if err == nil { return &DeviceObserver{ deviceID: deviceID, @@ -305,13 +357,13 @@ func loadTwinEnabled(ctx context.Context, rdClient GrpcGatewayClient, deviceID s } // Create observer with a single observation for /oic/res resource. -func createDiscoveryResourceObserver(ctx context.Context, deviceID string, coapConn ClientConn, callbacks ResourcesObserverCallbacks, logger log.Logger) (*resourcesObserver, error) { +func createDiscoveryResourceObserver(ctx context.Context, deviceID string, coapConn ClientConn, callbacks ResourcesObserverCallbacks, etags [][]byte, logger log.Logger) (*resourcesObserver, error) { resourcesObserver := newResourcesObserver(deviceID, coapConn, callbacks, logger) err := resourcesObserver.addResource(ctx, &commands.Resource{ DeviceId: resourcesObserver.deviceID, Href: resources.ResourceURI, Policy: &commands.Policy{BitFlags: int32(schema.Observable)}, - }, interfaces.OC_IF_B) + }, interfaces.OC_IF_B, etags) if err != nil { resourcesObserver.CleanObservedResources(ctx) return nil, err @@ -322,7 +374,7 @@ func createDiscoveryResourceObserver(ctx context.Context, deviceID string, coapC // Create observer with a single observations for all published resources. func createPublishedResourcesObserver(ctx context.Context, deviceID string, coapConn ClientConn, callbacks ResourcesObserverCallbacks, published []*commands.Resource, logger log.Logger) (*resourcesObserver, error) { resourcesObserver := newResourcesObserver(deviceID, coapConn, callbacks, logger) - + // TODO get ETAG for each resource err := resourcesObserver.addResources(ctx, published) if err != nil { resourcesObserver.CleanObservedResources(ctx) diff --git a/coap-gateway/service/observation/deviceObserver_test.go b/coap-gateway/service/observation/deviceObserver_test.go index ad554c9bf..ffac68de1 100644 --- a/coap-gateway/service/observation/deviceObserver_test.go +++ b/coap-gateway/service/observation/deviceObserver_test.go @@ -29,6 +29,7 @@ import ( "github.com/plgd-dev/hub/v2/resource-aggregate/commands" raPb "github.com/plgd-dev/hub/v2/resource-aggregate/service" raTest "github.com/plgd-dev/hub/v2/resource-aggregate/test" + pbRD "github.com/plgd-dev/hub/v2/resource-directory/pb" "github.com/plgd-dev/hub/v2/test" coapgwTestService "github.com/plgd-dev/hub/v2/test/coap-gateway/service" coapgwTest "github.com/plgd-dev/hub/v2/test/coap-gateway/test" @@ -46,8 +47,13 @@ import ( "google.golang.org/grpc/credentials" ) +type RDClient struct { + pb.GrpcGatewayClient + pbRD.ResourceDirectoryClient +} + type deviceObserverFactory struct { - rdClient pb.GrpcGatewayClient + rdClient RDClient raClient raPb.ResourceAggregateClient deviceID string } @@ -395,7 +401,12 @@ func runTestDeviceObserverRegister(ctx context.Context, t *testing.T, deviceID s defer func() { _ = rdConn.Close() }() - rdClient := pb.NewGrpcGatewayClient(rdConn.GRPC()) + grpcC := pb.NewGrpcGatewayClient(rdConn.GRPC()) + rdC := pbRD.NewResourceDirectoryClient(rdConn.GRPC()) + rdClient := RDClient{ + GrpcGatewayClient: grpcC, + ResourceDirectoryClient: rdC, + } raConn, err := grpcClient.New(config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), fileWatcher, log.Get(), trace.NewNoopTracerProvider()) require.NoError(t, err) diff --git a/coap-gateway/service/observation/observedResource.go b/coap-gateway/service/observation/observedResource.go index 5b2d948c1..51d9bfe3f 100644 --- a/coap-gateway/service/observation/observedResource.go +++ b/coap-gateway/service/observation/observedResource.go @@ -2,9 +2,13 @@ package observation import ( "context" + "encoding/base64" "sort" + "strings" "sync" + "github.com/plgd-dev/device/v2/schema/interfaces" + "github.com/plgd-dev/go-coap/v3/message" "go.uber.org/atomic" ) @@ -45,6 +49,42 @@ func (r *observedResource) Interface() string { return r.resInterface } +const ( + // maxURIQueryLen is the maximum length of a URI query. See https://datatracker.ietf.org/doc/html/rfc7252#section-5.10 + maxURIQueryLen = 255 + // maxETagLen is the maximum length of an ETag. See https://datatracker.ietf.org/doc/html/rfc7252#section-5.10 + maxETagLen = 8 + // prefixQueryIncChanged is the prefix of the URI query for the "incremental changed" option. See https://docs.plgd.dev/docs/features/control-plane/entity-tag/#etag-batch-interface-for-oicres + prefixQueryIncChanges = "incChanges=" +) + +func encodeETagsForIncrementChanges(etags [][]byte) []string { + if len(etags) < 1 { + return nil + } + etagsStr := make([]string, 0, (len(etags)/15)+1) + var b strings.Builder + for _, etag := range etags { + if len(etag) > maxETagLen { + continue + } + if b.Len() == 0 { + b.WriteString(prefixQueryIncChanges) + } else { + b.WriteString(",") + } + b.WriteString(base64.RawURLEncoding.EncodeToString(etag)) + if b.Len() >= maxURIQueryLen-(maxETagLen*2) { + etagsStr = append(etagsStr, b.String()) + b.Reset() + } + } + if b.Len() > 0 { + etagsStr = append(etagsStr, b.String()) + } + return etagsStr +} + func (r *observedResource) SetObservation(o Observation) { r.private.mutex.Lock() defer r.private.mutex.Unlock() @@ -59,6 +99,37 @@ func (r *observedResource) PopObservation() Observation { return o } +func (r *observedResource) isBatchObservation() bool { + return r.resInterface == interfaces.OC_IF_B +} + +func (r *observedResource) toCoapOptions(etags [][]byte) []message.Option { + opts := make([]message.Option, 0, 2) + if len(etags) > 0 { + opts = append(opts, message.Option{ + ID: message.ETag, + Value: etags[0], + }) + etags = etags[1:] + } + if r.Interface() != "" { + opts = append(opts, message.Option{ + ID: message.URIQuery, + Value: []byte("if=" + r.Interface()), + }) + } + + if r.isBatchObservation() { + for _, q := range encodeETagsForIncrementChanges(etags) { + opts = append(opts, message.Option{ + ID: message.URIQuery, + Value: []byte(q), + }) + } + } + return opts +} + type observedResources []*observedResource func (o observedResources) contains(href string) bool { diff --git a/coap-gateway/service/observation/observedResource_internal_test.go b/coap-gateway/service/observation/observedResource_internal_test.go new file mode 100644 index 000000000..9632bb21d --- /dev/null +++ b/coap-gateway/service/observation/observedResource_internal_test.go @@ -0,0 +1,93 @@ +package observation + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) { + tests := []struct { + name string + etags [][]byte + want []string + }{ + { + name: "empty", + etags: nil, + }, + { + name: "not-nil", + etags: [][]byte{}, + }, + { + name: "one-etag", + etags: [][]byte{ + []byte("01234567"), + }, + want: []string{ + prefixQueryIncChanges + "MDEyMzQ1Njc", + }, + }, + { + name: "two-etags", + etags: [][]byte{ + []byte("1"), + []byte("2"), + }, + want: []string{ + prefixQueryIncChanges + "MQ,Mg", + }, + }, + { + name: "two-etags-invalid-etag", + etags: [][]byte{ + []byte("1"), + []byte("2"), + []byte("invalid-etag-is-ignored"), + }, + want: []string{ + prefixQueryIncChanges + "MQ,Mg", + }, + }, + { + name: "multiple-etags", + etags: [][]byte{ + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), + []byte("01234567"), // 21 + }, + want: []string{ + prefixQueryIncChanges + "MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc", + prefixQueryIncChanges + "MDEyMzQ1Njc", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := encodeETagsForIncrementChanges(tt.etags) + for _, g := range got { + assert.Less(t, len(g), 255) // RFC 7641 - Uri-query length + } + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/coap-gateway/service/observation/resourcesObserver.go b/coap-gateway/service/observation/resourcesObserver.go index 7f3f5da7e..ffe438afe 100644 --- a/coap-gateway/service/observation/resourcesObserver.go +++ b/coap-gateway/service/observation/resourcesObserver.go @@ -7,7 +7,6 @@ import ( "time" "github.com/hashicorp/go-multierror" - "github.com/plgd-dev/device/v2/schema/interfaces" "github.com/plgd-dev/device/v2/schema/resources" "github.com/plgd-dev/go-coap/v3/message" "github.com/plgd-dev/go-coap/v3/message/pool" @@ -90,13 +89,13 @@ func newResourcesObserver(deviceID string, coapConn ClientConn, callbacks Resour } // Add resource to observer with given interface and wait for initialization message. -func (o *resourcesObserver) addResource(ctx context.Context, res *commands.Resource, obsInterface string) error { +func (o *resourcesObserver) addResource(ctx context.Context, res *commands.Resource, obsInterface string, etags [][]byte) error { o.private.lock.Lock() defer o.private.lock.Unlock() obs, err := o.addResourceLocked(res, obsInterface) if err == nil && obs != nil { o.notifyAboutStartTwinSynchronization(ctx) - err = o.performObservationLocked([]*observedResource{obs}) + err = o.performObservationLocked([]*observedResourceWithETags{{observedResource: obs, etags: etags}}) if err != nil { o.private.resources, _ = o.private.resources.removeByHref(obs.Href()) defer o.notifyAboutFinishTwinSynchronization(ctx) @@ -164,24 +163,28 @@ func (o *resourcesObserver) addResourceLocked(res *commands.Resource, obsInterfa // // For observable resources subscribe to observations, for unobservable resources retrieve // their content. -func (o *resourcesObserver) handleResource(ctx context.Context, obsRes *observedResource) error { +func (o *resourcesObserver) handleResource(ctx context.Context, obsRes *observedResource, etags [][]byte) error { if obsRes.Href() == commands.StatusHref { return nil } if obsRes.isObservable { - obs, err := o.observeResource(ctx, obsRes) + obs, err := o.observeResource(ctx, obsRes, etags) if err != nil { return err } obsRes.SetObservation(obs) return nil } - return o.getResourceContent(ctx, obsRes.Href()) + var etag []byte + if len(etags) > 0 { + etag = etags[0] + } + return o.getResourceContent(ctx, obsRes.Href(), etag) } // Register to COAP-GW resource observation for given resource -func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observedResource) (Observation, error) { +func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observedResource, etags [][]byte) (Observation, error) { cannotObserveResourceError := func(deviceID, href string, err error) error { return fmt.Errorf("cannot observe resource /%v%v: %w", deviceID, href, err) } @@ -189,15 +192,8 @@ func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observe return nil, cannotObserveResourceError(o.deviceID, obsRes.Href(), emptyDeviceIDError()) } - var opts []message.Option - if obsRes.Interface() != "" { - opts = append(opts, message.Option{ - ID: message.URIQuery, - Value: []byte("if=" + obsRes.Interface()), - }) - } - - batchObservation := obsRes.resInterface == interfaces.OC_IF_B + opts := obsRes.toCoapOptions(etags) + batchObservation := obsRes.isBatchObservation() returnIfNonObservable := batchObservation && obsRes.Href() == resources.ResourceURI obs, err := o.coapConn.Observe(ctx, obsRes.Href(), func(msg *pool.Message) { @@ -223,14 +219,22 @@ func (o *resourcesObserver) observeResource(ctx context.Context, obsRes *observe } // Request resource content form COAP-GW -func (o *resourcesObserver) getResourceContent(ctx context.Context, href string) error { +func (o *resourcesObserver) getResourceContent(ctx context.Context, href string, etag []byte) error { cannotGetResourceError := func(deviceID, href string, err error) error { return fmt.Errorf("cannot get resource /%v%v content: %w", deviceID, href, err) } if o.deviceID == "" { return cannotGetResourceError(o.deviceID, href, emptyDeviceIDError()) } - resp, err := o.coapConn.Get(ctx, href) + opts := make([]message.Option, 0, 1) + if etag != nil { + // we use only first etag + opts = append(opts, message.Option{ + ID: message.ETag, + Value: etag, + }) + } + resp, err := o.coapConn.Get(ctx, href, opts...) if err != nil { return cannotGetResourceError(o.deviceID, href, err) } @@ -259,10 +263,15 @@ func (o *resourcesObserver) notifyAboutFinishTwinSynchronization(ctx context.Con } } -func (o *resourcesObserver) performObservationLocked(obs []*observedResource) error { +type observedResourceWithETags struct { + *observedResource + etags [][]byte +} + +func (o *resourcesObserver) performObservationLocked(obs []*observedResourceWithETags) error { var errors *multierror.Error for _, obsRes := range obs { - if err := o.handleResource(context.Background(), obsRes); err != nil { + if err := o.handleResource(context.Background(), obsRes.observedResource, obsRes.etags); err != nil { o.private.resources, _ = o.private.resources.removeByHref(obsRes.Href()) errors = multierror.Append(errors, err) } @@ -295,15 +304,15 @@ func (o *resourcesObserver) addResources(ctx context.Context, resources []*comma return errors.ErrorOrNil() } -func (o *resourcesObserver) addResourcesLocked(resources []*commands.Resource) ([]*observedResource, error) { +func (o *resourcesObserver) addResourcesLocked(resources []*commands.Resource) ([]*observedResourceWithETags, error) { var errors *multierror.Error - observedResources := make([]*observedResource, 0, len(resources)) + observedResources := make([]*observedResourceWithETags, 0, len(resources)) for _, resource := range resources { observedResource, err := o.addResourceLocked(resource, "") if err != nil { errors = multierror.Append(errors, err) } else if observedResource != nil { - observedResources = append(observedResources, observedResource) + observedResources = append(observedResources, &observedResourceWithETags{observedResource: observedResource}) } } if errors.ErrorOrNil() != nil { diff --git a/coap-gateway/service/service.go b/coap-gateway/service/service.go index b4cdd96f7..afe849ade 100644 --- a/coap-gateway/service/service.go +++ b/coap-gateway/service/service.go @@ -42,6 +42,7 @@ import ( natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/subscriber" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/utils" + pbRD "github.com/plgd-dev/hub/v2/resource-directory/pb" otelCodes "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" @@ -49,13 +50,18 @@ import ( var authCtxKey = "AuthCtx" +type resourceDirectoryClient struct { + pbGRPC.GrpcGatewayClient + pbRD.ResourceDirectoryClient +} + // Service is a configuration of coap-gateway type Service struct { ctx context.Context tracerProvider trace.TracerProvider logger log.Logger isClient pbIS.IdentityStoreClient - rdClient pbGRPC.GrpcGatewayClient + rdClient *resourceDirectoryClient certificateAuthorityClient pbCA.CertificateAuthorityClient devicesStatusUpdater *devicesStatusUpdater providers map[string]*oauth2.PlgdProvider @@ -143,7 +149,7 @@ func newIdentityStoreClient(config IdentityStoreConfig, fileWatcher *fsnotify.Wa return isClient, closeIsConn, nil } -func newResourceDirectoryClient(config GrpcServerConfig, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (pbGRPC.GrpcGatewayClient, func(), error) { +func newResourceDirectoryClient(config GrpcServerConfig, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*resourceDirectoryClient, func(), error) { rdConn, err := grpcClient.New(config.Connection, fileWatcher, logger, tracerProvider) if err != nil { return nil, nil, fmt.Errorf("cannot create connection to resource-directory: %w", err) @@ -156,8 +162,11 @@ func newResourceDirectoryClient(config GrpcServerConfig, fileWatcher *fsnotify.W logger.Errorf("error occurs during close connection to resource-directory: %v", err) } } - rdClient := pbGRPC.NewGrpcGatewayClient(rdConn.GRPC()) - return rdClient, closeRdConn, nil + + return &resourceDirectoryClient{ + GrpcGatewayClient: pbGRPC.NewGrpcGatewayClient(rdConn.GRPC()), + ResourceDirectoryClient: pbRD.NewResourceDirectoryClient(rdConn.GRPC()), + }, closeRdConn, nil } func newCertificateAuthorityClient(config GrpcServerConfig, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (pbCA.CertificateAuthorityClient, func(), error) { diff --git a/coap-gateway/service/session.go b/coap-gateway/service/session.go index c24c5b912..c0d994d1b 100644 --- a/coap-gateway/service/session.go +++ b/coap-gateway/service/session.go @@ -32,6 +32,7 @@ import ( "github.com/plgd-dev/hub/v2/pkg/sync/task/future" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/events" + pbRD "github.com/plgd-dev/hub/v2/resource-directory/pb" kitSync "github.com/plgd-dev/kit/v2/sync" otelCodes "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" @@ -260,6 +261,15 @@ func (c *session) GetDevicesMetadata(ctx context.Context, in *pb.GetDevicesMetad return c.server.rdClient.GetDevicesMetadata(ctx, in, opts...) } +func (c *session) GetLatestDeviceETags(ctx context.Context, in *pbRD.GetLatestDeviceETagsRequest, opts ...grpc.CallOption) (*pbRD.GetLatestDeviceETagsResponse, error) { + authCtx, err := c.GetAuthorizationContext() + if err != nil { + return nil, err + } + ctx = kitNetGrpc.CtxWithToken(ctx, authCtx.GetAccessToken()) + return c.server.rdClient.GetLatestDeviceETags(ctx, in, opts...) +} + func (c *session) GetResourceLinks(ctx context.Context, in *pb.GetResourceLinksRequest, opts ...grpc.CallOption) (pb.GrpcGateway_GetResourceLinksClient, error) { authCtx, err := c.GetAuthorizationContext() if err != nil { @@ -524,6 +534,15 @@ func (c *session) notifyContentChanged(deviceID, href string, batch bool, notifi // we want to log only observations c.logNotificationFromClient(href, notification) } + // the content of the resource is up to date, codes.Valid is used to indicate that the resource has not changed for GET with the etag. + bodySize, err := notification.BodySize() + if err != nil { + return notifyError(deviceID, href, err) + } + if notification.Code() == codes.Valid && bodySize == 0 { + c.Debugf("resource /%v%v content is up to date", deviceID, href) + return nil + } ctx := kitNetGrpc.CtxWithToken(c.Context(), authCtx.GetAccessToken()) if batch && href == resources.ResourceURI { err = c.batchNotifyContentChanged(ctx, deviceID, notification) @@ -908,18 +927,25 @@ func (c *session) GetContext() context.Context { } func (c *session) confirmDeviceMetadataUpdate(ctx context.Context, event *events.DeviceMetadataUpdatePending) error { - _, err := c.server.raClient.ConfirmDeviceMetadataUpdate(ctx, &commands.ConfirmDeviceMetadataUpdateRequest{ + r := &commands.ConfirmDeviceMetadataUpdateRequest{ DeviceId: event.GetDeviceId(), CorrelationId: event.GetAuditContext().GetCorrelationId(), - Confirm: &commands.ConfirmDeviceMetadataUpdateRequest_TwinEnabled{ - TwinEnabled: event.GetTwinEnabled(), - }, CommandMetadata: &commands.CommandMetadata{ ConnectionId: c.RemoteAddr().String(), Sequence: c.coapConn.Sequence(), }, Status: commands.Status_OK, - }) + } + if event.GetTwinForceSynchronization() { + r.Confirm = &commands.ConfirmDeviceMetadataUpdateRequest_TwinForceSynchronization{ + TwinForceSynchronization: true, + } + } else { + r.Confirm = &commands.ConfirmDeviceMetadataUpdateRequest_TwinEnabled{ + TwinEnabled: event.GetTwinEnabled(), + } + } + _, err := c.server.raClient.ConfirmDeviceMetadataUpdate(ctx, r) return err } @@ -934,21 +960,24 @@ func (c *session) UpdateDeviceMetadata(ctx context.Context, event *events.Device c.Close() return fmt.Errorf("cannot update device('%v') metadata: %w", event.GetDeviceId(), err) } - if _, ok := event.GetUpdatePending().(*events.DeviceMetadataUpdatePending_TwinEnabled); !ok { + switch event.GetUpdatePending().(type) { + case *events.DeviceMetadataUpdatePending_TwinEnabled: + case *events.DeviceMetadataUpdatePending_TwinForceSynchronization: + default: return nil } sendConfirmCtx := authCtx.ToContext(ctx) var errObs error var previous bool - if event.GetTwinEnabled() { + if event.GetTwinEnabled() || event.GetTwinForceSynchronization() { // if twin is enabled, we need to first update twin synchronization state to sync out // and then synchronization state will be updated by other replaceDeviceObserverWithDeviceTwin err = c.confirmDeviceMetadataUpdate(sendConfirmCtx, event) - previous, errObs = c.replaceDeviceObserverWithDeviceTwin(sendConfirmCtx, event.GetTwinEnabled()) + previous, errObs = c.replaceDeviceObserverWithDeviceTwin(sendConfirmCtx, event.GetTwinEnabled(), event.GetTwinForceSynchronization()) } else { // if twin is disabled, we to stop observation resources to disable all update twin synchronization state - previous, errObs = c.replaceDeviceObserverWithDeviceTwin(sendConfirmCtx, event.GetTwinEnabled()) + previous, errObs = c.replaceDeviceObserverWithDeviceTwin(sendConfirmCtx, event.GetTwinEnabled(), false) // and then we need to update twin synchronization state to disabled err = c.confirmDeviceMetadataUpdate(sendConfirmCtx, event) } @@ -957,7 +986,7 @@ func (c *session) UpdateDeviceMetadata(ctx context.Context, event *events.Device return fmt.Errorf("cannot update device('%v') metadata: %w", event.GetDeviceId(), errObs) } if err != nil && !errors.Is(err, context.Canceled) { - _, errObs := c.replaceDeviceObserverWithDeviceTwin(sendConfirmCtx, previous) + _, errObs := c.replaceDeviceObserverWithDeviceTwin(sendConfirmCtx, previous, false) if errObs != nil { c.Close() c.Errorf("update device('%v') metadata error: %w", event.GetDeviceId(), errObs) diff --git a/coap-gateway/service/signIn.go b/coap-gateway/service/signIn.go index d7ba930bf..c503d3629 100644 --- a/coap-gateway/service/signIn.go +++ b/coap-gateway/service/signIn.go @@ -227,6 +227,8 @@ func asyncCreateDeviceObserver(x asyncCreateDeviceObserverArg) { observation.WithLogger(x.client.getLogger()), observation.WithRequireBatchObserveEnabled(x.client.server.config.APIs.COAP.RequireBatchObserveEnabled), observation.WithTwinEnabled(x.twinEnabled), + observation.WithMaxETagsCountInRequest(x.client.server.config.DeviceTwin.MaxETagsCountInRequest), + observation.WithUseETags(x.client.server.config.DeviceTwin.UseETags), ) if err != nil { x.client.Close() diff --git a/coap-gateway/service/utils_test.go b/coap-gateway/service/utils_test.go index 9895cab9e..9db149c2b 100644 --- a/coap-gateway/service/utils_test.go +++ b/coap-gateway/service/utils_test.go @@ -354,7 +354,12 @@ func testCoapDial(t *testing.T, deviceID string, withTLS, identityCert bool, val case codes.POST: err = w.SetResponse(codes.Changed, message.TextPlain, bytes.NewReader(resp)) case codes.GET: - err = w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader(resp)) + etag, err := r.ETag() + if err == nil && bytes.Equal(etag, []byte(TestETag)) { + err = w.SetResponse(codes.Valid, message.TextPlain, nil) + } else { + err = w.SetResponse(codes.Content, message.TextPlain, bytes.NewReader(resp), message.Option{ID: message.ETag, Value: []byte(TestETag)}) + } case codes.PUT: err = w.SetResponse(codes.Created, message.TextPlain, bytes.NewReader(resp)) case codes.DELETE: @@ -391,4 +396,5 @@ var ( TestExchangeTimeout = time.Second * 15 TestLogDebug = true + TestETag = "12345678" ) diff --git a/coap-gateway/test/test.go b/coap-gateway/test/test.go index 613874629..999ecb29b 100644 --- a/coap-gateway/test/test.go +++ b/coap-gateway/test/test.go @@ -21,6 +21,7 @@ func MakeConfig(t require.TestingT) service.Config { cfg.TaskQueue.Size = 2 * 1024 * 1024 cfg.APIs.COAP.Addr = config.COAP_GW_HOST cfg.APIs.COAP.RequireBatchObserveEnabled = false + cfg.DeviceTwin.MaxETagsCountInRequest = 3 cfg.APIs.COAP.ExternalAddress = config.COAP_GW_HOST cfg.APIs.COAP.Protocols = []coapService.Protocol{coapService.TCP} if config.COAP_GATEWAY_UDP_ENABLED { diff --git a/go.mod b/go.mod index 1795689ab..71efb681b 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/panjf2000/ants/v2 v2.8.1 github.com/pion/dtls/v2 v2.2.7 github.com/pion/logging v0.2.2 - github.com/plgd-dev/device/v2 v2.2.1-0.20230802115723-ba0b9a78abfd + github.com/plgd-dev/device/v2 v2.2.1-0.20230801145938-032719b097fc github.com/plgd-dev/go-coap/v3 v3.1.4-0.20230802114331-351cd00bab2d github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90 github.com/pseudomuto/protoc-gen-doc v1.5.1 diff --git a/go.sum b/go.sum index 14921a3f6..c4f6d3859 100644 --- a/go.sum +++ b/go.sum @@ -252,8 +252,8 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/plgd-dev/device/v2 v2.2.1-0.20230802115723-ba0b9a78abfd h1:jGNpQzY5gswbvCdPJj8voZktfA3w404YDSWi/AAFzZ0= -github.com/plgd-dev/device/v2 v2.2.1-0.20230802115723-ba0b9a78abfd/go.mod h1:lv93I6qbzHIDtXYdNBUkfTxpa7CdRia92OfVnjdU6ls= +github.com/plgd-dev/device/v2 v2.2.1-0.20230801145938-032719b097fc h1:j81j+HXz1rRb8nomjQnb/s2o5KW5QyuYXvBNNISEkz4= +github.com/plgd-dev/device/v2 v2.2.1-0.20230801145938-032719b097fc/go.mod h1:CKAr4/h0Hsx2JVOeB4exO276Rg5kANOm50rrzVl4joA= github.com/plgd-dev/go-coap/v2 v2.0.4-0.20200819112225-8eb712b901bc/go.mod h1:+tCi9Q78H/orWRtpVWyBgrr4vKFo2zYtbbxUllerBp4= github.com/plgd-dev/go-coap/v2 v2.4.1-0.20210517130748-95c37ac8e1fa/go.mod h1:rA7fc7ar+B/qa+Q0hRqv7yj/EMtIlmo1l7vkQGSrHPU= github.com/plgd-dev/go-coap/v3 v3.1.4-0.20230802114331-351cd00bab2d h1:dHabSkw9dQxuphK8s0lkm3EMlkyPo5C3ESQKbVrKT18= diff --git a/grpc-gateway/pb/README.md b/grpc-gateway/pb/README.md index 5185dee79..4b8c306ab 100644 --- a/grpc-gateway/pb/README.md +++ b/grpc-gateway/pb/README.md @@ -1089,6 +1089,7 @@ Certain filters perform a logical "or" operation among the elements of t | ----- | ---- | ----- | ----------- | | device_id | [string](#string) | | | | twin_enabled | [bool](#bool) | | | +| twin_force_synchronization | [bool](#bool) | | force synchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies. | | time_to_live | [int64](#int64) | | command validity in nanoseconds. 0 means forever and minimal value is 100000000 (100ms). | @@ -1298,6 +1299,7 @@ https://github.com/openconnectivityfoundation/core/blob/master/schemas/oic.links | ----- | ---- | ----- | ----------- | | device_id | [string](#string) | | | | twin_enabled | [bool](#bool) | | | +| twin_force_synchronization | [bool](#bool) | | | | audit_context | [AuditContext](#resourceaggregate-pb-AuditContext) | | | | event_metadata | [EventMetadata](#resourceaggregate-pb-EventMetadata) | | | | valid_until | [int64](#int64) | | unix timestamp in nanoseconds (https://golang.org/pkg/time/#Time.UnixNano) when pending event is considered as expired. 0 means forever. | @@ -1394,6 +1396,7 @@ https://github.com/openconnectivityfoundation/core/blob/master/schemas/oic.links | status | [Status](#resourceaggregate-pb-Status) | | | | audit_context | [AuditContext](#resourceaggregate-pb-AuditContext) | | | | event_metadata | [EventMetadata](#resourceaggregate-pb-EventMetadata) | | | +| etag | [bytes](#bytes) | | etag of the resource used by twin synchronization | | open_telemetry_carrier | [ResourceChanged.OpenTelemetryCarrierEntry](#resourceaggregate-pb-ResourceChanged-OpenTelemetryCarrierEntry) | repeated | Open telemetry data propagated to asynchronous events | @@ -1677,6 +1680,7 @@ https://github.com/openconnectivityfoundation/cloud-services/blob/master/swagger | audit_context | [AuditContext](#resourceaggregate-pb-AuditContext) | | | | event_metadata | [EventMetadata](#resourceaggregate-pb-EventMetadata) | | | | valid_until | [int64](#int64) | | unix timestamp in nanoseconds (https://golang.org/pkg/time/#Time.UnixNano) when pending event is considered as expired. 0 means forever. | +| etag | [bytes](#bytes) | | | | open_telemetry_carrier | [ResourceRetrievePending.OpenTelemetryCarrierEntry](#resourceaggregate-pb-ResourceRetrievePending-OpenTelemetryCarrierEntry) | repeated | Open telemetry data propagated to asynchronous events | @@ -1713,6 +1717,7 @@ https://github.com/openconnectivityfoundation/cloud-services/blob/master/swagger | content | [Content](#resourceaggregate-pb-Content) | | | | audit_context | [AuditContext](#resourceaggregate-pb-AuditContext) | | | | event_metadata | [EventMetadata](#resourceaggregate-pb-EventMetadata) | | | +| etag | [bytes](#bytes) | | | | open_telemetry_carrier | [ResourceRetrieved.OpenTelemetryCarrierEntry](#resourceaggregate-pb-ResourceRetrieved-OpenTelemetryCarrierEntry) | repeated | Open telemetry data propagated to asynchronous events | diff --git a/grpc-gateway/pb/doc.html b/grpc-gateway/pb/doc.html index d2ba673ce..0133578a8 100644 --- a/grpc-gateway/pb/doc.html +++ b/grpc-gateway/pb/doc.html @@ -3041,6 +3041,13 @@
force synchronization IoT hub with the device resources and set twin_enabled to true. Use to address potential synchronization issues and prevent operational discrepancies.
etag of the resource used by twin synchronization
unix timestamp in nanoseconds (https://golang.org/pkg/time/#Time.UnixNano) when pending event is considered as expired. 0 means forever.
Field | Type | Label | Description |
device_id | +string | ++ | device id |
+
limit | +uint32 | ++ | limit of the number of etags, 0 means no limit |
+
Field | Type | Label | Description |
etags | +bytes | +repeated | +the most recent device etags, each corresponding to a different resource in order of most recent to least recent. |
+
Internal API for Resource Directory
+Method Name | Request Type | Response Type | Description |
GetLatestDeviceETags | +GetLatestDeviceETagsRequest | +GetLatestDeviceETagsResponse | +Get the most recent device etags, each corresponding to a different resource in order of most recent to least recent. |
+
.proto Type | Notes | C++ | Java | Python | Go | C# | PHP | Ruby |
double | ++ | double | +double | +float | +float64 | +double | +float | +Float | +
float | ++ | float | +float | +float | +float32 | +float | +float | +Float | +
int32 | +Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead. | +int32 | +int | +int | +int32 | +int | +integer | +Bignum or Fixnum (as required) | +
int64 | +Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead. | +int64 | +long | +int/long | +int64 | +long | +integer/string | +Bignum | +
uint32 | +Uses variable-length encoding. | +uint32 | +int | +int/long | +uint32 | +uint | +integer | +Bignum or Fixnum (as required) | +
uint64 | +Uses variable-length encoding. | +uint64 | +long | +int/long | +uint64 | +ulong | +integer/string | +Bignum or Fixnum (as required) | +
sint32 | +Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s. | +int32 | +int | +int | +int32 | +int | +integer | +Bignum or Fixnum (as required) | +
sint64 | +Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s. | +int64 | +long | +int/long | +int64 | +long | +integer/string | +Bignum | +
fixed32 | +Always four bytes. More efficient than uint32 if values are often greater than 2^28. | +uint32 | +int | +int | +uint32 | +uint | +integer | +Bignum or Fixnum (as required) | +
fixed64 | +Always eight bytes. More efficient than uint64 if values are often greater than 2^56. | +uint64 | +long | +int/long | +uint64 | +ulong | +integer/string | +Bignum | +
sfixed32 | +Always four bytes. | +int32 | +int | +int | +int32 | +int | +integer | +Bignum or Fixnum (as required) | +
sfixed64 | +Always eight bytes. | +int64 | +long | +int/long | +int64 | +long | +integer/string | +Bignum | +
bool | ++ | bool | +boolean | +boolean | +bool | +bool | +boolean | +TrueClass/FalseClass | +
string | +A string must always contain UTF-8 encoded or 7-bit ASCII text. | +string | +String | +str/unicode | +string | +string | +string | +String (UTF-8) | +
bytes | +May contain any arbitrary sequence of bytes. | +string | +ByteString | +str | +[]byte | +ByteString | +string | +String (ASCII-8BIT) | +