Skip to content

Commit

Permalink
TCP connection is established only if a CONN packet can be built
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitrov-anasoft committed Oct 10, 2024
1 parent 88e86ee commit b7d62b9
Showing 1 changed file with 33 additions and 34 deletions.
67 changes: 33 additions & 34 deletions autopaho/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
// context is cancelled (in which case nil will be returned).
func establishServerConnection(ctx context.Context, cfg ClientConfig, firstConnection bool) (*paho.Client, *paho.Connack) {
// Note: We do not touch b.cli in order to avoid adding thread safety issues.
var err error

var attempt int = 0
for {
Expand All @@ -53,51 +52,51 @@ func establishServerConnection(ctx context.Context, cfg ClientConfig, firstConne
return nil, nil
}
for _, u := range cfg.ServerUrls {
connectionCtx, cancelConnCtx := context.WithTimeout(ctx, cfg.ConnectTimeout)

if cfg.AttemptConnection != nil { // Use custom function if it is provided
cfg.Conn, err = cfg.AttemptConnection(ctx, cfg, u)
} else {
switch strings.ToLower(u.Scheme) {
case "mqtt", "tcp", "":
cfg.Conn, err = attemptTCPConnection(connectionCtx, u.Host)
case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps":
cfg.Conn, err = attemptTLSConnection(connectionCtx, cfg.TlsCfg, u.Host)
case "ws":
cfg.Conn, err = attemptWebsocketConnection(connectionCtx, nil, cfg.WebSocketCfg, u)
case "wss":
cfg.Conn, err = attemptWebsocketConnection(connectionCtx, cfg.TlsCfg, cfg.WebSocketCfg, u)
default:
if cfg.OnConnectError != nil {
cfg.OnConnectError(fmt.Errorf("unsupported scheme (%s) user in url %s", u.Scheme, u.String()))
}
cancelConnCtx()
continue
}
}

var connack *paho.Connack
if err == nil {
cli := paho.NewClient(cfg.ClientConfig)
if cfg.PahoDebug != nil {
cli.SetDebugLogger(cfg.PahoDebug)
}

if cfg.PahoErrors != nil {
cli.SetErrorLogger(cfg.PahoErrors)
cp, err := cfg.buildConnectPacket(firstConnection, u)
if err == nil {
connectionCtx, cancelConnCtx := context.WithTimeout(ctx, cfg.ConnectTimeout)

if cfg.AttemptConnection != nil { // Use custom function if it is provided
cfg.Conn, err = cfg.AttemptConnection(ctx, cfg, u)
} else {
switch strings.ToLower(u.Scheme) {
case "mqtt", "tcp", "":
cfg.Conn, err = attemptTCPConnection(connectionCtx, u.Host)
case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps":
cfg.Conn, err = attemptTLSConnection(connectionCtx, cfg.TlsCfg, u.Host)
case "ws":
cfg.Conn, err = attemptWebsocketConnection(connectionCtx, nil, cfg.WebSocketCfg, u)
case "wss":
cfg.Conn, err = attemptWebsocketConnection(connectionCtx, cfg.TlsCfg, cfg.WebSocketCfg, u)
default:
if cfg.OnConnectError != nil {
cfg.OnConnectError(fmt.Errorf("unsupported scheme (%s) user in url %s", u.Scheme, u.String()))
}
cancelConnCtx()
continue
}
}

var cp *paho.Connect
cp, err = cfg.buildConnectPacket(firstConnection, u)
if err == nil {
cli := paho.NewClient(cfg.ClientConfig)
if cfg.PahoDebug != nil {
cli.SetDebugLogger(cfg.PahoDebug)
}

if cfg.PahoErrors != nil {
cli.SetErrorLogger(cfg.PahoErrors)
}

connack, err = cli.Connect(connectionCtx, cp) // will return an error if the connection is unsuccessful (checks the reason code)
if err == nil { // Successfully connected
cancelConnCtx()
return cli, connack
}
}
cancelConnCtx()
}
cancelConnCtx()

// Possible failure was due to outer context being cancelled
if ctx.Err() != nil {
Expand Down

0 comments on commit b7d62b9

Please sign in to comment.