Skip to content

Commit

Permalink
fix(service): close forgotten clients and sessions
Browse files Browse the repository at this point in the history
Fixes #3769
  • Loading branch information
Michal-Leszczynski committed Mar 29, 2024
1 parent 2905429 commit 1640dcd
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
3 changes: 2 additions & 1 deletion pkg/scyllaclient/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/scylladb/go-log"
"github.com/scylladb/scylla-manager/v3/pkg/util/logutil"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand Down Expand Up @@ -88,7 +89,7 @@ func (p *CachedProvider) Close() error {

for clusterID, c := range p.clients {
delete(p.clients, clusterID)
c.client.Close()
logutil.LogOnError(context.Background(), p.logger, c.client.Close, "Couldn't close scylla client")
}

return nil
Expand Down
11 changes: 5 additions & 6 deletions pkg/service/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/secrets"
"github.com/scylladb/scylla-manager/v3/pkg/service"
"github.com/scylladb/scylla-manager/v3/pkg/store"
"github.com/scylladb/scylla-manager/v3/pkg/util/logutil"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
"go.uber.org/multierr"
)
Expand Down Expand Up @@ -141,11 +142,7 @@ func (s *Service) createClient(ctx context.Context, clusterID uuid.UUID) (*scyll
if err != nil {
return nil, err
}
defer func() {
if err := client.Close(); err != nil {
s.logger.Error(ctx, "Couldn't close scylla client", "error", err)
}
}()
defer logutil.LogOnError(ctx, s.logger, client.Close, "Couldn't close scylla client")

for _, host := range config.Hosts {
knownHosts, err := s.discoverHosts(scyllaclient.ClientContextWithSelectedHost(ctx, host), client)
Expand Down Expand Up @@ -458,7 +455,7 @@ func (s *Service) validateHostsConnectivity(ctx context.Context, c *Cluster) err
if err != nil {
return errors.Wrap(err, "create client")
}
defer client.Close()
defer logutil.LogOnError(ctx, s.logger, client.Close, "Couldn't close scylla client")

status, err := client.Status(ctx)
if err != nil {
Expand Down Expand Up @@ -544,6 +541,7 @@ func (s *Service) ListNodes(ctx context.Context, clusterID uuid.UUID) ([]Node, e
if err != nil {
return nil, err
}
defer logutil.LogOnError(ctx, s.logger, client.Close, "Couldn't close scylla client")

dcs, err := client.Datacenters(ctx)
if err != nil {
Expand Down Expand Up @@ -578,6 +576,7 @@ func (s *Service) GetSession(ctx context.Context, clusterID uuid.UUID) (session
if err != nil {
return session, errors.Wrap(err, "get client")
}
defer logutil.LogOnError(ctx, s.logger, client.Close, "Couldn't close scylla client")

ni, err := client.AnyNodeInfo(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties
// Sort plan
p.SizeSort()
p.PrioritySort(NewInternalTablePreference())

if clusterSession, err := s.clusterSession(ctx, clusterID); err != nil {
s.logger.Info(ctx, "No cluster credentials, couldn't ensure repairing base table before its views", "error", err)
} else {
defer clusterSession.Close()
views, err := query.GetAllViews(clusterSession)
if err != nil {
return t, errors.Wrap(err, "get cluster views")
Expand Down
12 changes: 6 additions & 6 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func newRepairTestHelper(t *testing.T, session gocqlx.Session, config repair.Con
}
}

func newRepairWithClusterSessionTestHelper(t *testing.T, session gocqlx.Session, clusterSession gocqlx.Session,
func newRepairWithClusterSessionTestHelper(t *testing.T, session gocqlx.Session,
hrt *HackableRoundTripper, c *scyllaclient.Client, config repair.Config) *repairTestHelper {
t.Helper()

logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)
s := newTestServiceWithClusterSession(t, session, clusterSession, c, config, logger)
s := newTestServiceWithClusterSession(t, session, c, config, logger)

return &repairTestHelper{
CommonTestHelper: &CommonTestHelper{
Expand Down Expand Up @@ -419,7 +419,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
return s
}

func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clusterSession gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service {
func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service {
t.Helper()

s, err := repair.NewService(
Expand All @@ -430,7 +430,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clus
return client, nil
},
func(ctx context.Context, clusterID uuid.UUID) (gocqlx.Session, error) {
return clusterSession, nil
return CreateSession(t, client), nil
},
logger.Named("repair"),
)
Expand Down Expand Up @@ -667,7 +667,7 @@ func TestServiceRepairOrderIntegration(t *testing.T) {

session := CreateScyllaManagerDBSession(t)
clusterSession := CreateSessionAndDropAllKeyspaces(t, c)
h := newRepairWithClusterSessionTestHelper(t, session, clusterSession, hrt, c, repair.DefaultConfig())
h := newRepairWithClusterSessionTestHelper(t, session, hrt, c, repair.DefaultConfig())

// Add prefixes ruining lexicographic order
const (
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestServiceRepairResumeAllRangesIntegration(t *testing.T) {
clusterSession := CreateSessionAndDropAllKeyspaces(t, c)
cfg := repair.DefaultConfig()
cfg.GracefulStopTimeout = time.Second
h := newRepairWithClusterSessionTestHelper(t, session, clusterSession, hrt, c, cfg)
h := newRepairWithClusterSessionTestHelper(t, session, hrt, c, cfg)

const (
ks1 = "test_repair_1"
Expand Down
1 change: 1 addition & 0 deletions pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI
if err != nil {
return errors.Wrap(err, "create worker")
}
defer w.clusterSession.Close()
w.setRunInfo(taskID, runID)

if err := w.initTarget(ctx, properties); err != nil {
Expand Down

0 comments on commit 1640dcd

Please sign in to comment.