Skip to content

Commit

Permalink
Fix CleanSession usage in ReconnectClient and add AlwaysResubscribe o…
Browse files Browse the repository at this point in the history
…ption (#226)

- Fix session persistence in ReconnectClient (CleanSession usage)
- Add WithAlwaysResubscribe option to re-subscribe all previously subscribed topics
- Fix unintended resubscribe on initial connect
  • Loading branch information
at-wat authored Apr 14, 2023
1 parent c039343 commit 0d2d878
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 43 deletions.
54 changes: 33 additions & 21 deletions reconnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (ReconnectClient
// The function returns after establishing a first connection, which can be canceled by the context.
// Once after establishing the connection, the retry loop is not affected by the context.
func (c *reconnectClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (bool, error) {
connOptions := &ConnectOptions{
CleanSession: true,
}
connOptions := &ConnectOptions{}
for _, opt := range opts {
if err := opt(connOptions); err != nil {
return false, err
Expand All @@ -78,49 +76,48 @@ func (c *reconnectClient) Connect(ctx context.Context, clientID string, opts ...

var errDial, errConnect firstError

done := make(chan struct{})
done := make(chan bool, 1)
var doneOnce sync.Once
var sessionPresent bool
go func(ctx context.Context) {
defer func() {
close(c.done)
}()
clean := connOptions.CleanSession
reconnWait := c.options.ReconnectWaitBase
var initialized bool
for {
if baseCli, err := c.dialer.DialContext(ctx); err == nil {
optsCurr := append([]ConnectOption{}, opts...)
optsCurr = append(optsCurr, WithCleanSession(clean))
clean = false // Clean only first time.
c.RetryClient.SetClient(ctx, baseCli)

var ctxTimeout context.Context
var cancel func()
var ctxConnect context.Context
var cancelConnect func()
if c.options.Timeout == 0 {
ctxTimeout, cancel = ctx, func() {}
ctxConnect, cancelConnect = ctx, func() {}
} else {
ctxTimeout, cancel = context.WithTimeout(ctx, c.options.Timeout)
ctxConnect, cancelConnect = context.WithTimeout(ctx, c.options.Timeout)
}

if sessionPresent, err := c.RetryClient.Connect(ctxTimeout, clientID, optsCurr...); err == nil {
cancel()
if sessionPresent, err := c.RetryClient.Connect(ctxConnect, clientID, opts...); err == nil {
cancelConnect()

reconnWait = c.options.ReconnectWaitBase // Reset reconnect wait.
doneOnce.Do(func() {
ctx = context.Background()
done <- sessionPresent
close(done)
})

if !sessionPresent {
if initialized && (!sessionPresent || c.options.AlwaysResubscribe) {
c.RetryClient.Resubscribe(ctx)
}
c.RetryClient.Retry(ctx)
initialized = true

ctxKeepAlive, cancelKeepAlive := context.WithCancel(ctx)
if c.options.PingInterval > time.Duration(0) {
// Start keep alive.
go func() {
if err := KeepAlive(
ctx, baseCli,
ctxKeepAlive, baseCli,
c.options.PingInterval,
c.options.Timeout,
); err != nil {
Expand All @@ -133,20 +130,23 @@ func (c *reconnectClient) Connect(ctx context.Context, clientID string, opts ...
}
select {
case <-baseCli.Done():
cancelKeepAlive()
if err := baseCli.Err(); err == nil {
// Disconnected as expected; don't restart.
return
}
case <-ctx.Done():
cancelKeepAlive()
// User cancelled; don't restart.
return
case <-c.disconnected:
cancelKeepAlive()
return
}
} else if err != ctxTimeout.Err() {
} else if err != ctxConnect.Err() {
errConnect.Store(err) // Hold first connect error excepting context cancel.
}
cancel()
cancelConnect()
} else if err != ctx.Err() {
errDial.Store(err) // Hold first dial error excepting context cancel.
}
Expand All @@ -165,7 +165,8 @@ func (c *reconnectClient) Connect(ctx context.Context, clientID string, opts ...
}
}(ctx)
select {
case <-done:
case sessionPresent := <-done:
return sessionPresent, nil
case <-ctx.Done():
var actualErrs []string
if err := errDial.Load(); err != nil {
Expand All @@ -180,7 +181,6 @@ func (c *reconnectClient) Connect(ctx context.Context, clientID string, opts ...
}
return false, wrapErrorf(ctx.Err(), "establishing first connection%s", errStr)
}
return sessionPresent, nil
}

// Disconnect from the broker.
Expand All @@ -203,6 +203,7 @@ type ReconnectOptions struct {
ReconnectWaitMax time.Duration
PingInterval time.Duration
RetryClient *RetryClient
AlwaysResubscribe bool
}

// ReconnectOption sets option for Connect.
Expand Down Expand Up @@ -243,3 +244,14 @@ func WithRetryClient(cli *RetryClient) ReconnectOption {
return nil
}
}

// WithAlwaysResubscribe enables or disables re-subscribe on reconnect.
// Default value is false.
// This option can be used to ensure all subscriptions are restored
// even if the server is buggy.
func WithAlwaysResubscribe(always bool) ReconnectOption {
return func(o *ReconnectOptions) error {
o.AlwaysResubscribe = always
return nil
}
}
Loading

0 comments on commit 0d2d878

Please sign in to comment.