Skip to content

Commit

Permalink
Closing of sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Aug 15, 2024
1 parent 3b09c6b commit 59a44ec
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 56 deletions.
22 changes: 14 additions & 8 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ jobs:
include:
# test with check race with coverage and sonarcloud
- name: test
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
checkRace: "true"
coapGateway:
log:
level: "debug"
dumpBody: "true"

- name: test/cqldb
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
checkRace: "true"
database: "cqldb"
coapGateway:
Expand All @@ -46,14 +48,16 @@ jobs:

# test without check race
- name: test/norace
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
coapGateway:
log:
level: "debug"
dumpBody: "true"

- name: test/norace/cqldb
cmd: test
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false
database: "cqldb"
coapGateway:
log:
Expand All @@ -65,16 +69,16 @@ jobs:
# - with ECDSA-SHA256 signature and P384 elliptic curve certificates
# - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_USE_UUID
- name: test/norace-384
cmd: test
args: CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true

# test
# - without check race
# - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER
# - with logs from all services
- name: test/norace/logs
cmd: test
args: TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first
cmd: test-device-provisioning-service
args: TEST_COAP_GATEWAY_UDP_ENABLED=false TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first
coapGateway:
log:
level: "debug"
Expand Down Expand Up @@ -104,6 +108,8 @@ jobs:
echo "Number of cores: $(nproc)"
echo "Number of threads: $(nproc --all)"
which netstat
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v4
with:
Expand Down
7 changes: 4 additions & 3 deletions coap-gateway/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func SetUp(t require.TestingT) (tearDown func()) {
return New(t, MakeConfig(t))
}

func checkForClosedSockets(t require.TestingT, cfg service.Config) {
func checkForClosedSockets(cfg service.Config) error {
sockets := make(test.ListenSockets, 0, len(cfg.APIs.COAP.Protocols))
for _, protocol := range cfg.APIs.COAP.Protocols {
sockets = append(sockets, test.ListenSocket{
Network: string(protocol),
Address: cfg.APIs.COAP.Addr,
})
}
sockets.CheckForClosedSockets(t)
return sockets.CheckForClosedSockets()
}

// New creates test coap-gateway.
Expand All @@ -106,6 +106,7 @@ func New(t require.TestingT, cfg service.Config) func() {
err = fileWatcher.Close()
require.NoError(t, err)

checkForClosedSockets(t, cfg)
err = checkForClosedSockets(cfg)
require.NoError(t, err)
}
}
2 changes: 1 addition & 1 deletion device-provisioning-service/service/provision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestProvisioningWithPSK(t *testing.T) {
hubTestService.SetUpServicesId|hubTestService.SetUpServicesResourceAggregate|hubTestService.SetUpServicesGrpcGateway)
defer hubShutdown()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
ctx = pkgGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t))

Expand Down
22 changes: 11 additions & 11 deletions device-provisioning-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net"
"os/exec"
"time"

"github.com/pion/dtls/v2"
Expand All @@ -27,7 +28,6 @@ import (
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
"github.com/plgd-dev/hub/v2/pkg/opentelemetry/otelcoap"
"github.com/plgd-dev/hub/v2/pkg/service"
"github.com/plgd-dev/hub/v2/pkg/sync/task/queue"
otelCodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -36,7 +36,6 @@ type Service struct {
config Config
ctx context.Context
cancel context.CancelFunc
taskQueue *queue.Queue
messagePool *pool.Pool
linkedHubCache *LinkedHubCache
store *mongodb.Store
Expand Down Expand Up @@ -127,16 +126,10 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
return nil, fmt.Errorf("cannot create open telemetry collector client: %w", err)
}
otelClient.AddCloseFunc(cancel)
tracerProvider := otelClient.GetTracerProvider()

var closer fn.FuncList
closer.AddFunc(otelClient.Close)
queue, err := queue.New(config.TaskQueue)
if err != nil {
closer.Execute()
return nil, fmt.Errorf("cannot create job queue %w", err)
}
closer.AddFunc(queue.Release)

tracerProvider := otelClient.GetTracerProvider()
store, closeStore, err := NewStore(ctx, config.Clients.Storage.MongoDB, fileWatcher, logger, tracerProvider)
if err != nil {
closer.Execute()
Expand Down Expand Up @@ -180,7 +173,6 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
s := Service{
config: config,
linkedHubCache: linkedHubCache,
taskQueue: queue,

ctx: ctx,
cancel: cancel,
Expand All @@ -194,6 +186,14 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
enrollmentGroupsCache: enrollmentGroupsCache,
}

cmd := exec.Command("/usr/bin/netstat", "-tulpn")
stdout, err := cmd.Output()
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(string(stdout))
}

ss, err := s.createServices(fileWatcher, logger)
if err != nil {
if httpService != nil {
Expand Down
2 changes: 1 addition & 1 deletion device-provisioning-service/test/provisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (h *RequestHandlerWithDps) IsStarted() bool {

func (h *RequestHandlerWithDps) StartDps(opts ...service.Option) {
if h.dpsShutdown != nil {
return
panic("dps already started")
}
h.Logf("start provisioning")
h.dpsShutdown = New(h.t, h.dpsCfg, opts...)
Expand Down
62 changes: 35 additions & 27 deletions device-provisioning-service/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,31 @@ func init() {
},
},
}

if err := checkForClosedSockets(MakeAPIsConfig()); err != nil {
panic(err)
}
}

func MakeAPIsConfig() service.APIsConfig {
var cfg service.APIsConfig
cfg.COAP.Addr = DPSHost
cfg.COAP.MaxMessageSize = 256 * 1024
cfg.COAP.MessagePoolSize = 1000
cfg.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP}
if config.DPS_UDP_ENABLED {
cfg.COAP.Protocols = append(cfg.COAP.Protocols, pkgCoapService.UDP)
}
cfg.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{
Timeout: time.Second * 20,
}
cfg.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED
cfg.COAP.BlockwiseTransfer.SZX = "1024"
cfg.HTTP = MakeHTTPConfig()
tlsServerCfg := config.MakeTLSServerConfig()
cfg.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile
cfg.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile
return cfg
}

func MakeConfig(t require.TestingT) service.Config {
Expand All @@ -194,27 +219,11 @@ func MakeConfig(t require.TestingT) service.Config {
cfg.Log = log.MakeDefaultConfig()
cfg.TaskQueue.GoPoolSize = 1600
cfg.TaskQueue.Size = 2 * 1024 * 1024
cfg.APIs.COAP.Addr = DPSHost
cfg.APIs.COAP.MaxMessageSize = 256 * 1024
cfg.APIs.COAP.MessagePoolSize = 1000
cfg.APIs.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP}
if config.DPS_UDP_ENABLED {
cfg.APIs.COAP.Protocols = append(cfg.APIs.COAP.Protocols, pkgCoapService.UDP)
}
cfg.APIs.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{
Timeout: time.Second * 20,
}
cfg.APIs.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED
cfg.APIs.COAP.BlockwiseTransfer.SZX = "1024"
cfg.APIs.HTTP = MakeHTTPConfig()
tlsServerCfg := config.MakeTLSServerConfig()
cfg.APIs.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile
cfg.APIs.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile
cfg.APIs = MakeAPIsConfig()
cfg.Clients.Storage = MakeStorageConfig()
cfg.Clients.OpenTelemetryCollector = pkgHttp.OpenTelemetryCollectorConfig{
Config: config.MakeOpenTelemetryCollectorClient(),
}

cfg.EnrollmentGroups = append(cfg.EnrollmentGroups, MakeEnrollmentGroup())
err := cfg.Validate()
require.NoError(t, err)
Expand Down Expand Up @@ -262,21 +271,21 @@ func New(t *testing.T, cfg service.Config, opts ...service.Option) func() {
return NewWithContext(context.Background(), t, cfg, opts...)
}

func checkForClosedSockets(t require.TestingT, cfg service.Config) {
sockets := make(hubTest.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)+1)
for _, protocol := range cfg.APIs.COAP.Protocols {
func checkForClosedSockets(cfg service.APIsConfig) error {
sockets := make(hubTest.ListenSockets, 0, len(cfg.COAP.Protocols)+1)
for _, protocol := range cfg.COAP.Protocols {
sockets = append(sockets, hubTest.ListenSocket{
Network: string(protocol),
Address: cfg.APIs.COAP.Addr,
Address: cfg.COAP.Addr,
})
}
if cfg.APIs.HTTP.Enabled {
if cfg.HTTP.Enabled {
sockets = append(sockets, hubTest.ListenSocket{
Network: "tcp",
Address: cfg.APIs.HTTP.Config.Connection.Addr,
Address: cfg.HTTP.Config.Connection.Addr,
})
}
sockets.CheckForClosedSockets(t)
return sockets.CheckForClosedSockets()
}

// New creates test dps-gateway.
Expand All @@ -302,9 +311,8 @@ func NewWithContext(ctx context.Context, t *testing.T, cfg service.Config, opts
err = fileWatcher.Close()
require.NoError(t, err)

checkForClosedSockets(t, cfg)
// wait for all connections to be closed
time.Sleep(time.Millisecond * 500)
err = checkForClosedSockets(cfg.APIs)
require.NoError(t, err)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/net/coap/service/tcpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newTCPServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Watc
if err != nil {
return nil, fmt.Errorf("cannot create listener: %w", err)
}
fmt.Printf("tcp listerer(%v) opened\n", config.Addr)
tcpOpts := make([]coapTcpServer.Option, 0, 3)
if serviceOpts.OnNewConnection != nil {
tcpOpts = append(tcpOpts, options.WithOnNewConn(func(cc *coapTcpClient.Conn) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/net/coap/service/udpServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func newDTLSServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Wat
if err != nil {
return nil, fmt.Errorf("cannot create listener: %w", err)
}
fmt.Printf("dtls listerer(%v) opened\n", config.Addr)
dtlsOpts := make([]coapDtlsServer.Option, 0, 4)
if serviceOpts.OnNewConnection != nil {
dtlsOpts = append(dtlsOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) {
Expand Down Expand Up @@ -177,6 +178,7 @@ func newUDPServer(config Config, serviceOpts Options, logger log.Logger, opts ..
if err != nil {
return nil, fmt.Errorf("cannot create listener: %w", err)
}
fmt.Printf("udp listerer(%v) opened\n", config.Addr)
udpOpts := make([]coapUdpServer.Option, 0, 4)
if serviceOpts.OnNewConnection != nil {
udpOpts = append(udpOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) {
Expand Down
14 changes: 9 additions & 5 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,6 @@ func (ls *ListenSocket) IsClosed() (bool, error) {
}
c, err := net.ListenUDP(ls.Network, addr)
if err != nil {
fmt.Printf("ListenUDP error: %v\n", err)
return false, nil

Check failure on line 1015 in test/test.go

View workflow job for this annotation

GitHub Actions / lint

error is not nil (line 1013) but it returns nil (nilerr)
}
err = c.Close()
Expand All @@ -1028,7 +1027,6 @@ func (ls *ListenSocket) IsClosed() (bool, error) {
}
c, err := net.ListenTCP(ls.Network, addr)
if err != nil {
fmt.Printf("ListenTCP error: %v\n", err)
return false, nil

Check failure on line 1030 in test/test.go

View workflow job for this annotation

GitHub Actions / lint

error is not nil (line 1028) but it returns nil (nilerr)
}
err = c.Close()
Expand All @@ -1040,7 +1038,7 @@ func (ls *ListenSocket) IsClosed() (bool, error) {

type ListenSockets []ListenSocket

func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) {
func (ls ListenSockets) CheckForClosedSockets() error {
// wait for all sockets to be closed - max 3 minutes = 900*200
socketClosed := make([]bool, len(ls))
for j := 0; j < 900; j++ {
Expand All @@ -1049,17 +1047,23 @@ func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) {
if socketClosed[i] {
continue
}
fmt.Printf("Checking socket %v:%v\n", socket.Network, socket.Address)
closed, err := socket.IsClosed()
require.NoError(t, err)
if err != nil {
fmt.Printf("Socket %v:%v error\n", socket.Network, socket.Address)
return err
}
socketClosed[i] = closed
if socketClosed[i] {
fmt.Printf("Socket %v:%v closed\n", socket.Network, socket.Address)
continue
}
allClosed = false
}
if allClosed {
break
return nil
}
time.Sleep(time.Millisecond * 200)
}
return errors.New("ports not closed")
}

0 comments on commit 59a44ec

Please sign in to comment.