Skip to content

Commit

Permalink
chore(pkg): let other services use config cache
Browse files Browse the repository at this point in the history
This is the first general step toward fixing #3892.
It allows for accessing config cache service from other
services. Actually using config cache service will be
implemented in separate PRs.

Ref #3892
  • Loading branch information
Michal-Leszczynski committed Dec 13, 2024
1 parent 92775c3 commit dd51843
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/scylla-manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRepairMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("repair"),
)
if err != nil {
Expand All @@ -115,6 +116,7 @@ func (s *server) makeServices(ctx context.Context) error {
s.clusterSvc.GetClusterName,
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("backup"),
)
if err != nil {
Expand All @@ -128,6 +130,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRestoreMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("restore"),
)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/scheduler"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
Expand All @@ -45,13 +46,15 @@ type Service struct {
clusterName cluster.NameFunc
scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

dth deduplicateTestHooks
}

func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics,
clusterName cluster.NameFunc, scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics, clusterName cluster.NameFunc,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -76,6 +79,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMet
clusterName: clusterName,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scylla
}
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
logger.Named("backup"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
Expand All @@ -38,14 +39,16 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

intensityHandlers map[uuid.UUID]*intensityParallelHandler
mu sync.Mutex
}

func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid config")
Expand All @@ -61,6 +64,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler),
}, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return gocqlx.Session{}, errors.New("not implemented")
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand All @@ -424,6 +425,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clie
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/restore/helper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, client.Config().Hosts),
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -144,6 +145,8 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
}

func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, user, pass string) *Service {
configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts)

repairSvc, err := repair.NewService(
mgrSession,
repair.DefaultConfig(),
Expand All @@ -154,6 +157,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -171,6 +175,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.InfoLevel).Named("restore"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -29,11 +30,13 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger
}

func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config, metrics metrics.RestoreMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -53,6 +56,7 @@ func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c
func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, user, pass string) (*Service, *backup.Service) {
t.Helper()

configCacheSvc := NewTestConfigCacheSvc(t, client.Config().Hosts)

repairSvc, err := repair.NewService(
session,
repair.DefaultConfig(),
Expand All @@ -119,6 +121,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -138,6 +141,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -155,6 +159,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
logger.Named("restore"),
)
if err != nil {
Expand Down
38 changes: 35 additions & 3 deletions pkg/testutils/testhelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,28 @@ package testhelper
import (
"context"
"testing"
"time"

"github.com/scylladb/go-log"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/scylla-manager/v3/pkg/config/server"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/store"

"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

// CommonTestHelper common tester object for backups and repairs.
type CommonTestHelper struct {
Logger log.Logger
Session gocqlx.Session
Hrt *testutils.HackableRoundTripper
Hrt *HackableRoundTripper
Client *scyllaclient.Client

ClusterID uuid.UUID
Expand Down Expand Up @@ -57,11 +65,35 @@ func (h *CommonTestHelper) RestartAgents() {
func execOnAllHosts(h *CommonTestHelper, cmd string) {
h.T.Helper()
for _, host := range h.GetAllHosts() {
stdout, stderr, err := testutils.ExecOnHost(host, cmd)
stdout, stderr, err := ExecOnHost(host, cmd)
if err != nil {
h.T.Log("stdout", stdout)
h.T.Log("stderr", stderr)
h.T.Fatal("Command failed on host", host, err)
}
}
}

// NewTestConfigCacheSvc creates default config cache service which can be used
// for testing other services relaying on it.
func NewTestConfigCacheSvc(t *testing.T, hosts []string) configcache.ConfigCacher {
t.Helper()

session := CreateScyllaManagerDBSession(t)
secretsStore := store.NewTableStore(session, table.Secrets)

clusterSvc, err := cluster.NewService(session, metrics.NewClusterMetrics(), secretsStore,
scyllaclient.DefaultTimeoutConfig(), server.DefaultConfig().ClientCacheTimeout, log.NewDevelopment())
if err != nil {
t.Fatal(err)
}

scyllaClientProvider := func(context.Context, uuid.UUID) (*scyllaclient.Client, error) {
sc := scyllaclient.TestConfig(hosts, AgentAuthToken())
sc.Timeout = time.Second
sc.Transport = NewHackableRoundTripper(scyllaclient.DefaultTransport())
return scyllaclient.NewClient(sc, log.NewDevelopment())
}

return configcache.NewService(configcache.DefaultConfig(), clusterSvc, scyllaClientProvider, secretsStore, log.NewDevelopment())
}

0 comments on commit dd51843

Please sign in to comment.