diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index d2ea2545a82..dae3f5a8a0c 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -225,12 +225,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // Case 2: the table meta path is not empty. - if schemaFileCnt != 0 { - if lastVersion == 0 { - log.Warn("no table schema file found in an non-empty meta path", - zap.Any("versionedTableName", table), - zap.Uint32("checksum", checksum)) - } + if schemaFileCnt != 0 && lastVersion != 0 { f.versionMap[table] = lastVersion return nil } @@ -238,6 +233,11 @@ func (f *FilePathGenerator) CheckOrWriteSchema( // Case 3: the table meta path is empty, which happens when: // a. the table is existed before changefeed started. We need to write schema file to external storage. // b. the schema file is deleted by the consumer. We write schema file to external storage too. + if schemaFileCnt != 0 && lastVersion == 0 { + log.Warn("no table schema file found in an non-empty meta path", + zap.Any("versionedTableName", table), + zap.Uint32("checksum", checksum)) + } encodedDetail, err := def.MarshalWithQuery() if err != nil { return err diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 03a5ba55bc7..f8650441626 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/google/uuid" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" @@ -325,9 +326,25 @@ func TestCheckOrWriteSchema(t *testing.T) { require.Equal(t, tableInfo.Version, f.versionMap[table]) dir = filepath.Join(dir, "test/table1/meta") - cnt, err := os.ReadDir(dir) + files, err := os.ReadDir(dir) require.NoError(t, err) - require.Equal(t, 1, len(cnt)) + require.Equal(t, 1, len(files)) + + // test schema file is invalid + err = os.WriteFile(filepath.Join(dir, + fmt.Sprintf("%s.tmp.%s", files[0].Name(), uuid.NewString())), + []byte("invalid"), 0o644) + require.NoError(t, err) + err = os.Remove(filepath.Join(dir, files[0].Name())) + require.NoError(t, err) + delete(f.versionMap, table) + err = f.CheckOrWriteSchema(ctx, table, tableInfo) + require.NoError(t, err) + require.Equal(t, table.TableInfoVersion, f.versionMap[table]) + + files, err = os.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 2, len(files)) } func TestRemoveExpiredFilesWithoutPartition(t *testing.T) {