Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hub: efficient Device Twin Synchronization #1005

Merged
merged 7 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,15 @@ define CLEAN-DOCKER-DEVICE
sudo rm -rf $(WORKING_DIRECTORY)/.tmp/$(1) || true
endef

simulators:
simulators/clean:
$(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME))
$(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME))
.PHONY: simulators/clean

simulators: simulators/clean
$(call RUN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME),$(DEVICE_SIMULATOR_IMG))
$(call RUN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME),$(DEVICE_SIMULATOR_RES_OBSERVABLE_IMG))
.PHONY: simulators

env/test/mem: clean certificates nats mongo privateKeys
.PHONY: env/test/mem
Expand Down Expand Up @@ -292,12 +298,10 @@ $(test-targets): %: env

build: $(SUBDIRS)

clean:
clean: simulators/clean
docker rm -f mongo || true
docker rm -f nats || true
docker rm -f nats-cloud-connector || true
$(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME))
$(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME))
sudo rm -rf ./.tmp/certs || true
sudo rm -rf ./.tmp/mongo || true
sudo rm -rf ./.tmp/home || true
Expand Down
2 changes: 1 addition & 1 deletion charts/plgd-hub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ global:
| certmanager.internal.issuer.name | string | `nil` | Name |
| certmanager.internal.issuer.spec | string | `nil` | cert-manager issuer spec |
| cluster.dns | string | `"cluster.local"` | Cluster internal DNS prefix |
| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters |
| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"deviceTwin":{"maxETagsCountInRequest":8,"useETags":true},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters |
| coapgateway.affinity | object | `{}` | Affinity definition |
| coapgateway.apis | object | `{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}}` | For complete coap-gateway service configuration see [plgd/coap-gateway](https://github.com/plgd-dev/hub/tree/main/coap-gateway) |
| coapgateway.apis.coap.tls.disconnectOnExpiredCertificate | bool | `false` | After the certificate expires, the connection will be disconnected |
Expand Down
3 changes: 3 additions & 0 deletions charts/plgd-hub/templates/coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,8 @@ data:
goPoolSize: {{ .taskQueue.goPoolSize }}
size: {{ .taskQueue.size }}
maxIdleTime: {{ .taskQueue.maxIdleTime | quote }}
deviceTwin:
maxETagsCountInRequest: {{ deviceTwin.maxETagsCountInRequest }}
useETags: {{ deviceTwin.useETags }}
{{- end }}
{{- end }}
3 changes: 3 additions & 0 deletions charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,9 @@ coapgateway:
goPoolSize: 1600
size: "2097152"
maxIdleTime: "10m"
deviceTwin:
maxETagsCountInRequest: 8
useETags: true

identitystore:
# -- Enable identity service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (h deviceSubscriptionHandlers) OnDeviceSubscriberReconnectError(err error)

type DevicesSubscription struct {
ctx context.Context
data *kitSync.Map // //[deviceID]*deviceSubscription
data *kitSync.Map // [deviceID]*deviceSubscription
rdClient pb.GrpcGatewayClient
raClient raService.ResourceAggregateClient
subscriber *subscriber.Subscriber
Expand Down
34 changes: 31 additions & 3 deletions coap-gateway/coapconv/coapconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func NewCoapResourceRetrieveRequest(ctx context.Context, messagePool *pool.Pool,
if event.GetResourceInterface() != "" {
req.AddOptionString(message.URIQuery, "if="+event.GetResourceInterface())
}
if len(event.GetEtag()) > 0 {
if err := req.AddETag(event.GetEtag()); err != nil {
return nil, err
}
}
return req, nil
}

Expand Down Expand Up @@ -186,6 +191,14 @@ func NewCommandMetadata(sequenceNumber uint64, connectionID string) *commands.Co
}
}

func getETagFromMessage(msg interface{ ETag() ([]byte, error) }) []byte {
etag, err := msg.ETag()
if err != nil {
etag = nil
}
return etag
}

func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlationID, connectionID string, req *pool.Message) *commands.ConfirmResourceRetrieveRequest {
content := NewContent(req.Options(), req.Body())
metadata := NewCommandMetadata(req.Sequence(), connectionID)
Expand All @@ -196,6 +209,7 @@ func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlat
Status: CoapCodeToStatus(req.Code()),
Content: content,
CommandMetadata: metadata,
Etag: getETagFromMessage(req),
}
}

Expand Down Expand Up @@ -256,6 +270,7 @@ func NewNotifyResourceChangedRequest(resourceID *commands.ResourceId, connection
Content: content,
CommandMetadata: metadata,
Status: CoapCodeToStatus(req.Code()),
Etag: getETagFromMessage(req),
}
}

Expand Down Expand Up @@ -298,6 +313,8 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
}

requests := make([]*commands.NotifyResourceChangedRequest, 0, len(rs))
etag := getETagFromMessage(req)
var latestETagResource *commands.NotifyResourceChangedRequest
for _, r := range rs {
isEmpty, filterOut := filterOutEmptyResource(r)
if filterOut {
Expand All @@ -312,8 +329,7 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
data = nil
code = commands.Status_NOT_FOUND
}

requests = append(requests, &commands.NotifyResourceChangedRequest{
resourceChangedReq := &commands.NotifyResourceChangedRequest{
ResourceId: commands.NewResourceID(deviceID, r.Href()),
Content: &commands.Content{
ContentType: getContentFormatString(ct),
Expand All @@ -322,7 +338,18 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
},
CommandMetadata: metadata,
Status: code,
})
Etag: r.ETag,
}
if len(etag) > 0 && bytes.Equal(etag, r.ETag) {
latestETagResource = resourceChangedReq
continue
}
requests = append(requests, resourceChangedReq)
}
// send latestETagResource need to be send as last because the resource are applied in order in resource aggregate
// so latestETagResource is the last resource in the batch
if latestETagResource != nil {
requests = append(requests, latestETagResource)
}
return requests, nil
}
Expand Down Expand Up @@ -379,6 +406,7 @@ func NewRetrieveResourceRequest(resourceID *commands.ResourceId, req *mux.Messag
CorrelationId: correlationID,
ResourceInterface: resourceInterface,
CommandMetadata: metadata,
Etag: getETagFromMessage(req),
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ clients:
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
deviceTwin:
maxETagsCountInRequest: 8
useETags: false
taskQueue:
goPoolSize: 1600
size: 2097152
Expand Down
7 changes: 5 additions & 2 deletions coap-gateway/service/clientObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ func (c *session) replaceDeviceObserver(deviceObserverFuture *future.Future) *fu
}

// Replace deviceObserver instance in the client if Device Twin setting was changed for the device.
func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinEnabled bool) (bool, error) {
func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinEnabled, twinForceSynchronization bool) (bool, error) {
obs, err := c.getDeviceObserver(ctx)
if err != nil {
return false, err
}
prevTwinEnabled := obs.GetTwinEnabled()
deviceID := obs.GetDeviceID()
observationType := obs.GetObservationType()
if prevTwinEnabled == twinEnabled {
twinEnabled = twinEnabled || twinForceSynchronization
if !twinForceSynchronization && prevTwinEnabled == twinEnabled {
return prevTwinEnabled, nil
}
deviceObserverFuture, setDeviceObserver := future.New()
Expand All @@ -67,6 +68,8 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE
observation.WithTwinEnabled(twinEnabled), observation.WithObservationType(observationType),
observation.WithLogger(c.getLogger()),
observation.WithRequireBatchObserveEnabled(c.server.config.APIs.COAP.RequireBatchObserveEnabled),
observation.WithMaxETagsCountInRequest(c.server.config.DeviceTwin.MaxETagsCountInRequest),
observation.WithUseETags(!twinForceSynchronization && c.server.config.DeviceTwin.UseETags),
)
if err != nil {
setDeviceObserver(nil, err)
Expand Down
25 changes: 16 additions & 9 deletions coap-gateway/service/clientRetrieveHandler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -39,14 +40,17 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er
var content *commands.Content
var code coapCodes.Code
resourceInterface := message.GetResourceInterface(req)
etag, err := req.ETag()
if err != nil {
etag = nil
}
if resourceInterface == "" {
content, code, err = clientRetrieveFromResourceTwinHandler(req.Context(), client, deviceID, href)
content, code, err = clientRetrieveFromResourceTwinHandler(req.Context(), client, deviceID, href, etag)
if err != nil {
return nil, statusErrorf(code, errFmtRetrieveResource, fmt.Sprintf(" /%v%v from resource twin", deviceID, href), err)
}
} else {
code = coapCodes.Content
content, err = clientRetrieveFromDeviceHandler(req, client, deviceID, href)
content, code, err = clientRetrieveFromDeviceHandler(req, client, deviceID, href)
if err != nil {
code = coapconv.GrpcErr2CoapCode(err, coapconv.Retrieve)
return nil, statusErrorf(code, errFmtRetrieveResource, fmt.Sprintf(" /%v%v from device", deviceID, href), err)
Expand All @@ -63,7 +67,7 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er
return client.createResponse(code, req.Token(), mediaType, content.Data), nil
}

func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string) (*commands.Content, coapCodes.Code, error) {
func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string, etag []byte) (*commands.Content, coapCodes.Code, error) {
RetrieveResourcesClient, err := client.server.rdClient.GetResources(ctx, &pbGRPC.GetResourcesRequest{
ResourceIdFilter: []string{
commands.NewResourceID(deviceID, href).ToString(),
Expand All @@ -86,26 +90,29 @@ func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session,
return nil, coapconv.GrpcErr2CoapCode(err, coapconv.Retrieve), err
}
if resourceValue.GetData().GetResourceId().GetDeviceId() == deviceID && resourceValue.GetData().GetResourceId().GetHref() == href && resourceValue.GetData().Content != nil {
if etag != nil && bytes.Equal(etag, resourceValue.GetData().GetEtag()) {
return nil, coapCodes.Valid, nil
}
return resourceValue.GetData().Content, coapCodes.Content, nil
}
}
return nil, coapCodes.NotFound, fmt.Errorf("not found")
}

func clientRetrieveFromDeviceHandler(req *mux.Message, client *session, deviceID, href string) (*commands.Content, error) {
func clientRetrieveFromDeviceHandler(req *mux.Message, client *session, deviceID, href string) (*commands.Content, coapCodes.Code, error) {
retrieveCommand, err := coapconv.NewRetrieveResourceRequest(commands.NewResourceID(deviceID, href), req, client.RemoteAddr().String())
if err != nil {
return nil, err
return nil, coapCodes.ServiceUnavailable, err
}

retrievedEvent, err := client.server.raClient.SyncRetrieveResource(req.Context(), "*", retrieveCommand)
if err != nil {
return nil, err
return nil, coapCodes.ServiceUnavailable, err
}
content, err := commands.EventContentToContent(retrievedEvent)
if err != nil {
return nil, err
return nil, coapCodes.ServiceUnavailable, err
}

return content, nil
return content, coapconv.StatusToCoapCode(retrievedEvent.GetStatus(), coapconv.Retrieve), nil
}
29 changes: 28 additions & 1 deletion coap-gateway/service/clientRetrieveHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (

"github.com/plgd-dev/go-coap/v3/message"
coapCodes "github.com/plgd-dev/go-coap/v3/message/codes"
coapgwTest "github.com/plgd-dev/hub/v2/coap-gateway/test"
"github.com/plgd-dev/hub/v2/coap-gateway/uri"
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClientRetrieveHandler(t *testing.T) {
shutdown := setUp(t)
coapgwCfg := coapgwTest.MakeConfig(t)
coapgwCfg.Log.DumpBody = true
coapgwCfg.Log.Level = log.DebugLevel
shutdown := setUp(t, coapgwCfg)
defer shutdown()

co := testCoapDial(t, "", true, true, time.Now().Add(time.Minute))
Expand All @@ -30,6 +35,7 @@ func TestClientRetrieveHandler(t *testing.T) {
type args struct {
path string
query string
etag []byte
}
tests := []struct {
name string
Expand Down Expand Up @@ -65,6 +71,24 @@ func TestClientRetrieveHandler(t *testing.T) {
},
wantsCode: coapCodes.Content,
},
{
name: "found with etag",
args: args{
path: uri.ResourceRoute + "/" + CertIdentity + TestAResourceHref,
etag: []byte(TestETag),
},
wantsCode: coapCodes.Valid,
},

{
name: "found with with interface,etag",
args: args{
path: uri.ResourceRoute + "/" + CertIdentity + TestAResourceHref,
etag: []byte(TestETag),
query: "if=oic.if.baseline",
},
wantsCode: coapCodes.Valid,
},
}

testPrepareDevice(t, co)
Expand All @@ -75,6 +99,9 @@ func TestClientRetrieveHandler(t *testing.T) {
defer cancel()
req, err := co.NewGetRequest(ctx, tt.args.path)
require.NoError(t, err)
if tt.args.etag != nil {
req.SetOptionBytes(message.ETag, tt.args.etag)
}
if tt.args.query != "" {
req.SetOptionString(message.URIQuery, tt.args.query)
}
Expand Down
Loading
Loading