Skip to content

Commit

Permalink
fix some test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 27, 2023
1 parent d07a9c1 commit d69d582
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 83 deletions.
15 changes: 7 additions & 8 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
codecCommon "github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/sqlmodel"
"github.com/pingcap/tiflow/pkg/testkit"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -262,7 +261,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
},
) {
replicaConfig := config.GetDefaultReplicaConfig()
tk := testkit.New(t, replicaConfig)
tk := NewTestKit(t, replicaConfig)
defer tk.Close()

tk.MustExec("set @@tidb_enable_clustered_index=1;")
Expand Down Expand Up @@ -1008,7 +1007,7 @@ func TestE2ERowLevelChecksum(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness

tk := testkit.New(t, replicaConfig)
tk := NewTestKit(t, replicaConfig)
defer tk.Close()

// upstream TiDB enable checksum functionality
Expand Down Expand Up @@ -1180,7 +1179,7 @@ func TestDecodeRowEnableChecksum(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness

tk := testkit.New(t, replicaConfig)
tk := NewTestKit(t, replicaConfig)
defer tk.Close()

tk.MustExec("set global tidb_enable_row_level_checksum = 1")
Expand Down Expand Up @@ -1308,7 +1307,7 @@ func TestDecodeRowEnableChecksum(t *testing.T) {
func TestDecodeRow(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()

tk := testkit.New(t, replicaConfig)
tk := NewTestKit(t, replicaConfig)
defer tk.Close()

tk.MustExec("set @@tidb_enable_clustered_index=1;")
Expand Down Expand Up @@ -1336,7 +1335,7 @@ func TestDecodeRow(t *testing.T) {

schemaStorage.AdvanceResolvedTs(ver.Ver)

mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, replicaConfig.Integrity).(*mounter)

tk.MustExec(`insert into student values(1, "dongmen", 20, "male")`)
tk.MustExec(`update student set age = 27 where id = 1`)
Expand Down Expand Up @@ -1387,7 +1386,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Filter.Rules = []string{"test.student", "test.computer"}

tk := testkit.New(t, replicaConfig)
tk := NewTestKit(t, replicaConfig)
defer tk.Close()
tk.MustExec("use test;")

Expand Down Expand Up @@ -1416,7 +1415,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, cfg.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, replicaConfig.Integrity).(*mounter)

type testCase struct {
schema string
Expand Down
11 changes: 5 additions & 6 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/testkit"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -658,7 +657,7 @@ func TestMultiVersionStorage(t *testing.T) {
}

func TestCreateSnapFromMeta(t *testing.T) {
tk := testkit.New(t)
tk := NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

tk.MustExec("create database test2")
Expand Down Expand Up @@ -686,7 +685,7 @@ func TestCreateSnapFromMeta(t *testing.T) {
}

func TestExplicitTables(t *testing.T) {
tk := testkit.New(t)
tk := NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()
ver1, err := tk.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)
Expand Down Expand Up @@ -828,7 +827,7 @@ func TestSchemaStorage(t *testing.T) {
}}

testOneGroup := func(tc []string) {
tk := testkit.New(t)
tk := NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()
tk.MustExec("set global tidb_enable_clustered_index = 'int_only';")
for _, ddlSQL := range tc {
Expand Down Expand Up @@ -874,7 +873,7 @@ func TestSchemaStorage(t *testing.T) {
// 2. If the table has not null unique key, the handleKey is the first column of the unique key.
// 3. If the table has no primary key and no not null unique key, it has no handleKey.
func TestHandleKey(t *testing.T) {
tk := testkit.New(t)
tk := NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()
tk.MustExec("create database test2")
tk.MustExec("create table test.simple_test1 (id bigint primary key)")
Expand Down Expand Up @@ -921,7 +920,7 @@ func TestHandleKey(t *testing.T) {
}

func TestGetPrimaryKey(t *testing.T) {
tk := testkit.New(t)
tk := NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t1(a int primary key, b int)`
Expand Down
25 changes: 12 additions & 13 deletions pkg/testkit/testkit.go → cdc/entry/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package testkit
package entry

import (
"context"
Expand All @@ -25,13 +25,12 @@ import (
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
timeta "github.com/pingcap/tidb/meta"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -49,12 +48,12 @@ type TestKit struct {
storage kv.Storage
domain *domain.Domain

schemaStorage entry.SchemaStorage
mounter entry.Mounter
schemaStorage SchemaStorage
mounter Mounter
}

// New return a new testkit
func New(t *testing.T, replicaConfig *config.ReplicaConfig) *TestKit {
// NewTestKit return a new testkit
func NewTestKit(t *testing.T, replicaConfig *config.ReplicaConfig) *TestKit {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
Expand All @@ -75,13 +74,13 @@ func New(t *testing.T, replicaConfig *config.ReplicaConfig) *TestKit {

changefeedID := model.DefaultChangeFeedID("changefeed-testkit")

meta := meta.NewSnapshotMeta(store.GetSnapshot(ver))
schemaStorage, err := entry.NewSchemaStorage(
meta := timeta.NewSnapshotMeta(store.GetSnapshot(ver))
schemaStorage, err := NewSchemaStorage(
meta, ver.Ver, replicaConfig.ForceReplicate,
changefeedID, util.RoleTester, filter)
require.NoError(t, err)

mounter := entry.NewMounter(schemaStorage, changefeedID, time.Local,
mounter := NewMounter(schemaStorage, changefeedID, time.Local,
filter, replicaConfig.Integrity)

return &TestKit{
Expand Down Expand Up @@ -217,10 +216,10 @@ func (tk *TestKit) Storage() kv.Storage {
}

// GetCurrentMeta return the current meta snapshot
func (tk *TestKit) GetCurrentMeta() *meta.Meta {
func (tk *TestKit) GetCurrentMeta() *timeta.Meta {
ver, err := tk.storage.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(tk.t, err)
return meta.NewSnapshotMeta(tk.storage.GetSnapshot(ver))
return timeta.NewSnapshotMeta(tk.storage.GetSnapshot(ver))
}

// Close closes the helper
Expand All @@ -245,7 +244,7 @@ func (tk *TestKit) GetAllHistoryDDLJob(f filter.Filter) ([]*timodel.Job, error)
return nil, errors.Trace(err)
}
defer txn.Rollback() //nolint:errcheck
txnMeta := meta.NewMeta(txn)
txnMeta := timeta.NewMeta(txn)

jobs, err := tiddl.GetAllHistoryDDLJobs(txnMeta)
res := make([]*timodel.Job, 0)
Expand Down
9 changes: 5 additions & 4 deletions pkg/sink/codec/canal/canal_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand Down Expand Up @@ -92,11 +93,11 @@ var (
)

func TestCanalBatchEncoder(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
job := tk.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

for _, cs := range rowCases {
Expand Down Expand Up @@ -150,11 +151,11 @@ func TestCanalBatchEncoder(t *testing.T) {
}

func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
job := tk.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfo := tableInfo.GetRowColInfos()
Expand Down
8 changes: 4 additions & 4 deletions pkg/sink/codec/canal/canal_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ import (
"github.com/golang/protobuf/proto"
mm "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/internal"
"github.com/pingcap/tiflow/pkg/testkit"
canal "github.com/pingcap/tiflow/proto/canal"
"github.com/stretchr/testify/require"
"golang.org/x/text/encoding/charmap"
)

func TestInsert(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestInsert(t *testing.T) {
}

func TestUpdate(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(id int primary key, name varchar(32))`
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestUpdate(t *testing.T) {
}

func TestDelete(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(id int primary key)`
Expand Down
9 changes: 5 additions & 4 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/compression"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -580,11 +581,11 @@ func TestDDLEventWithExtensionValueMarshal(t *testing.T) {
}

func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
job := tk.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfos := tableInfo.GetRowColInfos()
Expand Down Expand Up @@ -672,11 +673,11 @@ func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) {
}

func TestMaxMessageBytes(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
job := tk.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfos := tableInfo.GetRowColInfos()
Expand Down
6 changes: 3 additions & 3 deletions pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/testkit"
"github.com/stretchr/testify/require"
)

Expand All @@ -35,7 +35,7 @@ func TestBuildCanalJSONTxnEventEncoder(t *testing.T) {
}

func TestCanalJSONTxnEventEncoderMaxMessageBytes(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(a varchar(255) primary key)`
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestCanalJSONTxnEventEncoderMaxMessageBytes(t *testing.T) {
}

func TestCanalJSONAppendTxnEventEncoderWithCallback(t *testing.T) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(a varchar(255) primary key)`
Expand Down
5 changes: 3 additions & 2 deletions pkg/sink/codec/canal/canal_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (

mm "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/testkit"
"github.com/pingcap/tiflow/pkg/config"
)

type testColumnTuple struct {
Expand Down Expand Up @@ -383,7 +384,7 @@ func collectExpectedDecodedValue(columns []*testColumnTuple) map[string]interfac
}

func newLargeEvent4Test(t *testing.T) (*model.RowChangedEvent, *model.RowChangedEvent, *model.RowChangedEvent) {
tk := testkit.New(t)
tk := entry.NewTestKit(t, config.GetDefaultReplicaConfig())
defer tk.Close()

sql := `create table test.t(
Expand Down
Loading

0 comments on commit d69d582

Please sign in to comment.