diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 5a1e4000fea..1c57dd0c08e 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -22,15 +22,7 @@ Flags: --backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728) - --binlog_host string PITR restore parameter: hostname/IP of binlog server. - --binlog_password string PITR restore parameter: password of binlog server. --binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc") - --binlog_port int PITR restore parameter: port of binlog server. - --binlog_ssl_ca string PITR restore parameter: Filename containing TLS CA certificate to verify binlog server TLS certificate against. - --binlog_ssl_cert string PITR restore parameter: Filename containing mTLS client certificate to present to binlog server as authentication. - --binlog_ssl_key string PITR restore parameter: Filename containing mTLS client private key for use in binlog server authentication. - --binlog_ssl_server_name string PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in --binlog_host). - --binlog_user string PITR restore parameter: username of binlog server. --buffer_drain_concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1) --buffer_keyspace_shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true. --buffer_max_failover_duration duration Stop buffering completely if a failover takes longer than this duration. (default 20s) @@ -261,7 +253,6 @@ Flags: --onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s) --onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s) --pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown. - --pitr_gtid_lookup_timeout duration PITR restore parameter: timeout for fetching gtid from timestamp. (default 1m0s) --planner-version string Sets the default planner to use when the session has not changed it. Valid values are: Gen4, Gen4Greedy, Gen4Left2Right --pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled) --port int port for the server diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 132c2d06344..48776d0e8e0 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -57,20 +57,12 @@ Flags: --backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728) - --binlog_host string PITR restore parameter: hostname/IP of binlog server. - --binlog_password string PITR restore parameter: password of binlog server. --binlog_player_grpc_ca string the server ca to use to validate servers when connecting --binlog_player_grpc_cert string the cert to use to connect --binlog_player_grpc_crl string the server crl to use to validate server certificates when connecting --binlog_player_grpc_key string the key to use to connect --binlog_player_grpc_server_name string the server name to use to validate server certificate --binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc") - --binlog_port int PITR restore parameter: port of binlog server. - --binlog_ssl_ca string PITR restore parameter: Filename containing TLS CA certificate to verify binlog server TLS certificate against. - --binlog_ssl_cert string PITR restore parameter: Filename containing mTLS client certificate to present to binlog server as authentication. - --binlog_ssl_key string PITR restore parameter: Filename containing mTLS client private key for use in binlog server authentication. - --binlog_ssl_server_name string PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in --binlog_host). - --binlog_user string PITR restore parameter: username of binlog server. --builtinbackup-file-read-buffer-size uint read files using an IO buffer of this many bytes. Golang defaults are used when set to 0. --builtinbackup-file-write-buffer-size uint write files using an IO buffer of this many bytes. Golang defaults are used when set to 0. (default 2097152) --builtinbackup-incremental-restore-path string the directory where incremental restore files, namely binlog files, are extracted to. In k8s environments, this should be set to a directory that is shared between the vttablet and mysqld pods. The path should exist. When empty, the default OS temp dir is assumed. @@ -259,7 +251,6 @@ Flags: --onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s) --opentsdb_uri string URI of opentsdb /api/put method --pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown. - --pitr_gtid_lookup_timeout duration PITR restore parameter: timeout for fetching gtid from timestamp. (default 1m0s) --pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled) --port int port for the server --pprof strings enable profiling diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 03d745468be..43278c9a0c4 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -84,7 +84,7 @@ func (flv *filePosFlavor) gtidMode(c *Conn) (string, error) { // serverUUID is part of the Flavor interface. func (flv *filePosFlavor) serverUUID(c *Conn) (string, error) { - // keep @@global as lowercase, as some servers like the Ripple binlog server only honors a lowercase `global` value + // keep @@global as lowercase, as some servers like a binlog server only honors a lowercase `global` value qr, err := c.ExecuteFetch("SELECT @@global.server_uuid", 1, false) if err != nil { return "", err diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index b109e5ca385..e405dc401ec 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -60,7 +60,7 @@ var _ flavor = (*mysqlFlavor82)(nil) // primaryGTIDSet is part of the Flavor interface. func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { - // keep @@global as lowercase, as some servers like the Ripple binlog server only honors a lowercase `global` value + // keep @@global as lowercase, as some servers like a binlog server only honors a lowercase `global` value qr, err := c.ExecuteFetch("SELECT @@global.gtid_executed", 1, false) if err != nil { return nil, err @@ -73,7 +73,7 @@ func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { // purgedGTIDSet is part of the Flavor interface. func (mysqlFlavor) purgedGTIDSet(c *Conn) (replication.GTIDSet, error) { - // keep @@global as lowercase, as some servers like the Ripple binlog server only honors a lowercase `global` value + // keep @@global as lowercase, as some servers like a binlog server only honors a lowercase `global` value qr, err := c.ExecuteFetch("SELECT @@global.gtid_purged", 1, false) if err != nil { return nil, err @@ -86,7 +86,7 @@ func (mysqlFlavor) purgedGTIDSet(c *Conn) (replication.GTIDSet, error) { // serverUUID is part of the Flavor interface. func (mysqlFlavor) serverUUID(c *Conn) (string, error) { - // keep @@global as lowercase, as some servers like the Ripple binlog server only honors a lowercase `global` value + // keep @@global as lowercase, as some servers like a binlog server only honors a lowercase `global` value qr, err := c.ExecuteFetch("SELECT @@global.server_uuid", 1, false) if err != nil { return "", err diff --git a/go/test/endtoend/recovery/pitr/binlog_server.go b/go/test/endtoend/recovery/pitr/binlog_server.go deleted file mode 100644 index 3b78b0d4ad7..00000000000 --- a/go/test/endtoend/recovery/pitr/binlog_server.go +++ /dev/null @@ -1,137 +0,0 @@ -/* -Copyright 2020 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pitr - -import ( - "fmt" - "os" - "os/exec" - "path" - "strings" - "syscall" - "time" - - "vitess.io/vitess/go/vt/log" -) - -const ( - binlogExecutableName = "rippled" - binlogDataDir = "binlog_dir" - binlogUser = "ripple" - binlogPassword = "ripplepassword" - binlogPasswordHash = "D4CDF66E273494CEA9592162BEBB6D62D94C4168" -) - -type binLogServer struct { - hostname string - port int - username string - password string - passwordHash string - dataDirectory string - executablePath string - - proc *exec.Cmd - exit chan error -} - -type mysqlSource struct { - hostname string - port int - username string - password string -} - -// newBinlogServer returns an instance of binlog server -func newBinlogServer(hostname string, port int) (*binLogServer, error) { - dataDir := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", binlogDataDir, port)) - fmt.Println(dataDir) - if _, err := os.Stat(dataDir); os.IsNotExist(err) { - err := os.Mkdir(dataDir, 0700) - if err != nil { - log.Error(err) - return nil, err - } - } - return &binLogServer{ - executablePath: path.Join(os.Getenv("EXTRA_BIN"), binlogExecutableName), - dataDirectory: dataDir, - username: binlogUser, - password: binlogPassword, - passwordHash: binlogPasswordHash, - hostname: hostname, - port: port, - }, nil -} - -// start starts the binlog server points to running mysql port -func (bs *binLogServer) start(source mysqlSource) error { - bs.proc = exec.Command( - bs.executablePath, - fmt.Sprintf("-ripple_datadir=%s", bs.dataDirectory), - fmt.Sprintf("-ripple_server_password_hash=%s", bs.passwordHash), - fmt.Sprintf("-ripple_master_address=%s", source.hostname), - fmt.Sprintf("-ripple_master_port=%d", source.port), - fmt.Sprintf("-ripple_master_user=%s", source.username), - fmt.Sprintf("-ripple_server_ports=%d", bs.port), - ) - if source.password != "" { - bs.proc.Args = append(bs.proc.Args, fmt.Sprintf("-ripple_master_password=%s", source.password)) - } - - errFile, err := os.Create(path.Join(bs.dataDirectory, "log.txt")) - if err != nil { - log.Errorf("cannot create error log file for binlog server: %v", err) - return err - } - bs.proc.Stderr = errFile - - bs.proc.Env = append(bs.proc.Env, os.Environ()...) - - log.Infof("Running binlog server with command: %v", strings.Join(bs.proc.Args, " ")) - - err = bs.proc.Start() - if err != nil { - return err - } - bs.exit = make(chan error) - go func() { - if bs.proc != nil { - bs.exit <- bs.proc.Wait() - } - }() - return nil -} - -func (bs *binLogServer) stop() error { - if bs.proc == nil || bs.exit == nil { - return nil - } - // Attempt graceful shutdown with SIGTERM first - bs.proc.Process.Signal(syscall.SIGTERM) - - select { - case err := <-bs.exit: - bs.proc = nil - return err - - case <-time.After(10 * time.Second): - bs.proc.Process.Kill() - bs.proc = nil - return <-bs.exit - } -} diff --git a/go/test/endtoend/recovery/pitr/shardedpitr_test.go b/go/test/endtoend/recovery/pitr/shardedpitr_test.go deleted file mode 100644 index 3bb2399737e..00000000000 --- a/go/test/endtoend/recovery/pitr/shardedpitr_test.go +++ /dev/null @@ -1,602 +0,0 @@ -/* -Copyright 2020 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pitr - -import ( - "context" - "fmt" - "os" - "os/exec" - "path" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/json2" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/log" - - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" -) - -var ( - createTable = `create table product (id bigint(20) primary key, name char(10), created bigint(20));` - insertTable = `insert into product (id, name, created) values(%d, '%s', unix_timestamp());` - getCountID = `select count(*) from product` -) - -var ( - clusterInstance *cluster.LocalProcessCluster - - primary *cluster.Vttablet - replica1 *cluster.Vttablet - replica2 *cluster.Vttablet - shard0Primary *cluster.Vttablet - shard0Replica1 *cluster.Vttablet - shard0Replica2 *cluster.Vttablet - shard1Primary *cluster.Vttablet - shard1Replica1 *cluster.Vttablet - shard1Replica2 *cluster.Vttablet - - cell = "zone1" - hostname = "localhost" - binlogHost = "127.0.0.1" - keyspaceName = "ks" - restoreKS1Name = "restoreks1" - restoreKS2Name = "restoreks2" - restoreKS3Name = "restoreks3" - shardName = "0" - shard0Name = "-80" - shard1Name = "80-" - dbName = "vt_ks" - mysqlUserName = "vt_dba" - mysqlPassword = "VtDbaPass" - dbCredentialFile = "" - initDBFileWithPassword = "" - vSchema = `{ - "sharded": true, - "vindexes": { - "hash_index": { - "type": "hash" - } - }, - "tables": { - "product": { - "column_vindexes": [ - { - "column": "id", - "name": "hash_index" - } - ] - } - } - }` - commonTabletArg = []string{ - "--vreplication_retry_delay", "1s", - "--degraded_threshold", "5s", - "--lock_tables_timeout", "5s", - "--watch_replication_stream", - "--serving_state_grace_period", "1s"} - - defaultTimeout = 30 * time.Second - defaultTick = 1 * time.Second -) - -// Test pitr (Point in time recovery). -// ------------------------------------------- -// The following test will: -// - create a shard with primary and replica -// - run InitShardPrimary -// - point binlog server to primary -// - insert some data using vtgate (e.g. here we have inserted rows 1,2) -// - verify the replication -// - take backup of replica -// - insert some data using vtgate (e.g. we inserted rows 3 4 5 6), while inserting row-4, note down the time (restoreTime1) -// - perform a resharding to create 2 shards (-80, 80-), and delete the old shard -// - point binlog server to primary of both shards -// - insert some data using vtgate (e.g. we will insert 7 8 9 10) and verify we get required number of rows in -80, 80- shard -// - take backup of both shards -// - insert some more data using vtgate (e.g. we will insert 11 12 13 14 15), while inserting row-13, note down the time (restoreTime2) -// - note down the current time (restoreTime3) - -// - Till now we did all the presetup for assertions - -// - asserting that restoring to restoreTime1 (going from 2 shards to 1 shard) is working, i.e. we should get 4 rows. -// - asserting that while restoring if we give small timeout value, it will restore upto to the last available backup (asserting only -80 shard) -// - asserting that restoring to restoreTime2 (going from 2 shards to 2 shards with past time) is working, it will assert for both shards -// - asserting that restoring to restoreTime3 is working, we should get complete data after restoring, as we have in existing shards. -func TestPITRRecovery(t *testing.T) { - initializeCluster(t) - defer clusterInstance.Teardown() - - // start the binlog server and point it to primary - bs := startBinlogServer(t, primary) - defer bs.stop() - - // Creating the table - _, err := primary.VttabletProcess.QueryTablet(createTable, keyspaceName, true) - require.NoError(t, err) - - insertRow(t, 1, "prd-1", false) - insertRow(t, 2, "prd-2", false) - - cluster.VerifyRowsInTabletForTable(t, replica1, keyspaceName, 2, "product") - - // backup the replica - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.NoError(t, err) - - // check that the backup shows up in the listing - output, err := clusterInstance.ListBackups("ks/0") - require.NoError(t, err) - assert.Equal(t, 1, len(output)) - - // now insert some more data to simulate the changes after regular backup - // every insert has some time lag/difference to simulate the time gap between rows - // and when we recover to certain time, this time gap will be able to identify the exact eligible row - var restoreTime1 string - for counter := 3; counter <= 6; counter++ { - if counter == 4 { // we want to recovery till this, so noting the time - tm := time.Now().Add(1 * time.Second).UTC() - restoreTime1 = tm.Format(time.RFC3339) - } - insertRow(t, counter, fmt.Sprintf("prd-%d", counter), true) - } - - // starting resharding process - performResharding(t) - - // start the binlog server and point it to shard0Primary - bs0 := startBinlogServer(t, shard0Primary) - defer bs0.stop() - - // start the binlog server and point it to shard1Primary - bs1 := startBinlogServer(t, shard1Primary) - defer bs1.stop() - - for counter := 7; counter <= 10; counter++ { - insertRow(t, counter, fmt.Sprintf("prd-%d", counter), false) - } - - // wait till all the shards have required data - cluster.VerifyRowsInTabletForTable(t, shard0Replica1, keyspaceName, 6, "product") - cluster.VerifyRowsInTabletForTable(t, shard1Replica1, keyspaceName, 4, "product") - - // take the backup (to simulate the regular backup) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Backup", shard0Replica1.Alias) - require.NoError(t, err) - // take the backup (to simulate the regular backup) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Backup", shard1Replica1.Alias) - require.NoError(t, err) - - backups, err := clusterInstance.ListBackups(keyspaceName + "/-80") - require.NoError(t, err) - require.Equal(t, len(backups), 1) - - backups, err = clusterInstance.ListBackups(keyspaceName + "/80-") - require.NoError(t, err) - require.Equal(t, len(backups), 1) - - // now insert some more data to simulate the changes after regular backup - // every insert has some time lag/difference to simulate the time gap between rows - // and when we recover to certain time, this time gap will be able to identify the exact eligible row - var restoreTime2 string - for counter := 11; counter <= 15; counter++ { - if counter == 13 { // we want to recovery till this, so noting the time - tm := time.Now().Add(1 * time.Second).UTC() - restoreTime2 = tm.Format(time.RFC3339) - } - insertRow(t, counter, fmt.Sprintf("prd-%d", counter), true) - } - restoreTime3 := time.Now().UTC().Format(time.RFC3339) - - // creating restore keyspace with snapshot time as restoreTime1 - createRestoreKeyspace(t, restoreTime1, restoreKS1Name) - - // Launching a recovery tablet which recovers data from the primary till the restoreTime1 - testTabletRecovery(t, bs, "2m", restoreKS1Name, "0", "INT64(4)") - - // create restoreKeyspace with snapshot time as restoreTime2 - createRestoreKeyspace(t, restoreTime2, restoreKS2Name) - - // test the recovery with smaller binlog_lookup_timeout for shard0 - // since we have small lookup timeout, it will just get whatever available in the backup - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 1 | prd-1 | 1597219030 | - // | 2 | prd-2 | 1597219030 | - // | 3 | prd-3 | 1597219043 | - // | 5 | prd-5 | 1597219045 | - // | 9 | prd-9 | 1597219130 | - // | 10 | prd-10 | 1597219130 | - // +----+--------+------------+ - testTabletRecovery(t, bs0, "1ms", restoreKS2Name, "-80", "INT64(6)") - - // test the recovery with valid binlog_lookup_timeout for shard0 and getting the data till the restoreTime2 - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 1 | prd-1 | 1597219030 | - // | 2 | prd-2 | 1597219030 | - // | 3 | prd-3 | 1597219043 | - // | 5 | prd-5 | 1597219045 | - // | 9 | prd-9 | 1597219130 | - // | 10 | prd-10 | 1597219130 | - // | 13 | prd-13 | 1597219141 | - // +----+--------+------------+ - testTabletRecovery(t, bs0, "2m", restoreKS2Name, "-80", "INT64(7)") - - // test the recovery with valid binlog_lookup_timeout for shard1 and getting the data till the restoreTime2 - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 4 | prd-4 | 1597219044 | - // | 6 | prd-6 | 1597219046 | - // | 7 | prd-7 | 1597219130 | - // | 8 | prd-8 | 1597219130 | - // | 11 | prd-11 | 1597219139 | - // | 12 | prd-12 | 1597219140 | - // +----+--------+------------+ - testTabletRecovery(t, bs1, "2m", restoreKS2Name, "80-", "INT64(6)") - - // test the recovery with timetorecover > (timestmap of last binlog event in binlog server) - createRestoreKeyspace(t, restoreTime3, restoreKS3Name) - - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 1 | prd-1 | 1597219030 | - // | 2 | prd-2 | 1597219030 | - // | 3 | prd-3 | 1597219043 | - // | 5 | prd-5 | 1597219045 | - // | 9 | prd-9 | 1597219130 | - // | 10 | prd-10 | 1597219130 | - // | 13 | prd-13 | 1597219141 | - // | 15 | prd-15 | 1597219142 | - // +----+--------+------------+ - testTabletRecovery(t, bs0, "2m", restoreKS3Name, "-80", "INT64(8)") - - // mysql> select * from product; - // +----+--------+------------+ - // | id | name | created | - // +----+--------+------------+ - // | 4 | prd-4 | 1597219044 | - // | 6 | prd-6 | 1597219046 | - // | 7 | prd-7 | 1597219130 | - // | 8 | prd-8 | 1597219130 | - // | 11 | prd-11 | 1597219139 | - // | 12 | prd-12 | 1597219140 | - // | 14 | prd-14 | 1597219142 | - // +----+--------+------------+ - testTabletRecovery(t, bs1, "2m", restoreKS3Name, "80-", "INT64(7)") -} - -func performResharding(t *testing.T) { - err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, vSchema) - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "create", "--source-shards=0", "--target-shards=-80,80-", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - waitTimeout := 30 * time.Second - shard0Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) - shard1Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) - - waitForNoWorkflowLag(t, clusterInstance, "ks", "reshardWorkflow") - - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=rdonly", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=replica", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - // then serve primary from the split shards - err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=primary", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") - require.NoError(t, err) - - // remove the original tablets in the original shard - removeTablets(t, []*cluster.Vttablet{primary, replica1, replica2}) - - for _, tablet := range []*cluster.Vttablet{replica1, replica2} { - err = clusterInstance.VtctldClientProcess.ExecuteCommand("DeleteTablets", tablet.Alias) - require.NoError(t, err) - } - err = clusterInstance.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", primary.Alias) - require.NoError(t, err) - - // rebuild the serving graph, all mentions of the old shards should be gone - err = clusterInstance.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "ks") - require.NoError(t, err) - - // delete the original shard - err = clusterInstance.VtctldClientProcess.ExecuteCommand("DeleteShards", "ks/0") - require.NoError(t, err) - - // Restart vtgate process - err = clusterInstance.VtgateProcess.TearDown() - require.NoError(t, err) - - err = clusterInstance.VtgateProcess.Setup() - require.NoError(t, err) - - clusterInstance.WaitForTabletsToHealthyInVtgate() -} - -func startBinlogServer(t *testing.T, primaryTablet *cluster.Vttablet) *binLogServer { - bs, err := newBinlogServer(hostname, clusterInstance.GetAndReservePort()) - require.NoError(t, err) - - err = bs.start(mysqlSource{ - hostname: binlogHost, - port: primaryTablet.MysqlctlProcess.MySQLPort, - username: mysqlUserName, - password: mysqlPassword, - }) - require.NoError(t, err) - return bs -} - -func removeTablets(t *testing.T, tablets []*cluster.Vttablet) { - var mysqlProcs []*exec.Cmd - for _, tablet := range tablets { - proc, _ := tablet.MysqlctlProcess.StopProcess() - mysqlProcs = append(mysqlProcs, proc) - } - for _, proc := range mysqlProcs { - err := proc.Wait() - require.NoError(t, err) - } - for _, tablet := range tablets { - tablet.VttabletProcess.TearDown() - } -} - -func initializeCluster(t *testing.T) { - clusterInstance = cluster.NewCluster(cell, hostname) - - // Start topo server - err := clusterInstance.StartTopo() - require.NoError(t, err) - - // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - } - clusterInstance.Keyspaces = append(clusterInstance.Keyspaces, *keyspace) - - shard := &cluster.Shard{ - Name: shardName, - } - shard0 := &cluster.Shard{ - Name: shard0Name, - } - shard1 := &cluster.Shard{ - Name: shard1Name, - } - - // Defining all the tablets - primary = clusterInstance.NewVttabletInstance("replica", 0, "") - replica1 = clusterInstance.NewVttabletInstance("replica", 0, "") - replica2 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard0Primary = clusterInstance.NewVttabletInstance("replica", 0, "") - shard0Replica1 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard0Replica2 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard1Primary = clusterInstance.NewVttabletInstance("replica", 0, "") - shard1Replica1 = clusterInstance.NewVttabletInstance("replica", 0, "") - shard1Replica2 = clusterInstance.NewVttabletInstance("replica", 0, "") - - shard.Vttablets = []*cluster.Vttablet{primary, replica1, replica2} - shard0.Vttablets = []*cluster.Vttablet{shard0Primary, shard0Replica1, shard0Replica2} - shard1.Vttablets = []*cluster.Vttablet{shard1Primary, shard1Replica1, shard1Replica2} - - dbCredentialFile = cluster.WriteDbCredentialToTmp(clusterInstance.TmpDirectory) - extraArgs := []string{"--db-credentials-file", dbCredentialFile} - commonTabletArg = append(commonTabletArg, "--db-credentials-file", dbCredentialFile) - - clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, commonTabletArg...) - clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--restore_from_backup") - - err = clusterInstance.SetupCluster(keyspace, []cluster.Shard{*shard, *shard0, *shard1}) - require.NoError(t, err) - vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory) - out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") - require.NoError(t, err, out) - - initDb, _ := os.ReadFile(path.Join(os.Getenv("VTROOT"), "/config/init_db.sql")) - sql := string(initDb) - // The original init_db.sql does not have any passwords. Here we update the init file with passwords - sql, err = utils.GetInitDBSQL(sql, cluster.GetPasswordUpdateSQL(clusterInstance), "") - require.NoError(t, err, "expected to load init_db file") - initDBFileWithPassword = path.Join(clusterInstance.TmpDirectory, "init_db_with_passwords.sql") - err = os.WriteFile(initDBFileWithPassword, []byte(sql), 0660) - require.NoError(t, err, "expected to load init_db file") - - // Start MySql - var mysqlCtlProcessList []*exec.Cmd - for _, shard := range clusterInstance.Keyspaces[0].Shards { - for _, tablet := range shard.Vttablets { - tablet.MysqlctlProcess.InitDBFile = initDBFileWithPassword - tablet.VttabletProcess.DbPassword = mysqlPassword - tablet.MysqlctlProcess.ExtraArgs = extraArgs - proc, err := tablet.MysqlctlProcess.StartProcess() - require.NoError(t, err) - mysqlCtlProcessList = append(mysqlCtlProcessList, proc) - } - } - - // Wait for mysql processes to start - for _, proc := range mysqlCtlProcessList { - err = proc.Wait() - require.NoError(t, err) - } - - for _, shard := range clusterInstance.Keyspaces[0].Shards { - for _, tablet := range shard.Vttablets { - err = tablet.VttabletProcess.Setup() - require.NoError(t, err) - } - } - - err = clusterInstance.VtctldClientProcess.InitShardPrimary(keyspaceName, shard.Name, cell, primary.TabletUID) - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.InitShardPrimary(keyspaceName, shard0.Name, cell, shard0Primary.TabletUID) - require.NoError(t, err) - - err = clusterInstance.VtctldClientProcess.InitShardPrimary(keyspaceName, shard1.Name, cell, shard1Primary.TabletUID) - require.NoError(t, err) - - err = clusterInstance.StartVTOrc(keyspaceName) - require.NoError(t, err) - - // Start vtgate - err = clusterInstance.StartVtgate() - require.NoError(t, err) -} - -func insertRow(t *testing.T, id int, productName string, isSlow bool) { - ctx := context.Background() - vtParams := mysql.ConnParams{ - Host: clusterInstance.Hostname, - Port: clusterInstance.VtgateMySQLPort, - } - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - insertSmt := fmt.Sprintf(insertTable, id, productName) - _, err = conn.ExecuteFetch(insertSmt, 1000, true) - require.NoError(t, err) - - if isSlow { - time.Sleep(1 * time.Second) - } -} - -func createRestoreKeyspace(t *testing.T, timeToRecover, restoreKeyspaceName string) { - output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("CreateKeyspace", - "--type=SNAPSHOT", "--base-keyspace="+keyspaceName, - "--snapshot-timestamp", timeToRecover, restoreKeyspaceName) - log.Info(output) - require.NoError(t, err) -} - -func testTabletRecovery(t *testing.T, binlogServer *binLogServer, lookupTimeout, restoreKeyspaceName, shardName, expectedRows string) { - recoveryTablet := clusterInstance.NewVttabletInstance("replica", 0, cell) - launchRecoveryTablet(t, recoveryTablet, binlogServer, lookupTimeout, restoreKeyspaceName, shardName) - - sqlRes, err := recoveryTablet.VttabletProcess.QueryTablet(getCountID, keyspaceName, true) - require.NoError(t, err) - assert.Equal(t, expectedRows, sqlRes.Rows[0][0].String()) - - defer recoveryTablet.MysqlctlProcess.Stop() - defer recoveryTablet.VttabletProcess.TearDown() -} - -func launchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, binlogServer *binLogServer, lookupTimeout, restoreKeyspaceName, shardName string) { - mysqlctlProcess, err := cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory) - require.NoError(t, err) - tablet.MysqlctlProcess = *mysqlctlProcess - extraArgs := []string{"--db-credentials-file", dbCredentialFile} - tablet.MysqlctlProcess.InitDBFile = initDBFileWithPassword - tablet.MysqlctlProcess.ExtraArgs = extraArgs - err = tablet.MysqlctlProcess.Start() - require.NoError(t, err) - - tablet.VttabletProcess = cluster.VttabletProcessInstance( - tablet.HTTPPort, - tablet.GrpcPort, - tablet.TabletUID, - clusterInstance.Cell, - shardName, - keyspaceName, - clusterInstance.VtctldProcess.Port, - tablet.Type, - clusterInstance.TopoProcess.Port, - clusterInstance.Hostname, - clusterInstance.TmpDirectory, - clusterInstance.VtTabletExtraArgs, - clusterInstance.DefaultCharset) - tablet.Alias = tablet.VttabletProcess.TabletPath - tablet.VttabletProcess.DbPassword = mysqlPassword - tablet.VttabletProcess.SupportsBackup = true - tablet.VttabletProcess.Keyspace = restoreKeyspaceName - tablet.VttabletProcess.ExtraArgs = []string{ - "--disable_active_reparents", - "--enable_replication_reporter=false", - "--init_db_name_override", dbName, - "--init_tablet_type", "replica", - "--init_keyspace", restoreKeyspaceName, - "--init_shard", shardName, - "--binlog_host", binlogServer.hostname, - "--binlog_port", fmt.Sprintf("%d", binlogServer.port), - "--binlog_user", binlogServer.username, - "--binlog_password", binlogServer.password, - "--pitr_gtid_lookup_timeout", lookupTimeout, - "--vreplication_retry_delay", "1s", - "--degraded_threshold", "5s", - "--lock_tables_timeout", "5s", - "--watch_replication_stream", - "--serving_state_grace_period", "1s", - "--db-credentials-file", dbCredentialFile, - } - tablet.VttabletProcess.ServingStatus = "" - - err = tablet.VttabletProcess.Setup() - require.NoError(t, err) - - tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second) -} - -// waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag -// value to be 0. -func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ks string, workflow string) { - var lag int64 - timer := time.NewTimer(defaultTimeout) - defer timer.Stop() - for { - output, err := vc.VtctldClientProcess.ExecuteCommandWithOutput("Workflow", "--keyspace", ks, "show", "--workflow", workflow) - require.NoError(t, err) - - var resp vtctldatapb.GetWorkflowsResponse - err = json2.UnmarshalPB([]byte(output), &resp) - require.NoError(t, err) - require.GreaterOrEqual(t, len(resp.Workflows), 1, "responce should have at least one workflow") - lag = resp.Workflows[0].MaxVReplicationTransactionLag - if lag == 0 { - return - } - select { - case <-timer.C: - require.FailNow(t, fmt.Sprintf("workflow %q did not eliminate VReplication lag before the timeout of %s; last seen MaxVReplicationTransactionLag: %d", - strings.Join([]string{ks, workflow}, "."), defaultTimeout, lag)) - default: - time.Sleep(defaultTick) - } - } -} diff --git a/go/test/endtoend/vtgate/plan_tests/main_test.go b/go/test/endtoend/vtgate/plan_tests/main_test.go index d3915af0c8d..504ec3ffb26 100644 --- a/go/test/endtoend/vtgate/plan_tests/main_test.go +++ b/go/test/endtoend/vtgate/plan_tests/main_test.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/planbuilder" ) @@ -128,6 +129,31 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { } } +// splitSQL statements - querySQL may be a multi-line sql blob +func splitSQL(querySQL ...string) ([]string, error) { + parser := sqlparser.NewTestParser() + var sqls []string + for _, sql := range querySQL { + split, err := parser.SplitStatementToPieces(sql) + if err != nil { + return nil, err + } + sqls = append(sqls, split...) + } + return sqls, nil +} + +func loadSampleData(t *testing.T, mcmp utils.MySQLCompare) { + sampleDataSQL := readFile("sampledata/user.sql") + insertSQL, err := splitSQL(sampleDataSQL) + if err != nil { + require.NoError(t, err) + } + for _, sql := range insertSQL { + mcmp.ExecNoCompare(sql) + } +} + func readJSONTests(filename string) []planbuilder.PlanTest { var output []planbuilder.PlanTest file, err := os.Open(locateFile(filename)) diff --git a/go/test/endtoend/vtgate/plan_tests/plan_e2e_test.go b/go/test/endtoend/vtgate/plan_tests/plan_e2e_test.go index 1594e9b392c..b4d6a2b39f6 100644 --- a/go/test/endtoend/vtgate/plan_tests/plan_e2e_test.go +++ b/go/test/endtoend/vtgate/plan_tests/plan_e2e_test.go @@ -22,21 +22,25 @@ import ( "vitess.io/vitess/go/test/endtoend/utils" ) -func TestSelectCases(t *testing.T) { +func TestE2ECases(t *testing.T) { + e2eTestCaseFiles := []string{"select_cases.json", "filter_cases.json"} mcmp, closer := start(t) defer closer() - tests := readJSONTests("select_cases.json") - for _, test := range tests { - mcmp.Run(test.Comment, func(mcmp *utils.MySQLCompare) { - if test.SkipE2E { - mcmp.AsT().Skip(test.Query) - } - mcmp.Exec(test.Query) - pd := utils.ExecTrace(mcmp.AsT(), mcmp.VtConn, test.Query) - verifyTestExpectations(mcmp.AsT(), pd, test) - if mcmp.VtConn.IsClosed() { - mcmp.AsT().Fatal("vtgate connection is closed") - } - }) + loadSampleData(t, mcmp) + for _, fileName := range e2eTestCaseFiles { + tests := readJSONTests(fileName) + for _, test := range tests { + mcmp.Run(test.Comment, func(mcmp *utils.MySQLCompare) { + if test.SkipE2E { + mcmp.AsT().Skip(test.Query) + } + mcmp.Exec(test.Query) + pd := utils.ExecTrace(mcmp.AsT(), mcmp.VtConn, test.Query) + verifyTestExpectations(mcmp.AsT(), pd, test) + if mcmp.VtConn.IsClosed() { + mcmp.AsT().Fatal("vtgate connection is closed") + } + }) + } } } diff --git a/go/vt/key/key.go b/go/vt/key/key.go index dcdcda47f81..89d956bd433 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -90,6 +90,11 @@ func Empty(id []byte) bool { // KeyRange helper methods // +// Make a Key Range +func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange { + return &topodatapb.KeyRange{Start: start, End: end} +} + // KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent, // it returns false. func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) { diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index e05366c4aeb..067278c1a93 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -51,6 +51,9 @@ const ( // IN is for routing a statement to a multi shard. // Requires: A Vindex, and a multi Values. IN + // Between is for routing a statement to a multi shard + // Requires: A Vindex, and start and end Value. + Between // MultiEqual is used for routing queries with IN with tuple clause // Requires: A Vindex, and a multi Tuple Values. MultiEqual @@ -78,6 +81,7 @@ var opName = map[Opcode]string{ EqualUnique: "EqualUnique", Equal: "Equal", IN: "IN", + Between: "Between", MultiEqual: "MultiEqual", Scatter: "Scatter", DBA: "DBA", @@ -157,6 +161,14 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin default: return rp.in(ctx, vcursor, bindVars) } + case Between: + switch rp.Vindex.(type) { + case vindexes.SingleColumn: + return rp.between(ctx, vcursor, bindVars) + default: + // Only SingleColumn vindex supported. + return nil, nil, vterrors.VT13001("between supported on SingleColumn vindex only") + } case MultiEqual: switch rp.Vindex.(type) { case vindexes.MultiColumn: @@ -396,6 +408,19 @@ func (rp *RoutingParameters) inMultiCol(ctx context.Context, vcursor VCursor, bi return rss, shardVarsMultiCol(bindVars, mapVals, isSingleVal), nil } +func (rp *RoutingParameters) between(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { + env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) + value, err := env.Evaluate(rp.Values[0]) + if err != nil { + return nil, nil, err + } + rss, values, err := resolveShardsBetween(ctx, vcursor, rp.Vindex.(vindexes.Sequential), rp.Keyspace, value.TupleValues()) + if err != nil { + return nil, nil, err + } + return rss, shardVars(bindVars, values), nil +} + func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) value, err := env.Evaluate(rp.Values[0]) @@ -520,6 +545,24 @@ func buildMultiColumnVindexValues(shardsValues [][][]sqltypes.Value) [][][]*quer return shardsIds } +func resolveShardsBetween(ctx context.Context, vcursor VCursor, vindex vindexes.Sequential, keyspace *vindexes.Keyspace, vindexKeys []sqltypes.Value) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { + // Convert vindexKeys to []*querypb.Value + ids := make([]*querypb.Value, len(vindexKeys)) + for i, vik := range vindexKeys { + ids[i] = sqltypes.ValueToProto(vik) + } + + // RangeMap using the Vindex + destinations, err := vindex.RangeMap(ctx, vcursor, vindexKeys[0], vindexKeys[1]) + if err != nil { + return nil, nil, err + + } + + // And use the Resolver to map to ResolvedShards. + return vcursor.ResolveDestinations(ctx, keyspace.Name, ids, destinations) +} + func shardVars(bv map[string]*querypb.BindVariable, mapVals [][]*querypb.Value) []map[string]*querypb.BindVariable { shardVars := make([]map[string]*querypb.BindVariable, len(mapVals)) for i, vals := range mapVals { diff --git a/go/vt/vtgate/planbuilder/operators/sharded_routing.go b/go/vt/vtgate/planbuilder/operators/sharded_routing.go index 066cb47d9a9..2c8873dee07 100644 --- a/go/vt/vtgate/planbuilder/operators/sharded_routing.go +++ b/go/vt/vtgate/planbuilder/operators/sharded_routing.go @@ -223,6 +223,9 @@ func (tr *ShardedRouting) resetRoutingLogic(ctx *plancontext.PlanningContext) Ro func (tr *ShardedRouting) searchForNewVindexes(ctx *plancontext.PlanningContext, predicate sqlparser.Expr) (Routing, bool) { newVindexFound := false switch node := predicate.(type) { + case *sqlparser.BetweenExpr: + return tr.planBetweenOp(ctx, node) + case *sqlparser.ComparisonExpr: return tr.planComparison(ctx, node) @@ -234,6 +237,35 @@ func (tr *ShardedRouting) searchForNewVindexes(ctx *plancontext.PlanningContext, return nil, newVindexFound } +func (tr *ShardedRouting) planBetweenOp(ctx *plancontext.PlanningContext, node *sqlparser.BetweenExpr) (routing Routing, foundNew bool) { + column, ok := node.Left.(*sqlparser.ColName) + if !ok { + return nil, false + } + var vdValue sqlparser.ValTuple = sqlparser.ValTuple([]sqlparser.Expr{node.From, node.To}) + + opcode := func(vindex *vindexes.ColumnVindex) engine.Opcode { + if _, ok := vindex.Vindex.(vindexes.Sequential); ok { + return engine.Between + } + return engine.Scatter + } + + sequentialVdx := func(vindex *vindexes.ColumnVindex) vindexes.Vindex { + if _, ok := vindex.Vindex.(vindexes.Sequential); ok { + return vindex.Vindex + } + // if vindex is not of type Sequential, we can't use this vindex at all + return nil + } + + val := makeEvalEngineExpr(ctx, vdValue) + if val == nil { + return nil, false + } + return nil, tr.haveMatchingVindex(ctx, node, vdValue, column, val, opcode, sequentialVdx) +} + func (tr *ShardedRouting) planComparison(ctx *plancontext.PlanningContext, cmp *sqlparser.ComparisonExpr) (routing Routing, foundNew bool) { switch cmp.Operator { case sqlparser.EqualOp: @@ -332,6 +364,8 @@ func (tr *ShardedRouting) Cost() int { return 5 case engine.IN: return 10 + case engine.Between: + return 10 case engine.MultiEqual: return 10 case engine.Scatter: @@ -441,6 +475,12 @@ func (tr *ShardedRouting) processMultiColumnVindex( return newVindexFound } + routeOpcode := opcode(v.ColVindex) + vindex := vfunc(v.ColVindex) + if vindex == nil || routeOpcode == engine.Scatter { + return newVindexFound + } + var newOption []*VindexOption for _, op := range v.Options { if op.Ready { diff --git a/go/vt/vtgate/planbuilder/testdata/filter_cases.json b/go/vt/vtgate/planbuilder/testdata/filter_cases.json index edce4ebd0cb..72b6c4ddd46 100644 --- a/go/vt/vtgate/planbuilder/testdata/filter_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/filter_cases.json @@ -699,7 +699,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e": true }, { "comment": "Composite IN: RHS not tuple", @@ -722,7 +723,8 @@ "user.music", "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "Composite IN: RHS has no simple values", @@ -970,7 +972,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "Merging subqueries should remove keyspace from query", @@ -1095,7 +1098,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e": true }, { "comment": "Multi-table unique vindex constraint", @@ -1252,7 +1256,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "Multi-route unique vindex route on both routes", @@ -1279,7 +1284,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "Multi-route with cross-route constraint", @@ -1328,7 +1334,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "Multi-route with non-route constraint, should use first route.", @@ -1373,7 +1380,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "Route with multiple route constraints, SelectIN is the best constraint.", @@ -1654,7 +1662,8 @@ "main.unsharded", "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "routing rules: choose the redirected table", @@ -1680,7 +1689,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "subquery", @@ -1729,7 +1739,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "correlated subquery merge-able into a route of a join tree", @@ -1778,7 +1789,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "ensure subquery reordering gets us a better plan", @@ -1824,7 +1836,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "nested subquery", @@ -1873,7 +1886,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "Correlated subquery in where clause", @@ -1896,7 +1910,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "outer and inner subquery route by same int val", @@ -1923,7 +1938,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "outer and inner subquery route by same str val", @@ -1950,7 +1966,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "outer and inner subquery route by same val arg", @@ -1977,12 +1994,14 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "unresolved symbol in inner subquery.", "query": "select id from user where id = :a and user.col in (select user_extra.col from user_extra where user_extra.user_id = :a and foo.id = 1)", - "plan": "column 'foo.id' not found" + "plan": "column 'foo.id' not found", + "skip_e2e":true }, { "comment": "outer and inner subquery route by same outermost column value", @@ -2005,7 +2024,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "cross-shard subquery in IN clause.\n# Note the improved Underlying plan as SelectIN.", @@ -2290,7 +2310,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "routing rules subquery pullout", @@ -2339,7 +2360,8 @@ "main.unsharded", "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "Case preservation test", @@ -2366,7 +2388,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "database() call in where clause.", @@ -2649,7 +2672,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "solving LIKE query with a CFC prefix vindex", @@ -2701,7 +2725,8 @@ "TablesUsed": [ "user.samecolvin" ] - } + }, + "skip_e2e":true }, { "comment": "non unique predicate on vindex", @@ -2819,7 +2844,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "SelectDBA with uncorrelated subqueries", @@ -2838,7 +2864,8 @@ "Query": "select t.table_schema from information_schema.`tables` as t where t.table_schema in (select c.column_name from information_schema.`columns` as c)", "Table": "information_schema.`tables`" } - } + }, + "skip_e2e": true }, { "comment": "SelectReference with uncorrelated subqueries", @@ -3040,7 +3067,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "The outer and second inner are SelectEqualUnique with same Vindex value, the first inner has different Vindex value", @@ -3094,7 +3122,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "two correlated subqueries that can be merge in a single route", @@ -3117,7 +3146,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "transitive closures for the win", @@ -3166,7 +3196,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "not supported transitive closures with equality inside of an OR", @@ -3215,7 +3246,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "routing rules subquery merge with alias", @@ -3237,7 +3269,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "left join where clauses where we can optimize into an inner join", @@ -3282,12 +3315,14 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "this query lead to a nil pointer error", "query": "select user.id from user left join user_extra on user.col = user_extra.col where foo(user_extra.foobar)", - "plan": "expr cannot be translated, not supported: foo(user_extra.foobar)" + "plan": "expr cannot be translated, not supported: foo(user_extra.foobar)", + "skip_e2e" :true }, { "comment": "filter after outer join", @@ -3339,7 +3374,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "subquery on other table", @@ -3994,7 +4030,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "conditions following a null safe comparison operator can be used for routing", @@ -4444,7 +4481,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "two predicates that mean the same thing", @@ -4530,7 +4568,8 @@ "main.unsharded", "user.user" ] - } + }, + "skip_e2e": true }, { "comment": "push filter under aggregation", @@ -4598,7 +4637,8 @@ "user.user", "user.user_extra" ] - } + }, + "skip_e2e":true }, { "comment": "query that would time out because planning was too slow", @@ -4628,7 +4668,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e":true }, { "comment": "union inside subquery. all routes can be merged by literal value", @@ -4682,7 +4723,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e": true }, { "comment": "list args: single column vindex on non-zero offset", @@ -4708,7 +4750,8 @@ "TablesUsed": [ "user.user" ] - } + }, + "skip_e2e": true }, { "comment": "list args: multi column vindex", @@ -4735,7 +4778,8 @@ "TablesUsed": [ "user.multicol_tbl" ] - } + }, + "skip_e2e": true }, { "comment": "list args: multi column vindex - subshard", @@ -4761,7 +4805,8 @@ "TablesUsed": [ "user.multicol_tbl" ] - } + }, + "skip_e2e": true }, { "comment": "list args: multi column vindex - more columns", @@ -4788,7 +4833,8 @@ "TablesUsed": [ "user.multicol_tbl" ] - } + }, + "skip_e2e": true }, { "comment": "list args: multi column vindex - columns rearranged", @@ -4815,7 +4861,8 @@ "TablesUsed": [ "user.multicol_tbl" ] - } + }, + "skip_e2e": true }, { "comment": "order by with filter removing the keyspace from order by", @@ -4920,5 +4967,148 @@ "user.authoritative" ] } + }, + { + "comment": "Between clause on primary indexed id column (binary vindex on id)", + "query": "select id from unq_binary_idx where id between 1 and 5", + "plan": { + "QueryType": "SELECT", + "Original": "select id from unq_binary_idx where id between 1 and 5", + "Instructions": { + "OperatorType": "Route", + "Variant": "Between", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select id from unq_binary_idx where 1 != 1", + "Query": "select id from unq_binary_idx where id between 1 and 5", + "Table": "unq_binary_idx", + "Values": [ + "(1, 5)" + ], + "Vindex": "binary" + }, + "TablesUsed": [ + "user.unq_binary_idx" + ] + } + }, +{ + "comment": "Between clause on customer.id column (xxhash vindex on id)", + "query": "select id from customer where id between 1 and 5", + "plan": { + "QueryType": "SELECT", + "Original": "select id from customer where id between 1 and 5", + "Instructions": { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select id from customer where 1 != 1", + "Query": "select id from customer where id between 1 and 5", + "Table": "customer" + }, + "TablesUsed": [ + "user.customer" + ] + } + }, +{ + "comment": "Between clause on col1 column (there is no vindex on this column)", + "query": "select id, col1 from unq_binary_idx where col1 between 10 and 50", + "plan": { + "QueryType": "SELECT", + "Original": "select id, col1 from unq_binary_idx where col1 between 10 and 50", + "Instructions": { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select id, col1 from unq_binary_idx where 1 != 1", + "Query": "select id, col1 from unq_binary_idx where col1 between 10 and 50", + "Table": "unq_binary_idx" + }, + "TablesUsed": [ + "user.unq_binary_idx" + ] + } +}, +{ + "comment": "Between clause on multicolumn vindex (cola,colb)", + "query": "select cola,colb,colc from multicol_tbl where cola between 1 and 5", + "plan": { + "QueryType": "SELECT", + "Original": "select cola,colb,colc from multicol_tbl where cola between 1 and 5", + "Instructions": { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select cola, colb, colc from multicol_tbl where 1 != 1", + "Query": "select cola, colb, colc from multicol_tbl where cola between 1 and 5", + "Table": "multicol_tbl" + }, + "TablesUsed": [ + "user.multicol_tbl" + ] + } +}, +{ + "comment": "Between clause on a binary vindex field with values from a different table", + "query": "select s.oid,s.col1, se.colb from sales s join sales_extra se on s.col1 = se.cola where s.oid between se.start and se.end", + "plan": { + "QueryType": "SELECT", + "Original": "select s.oid,s.col1, se.colb from sales s join sales_extra se on s.col1 = se.cola where s.oid between se.start and se.end", + "Instructions": { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "R:0,R:1,L:0", + "JoinVars": { + "se_cola": 1, + "se_end": 3, + "se_start": 2 + }, + "TableName": "sales_extra_sales", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select se.colb, se.cola, se.`start`, se.`end` from sales_extra as se where 1 != 1", + "Query": "select se.colb, se.cola, se.`start`, se.`end` from sales_extra as se", + "Table": "sales_extra" + }, + { + "OperatorType": "Route", + "Variant": "Between", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select s.oid, s.col1 from sales as s where 1 != 1", + "Query": "select s.oid, s.col1 from sales as s where s.oid between :se_start /* INT16 */ and :se_end /* INT16 */ and s.col1 = :se_cola /* VARCHAR */", + "Table": "sales", + "Values": [ + "(:se_start, :se_end)" + ], + "Vindex": "binary" + } + ] + }, + "TablesUsed": [ + "user.sales", + "user.sales_extra" + ] } +} ] diff --git a/go/vt/vtgate/planbuilder/testdata/sampledata/user.sql b/go/vt/vtgate/planbuilder/testdata/sampledata/user.sql new file mode 100644 index 00000000000..044a1ee140d --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/sampledata/user.sql @@ -0,0 +1,14 @@ +INSERT INTO sales (oid, col1) + VALUES (1, 'a_1'); + +INSERT INTO sales_extra(colx, cola, colb, start, end) +VALUES (11, 'a_1', 'b_1',0, 500); + +INSERT INTO sales_extra(colx, cola, colb, start, end) +VALUES (12, 'a_2', 'b_2',500, 1000); + +INSERT INTO sales_extra(colx, cola, colb, start, end) +VALUES (13, 'a_3', 'b_3',1000, 1500); + +INSERT INTO sales_extra(colx, cola, colb, start, end) +VALUES (14, 'a_4', 'b_4',1500, 2000); \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/testdata/schemas/main.sql b/go/vt/vtgate/planbuilder/testdata/schemas/main.sql index 8c15b99218c..fb03b69419b 100644 --- a/go/vt/vtgate/planbuilder/testdata/schemas/main.sql +++ b/go/vt/vtgate/planbuilder/testdata/schemas/main.sql @@ -1,12 +1,26 @@ CREATE TABLE `unsharded` ( - `id` INT NOT NULL PRIMARY KEY, - `col1` VARCHAR(255) DEFAULT NULL, - `col2` VARCHAR(255) DEFAULT NULL, - `name` VARCHAR(255) DEFAULT NULL + `id` INT NOT NULL PRIMARY KEY, + `col` VARCHAR(255) DEFAULT NULL, + `col1` VARCHAR(255) DEFAULT NULL, + `col2` VARCHAR(255) DEFAULT NULL, + `name` VARCHAR(255) DEFAULT NULL, + `baz` INT ); CREATE TABLE `unsharded_auto` ( - `id` INT NOT NULL PRIMARY KEY, + `id` INT NOT NULL PRIMARY KEY, `col1` VARCHAR(255) DEFAULT NULL, `col2` VARCHAR(255) DEFAULT NULL +); + +CREATE TABLE `unsharded_a` ( + `id` INT NOT NULL PRIMARY KEY, + `col` VARCHAR(255) DEFAULT NULL, + `name` VARCHAR(255) DEFAULT NULL +); + +CREATE TABLE `unsharded_b` ( + `id` INT NOT NULL PRIMARY KEY, + `col` VARCHAR(255) DEFAULT NULL, + `name` VARCHAR(255) DEFAULT NULL ); \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/testdata/schemas/user.sql b/go/vt/vtgate/planbuilder/testdata/schemas/user.sql index 55f4078557a..818d2508069 100644 --- a/go/vt/vtgate/planbuilder/testdata/schemas/user.sql +++ b/go/vt/vtgate/planbuilder/testdata/schemas/user.sql @@ -1,12 +1,25 @@ CREATE TABLE user ( - id INT PRIMARY KEY, - col BIGINT, - predef1 VARCHAR(255), - predef2 VARCHAR(255), - textcol1 VARCHAR(255), - intcol BIGINT, - textcol2 VARCHAR(255) + id INT PRIMARY KEY, + col BIGINT, + intcol BIGINT, + user_id INT, + id1 INT, + id2 INT, + id3 INT, + m INT, + bar INT, + a INT, + name VARCHAR(255), + col1 VARCHAR(255), + col2 VARCHAR(255), + costly VARCHAR(255), + predef1 VARCHAR(255), + predef2 VARCHAR(255), + textcol1 VARCHAR(255), + textcol2 VARCHAR(255), + someColumn VARCHAR(255), + foo VARCHAR(255) ); CREATE TABLE user_metadata @@ -23,6 +36,10 @@ CREATE TABLE music ( user_id INT, id INT, + col1 VARCHAR(255), + col2 VARCHAR(255), + genre VARCHAR(255), + componist VARCHAR(255), PRIMARY KEY (user_id) ); @@ -35,9 +52,9 @@ CREATE TABLE samecolvin CREATE TABLE multicolvin ( kid INT, - column_a VARCHAR(255), - column_b VARCHAR(255), - column_c VARCHAR(255), + column_a INT, + column_b INT, + column_c INT, PRIMARY KEY (kid) ); @@ -97,4 +114,73 @@ CREATE TABLE authoritative col1 VARCHAR(255), col2 bigint, PRIMARY KEY (user_id) -) ENGINE=InnoDB; \ No newline at end of file +) ENGINE=InnoDB; + +CREATE TABLE colb_colc_map +( + colb INT PRIMARY KEY, + colc INT, + keyspace_id VARCHAR(255) +); + +CREATE TABLE seq +( + id INT, + next_id BIGINT, + cache BIGINT, + PRIMARY KEY (id) +) COMMENT 'vitess_sequence'; + +CREATE TABLE user_extra +( + id INT, + user_id INT, + extra_id INT, + col INT, + m2 INT, + PRIMARY KEY (id, extra_id) +); + +CREATE TABLE name_user_map +( + name VARCHAR(255), + keyspace_id VARCHAR(255) +); + +CREATE TABLE name_user_vdx +( + name VARCHAR(255), + keyspace_id VARCHAR(255) +); + +CREATE TABLE costly_map +( + costly VARCHAR(255), + keyspace_id VARCHAR(255) +); + +CREATE TABLE unq_binary_idx +( + id INT PRIMARY KEY, + col1 INT +); + +CREATE TABLE sales +( + oid INT PRIMARY KEY, + col1 VARCHAR(255) +); + +CREATE TABLE sales_extra +( + colx INT PRIMARY KEY, + cola VARCHAR(255), + colb VARCHAR(255), + start INT, + end INT +); + +CREATE TABLE ref +( + col INT PRIMARY KEY +); \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/testdata/vschemas/schema.json b/go/vt/vtgate/planbuilder/testdata/vschemas/schema.json index a5de9d3697e..aaa11727510 100644 --- a/go/vt/vtgate/planbuilder/testdata/vschemas/schema.json +++ b/go/vt/vtgate/planbuilder/testdata/vschemas/schema.json @@ -214,6 +214,9 @@ "to": "keyspace_id", "write_only": "true" } + }, + "binary": { + "type": "binary" } }, "tables": { @@ -569,6 +572,62 @@ "name": "shard_index" } ] + }, + "unq_binary_idx": { + "column_vindexes" : [ + { + "column" : "id", + "name": "binary" + } + ], + "columns" :[ + { + "name": "col1", + "type": "INT16" + } + ] + }, + "sales": { + "column_vindexes" : [ + { + "column" : "oid", + "name" : "binary" + } + ], + "columns" : [ + { + "name" : "col1", + "type" : "VARCHAR" + } + ] + }, + "sales_extra" : { + "column_vindexes": [ + { + "columns": [ + "colx" + ], + "name": "shard_index" + } + ], + "columns" : [ + { + "name" : "cola", + "type" : "VARCHAR" + }, + { + "name" : "colb", + "type" : "VARCHAR" + }, + { + "name" : "start", + "type" : "INT16" + }, + { + "name" : "end", + "type" : "INT16" + } + ] } } }, diff --git a/go/vt/vtgate/vindexes/binary.go b/go/vt/vtgate/vindexes/binary.go index b78451ca1fb..96a72b2c3f4 100644 --- a/go/vt/vtgate/vindexes/binary.go +++ b/go/vt/vtgate/vindexes/binary.go @@ -30,6 +30,7 @@ var ( _ Reversible = (*Binary)(nil) _ Hashing = (*Binary)(nil) _ ParamValidating = (*Binary)(nil) + _ Sequential = (*Binary)(nil) ) // Binary is a vindex that converts binary bits to a keyspace id. @@ -108,6 +109,20 @@ func (*Binary) ReverseMap(_ VCursor, ksids [][]byte) ([]sqltypes.Value, error) { return reverseIds, nil } +// RangeMap can map ids to key.Destination objects. +func (vind *Binary) RangeMap(ctx context.Context, vcursor VCursor, startId sqltypes.Value, endId sqltypes.Value) ([]key.Destination, error) { + startKsId, err := vind.Hash(startId) + if err != nil { + return nil, err + } + endKsId, err := vind.Hash(endId) + if err != nil { + return nil, err + } + out := []key.Destination{&key.DestinationKeyRange{KeyRange: key.NewKeyRange(startKsId, endKsId)}} + return out, nil +} + // UnknownParams implements the ParamValidating interface. func (vind *Binary) UnknownParams() []string { return vind.unknownParams diff --git a/go/vt/vtgate/vindexes/binary_test.go b/go/vt/vtgate/vindexes/binary_test.go index 27ae6ceca11..a6556ec958a 100644 --- a/go/vt/vtgate/vindexes/binary_test.go +++ b/go/vt/vtgate/vindexes/binary_test.go @@ -24,6 +24,7 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" @@ -145,3 +146,18 @@ func TestBinaryReverseMap(t *testing.T) { t.Errorf("ReverseMap(): %v, want %s", err, wantErr) } } + +// TestBinaryRangeMap takes start and env values, +// and checks against a destination keyrange. +func TestBinaryRangeMap(t *testing.T) { + + startInterval := "0x01" + endInterval := "0x10" + + got, err := binOnlyVindex.(Sequential).RangeMap(context.Background(), nil, sqltypes.NewHexNum([]byte(startInterval)), + sqltypes.NewHexNum([]byte(endInterval))) + require.NoError(t, err) + want := "DestinationKeyRange(01-10)" + assert.Equal(t, want, got[0].String()) + +} diff --git a/go/vt/vtgate/vindexes/numeric.go b/go/vt/vtgate/vindexes/numeric.go index 091807ec2cc..b40df13a997 100644 --- a/go/vt/vtgate/vindexes/numeric.go +++ b/go/vt/vtgate/vindexes/numeric.go @@ -31,6 +31,7 @@ var ( _ Reversible = (*Numeric)(nil) _ Hashing = (*Numeric)(nil) _ ParamValidating = (*Numeric)(nil) + _ Sequential = (*Numeric)(nil) ) // Numeric defines a bit-pattern mapping of a uint64 to the KeyspaceId. @@ -108,6 +109,20 @@ func (*Numeric) ReverseMap(_ VCursor, ksids [][]byte) ([]sqltypes.Value, error) return reverseIds, nil } +// RangeMap implements Between. +func (vind *Numeric) RangeMap(ctx context.Context, vcursor VCursor, startId sqltypes.Value, endId sqltypes.Value) ([]key.Destination, error) { + startKsId, err := vind.Hash(startId) + if err != nil { + return nil, err + } + endKsId, err := vind.Hash(endId) + if err != nil { + return nil, err + } + out := []key.Destination{&key.DestinationKeyRange{KeyRange: key.NewKeyRange(startKsId, endKsId)}} + return out, nil +} + // UnknownParams implements the ParamValidating interface. func (vind *Numeric) UnknownParams() []string { return vind.unknownParams diff --git a/go/vt/vtgate/vindexes/vindex.go b/go/vt/vtgate/vindexes/vindex.go index e3d5a6d7e4d..947877108e0 100644 --- a/go/vt/vtgate/vindexes/vindex.go +++ b/go/vt/vtgate/vindexes/vindex.go @@ -130,6 +130,13 @@ type ( ReverseMap(vcursor VCursor, ks [][]byte) ([]sqltypes.Value, error) } + // A Sequential vindex is an optional interface one that maps to a keyspace range + // instead of a single keyspace id. It's being used to reduce the fan out for + // 'BETWEEN' expressions. + Sequential interface { + RangeMap(ctx context.Context, vcursor VCursor, startId sqltypes.Value, endId sqltypes.Value) ([]key.Destination, error) + } + // A Prefixable vindex is one that maps the prefix of a id to a keyspace range // instead of a single keyspace id. It's being used to reduced the fan out for // 'LIKE' expressions. diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 35587124108..54813e11bf3 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -19,7 +19,6 @@ package tabletmanager import ( "context" "fmt" - "io" "time" "github.com/spf13/pflag" @@ -29,22 +28,17 @@ import ( "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/hook" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/backupstats" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) // This file handles the initial backup restore upon startup. @@ -80,31 +74,6 @@ func registerIncrementalRestoreFlags(fs *pflag.FlagSet) { fs.StringVar(&restoreToPos, "restore-to-pos", restoreToPos, "(init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups") } -var ( - // Flags for PITR - old iteration - binlogHost string - binlogPort int - binlogUser string - binlogPwd string - timeoutForGTIDLookup = 60 * time.Second - binlogSslCa string - binlogSslCert string - binlogSslKey string - binlogSslServerName string -) - -func registerPointInTimeRestoreFlags(fs *pflag.FlagSet) { - fs.StringVar(&binlogHost, "binlog_host", binlogHost, "PITR restore parameter: hostname/IP of binlog server.") - fs.IntVar(&binlogPort, "binlog_port", binlogPort, "PITR restore parameter: port of binlog server.") - fs.StringVar(&binlogUser, "binlog_user", binlogUser, "PITR restore parameter: username of binlog server.") - fs.StringVar(&binlogPwd, "binlog_password", binlogPwd, "PITR restore parameter: password of binlog server.") - fs.DurationVar(&timeoutForGTIDLookup, "pitr_gtid_lookup_timeout", timeoutForGTIDLookup, "PITR restore parameter: timeout for fetching gtid from timestamp.") - fs.StringVar(&binlogSslCa, "binlog_ssl_ca", binlogSslCa, "PITR restore parameter: Filename containing TLS CA certificate to verify binlog server TLS certificate against.") - fs.StringVar(&binlogSslCert, "binlog_ssl_cert", binlogSslCert, "PITR restore parameter: Filename containing mTLS client certificate to present to binlog server as authentication.") - fs.StringVar(&binlogSslKey, "binlog_ssl_key", binlogSslKey, "PITR restore parameter: Filename containing mTLS client private key for use in binlog server authentication.") - fs.StringVar(&binlogSslServerName, "binlog_ssl_server_name", binlogSslServerName, "PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in --binlog_host).") -} - func init() { servenv.OnParseFor("vtcombo", registerRestoreFlags) servenv.OnParseFor("vttablet", registerRestoreFlags) @@ -112,9 +81,6 @@ func init() { servenv.OnParseFor("vtcombo", registerIncrementalRestoreFlags) servenv.OnParseFor("vttablet", registerIncrementalRestoreFlags) - servenv.OnParseFor("vtcombo", registerPointInTimeRestoreFlags) - servenv.OnParseFor("vttablet", registerPointInTimeRestoreFlags) - statsRestoreBackupTime = stats.NewString("RestoredBackupTime") statsRestoreBackupPosition = stats.NewString("RestorePosition") } @@ -299,15 +265,6 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L pos = backupManifest.Position params.Logger.Infof("Restore: pos=%v", replication.EncodePosition(pos)) } - // If SnapshotTime is set , then apply the incremental change - if keyspaceInfo.SnapshotTime != nil { - params.Logger.Infof("Restore: Restoring to time %v from binlog", keyspaceInfo.SnapshotTime) - err = tm.restoreToTimeFromBinlog(ctx, pos, keyspaceInfo.SnapshotTime) - if err != nil { - log.Errorf("unable to restore to the specified time %s, error : %v", keyspaceInfo.SnapshotTime.String(), err) - return nil - } - } switch { case err == nil && backupManifest != nil: // Starting from here we won't be able to recover if we get stopped by a cancelled @@ -365,196 +322,6 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L return tm.tmState.ChangeTabletType(bgCtx, originalType, DBActionNone) } -// restoreToTimeFromBinlog restores to the snapshot time of the keyspace -// currently this works with mysql based database only (as it uses mysql specific queries for restoring) -func (tm *TabletManager) restoreToTimeFromBinlog(ctx context.Context, pos replication.Position, restoreTime *vttime.Time) error { - // validate the minimal settings necessary for connecting to binlog server - if binlogHost == "" || binlogPort <= 0 || binlogUser == "" { - log.Warning("invalid binlog server setting, restoring to last available backup.") - return nil - } - - timeoutCtx, cancelFnc := context.WithTimeout(ctx, timeoutForGTIDLookup) - defer cancelFnc() - - afterGTIDPos, beforeGTIDPos, err := tm.getGTIDFromTimestamp(timeoutCtx, pos, restoreTime.Seconds) - if err != nil { - return err - } - - if afterGTIDPos == "" && beforeGTIDPos == "" { - return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, fmt.Sprintf("unable to fetch the GTID for the specified time - %s", restoreTime.String())) - } else if afterGTIDPos == "" && beforeGTIDPos != "" { - log.Info("no afterGTIDPos found, which implies we reached the end of all GTID events") - } - - log.Infof("going to restore upto the GTID - %s", afterGTIDPos) - // when we don't have before GTID, we will take it as current backup pos's last GTID - // this is case where someone tries to restore just to the 1st event after backup - if beforeGTIDPos == "" { - beforeGTIDPos = pos.GTIDSet.Last() - } - err = tm.catchupToGTID(timeoutCtx, afterGTIDPos, beforeGTIDPos) - if err != nil { - return vterrors.Wrapf(err, "unable to replicate upto desired GTID : %s", afterGTIDPos) - } - - return nil -} - -// getGTIDFromTimestamp computes 2 GTIDs based on restoreTime -// afterPos is the GTID of the first event at or after restoreTime. -// beforePos is the GTID of the last event before restoreTime. This is the GTID upto which replication will be applied -// afterPos can be used directly in the query `START SLAVE UNTIL SQL_BEFORE_GTIDS = ”` -// beforePos will be used to check if replication was able to catch up from the binlog server -func (tm *TabletManager) getGTIDFromTimestamp(ctx context.Context, pos replication.Position, restoreTime int64) (afterPos string, beforePos string, err error) { - connParams := &mysql.ConnParams{ - Host: binlogHost, - Port: binlogPort, - Uname: binlogUser, - SslCa: binlogSslCa, - SslCert: binlogSslCert, - SslKey: binlogSslKey, - ServerName: binlogSslServerName, - } - if binlogPwd != "" { - connParams.Pass = binlogPwd - } - if binlogSslCa != "" || binlogSslCert != "" { - connParams.EnableSSL() - } - dbCfgs := &dbconfigs.DBConfigs{ - Host: connParams.Host, - Port: connParams.Port, - } - dbCfgs.SetDbParams(*connParams, *connParams, *connParams) - vsClient := vreplication.NewReplicaConnector(tm.Env, connParams) - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "/.*", - }}, - } - - // get current lastPos of binlog server, so that if we hit that in vstream, we'll return from there - binlogConn, err := mysql.Connect(ctx, connParams) - if err != nil { - return "", "", err - } - defer binlogConn.Close() - lastPos, err := binlogConn.PrimaryPosition() - if err != nil { - return "", "", err - } - - gtidsChan := make(chan []string, 1) - - go func() { - err := vsClient.VStream(ctx, replication.EncodePosition(pos), filter, func(events []*binlogdatapb.VEvent) error { - for _, event := range events { - if event.Gtid != "" { - // check if we reached the lastPos then return - eventPos, err := replication.DecodePosition(event.Gtid) - if err != nil { - return err - } - - if event.Timestamp >= restoreTime { - afterPos = event.Gtid - gtidsChan <- []string{event.Gtid, beforePos} - return io.EOF - } - - if eventPos.AtLeast(lastPos) { - gtidsChan <- []string{"", beforePos} - return io.EOF - } - beforePos = event.Gtid - } - } - return nil - }) - if err != nil && err != io.EOF { - log.Warningf("Error using VStream to find timestamp for GTID position: %v error: %v", pos, err) - gtidsChan <- []string{"", ""} - } - }() - defer vsClient.Close() - select { - case val := <-gtidsChan: - return val[0], val[1], nil - case <-ctx.Done(): - log.Warningf("Can't find the GTID from restore time stamp, exiting.") - return "", beforePos, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "unable to find GTID from the snapshot time as context timed out") - } -} - -// catchupToGTID replicates upto specified GTID from binlog server -// -// copies the data from binlog server by pointing to as replica -// waits till all events to GTID replicated -// once done, it will reset the replication -func (tm *TabletManager) catchupToGTID(ctx context.Context, afterGTIDPos string, beforeGTIDPos string) error { - var afterGTID replication.Position - if afterGTIDPos != "" { - var err error - afterGTID, err = replication.DecodePosition(afterGTIDPos) - if err != nil { - return err - } - } - - beforeGTIDPosParsed, err := replication.DecodePosition(beforeGTIDPos) - if err != nil { - return err - } - - if err := tm.MysqlDaemon.CatchupToGTID(ctx, afterGTID); err != nil { - return vterrors.Wrap(err, fmt.Sprintf("failed to restart the replication until %s GTID", afterGTID.GTIDSet.Last())) - } - log.Infof("Waiting for position to reach", beforeGTIDPosParsed.GTIDSet.Last()) - // Could not use `agent.MysqlDaemon.WaitSourcePos` as replication is stopped with `START REPLICA UNTIL SQL_BEFORE_GTIDS` - // this is as per https://dev.mysql.com/doc/refman/8.0/en/start-replica.html - // We need to wait until replication catches upto the specified afterGTIDPos - chGTIDCaughtup := make(chan bool) - go func() { - timeToWait := time.Now().Add(timeoutForGTIDLookup) - for time.Now().Before(timeToWait) { - pos, err := tm.MysqlDaemon.PrimaryPosition(ctx) - if err != nil { - chGTIDCaughtup <- false - } - - if pos.AtLeast(beforeGTIDPosParsed) { - chGTIDCaughtup <- true - } - select { - case <-ctx.Done(): - chGTIDCaughtup <- false - default: - time.Sleep(300 * time.Millisecond) - } - } - }() - select { - case resp := <-chGTIDCaughtup: - if resp { - if err := tm.MysqlDaemon.StopReplication(ctx, nil); err != nil { - return vterrors.Wrap(err, "failed to stop replication") - } - if err := tm.MysqlDaemon.ResetReplicationParameters(ctx); err != nil { - return vterrors.Wrap(err, "failed to reset replication") - } - - return nil - } - return vterrors.Wrap(err, "error while fetching the current GTID position") - case <-ctx.Done(): - log.Warningf("Could not copy up to GTID.") - return vterrors.Wrapf(err, "context timeout while restoring up to specified GTID - %s", beforeGTIDPos) - } -} - // disableReplication stops and resets replication on the mysql server. It moreover sets impossible replication // source params, so that the replica can't possibly reconnect. It would take a `CHANGE [MASTER|REPLICATION SOURCE] TO ...` to // make the mysql server replicate again (available via tm.MysqlDaemon.SetReplicationPosition) diff --git a/test/bin/.keep b/test/bin/.keep new file mode 100644 index 00000000000..7f6eda3a448 --- /dev/null +++ b/test/bin/.keep @@ -0,0 +1 @@ +Do not remove. Ensures existence of directory. diff --git a/test/bin/rippled b/test/bin/rippled deleted file mode 100755 index c7b6bea3b95..00000000000 Binary files a/test/bin/rippled and /dev/null differ