diff --git a/pkg/pgctl/pgctl.go b/pkg/pgctl/pgctl.go index 0727d9c1..2243434e 100644 --- a/pkg/pgctl/pgctl.go +++ b/pkg/pgctl/pgctl.go @@ -7,6 +7,8 @@ import ( "net/url" "github.com/go-logr/logr" + "github.com/infobloxopen/db-controller/pkg/metrics" + "github.com/lib/pq" ) const ( @@ -262,18 +264,52 @@ func (s *copy_schema_state) Execute() (State, error) { log := s.config.Log.WithValues("state", s.String()) log.Info("started") + //grant rds_superuser to target db user temporarily + //this is required to copy schema from source to target and take ownership of the objects + log.Info("grant temp superuser to ", "user", s.config.SourceDBAdminDsn) + + var ( + err error + targetDBAdmin *sql.DB + ) + + if targetDBAdmin, err = getDB(s.config.TargetDBAdminDsn, nil); err != nil { + log.Error(err, "connection test failed for targetDBAdmin") + + return nil, err + } + defer closeDB(log, targetDBAdmin) + + url, err := url.Parse(s.config.TargetDBUserDsn) + if err != nil { + return nil, err + } + rolename := url.User.Username() + //check if rolename ends with "_a" or "_b" and remove the last 2 characters + if len(rolename) > 2 { + if string(rolename[len(rolename)-2]) == "_" { + rolename = rolename[:len(rolename)-2] + } + } + + _, err = targetDBAdmin.Exec(fmt.Sprintf("GRANT rds_superuser TO %s;", pq.QuoteIdentifier(rolename))) + if err != nil { + log.Error(err, "could not grant super user acesss role to"+rolename) + metrics.UsersUpdatedErrors.WithLabelValues("grant error").Inc() + return nil, err + } + dump := NewDump(s.config.SourceDBAdminDsn) dump.SetupFormat("p") dump.SetPath(s.config.ExportFilePath) - dump.EnableVerbose() - dump.SetOptions([]string{ "--schema-only", "--no-publication", "--no-subscriptions", "--no-privileges", + "--no-owner", }) dumpExec := dump.Exec(ExecOptions{StreamPrint: true}) @@ -283,9 +319,8 @@ func (s *copy_schema_state) Execute() (State, error) { return nil, dumpExec.Error.Err } - restore := NewRestore(s.config.TargetDBAdminDsn) + restore := NewRestore(s.config.TargetDBUserDsn) restore.EnableVerbose() - restore.Path = s.config.ExportFilePath restoreExec := restore.Exec(dumpExec.FileName, ExecOptions{StreamPrint: true}) @@ -294,6 +329,13 @@ func (s *copy_schema_state) Execute() (State, error) { if restoreExec.Error != nil { return nil, restoreExec.Error.Err } + _, err = targetDBAdmin.Exec(fmt.Sprintf("REVOKE rds_superuser FROM %s;", pq.QuoteIdentifier(rolename))) + if err != nil { + log.Error(err, "could not revoke super user acesss role from"+rolename) + metrics.UsersUpdatedErrors.WithLabelValues("revoke error").Inc() + return nil, err + } + log.Info("completed") return &create_subscription_state{ config: s.config, @@ -611,17 +653,22 @@ func (s *disable_source_access_state) Execute() (State, error) { return nil, err } - //if user name is sample_user_a, the role inherited is sample_user + //if rolename name is sample_user_a, the role inherited is sample_user //remove "_x" to get role name //the role has all the permission - which needs to be removed now - user := u.User.Username() + rolename := u.User.Username() + if len(rolename) > 2 { + if string(rolename[len(rolename)-2]) == "_" { + rolename = rolename[:len(rolename)-2] + } + } stmt := ` REVOKE insert, delete, update - ON ALL TABLES IN SCHEMA PUBLIC FROM %s` + ON ALL TABLES IN SCHEMA PUBLIC FROM %s;` - _, err = sourceDBAdmin.Exec(fmt.Sprintf(stmt, user[:len(user)-2])) + _, err = sourceDBAdmin.Exec(fmt.Sprintf(stmt, pq.QuoteIdentifier(rolename))) if err != nil { - log.Error(err, "failed revoking access for source db") + log.Error(err, "failed revoking access for source db - "+rolename) return nil, err } log.Info("completed") @@ -661,19 +708,21 @@ func (s *validate_migration_status_state) Execute() (State, error) { defer closeDB(log, sourceDBUser) allTableCountQ := ` - WITH tbl AS ( - SELECT table_schema, table_name - FROM information_schema.tables - WHERE TABLE_NAME not like 'pg_%' - AND table_type = 'BASE TABLE' - AND table_schema = 'public' - ) - SELECT table_name, - (xpath('/row/c/text()', - query_to_xml(format('select count(*) as c from %I.%I', table_schema, TABLE_NAME), FALSE, TRUE, '') - ) - )[1]::text::int AS table_count - FROM tbl + WITH tbl AS ( + SELECT table_schema, table_name + FROM information_schema.tables t, pg_catalog.pg_tables pt + WHERE t.TABLE_NAME not like 'pg_%' + AND t.table_type = 'BASE TABLE' + AND t.table_schema = 'public' + AND t.table_name = pt.tablename + AND pt.tableowner != 'rdsadmin' + ) + SELECT table_name, + (xpath('/row/c/text()', + query_to_xml(format('select count(*) as c from %I.%I', table_schema, TABLE_NAME), FALSE, TRUE, '') + ) + )[1]::text::int AS table_count + FROM tbl ` tableCountQ := "SELECT count(*) From %s" diff --git a/pkg/pgctl/pgctl_test.go b/pkg/pgctl/pgctl_test.go index 47851223..f7eb9356 100644 --- a/pkg/pgctl/pgctl_test.go +++ b/pkg/pgctl/pgctl_test.go @@ -113,6 +113,10 @@ func realTestMain(m *testing.M) int { fmt.Println(err) return 1 } + if err = loadTargetTestData(TargetDBAdminDsn); err != nil { + fmt.Println(err) + return 1 + } rc := m.Run() @@ -235,7 +239,15 @@ func setUpDatabase(pool *dockertest.Pool, pgInfo *PgInfo) (string, *dockertest.R } func loadSourceTestData(dsn string) error { - _, err := Exec("psql", dsn, "-f", "./test/pgctl_test.sql", "-v", "end=50") + _, err := Exec("psql", dsn, "-f", "./test/pgctl_source_test_data.sql", "-v", "end=50") + if err != nil { + return err + } + return nil +} + +func loadTargetTestData(dsn string) error { + _, err := Exec("psql", dsn, "-f", "./test/pgctl_target_test_data.sql") if err != nil { return err } diff --git a/pkg/pgctl/test/pgctl_test.sql b/pkg/pgctl/test/pgctl_source_test_data.sql similarity index 99% rename from pkg/pgctl/test/pgctl_test.sql rename to pkg/pgctl/test/pgctl_source_test_data.sql index ac92e458..ae492ef6 100644 --- a/pkg/pgctl/test/pgctl_test.sql +++ b/pkg/pgctl/test/pgctl_source_test_data.sql @@ -43,6 +43,8 @@ set rds.logical_replication static parameter to 1 -- DROP DATABASE IF EXISTS pub; -- CREATE DATABASE pub; -- \c pub +CREATE ROLE rds_superuser WITH SUPERUSER; + CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "hstore"; @@ -302,6 +304,7 @@ CREATE OPERATOR CLASS soa_serial_number_ops DEFAULT FOR TYPE soa_serial_number select * from pg_stat_replication; SELECT * FROM pg_publication; +drop publication mypub; select * from pg_create_logical_replication_slot('test_slot', 'test_decoding') select * from pg_replication_slots diff --git a/pkg/pgctl/test/pgctl_target_test_data.sql b/pkg/pgctl/test/pgctl_target_test_data.sql new file mode 100644 index 00000000..3b524a22 --- /dev/null +++ b/pkg/pgctl/test/pgctl_target_test_data.sql @@ -0,0 +1,6 @@ +\set ON_ERROR_STOP on +-- DROP DATABASE IF EXISTS pub; +-- CREATE DATABASE pub; +-- \c pub +CREATE ROLE rds_superuser WITH SUPERUSER; +