Skip to content

Commit

Permalink
hub: efficient Device Twin Synchronization
Browse files Browse the repository at this point in the history
In order to monitor resource changes and determine if a resource has been modified on the device, the CoAP gateway utilizes the Entity Tag (ETAG) mechanism.

For Batch Observation, the ETAG is associated with the overall state of resources. Prior to initiating resource observation, the CoAP gateway retrieves the latest ETAG among all device resources from the Hub Database. When initiating the resource observation, the CoAP gateway sends the ETAG to the device. If the received ETAG matches the highest ETAG among the device resources, the device responds with a code VALID. However, if the received ETAG does not match, the device responds with a code CONTENT and includes the current ETAG. Consequently, when a resource changes, the device sends the updated ETAG back to the CoAP gateway via a notification. The CoAP gateway transmits the ETAG together with the Content by using the NotifyResourceChanged method to the resource-aggregate. This command is then converted into a ResourceChanged event, which is saved in a database and distributed through the event bus. In cases where multiple resources change simultaneously, the CoAP gateway updates all affected resources with the same timestamp and ETAG.

The special query to the database efficiently retrieves the latest ETAG value from all device resources without loading the complete set of data. This optimized query solely focuses on performance and retrieves only the required ETAG value, excluding any additional information.
  • Loading branch information
jkralik committed Jul 21, 2023
1 parent 9dccb80 commit b68e708
Show file tree
Hide file tree
Showing 64 changed files with 2,592 additions and 797 deletions.
38 changes: 35 additions & 3 deletions coap-gateway/coapconv/coapconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func NewCoapResourceRetrieveRequest(ctx context.Context, messagePool *pool.Pool,
if event.GetResourceInterface() != "" {
req.AddOptionString(message.URIQuery, "if="+event.GetResourceInterface())
}
if len(event.GetEtag()) > 0 {
if err := req.AddETag(event.GetEtag()); err != nil {
return nil, err
}
}
return req, nil
}

Expand Down Expand Up @@ -186,6 +191,14 @@ func NewCommandMetadata(sequenceNumber uint64, connectionID string) *commands.Co
}
}

func getETagFromMessage(msg interface{ ETag() ([]byte, error) }) []byte {
etag, err := msg.ETag()
if err != nil {
etag = nil
}
return etag
}

func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlationID, connectionID string, req *pool.Message) *commands.ConfirmResourceRetrieveRequest {
content := NewContent(req.Options(), req.Body())
metadata := NewCommandMetadata(req.Sequence(), connectionID)
Expand All @@ -196,6 +209,7 @@ func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlat
Status: CoapCodeToStatus(req.Code()),
Content: content,
CommandMetadata: metadata,
Etag: getETagFromMessage(req),
}
}

Expand Down Expand Up @@ -256,6 +270,7 @@ func NewNotifyResourceChangedRequest(resourceID *commands.ResourceId, connection
Content: content,
CommandMetadata: metadata,
Status: CoapCodeToStatus(req.Code()),
Etag: getETagFromMessage(req),
}
}

Expand Down Expand Up @@ -298,6 +313,8 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
}

requests := make([]*commands.NotifyResourceChangedRequest, 0, len(rs))
etag := getETagFromMessage(req)
var latestETagResource *commands.NotifyResourceChangedRequest
for _, r := range rs {
isEmpty, filterOut := filterOutEmptyResource(r)
if filterOut {
Expand All @@ -312,8 +329,7 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
data = nil
code = commands.Status_NOT_FOUND
}

requests = append(requests, &commands.NotifyResourceChangedRequest{
resourceChangedReq := &commands.NotifyResourceChangedRequest{
ResourceId: commands.NewResourceID(deviceID, r.Href()),
Content: &commands.Content{
ContentType: getContentFormatString(ct),
Expand All @@ -322,7 +338,22 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
},
CommandMetadata: metadata,
Status: code,
})
Etag: r.ETag,
}
if len(etag) > 0 && bytes.Equal(etag, r.ETag) {
latestETagResource = resourceChangedReq
continue
}
requests = append(requests, resourceChangedReq)
}
// send latestETagResource need to be send as last because the resource are applied in order in resource aggregate
// so latestETagResource is the last resource in the batch
if latestETagResource != nil {
requests = append(requests, latestETagResource)
} else if len(requests) > 0 && len(etag) > 0 {
// if there is no latestETagResource we need to set etag to the last resource in the batch
// to make sure that the one resource contains the etag
requests[len(requests)-1].Etag = etag
}
return requests, nil
}
Expand Down Expand Up @@ -379,6 +410,7 @@ func NewRetrieveResourceRequest(resourceID *commands.ResourceId, req *mux.Messag
CorrelationId: correlationID,
ResourceInterface: resourceInterface,
CommandMetadata: metadata,
Etag: getETagFromMessage(req),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ apis:
subscriptionBufferSize: 1000
messagePoolSize: 1000
requireBatchObserveEnabled: true
limitBatchObserveLatestETags: 8
messageQueueSize: 16
keepAlive:
timeout: 20s
Expand Down
1 change: 1 addition & 0 deletions coap-gateway/service/clientObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE
observation.WithTwinEnabled(twinEnabled), observation.WithObservationType(observationType),
observation.WithLogger(c.getLogger()),
observation.WithRequireBatchObserveEnabled(c.server.config.APIs.COAP.RequireBatchObserveEnabled),
observation.WithLimitBatchObserveLatestETags(c.server.config.APIs.COAP.LimitBatchObserveLatestETags),
)
if err != nil {
setDeviceObserver(nil, err)
Expand Down
25 changes: 16 additions & 9 deletions coap-gateway/service/clientRetrieveHandler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -39,14 +40,17 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er
var content *commands.Content
var code coapCodes.Code
resourceInterface := message.GetResourceInterface(req)
etag, err := req.ETag()
if err != nil {
etag = nil
}
if resourceInterface == "" {
content, code, err = clientRetrieveFromResourceTwinHandler(req.Context(), client, deviceID, href)
content, code, err = clientRetrieveFromResourceTwinHandler(req.Context(), client, deviceID, href, etag)
if err != nil {
return nil, statusErrorf(code, errFmtRetrieveResource, fmt.Sprintf(" /%v%v from resource twin", deviceID, href), err)
}
} else {
code = coapCodes.Content
content, err = clientRetrieveFromDeviceHandler(req, client, deviceID, href)
content, code, err = clientRetrieveFromDeviceHandler(req, client, deviceID, href)
if err != nil {
code = coapconv.GrpcErr2CoapCode(err, coapconv.Retrieve)
return nil, statusErrorf(code, errFmtRetrieveResource, fmt.Sprintf(" /%v%v from device", deviceID, href), err)
Expand All @@ -63,7 +67,7 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er
return client.createResponse(code, req.Token(), mediaType, content.Data), nil
}

func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string) (*commands.Content, coapCodes.Code, error) {
func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string, etag []byte) (*commands.Content, coapCodes.Code, error) {
RetrieveResourcesClient, err := client.server.rdClient.GetResources(ctx, &pbGRPC.GetResourcesRequest{
ResourceIdFilter: []string{
commands.NewResourceID(deviceID, href).ToString(),
Expand All @@ -86,26 +90,29 @@ func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session,
return nil, coapconv.GrpcErr2CoapCode(err, coapconv.Retrieve), err
}
if resourceValue.GetData().GetResourceId().GetDeviceId() == deviceID && resourceValue.GetData().GetResourceId().GetHref() == href && resourceValue.GetData().Content != nil {
if etag != nil && bytes.Equal(etag, resourceValue.GetData().GetEtag()) {
return nil, coapCodes.Valid, nil
}
return resourceValue.GetData().Content, coapCodes.Content, nil
}
}
return nil, coapCodes.NotFound, fmt.Errorf("not found")
}

func clientRetrieveFromDeviceHandler(req *mux.Message, client *session, deviceID, href string) (*commands.Content, error) {
func clientRetrieveFromDeviceHandler(req *mux.Message, client *session, deviceID, href string) (*commands.Content, coapCodes.Code, error) {
retrieveCommand, err := coapconv.NewRetrieveResourceRequest(commands.NewResourceID(deviceID, href), req, client.RemoteAddr().String())
if err != nil {
return nil, err
return nil, coapCodes.ServiceUnavailable, err
}

retrievedEvent, err := client.server.raClient.SyncRetrieveResource(req.Context(), "*", retrieveCommand)
if err != nil {
return nil, err
return nil, coapCodes.ServiceUnavailable, err
}
content, err := commands.EventContentToContent(retrievedEvent)
if err != nil {
return nil, err
return nil, coapCodes.ServiceUnavailable, err
}

return content, nil
return content, coapconv.StatusToCoapCode(retrievedEvent.GetStatus(), coapconv.Retrieve), nil
}
29 changes: 28 additions & 1 deletion coap-gateway/service/clientRetrieveHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (

"github.com/plgd-dev/go-coap/v3/message"
coapCodes "github.com/plgd-dev/go-coap/v3/message/codes"
coapgwTest "github.com/plgd-dev/hub/v2/coap-gateway/test"
"github.com/plgd-dev/hub/v2/coap-gateway/uri"
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClientRetrieveHandler(t *testing.T) {
shutdown := setUp(t)
coapgwCfg := coapgwTest.MakeConfig(t)
coapgwCfg.Log.DumpBody = true
coapgwCfg.Log.Level = log.DebugLevel
shutdown := setUp(t, coapgwCfg)
defer shutdown()

co := testCoapDial(t, "", true, true, time.Now().Add(time.Minute))
Expand All @@ -30,6 +35,7 @@ func TestClientRetrieveHandler(t *testing.T) {
type args struct {
path string
query string
etag []byte
}
tests := []struct {
name string
Expand Down Expand Up @@ -65,6 +71,24 @@ func TestClientRetrieveHandler(t *testing.T) {
},
wantsCode: coapCodes.Content,
},
{
name: "found with etag",
args: args{
path: uri.ResourceRoute + "/" + CertIdentity + TestAResourceHref,
etag: []byte(TestETag),
},
wantsCode: coapCodes.Valid,
},

{
name: "found with with interface,etag",
args: args{
path: uri.ResourceRoute + "/" + CertIdentity + TestAResourceHref,
etag: []byte(TestETag),
query: "if=oic.if.baseline",
},
wantsCode: coapCodes.Valid,
},
}

testPrepareDevice(t, co)
Expand All @@ -75,6 +99,9 @@ func TestClientRetrieveHandler(t *testing.T) {
defer cancel()
req, err := co.NewGetRequest(ctx, tt.args.path)
require.NoError(t, err)
if tt.args.etag != nil {
req.SetOptionBytes(message.ETag, tt.args.etag)
}
if tt.args.query != "" {
req.SetOptionString(message.URIQuery, tt.args.query)
}
Expand Down
15 changes: 8 additions & 7 deletions coap-gateway/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ func (c *InjectedCOAPConfig) Validate() error {
}

type COAPConfig struct {
coapService.Config `yaml:",inline" json:",inline"`
ExternalAddress string `yaml:"externalAddress" json:"externalAddress"`
Authorization AuthorizationConfig `yaml:"authorization" json:"authorization"`
OwnerCacheExpiration time.Duration `yaml:"ownerCacheExpiration" json:"ownerCacheExpiration"`
SubscriptionBufferSize int `yaml:"subscriptionBufferSize" json:"subscriptionBufferSize"`
RequireBatchObserveEnabled bool `yaml:"requireBatchObserveEnabled" json:"requireBatchObserveEnabled"`
InjectedCOAPConfig InjectedCOAPConfig `yaml:"-" json:"-"`
coapService.Config `yaml:",inline" json:",inline"`
ExternalAddress string `yaml:"externalAddress" json:"externalAddress"`
Authorization AuthorizationConfig `yaml:"authorization" json:"authorization"`
OwnerCacheExpiration time.Duration `yaml:"ownerCacheExpiration" json:"ownerCacheExpiration"`
SubscriptionBufferSize int `yaml:"subscriptionBufferSize" json:"subscriptionBufferSize"`
RequireBatchObserveEnabled bool `yaml:"requireBatchObserveEnabled" json:"requireBatchObserveEnabled"`
LimitBatchObserveLatestETags uint32 `yaml:"limitBatchObserveLatestETags " json:"limitBatchObserveLatestETags"`
InjectedCOAPConfig InjectedCOAPConfig `yaml:"-" json:"-"`
}

type COAPConfigMarshalerUnmarshaler struct {
Expand Down
9 changes: 8 additions & 1 deletion coap-gateway/service/message/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type JsonCoapMessage struct {
Observe *uint32 `json:"observe,omitempty"`
ContentFormat string `json:"contentFormat,omitempty"`
Body interface{} `json:"body,omitempty"`
Etag []byte `json:"etag,omitempty"`
}

func (c JsonCoapMessage) IsEmpty() bool {
return c.Code == "" && c.Path == "" && c.Token == "" && len(c.Queries) == 0 && c.Observe == nil && c.ContentFormat == "" && c.Body == nil
return c.Code == "" && c.Path == "" && c.Token == "" && len(c.Queries) == 0 && c.Observe == nil && c.ContentFormat == "" && c.Body == nil && c.Etag == nil
}

func readBody(r io.ReadSeeker) []byte {
Expand Down Expand Up @@ -102,6 +103,11 @@ func ToJson(m *pool.Message, withBody, withToken bool) JsonCoapMessage {
if withToken {
token = m.Token().String()
}
var etag []byte
e, err := m.ETag()
if err == nil {
etag = e
}

msg := JsonCoapMessage{
Code: m.Code().String(),
Expand All @@ -110,6 +116,7 @@ func ToJson(m *pool.Message, withBody, withToken bool) JsonCoapMessage {
Queries: queries,
Observe: obs,
Body: body,
Etag: etag,
}
return msg
}
Loading

0 comments on commit b68e708

Please sign in to comment.