Skip to content

Commit

Permalink
add kafka column selector integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 1, 2023
1 parent 982ae00 commit c547766
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[sink]
column-selectors = [
{matcher = ['test.t1'], columns = ['a', 'b']},
{matcher = ['test.*'], columns = ["*", "!b"]},

{matcher = ['test1.t1'], columns = ['column*', '!column1']},
]
49 changes: 49 additions & 0 deletions tests/integration_tests/kafka_column_selector/data/data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
drop database if exists `test`;
create database `test`;
use `test`;

create table t1 (
a int primary key,
b int,
c int,
);

insert into t1 values (1, 2, 3);
insert into t1 values (2, 3, 4);
insert into t1 values (3, 4, 5);

create table t2 (
a int primary key,
b int,
c int,
);

insert into t2 values (1, 2, 3);
insert into t2 values (2, 3, 4);
insert into t2 values (3, 4, 5);

create table t3 (
a int primary key,
b int,
c int,
);

insert into t3 values (1, 2, 3);
insert into t3 values (2, 3, 4);
insert into t3 values (3, 4, 5);

drop database if exists `test1`;
create database `test1`;
use `test1`;

create table t1 (
column0 int primary key,
column1 int,
column2 int,
);

insert into t1 values (1, 2, 3);
insert into t1 values (2, 3, 4);
insert into t1 values (3, 4, 5);

create table finishmark(id int primary key);
52 changes: 52 additions & 0 deletions tests/integration_tests/kafka_column_selector/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" != "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeed_id="test"
TOPIC_NAME = "column-selector-test"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=canal-json&partition-num=1&enable-tidb-extension=true"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c ${changefeed_id} --config="$CUR/conf/changefeed.toml"

cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://[email protected]:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/changefeed.toml" 2>&1 &

run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

echo "Starting build checksum checker..."
cd $CUR/../../utils/checksum_checker
if [ ! -f ./checksum_checker ]; then
GO111MODULE=on go build
fi

check_table_exists "test1.finishmark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

./checksum_checker --upstream-uri "root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/" --downstream-uri "root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/" --databases "test,test1" --config="$CUR/conf/changefeed.toml"

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Binary file added tests/utils/checksum_checker/checksum_checker
Binary file not shown.
49 changes: 41 additions & 8 deletions tests/utils/checksum_checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package main

import (
"database/sql"
"flag"
"fmt"
"os"
"strings"
"time"

Expand All @@ -27,25 +29,55 @@ import (
"go.uber.org/zap"
)

type options struct {
upstreamURI string
downstreamURI string
dbNames string
configFile string
}

func (o *options) validate() error {
if o.upstreamURI == "" {
return errors.New("upstreamURI is required")
}
if o.downstreamURI == "" {
return errors.New("downstreamURI is required")
}
if len(o.dbNames) == 0 {
return errors.New("dbNames is required")
}
return nil
}

func main() {
var upstreamURI string
var downstreamURI string
var dbNames []string
var configFile string

upstreamDB, err := openDB(upstreamURI)
o := &options{}

flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
flags.StringVar(&o.upstreamURI, "upstream-uri", "", "upstream database uri")
flags.StringVar(&o.downstreamURI, "downstream-uri", "", "downstream database uri")
flags.StringVar(&o.dbNames, "db-names", "", "database names")
flags.StringVar(&o.configFile, "config", "", "config file")
if err := flags.Parse(os.Args[1:]); err != nil {
log.Panic("parse args failed", zap.Error(err))
}
if err := o.validate(); err != nil {
log.Panic("invalid options", zap.Error(err))
}

upstreamDB, err := openDB(o.upstreamURI)
if err != nil {
log.Panic("cannot open db for the upstream", zap.Error(err))
}

downstreamDB, err := openDB(downstreamURI)
downstreamDB, err := openDB(o.downstreamURI)
if err != nil {
log.Panic("cannot open db for the downstream", zap.Error(err))
}

replicaConfig := config.GetDefaultReplicaConfig()
if configFile != "" {
err = cmdUtil.StrictDecodeFile(configFile, "checksum checker", replicaConfig)
if o.configFile != "" {
err = cmdUtil.StrictDecodeFile(o.configFile, "checksum checker", replicaConfig)
if err != nil {
log.Panic("cannot decode config file", zap.Error(err))
}
Expand All @@ -56,6 +88,7 @@ func main() {
log.Panic("cannot create column filter", zap.Error(err))
}

dbNames := strings.Split(o.dbNames, ",")
err = compareCRC32CheckSum(upstreamDB, downstreamDB, dbNames, columnFilter)
if err != nil {
log.Panic("compare checksum failed", zap.Error(err))
Expand Down

0 comments on commit c547766

Please sign in to comment.