Skip to content

Commit

Permalink
fix after CR
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Aug 10, 2023
1 parent b95d062 commit 102c5b5
Show file tree
Hide file tree
Showing 20 changed files with 95 additions and 113 deletions.
2 changes: 1 addition & 1 deletion charts/plgd-hub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ global:
| certmanager.internal.issuer.name | string | `nil` | Name |
| certmanager.internal.issuer.spec | string | `nil` | cert-manager issuer spec |
| cluster.dns | string | `"cluster.local"` | Cluster internal DNS prefix |
| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters |
| coapgateway | object | `{"affinity":{},"apis":{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}},"clients":{"certificateAuthority":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"eventBus":{"nats":{"pendingLimits":{"bytesLimit":"67108864","msgLimit":"524288"},"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false},"url":""}},"identityStore":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}},"ownerClaim":null},"resourceAggregate":{"deviceStatusExpiration":{"enabled":false,"expiresIn":"0s"},"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}},"resourceDirectory":{"grpc":{"address":"","keepAlive":{"permitWithoutStream":true,"time":"10s","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":false}}}},"config":{"fileName":"service.yaml","mountPath":"/config","volume":"config"},"deploymentAnnotations":{},"deploymentLabels":{},"deviceTwin":{"forceResynchronization":false,"numberOfETAGsForBatchObservation":8},"enabled":true,"extraContainers":{},"extraVolumeMounts":{},"extraVolumes":{},"fullnameOverride":null,"hubId":null,"image":{"imagePullSecrets":{},"pullPolicy":"Always","registry":"ghcr.io/","repository":"plgd-dev/hub/coap-gateway","tag":null},"imagePullSecrets":{},"initContainersTpl":{},"livenessProbe":{},"log":{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}},"name":"coap-gateway","nodeSelector":{},"podAnnotations":{},"podLabels":{},"podSecurityContext":{},"port":5684,"rbac":{"enabled":false,"roleBindingDefinitionTpl":null,"serviceAccountName":"coap-gateway"},"readinessProbe":{},"replicas":1,"resources":{},"restartPolicy":"Always","securityContext":{},"service":{"annotations":{},"labels":{},"nodePort":null,"tcp":{"annotations":{},"labels":{},"name":"coaps-tcp","nodePort":null,"protocol":"TCP","targetPort":"coaps-tcp","type":null},"type":"LoadBalancer","udp":{"annotations":{},"labels":{},"name":"coaps-udp","nodePort":null,"protocol":"UDP","targetPort":"coaps-udp","type":null}},"taskQueue":{"goPoolSize":1600,"maxIdleTime":"10m","size":"2097152"},"tolerations":{}}` | CoAP gateway parameters |
| coapgateway.affinity | object | `{}` | Affinity definition |
| coapgateway.apis | object | `{"coap":{"authorization":{"deviceIdClaim":null,"ownerClaim":null,"providers":null},"blockwiseTransfer":{"blockSize":"1024","enabled":true},"externalAddress":"","keepAlive":{"timeout":"20s"},"maxMessageSize":262144,"messagePoolSize":1000,"messageQueueSize":16,"ownerCacheExpiration":"1m","protocols":["tcp"],"requireBatchObserveEnabled":true,"subscriptionBufferSize":1000,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":true,"disconnectOnExpiredCertificate":false,"enabled":true,"identityPropertiesRequired":true,"keyFile":null}}}` | For complete coap-gateway service configuration see [plgd/coap-gateway](https://github.com/plgd-dev/hub/tree/main/coap-gateway) |
| coapgateway.apis.coap.tls.disconnectOnExpiredCertificate | bool | `false` | After the certificate expires, the connection will be disconnected |
Expand Down
3 changes: 3 additions & 0 deletions charts/plgd-hub/templates/coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,8 @@ data:
goPoolSize: {{ .taskQueue.goPoolSize }}
size: {{ .taskQueue.size }}
maxIdleTime: {{ .taskQueue.maxIdleTime | quote }}
deviceTwin:
numberOfETAGsForBatchObservation: {{ deviceTwin.numberOfETAGsForBatchObservation }}
forceResynchronization: {{ deviceTwin.forceResynchronization }}
{{- end }}
{{- end }}
3 changes: 3 additions & 0 deletions charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,9 @@ coapgateway:
goPoolSize: 1600
size: "2097152"
maxIdleTime: "10m"
deviceTwin:
numberOfETAGsForBatchObservation: 8
forceResynchronization: false

identitystore:
# -- Enable identity service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (h deviceSubscriptionHandlers) OnDeviceSubscriberReconnectError(err error)

type DevicesSubscription struct {
ctx context.Context
data *kitSync.Map // //[deviceID]*deviceSubscription
data *kitSync.Map //[deviceID]*deviceSubscription
rdClient pb.GrpcGatewayClient
raClient raService.ResourceAggregateClient
subscriber *subscriber.Subscriber
Expand Down
2 changes: 1 addition & 1 deletion coap-gateway/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ clients:
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
deviceTwin:
latestETAGsForNumbersOfResource: 8
numberOfETAGsForBatchObservation: 8
forceResynchronization: false
taskQueue:
goPoolSize: 1600
Expand Down
2 changes: 1 addition & 1 deletion coap-gateway/service/clientObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *session) replaceDeviceObserverWithDeviceTwin(ctx context.Context, twinE
observation.WithTwinEnabled(twinEnabled), observation.WithObservationType(observationType),
observation.WithLogger(c.getLogger()),
observation.WithRequireBatchObserveEnabled(c.server.config.APIs.COAP.RequireBatchObserveEnabled),
observation.WithLatestETAGsForNumbersOfResource(c.server.config.DeviceTwin.LatestETAGsForNumbersOfResource),
observation.WithNumberOfETAGsForBatchObservation(c.server.config.DeviceTwin.NumberOfETAGsForBatchObservation),
observation.WithTwinForceResynchronization(forceResynchronization || c.server.config.DeviceTwin.ForceResynchronization),
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (c *InjectedCOAPConfig) Validate() error {
}

type DeviceTwinConfig struct {
LatestETAGsForNumbersOfResource uint32 `yaml:"latestETAGsForNumbersOfResource" json:"latestETAGsForNumbersOfResource"`
ForceResynchronization bool `yaml:"forceResynchronization" json:"forceResynchronization"`
NumberOfETAGsForBatchObservation uint32 `yaml:"numberOfETAGsForBatchObservation" json:"numberOfETAGsForBatchObservation"`
ForceResynchronization bool `yaml:"forceResynchronization" json:"forceResynchronization"`
}

type COAPConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/observation/deviceObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (o LimitBatchObserveLatestETagsOpt) Apply(opts *DeviceObserverConfig) {
opts.LimitBatchObserveLatestETags = o.limitBatchObserveLatestETags
}

func WithLatestETAGsForNumbersOfResource(v uint32) LimitBatchObserveLatestETagsOpt {
func WithNumberOfETAGsForBatchObservation(v uint32) LimitBatchObserveLatestETagsOpt {
return LimitBatchObserveLatestETagsOpt{
limitBatchObserveLatestETags: v,
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func createDiscoveryResourceObserver(ctx context.Context, deviceID string, coapC
// Create observer with a single observations for all published resources.
func createPublishedResourcesObserver(ctx context.Context, deviceID string, coapConn ClientConn, callbacks ResourcesObserverCallbacks, published []*commands.Resource, logger log.Logger) (*resourcesObserver, error) {
resourcesObserver := newResourcesObserver(deviceID, coapConn, callbacks, logger)

// TODO get ETAG for each resource
err := resourcesObserver.addResources(ctx, published)
if err != nil {
resourcesObserver.CleanObservedResources(ctx)
Expand Down
25 changes: 8 additions & 17 deletions coap-gateway/service/observation/observedResource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Observation = interface {
type observedResource struct {
href string
resInterface string
etags [][]byte
synced atomic.Bool
isObservable bool
private struct { // guarded by mutex
Expand All @@ -30,12 +29,11 @@ type observedResource struct {
}
}

func newObservedResource(href, resInterface string, etags [][]byte, isObservable bool) *observedResource {
func newObservedResource(href, resInterface string, isObservable bool) *observedResource {
return &observedResource{
href: href,
resInterface: resInterface,
isObservable: isObservable,
etags: etags,
}
}

Expand All @@ -51,13 +49,6 @@ func (r *observedResource) Interface() string {
return r.resInterface
}

func (r *observedResource) ETag() []byte {
if len(r.etags) == 0 {
return nil
}
return r.etags[0]
}

const (
// maxURIQueryLen is the maximum length of a URI query. See https://datatracker.ietf.org/doc/html/rfc7252#section-5.10
maxURIQueryLen = 255
Expand All @@ -67,11 +58,10 @@ const (
prefixQueryIncChanges = "incChanges="
)

func (r *observedResource) EncodeETagsForIncrementChanges() []string {
if len(r.etags) <= 1 {
func encodeETagsForIncrementChanges(etags [][]byte) []string {
if len(etags) < 1 {
return nil
}
etags := r.etags[1:]
etagsStr := make([]string, 0, (len(etags)/15)+1)
var b strings.Builder
for _, etag := range etags {
Expand Down Expand Up @@ -113,13 +103,14 @@ func (r *observedResource) isBatchObservation() bool {
return r.resInterface == interfaces.OC_IF_B
}

func (r *observedResource) toCoapOptions() []message.Option {
func (r *observedResource) toCoapOptions(etags [][]byte) []message.Option {
opts := make([]message.Option, 0, 2)
if r.ETag() != nil {
if len(etags) > 0 {
opts = append(opts, message.Option{
ID: message.ETag,
Value: r.ETag(),
Value: etags[0],
})
etags = etags[1:]
}
if r.Interface() != "" {
opts = append(opts, message.Option{
Expand All @@ -129,7 +120,7 @@ func (r *observedResource) toCoapOptions() []message.Option {
}

if r.isBatchObservation() {
for _, q := range r.EncodeETagsForIncrementChanges() {
for _, q := range encodeETagsForIncrementChanges(etags) {
opts = append(opts, message.Option{
ID: message.URIQuery,
Value: []byte(q),
Expand Down
102 changes: 39 additions & 63 deletions coap-gateway/service/observation/observedResource_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,94 +7,73 @@ import (
)

func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) {
type fields struct {
etags [][]byte
}
tests := []struct {
name string
fields fields
want []string
name string
etags [][]byte
want []string
}{
{
name: "empty",
fields: fields{
etags: nil,
},
name: "empty",
etags: nil,
},
{
name: "only-latest-etag",
fields: fields{
etags: [][]byte{
[]byte("0"),
},
},
name: "not-nil",
etags: [][]byte{},
},
{
name: "one-etag",
fields: fields{
etags: [][]byte{
[]byte("0"),
[]byte("01234567"),
},
etags: [][]byte{
[]byte("01234567"),
},
want: []string{
prefixQueryIncChanges + "MDEyMzQ1Njc",
},
},
{
name: "two-etags",
fields: fields{
etags: [][]byte{
[]byte("0"),
[]byte("1"),
[]byte("2"),
},
etags: [][]byte{
[]byte("1"),
[]byte("2"),
},
want: []string{
prefixQueryIncChanges + "MQ,Mg",
},
},
{
name: "two-etags-invalid-etag",
fields: fields{
etags: [][]byte{
[]byte("0"),
[]byte("1"),
[]byte("2"),
[]byte("invalid-etag-is-ignored"),
},
etags: [][]byte{
[]byte("1"),
[]byte("2"),
[]byte("invalid-etag-is-ignored"),
},
want: []string{
prefixQueryIncChanges + "MQ,Mg",
},
},
{
name: "multiple-etags",
fields: fields{
etags: [][]byte{
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"), // 22
},
etags: [][]byte{
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"),
[]byte("01234567"), // 21
},
want: []string{
prefixQueryIncChanges + "MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc,MDEyMzQ1Njc",
Expand All @@ -104,10 +83,7 @@ func TestObservedResourceEncodeETagsForIncrementChanged(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &observedResource{
etags: tt.fields.etags,
}
got := r.EncodeETagsForIncrementChanges()
got := encodeETagsForIncrementChanges(tt.etags)
for _, g := range got {
assert.Less(t, len(g), 255) // RFC 7641 - Uri-query length
}
Expand Down
Loading

0 comments on commit 102c5b5

Please sign in to comment.