Skip to content

Commit

Permalink
allow to set always forceResynchronization in coap-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Aug 3, 2023
1 parent d01851a commit 2094305
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 20 deletions.
4 changes: 3 additions & 1 deletion coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ apis:
subscriptionBufferSize: 1000
messagePoolSize: 1000
requireBatchObserveEnabled: true
limitBatchObserveLatestETags: 8
messageQueueSize: 16
keepAlive:
timeout: 20s
Expand Down Expand Up @@ -161,6 +160,9 @@ clients:
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
deviceTwin:
latestETAGsForNumbersOfResource: 8
forceResynchronization: false
taskQueue:
goPoolSize: 1600
size: 2097152
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/clientObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE
observation.WithTwinEnabled(twinEnabled), observation.WithObservationType(observationType),
observation.WithLogger(c.getLogger()),
observation.WithRequireBatchObserveEnabled(c.server.config.APIs.COAP.RequireBatchObserveEnabled),
observation.WithLimitBatchObserveLatestETags(c.server.config.APIs.COAP.LimitBatchObserveLatestETags),
observation.WithTwinForceResynchronization(forceResynchronization),
observation.WithLatestETAGsForNumbersOfResource(c.server.config.DeviceTwin.LatestETAGsForNumbersOfResource),
observation.WithTwinForceResynchronization(forceResynchronization || c.server.config.DeviceTwin.ForceResynchronization),
)
if err != nil {
setDeviceObserver(nil, err)
Expand Down
30 changes: 18 additions & 12 deletions coap-gateway/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (

// Config represent application configuration
type Config struct {
Log LogConfig `yaml:"log" json:"log"`
APIs APIsConfig `yaml:"apis" json:"apis"`
Clients ClientsConfig `yaml:"clients" json:"clients"`
TaskQueue queue.Config `yaml:"taskQueue" json:"taskQueue"`
Log LogConfig `yaml:"log" json:"log"`
APIs APIsConfig `yaml:"apis" json:"apis"`
Clients ClientsConfig `yaml:"clients" json:"clients"`
DeviceTwin DeviceTwinConfig `yaml:"deviceTwin" json:"deviceTwin"`
TaskQueue queue.Config `yaml:"taskQueue" json:"taskQueue"`
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -107,15 +108,20 @@ func (c *InjectedCOAPConfig) Validate() error {
return nil
}

type DeviceTwinConfig struct {
LatestETAGsForNumbersOfResource uint32 `yaml:"latestETAGsForNumbersOfResource" json:"latestETAGsForNumbersOfResource"`
ForceResynchronization bool `yaml:"forceResynchronization" json:"forceResynchronization"`
}

type COAPConfig struct {
coapService.Config `yaml:",inline" json:",inline"`
ExternalAddress string `yaml:"externalAddress" json:"externalAddress"`
Authorization AuthorizationConfig `yaml:"authorization" json:"authorization"`
OwnerCacheExpiration time.Duration `yaml:"ownerCacheExpiration" json:"ownerCacheExpiration"`
SubscriptionBufferSize int `yaml:"subscriptionBufferSize" json:"subscriptionBufferSize"`
RequireBatchObserveEnabled bool `yaml:"requireBatchObserveEnabled" json:"requireBatchObserveEnabled"`
LimitBatchObserveLatestETags uint32 `yaml:"limitBatchObserveLatestETags " json:"limitBatchObserveLatestETags"`
InjectedCOAPConfig InjectedCOAPConfig `yaml:"-" json:"-"`
coapService.Config `yaml:",inline" json:",inline"`
ExternalAddress string `yaml:"externalAddress" json:"externalAddress"`
Authorization AuthorizationConfig `yaml:"authorization" json:"authorization"`
OwnerCacheExpiration time.Duration `yaml:"ownerCacheExpiration" json:"ownerCacheExpiration"`
SubscriptionBufferSize int `yaml:"subscriptionBufferSize" json:"subscriptionBufferSize"`
RequireBatchObserveEnabled bool `yaml:"requireBatchObserveEnabled" json:"requireBatchObserveEnabled"`

InjectedCOAPConfig InjectedCOAPConfig `yaml:"-" json:"-"`
}

type COAPConfigMarshalerUnmarshaler struct {
Expand Down
2 changes: 1 addition & 1 deletion coap-gateway/service/observation/deviceObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (o LimitBatchObserveLatestETagsOpt) Apply(opts *DeviceObserverConfig) {
opts.LimitBatchObserveLatestETags = o.limitBatchObserveLatestETags
}

func WithLimitBatchObserveLatestETags(v uint32) LimitBatchObserveLatestETagsOpt {
func WithLatestETAGsForNumbersOfResource(v uint32) LimitBatchObserveLatestETagsOpt {
return LimitBatchObserveLatestETagsOpt{
limitBatchObserveLatestETags: v,
}
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/signIn.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func asyncCreateDeviceObserver(x asyncCreateDeviceObserverArg) {
observation.WithLogger(x.client.getLogger()),
observation.WithRequireBatchObserveEnabled(x.client.server.config.APIs.COAP.RequireBatchObserveEnabled),
observation.WithTwinEnabled(x.twinEnabled),
observation.WithLimitBatchObserveLatestETags(x.client.server.config.APIs.COAP.LimitBatchObserveLatestETags),
observation.WithTwinForceResynchronization(false),
observation.WithLatestETAGsForNumbersOfResource(x.client.server.config.DeviceTwin.LatestETAGsForNumbersOfResource),
observation.WithTwinForceResynchronization(x.client.server.config.DeviceTwin.ForceResynchronization),
)
if err != nil {
x.client.Close()
Expand Down
2 changes: 1 addition & 1 deletion coap-gateway/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func MakeConfig(t require.TestingT) service.Config {
cfg.TaskQueue.Size = 2 * 1024 * 1024
cfg.APIs.COAP.Addr = config.COAP_GW_HOST
cfg.APIs.COAP.RequireBatchObserveEnabled = false
cfg.APIs.COAP.LimitBatchObserveLatestETags = 3
cfg.DeviceTwin.LatestETAGsForNumbersOfResource = 3
cfg.APIs.COAP.ExternalAddress = config.COAP_GW_HOST
cfg.APIs.COAP.Protocols = []coapService.Protocol{coapService.TCP}
if config.COAP_GATEWAY_UDP_ENABLED {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/panjf2000/ants/v2 v2.8.0
github.com/pion/dtls/v2 v2.2.7
github.com/pion/logging v0.2.2
github.com/plgd-dev/device/v2 v2.2.1-0.20230721085651-480a4a0dfa33
github.com/plgd-dev/device/v2 v2.2.1-0.20230801145938-032719b097fc
github.com/plgd-dev/go-coap/v3 v3.1.4-0.20230703210849-6d484eba5ff0
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90
github.com/pseudomuto/protoc-gen-doc v1.5.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/plgd-dev/device/v2 v2.2.1-0.20230721085651-480a4a0dfa33 h1:TW83npOUZQvSTNJJatlCkV5jgO3fC7J5Z7Bxyj2YoH8=
github.com/plgd-dev/device/v2 v2.2.1-0.20230721085651-480a4a0dfa33/go.mod h1:CKAr4/h0Hsx2JVOeB4exO276Rg5kANOm50rrzVl4joA=
github.com/plgd-dev/device/v2 v2.2.1-0.20230801145938-032719b097fc h1:j81j+HXz1rRb8nomjQnb/s2o5KW5QyuYXvBNNISEkz4=
github.com/plgd-dev/device/v2 v2.2.1-0.20230801145938-032719b097fc/go.mod h1:CKAr4/h0Hsx2JVOeB4exO276Rg5kANOm50rrzVl4joA=
github.com/plgd-dev/go-coap/v2 v2.0.4-0.20200819112225-8eb712b901bc/go.mod h1:+tCi9Q78H/orWRtpVWyBgrr4vKFo2zYtbbxUllerBp4=
github.com/plgd-dev/go-coap/v2 v2.4.1-0.20210517130748-95c37ac8e1fa/go.mod h1:rA7fc7ar+B/qa+Q0hRqv7yj/EMtIlmo1l7vkQGSrHPU=
github.com/plgd-dev/go-coap/v3 v3.1.4-0.20230703210849-6d484eba5ff0 h1:uXDNp7DGePUdt3gvzYC8oA8C02fIQJEFq3mbAZQUPV4=
Expand Down
105 changes: 105 additions & 0 deletions grpc-gateway/service/getResourceFromDevice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"testing"
"time"

"github.com/plgd-dev/device/v2/pkg/codec/cbor"
"github.com/plgd-dev/device/v2/pkg/net/coap"
"github.com/plgd-dev/device/v2/schema"
"github.com/plgd-dev/device/v2/schema/device"
"github.com/plgd-dev/device/v2/schema/interfaces"
"github.com/plgd-dev/device/v2/test/resource/types"
"github.com/plgd-dev/go-coap/v3/message"
coapTest "github.com/plgd-dev/hub/v2/coap-gateway/test"
"github.com/plgd-dev/hub/v2/grpc-gateway/pb"
kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc"
Expand Down Expand Up @@ -165,3 +168,105 @@ func TestRequestHandlerGetResourceFromDevice(t *testing.T) {
})
}
}

func validateETags(ctx context.Context, t *testing.T, c pb.GrpcGatewayClient, deviceID, href string) {
sdkClient, err := test.NewSDKClient()
require.NoError(t, err)

defer func() {
err := sdkClient.Close(context.Background())
require.NoError(t, err)
}()

cfg1 := coap.DetailedResponse[interface{}]{}
err = sdkClient.GetResource(ctx, deviceID, href, &cfg1)
require.NoError(t, err)

cfg2, err := c.GetResourceFromDevice(ctx, &pb.GetResourceFromDeviceRequest{
ResourceId: commands.NewResourceID(deviceID, href),
TimeToLive: int64(time.Hour),
})
require.NoError(t, err)
require.Equal(t, cfg1.ETag, cfg2.GetData().GetEtag())
var body2 interface{}
err = cbor.Decode(cfg2.GetData().GetContent().GetData(), &body2)
require.NoError(t, err)
require.Equal(t, cfg1.Body, body2)
/*
TODO: uncomment this block when etag will be supported in by coap notifications
clients, err := c.GetResources(ctx, &pb.GetResourcesRequest{
ResourceIdFilter: []string{
commands.NewResourceID(deviceID, href).ToString(),
},
})
require.NoError(t, err)
var etag3 []byte
var body3 interface{}
for {
res, err := clients.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
etag3 = res.GetData().GetEtag()
err = cbor.Decode(res.GetData().GetContent().GetData(), &body3)
require.NoError(t, err)
}
require.Equal(t, cfg1.Body, body3)
require.Equal(t, cfg1.ETag, etag3)
*/
}

func TestRequestHandlerCheckResourceETag(t *testing.T) {
deviceID := test.MustFindDeviceByName(test.TestDeviceName)

ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT)
defer cancel()

coapCfg := coapTest.MakeConfig(t)
tearDown := service.SetUp(ctx, t, service.WithCOAPGWConfig(coapCfg))
defer tearDown()

ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t))

conn, err := grpc.Dial(config.GRPC_GW_HOST, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
RootCAs: test.GetRootCertificatePool(t),
})))
require.NoError(t, err)
defer func() {
_ = conn.Close()
}()
c := pb.NewGrpcGatewayClient(conn)

_, shutdownDevSim := test.OnboardDevSim(ctx, t, c, deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, test.GetAllBackendResourceLinks())
defer shutdownDevSim()

href := test.TestResourceLightInstanceHref("1")
validateETags(ctx, t, c, deviceID, href)
v := test.LightResourceRepresentation{Power: 99}
_, err = c.UpdateResource(ctx, &pb.UpdateResourceRequest{
ResourceId: commands.NewResourceID(deviceID, href),
Content: &pb.Content{
ContentType: message.AppOcfCbor.String(),
Data: test.EncodeToCbor(t, v),
},
})
require.NoError(t, err)
time.Sleep(time.Second)
validateETags(ctx, t, c, deviceID, href)
v = test.LightResourceRepresentation{Power: 0}
_, err = c.UpdateResource(ctx, &pb.UpdateResourceRequest{
ResourceId: commands.NewResourceID(deviceID, href),
Content: &pb.Content{
ContentType: message.AppOcfCbor.String(),
Data: test.EncodeToCbor(t, v),
},
})
require.NoError(t, err)
time.Sleep(time.Second)
validateETags(ctx, t, c, deviceID, href)
}

0 comments on commit 2094305

Please sign in to comment.