Skip to content

Commit

Permalink
add(restore_test): test full restore of alternator table
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Nov 8, 2023
1 parent 7498cf0 commit ced7799
Showing 1 changed file with 160 additions and 0 deletions.
160 changes: 160 additions & 0 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
dbsession "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/cenkalti/backoff/v4"
"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -1353,6 +1357,162 @@ func restoreAllTables(t *testing.T, schemaTarget, tablesTarget Target, keyspace
dstH.validateRestoreSuccess(dstSession, srcSession, tablesTarget, toValidate)
}

func TestRestoreFullAlternatorIntegration(t *testing.T) {
testBucket, _, testUser := getBucketKeyspaceUser(t)
const (
testTable = "table-with-dash"
testKeyspace = "alternator_" + testTable
testBatchSize = 1
testParallel = 3
testAlternatorPort = 8000
)

locs := []Location{
{
DC: "dc1",
Provider: S3,
Path: testBucket,
},
}

schemaTarget := Target{
Location: locs,
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreSchema: true,
}

tablesTarget := Target{
Location: locs,
BatchSize: testBatchSize,
Parallel: testParallel,
RestoreTables: true,
}

restoreAlternator(t, schemaTarget, tablesTarget, testKeyspace, testTable, testUser, testAlternatorPort)
}

func restoreAlternator(t *testing.T, schemaTarget, tablesTarget Target, keyspace, table, user string, alternatorPort int) {
var (
ctx = context.Background()
cfg = DefaultConfig()
srcClientCfg = scyllaclient.TestConfig(ManagedSecondClusterHosts(), AgentAuthToken())
mgrSession = CreateScyllaManagerDBSession(t)
dstH = newRestoreTestHelper(t, mgrSession, cfg, schemaTarget.Location[0], nil, "", "")
srcH = newRestoreTestHelper(t, mgrSession, cfg, schemaTarget.Location[0], &srcClientCfg, "", "")
dstSession = CreateSessionAndDropAllKeyspaces(t, dstH.Client)
srcSession = CreateSessionAndDropAllKeyspaces(t, srcH.Client)
)

// Restore should be performed on user with limited permissions
if err := createUser(dstSession, user, "pass"); err != nil {
t.Fatal(err)
}
dstH = newRestoreTestHelper(t, mgrSession, cfg, schemaTarget.Location[0], nil, user, "pass")

createAlternatorTable(t, ManagedSecondClusterHosts()[0], alternatorPort, table)
fillAlternatorTable(t, ManagedSecondClusterHosts()[0], alternatorPort, table)

schemaTarget.SnapshotTag = srcH.simpleBackup(schemaTarget.Location[0])
if err := grantPermissionsToUser(dstSession, schemaTarget, user); err != nil {
t.Fatal(err)
}

Print("Restore schema on different cluster")
if err := dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(schemaTarget)); err != nil {
t.Fatal(err)
}

toValidate := []validateTable{
{Keyspace: keyspace, Table: table, Column: "key"},
}

dstH.validateRestoreSuccess(dstSession, srcSession, schemaTarget, toValidate)

tablesTarget.SnapshotTag = schemaTarget.SnapshotTag
dstH.ClusterID = uuid.MustRandom()
dstH.RunID = uuid.MustRandom()

if err := grantPermissionsToUser(dstSession, tablesTarget, user); err != nil {
t.Fatal(err)
}

Print("Restore tables on different cluster")
if err := dstH.service.Restore(ctx, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(tablesTarget)); err != nil {
t.Fatal(err)
}

dstH.validateRestoreSuccess(dstSession, srcSession, tablesTarget, toValidate)
}

func createAlternatorTable(t *testing.T, host string, alternatorPort int, table string) {
svc := createDynamoDBService(t, host, alternatorPort)
createTable := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("key"),
AttributeType: aws.String("S"),
},
},
BillingMode: aws.String("PAY_PER_REQUEST"),
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("key"),
KeyType: aws.String("HASH"),
},
},
TableName: aws.String(table),
}

Print("When: create alternator table")
_, err := svc.CreateTable(createTable)
if err != nil {
t.Fatal(err)
}
}

func fillAlternatorTable(t *testing.T, host string, alternatorPort int, table string) {
svc := createDynamoDBService(t, host, alternatorPort)
insertData := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
table: {
{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String("test"),
},
},
},
},
},
},
}

Print("When: insert alternator row")
_, err := svc.BatchWriteItem(insertData)
if err != nil {
t.Fatal(err)
}
}

func createDynamoDBService(t *testing.T, host string, alternatorPort int) *dynamodb.DynamoDB {
awsCfg := &aws.Config{
Endpoint: aws.String(fmt.Sprintf("http://%s:%d", host, alternatorPort)),
Credentials: credentials.NewStaticCredentialsFromCreds(credentials.Value{
AccessKeyID: "None",
SecretAccessKey: "None",
}),
Region: aws.String("None"),
}
dbs, err := dbsession.NewSession(awsCfg)
if err != nil {
t.Fatal(err)
}

return dynamodb.New(dbs)
}

func (h *restoreTestHelper) targetToProperties(target Target) json.RawMessage {
props, err := json.Marshal(target)
if err != nil {
Expand Down

0 comments on commit ced7799

Please sign in to comment.