From f384fe44649f1a6abf3d536aadfda48149e6ca43 Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Thu, 6 Jun 2024 12:09:59 +0530 Subject: [PATCH 1/2] adding integration test for forward transformation --- .../templates/DataStreamToSpannerDDLIT.java | 186 ++++++- .../DataStreamToSpannerEventsIT.java | 3 +- .../templates/DataStreamToSpannerITBase.java | 35 +- .../DataStreamToSpannerSessionIT.java | 3 +- ...MigrationWithMigrationShardIdColumnIT.java | 100 +++- ...rationWithoutMigrationShardIdColumnIT.java | 6 +- ...ql-backfill-AllDatatypeTransformation.avro | Bin 0 -> 4172 bytes .../mysql-cdc-AllDatatypeTransformation.avro | Bin 0 -> 3845 bytes .../DataStreamToSpannerDDLIT/mysql-schema.sql | 24 +- .../spanner-schema.sql | 23 +- .../DataStreamToSpannerDDLIT/statements.sql | 76 +++ .../Customers-shard1.avro | Bin 0 -> 1696 bytes .../Customers-shard2.avro | Bin 0 -> 1696 bytes .../mysql-schema.sql | 7 + .../mysql-session.json | 502 ++++++++++++++---- .../mysql-statements.sql | 11 +- .../spanner-schema.sql | 8 + .../CustomTransformationWithShardForIT.java | 106 ++++ 18 files changed, 961 insertions(+), 129 deletions(-) create mode 100644 v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro create mode 100644 v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro create mode 100644 v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/Customers-shard1.avro create mode 100644 v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/Customers-shard2.avro create mode 100644 v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForIT.java diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java index a371df55d6..39e85c388e 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java @@ -19,6 +19,7 @@ import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -60,6 +61,8 @@ public class DataStreamToSpannerDDLIT extends DataStreamToSpannerITBase { private static final String TABLE6 = "Books"; private static final String TABLE7 = "Authors"; + private static final String TRANSFORMATION_TABLE = "AllDatatypeTransformation"; + private static HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; @@ -72,7 +75,7 @@ public class DataStreamToSpannerDDLIT extends DataStreamToSpannerITBase { * @throws IOException */ @Before - public void setUp() throws IOException { + public void setUp() throws IOException, InterruptedException { // Prevent cleaning up of dataflow job after a test method is executed. skipBaseCleanup = true; synchronized (DataStreamToSpannerDDLIT.class) { @@ -81,6 +84,11 @@ public void setUp() throws IOException { spannerResourceManager = setUpSpannerResourceManager(); pubsubResourceManager = setUpPubSubResourceManager(); createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE); + createAndUploadJarToGcs("DatatypeIT"); + CustomTransformation customTransformation = + CustomTransformation.builder( + "customTransformation.jar", "com.custom.CustomTransformationWithShardForIT") + .build(); jobInfo = launchDataflowJob( getClass().getSimpleName(), @@ -93,7 +101,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + customTransformation); } } } @@ -222,6 +231,59 @@ public void migrationTestWithAllDatatypeDefaultMapping() { assertAllDatatypeColumns2TableCdcContents(); } + @Test + public void migrationTestWithAllDatatypeTransformation() { + // Construct a ChainedConditionCheck with 4 stages. + // 1. Send initial wave of events + // 2. Wait on Spanner to have events + ChainedConditionCheck conditionCheck = + ChainedConditionCheck.builder( + List.of( + uploadDataStreamFile( + jobInfo, + TRANSFORMATION_TABLE, + "backfill.avro", + "DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro"), + SpannerRowsCheck.builder(spannerResourceManager, TRANSFORMATION_TABLE) + .setMinRows(3) + .setMaxRows(3) + .build())) + .build(); + + // Wait for conditions + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck); + + // Assert Conditions + assertThatResult(result).meetsConditions(); + + assertAllDatatypeTransformationTableBackfillContents(); + + conditionCheck = + ChainedConditionCheck.builder( + List.of( + uploadDataStreamFile( + jobInfo, + TRANSFORMATION_TABLE, + "cdc.avro", + "DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro"), + SpannerRowsCheck.builder(spannerResourceManager, TRANSFORMATION_TABLE) + .setMinRows(2) + .setMaxRows(2) + .build())) + .build(); + + result = + pipelineOperator() + .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck); + + // Assert Conditions + assertThatResult(result).meetsConditions(); + + assertAllDatatypeTransformationTableCdcContents(); + } + @Test public void migrationTestWithDatatypeSizeConversion() { // Construct a ChainedConditionCheck with 4 stages. @@ -544,6 +606,126 @@ private void assertAllDatatypeColumns2TableBackfillContents() { .hasRecordsUnorderedCaseInsensitiveColumns(events); } + private void assertAllDatatypeTransformationTableBackfillContents() { + List> events = new ArrayList<>(); + + Map row = new HashMap<>(); + row.put("varchar_column", "example2"); + row.put("tinyint_column", 21); + row.put("text_column", "Some text 2 append"); + row.put("date_column", "2023-01-01"); + row.put("int_column", 201); + row.put("bigint_column", 987655); + row.put("float_column", 24.45); + row.put("double_column", 235.567); + row.put("decimal_column", 23457.78); + row.put("datetime_column", "2022-12-31T23:59:58Z"); + row.put("timestamp_column", "2022-12-31T23:59:58Z"); + row.put("time_column", "86399001000"); + row.put("year_column", "2023"); + row.put("blob_column", "V29ybWQ="); + row.put("enum_column", "1"); + row.put("bool_column", true); + row.put("binary_column", "AQIDBAUGBwgJCgsMDQ4PEBESExQ="); + row.put("bit_column", "Ew=="); + events.add(row); + + row = new HashMap<>(); + row.put("varchar_column", "example3"); + row.put("tinyint_column", 31); + row.put("text_column", "Some text 3 append"); + row.put("date_column", "2024-01-02"); + row.put("int_column", 301); + row.put("bigint_column", 112234); + row.put("float_column", 35.56); + row.put("double_column", 346.678); + row.put("decimal_column", 34568.89); + row.put("datetime_column", "2023-12-31T23:59:59Z"); + row.put("timestamp_column", "2023-12-31T23:59:59Z"); + row.put("time_column", "1000"); + row.put("year_column", "2025"); + row.put("blob_column", "V29ybWQ="); + row.put("enum_column", "1"); + row.put("bool_column", true); + row.put("binary_column", "AQIDBAUGBwgJCgsMDQ4PEBESExQ="); + row.put("bit_column", "Ew=="); + events.add(row); + + row = new HashMap<>(); + row.put("varchar_column", "example4"); + row.put("tinyint_column", 41); + row.put("text_column", "Some text 4 append"); + row.put("date_column", "2021-11-12"); + row.put("int_column", 401); + row.put("bigint_column", 223345); + row.put("float_column", 46.67); + row.put("double_column", 457.789); + row.put("decimal_column", 45679.90); + row.put("datetime_column", "2021-11-11T11:11:10Z"); + row.put("timestamp_column", "2021-11-11T11:11:10Z"); + row.put("time_column", "40271001000"); + row.put("year_column", "2022"); + row.put("blob_column", "V29ybWQ="); + row.put("enum_column", "1"); + row.put("bool_column", true); + row.put("binary_column", "AQIDBAUGBwgJCgsMDQ4PEBESExQ="); + row.put("bit_column", "Ew=="); + events.add(row); + + SpannerAsserts.assertThatStructs( + spannerResourceManager.runQuery("select* from AllDatatypeTransformation")) + .hasRecordsUnorderedCaseInsensitiveColumns(events); + } + + private void assertAllDatatypeTransformationTableCdcContents() { + List> events = new ArrayList<>(); + Map row = new HashMap<>(); + row.put("varchar_column", "example2"); + row.put("tinyint_column", 25); + row.put("text_column", "Updated text 2"); + row.put("date_column", "2023-01-01"); + row.put("int_column", 250); + row.put("bigint_column", 56789); + row.put("float_column", 25.45); + row.put("double_column", 345.678); + row.put("decimal_column", 23456.79); + row.put("datetime_column", "2023-01-01T12:00:00Z"); + row.put("timestamp_column", "2023-01-01T12:00:00Z"); + row.put("time_column", "43200000000"); + row.put("year_column", "2023"); + row.put("blob_column", "EjRWeJCrze8="); + row.put("enum_column", "3"); + row.put("bool_column", true); + row.put("binary_column", "EjRWeJCrze8SNFZ4kKvN7xI0Vng="); + row.put("bit_column", "ASc="); + events.add(row); + + row = new HashMap<>(); + row.put("varchar_column", "example3"); + row.put("tinyint_column", 35); + row.put("text_column", "Updated text 3"); + row.put("date_column", "2024-01-02"); + row.put("int_column", 350); + row.put("bigint_column", 88000); + row.put("float_column", 35.67); + row.put("double_column", 456.789); + row.put("decimal_column", 34567.90); + row.put("datetime_column", "2024-01-02T00:00:00Z"); + row.put("timestamp_column", "2024-01-02T00:00:00Z"); + row.put("time_column", "3600000000"); + row.put("year_column", "2025"); + row.put("blob_column", "q83vEjRWeJA="); + row.put("enum_column", "1"); + row.put("bool_column", false); + row.put("binary_column", "q83vEjRWeJCrze8SNFZ4kKvN7xI="); + row.put("bit_column", "AA=="); + events.add(row); + + SpannerAsserts.assertThatStructs( + spannerResourceManager.runQuery("select * from AllDatatypeTransformation")) + .hasRecordsUnorderedCaseInsensitiveColumns(events); + } + private void assertAllDatatypeColumns2TableCdcContents() { List> events = new ArrayList<>(); Map row = new HashMap<>(); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java index 4cc2d4df56..f4eb77c45c 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java @@ -98,7 +98,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + null); } } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java index 8c8b6cb29b..a569729091 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java @@ -17,6 +17,7 @@ import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; import com.google.common.io.Resources; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; @@ -28,11 +29,14 @@ import java.util.Map.Entry; import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; +import org.apache.beam.it.common.utils.IORedirectUtil; import org.apache.beam.it.common.utils.PipelineUtils; import org.apache.beam.it.conditions.ConditionCheck; import org.apache.beam.it.gcp.TemplateTestBase; import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base class for DataStreamToSpanner integration tests. It provides helper functions related to @@ -42,6 +46,7 @@ public abstract class DataStreamToSpannerITBase extends TemplateTestBase { // Format of avro file path in GCS - {table}/2023/12/20/06/57/{fileName} public static final String DATA_STREAM_EVENT_FILES_PATH_FORMAT_IN_GCS = "%s/2023/12/20/06/57/%s"; + private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpannerITBase.class); public PubsubResourceManager setUpPubSubResourceManager() throws IOException { return PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); @@ -162,7 +167,8 @@ protected LaunchInfo launchDataflowJob( String gcsPathPrefix, SpannerResourceManager spannerResourceManager, PubsubResourceManager pubsubResourceManager, - Map jobParameters) + Map jobParameters, + CustomTransformation customTransformation) throws IOException { if (sessionFileResourceName != null) { @@ -217,6 +223,13 @@ protected LaunchInfo launchDataflowJob( getGcsPath(gcsPathPrefix + "/transformationContext.json")); } + if (customTransformation != null) { + params.put( + "transformationJarPath", + getGcsPath(gcsPathPrefix + "/" + customTransformation.jarPath())); + params.put("transformationClassName", customTransformation.classPath()); + } + // overridden parameters if (jobParameters != null) { for (Entry entry : jobParameters.entrySet()) { @@ -236,4 +249,24 @@ protected LaunchInfo launchDataflowJob( return jobInfo; } + + public void createAndUploadJarToGcs(String gcsPathPrefix) + throws IOException, InterruptedException { + String[] commands = {"cd ../spanner-custom-shard", "mvn install"}; + + // Join the commands with && to execute them sequentially + String[] shellCommand = {"/bin/bash", "-c", String.join(" && ", commands)}; + + Process exec = Runtime.getRuntime().exec(shellCommand); + + IORedirectUtil.redirectLinesLog(exec.getInputStream(), LOG); + IORedirectUtil.redirectLinesLog(exec.getErrorStream(), LOG); + + if (exec.waitFor() != 0) { + throw new RuntimeException("Error staging template, check Maven logs."); + } + gcsClient.uploadArtifact( + gcsPathPrefix + "/customTransformation.jar", + "../spanner-custom-shard/target/spanner-custom-shard-1.0-SNAPSHOT.jar"); + } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java index 0ec8084a2c..510aa3c3c8 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java @@ -91,7 +91,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + null); } } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java index 949dfb06ec..92ecb540ca 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java @@ -19,6 +19,7 @@ import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -61,6 +62,8 @@ public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT private static final String TABLE = "Users"; private static final String MOVIE_TABLE = "Movie"; + private static final String CUSTOMERS_TABLE = "Customers"; + private static final String SESSION_FILE_RESOURCE = "DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-session.json"; @@ -86,7 +89,7 @@ public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT * @throws IOException */ @Before - public void setUp() throws IOException { + public void setUp() throws IOException, InterruptedException { // Prevent cleaning up of dataflow job after a test method is executed. skipBaseCleanup = true; synchronized (DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.class) { @@ -98,6 +101,11 @@ public void setUp() throws IOException { if (pubsubResourceManager == null) { pubsubResourceManager = setUpPubSubResourceManager(); } + createAndUploadJarToGcs("shard1"); + CustomTransformation customTransformation = + CustomTransformation.builder( + "customTransformation.jar", "com.custom.CustomTransformationWithShardForIT") + .build(); if (jobInfo1 == null) { jobInfo1 = launchDataflowJob( @@ -111,7 +119,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + customTransformation); } if (jobInfo2 == null) { jobInfo2 = @@ -126,7 +135,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + null); } } } @@ -272,6 +282,90 @@ public void pkReorderedMultiShardMigration() { assertMovieTableContents(); } + @Test + public void customTransformationMultiShardMigration() { + // Migrates Customer table from 2 logical shards. Asserts data from all the shards are going to + // Spanner. This test case changes populates spanner column value based on the following logic + // full_name = first_name + last_name + // and migration_shard_id = id + logical_shard_id + // It verifies that the migration respects both the custom transformation and the addition of + // new column(full_name) and migration_shard_id. + ChainedConditionCheck conditionCheck = + ChainedConditionCheck.builder( + List.of( + uploadDataStreamFile( + jobInfo1, + CUSTOMERS_TABLE, + "Customers-shard1.avro", + "DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/Customers-shard1.avro"), + uploadDataStreamFile( + jobInfo1, + CUSTOMERS_TABLE, + "Customers-shard2.avro", + "DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/Customers-shard2.avro"))) + .build(); + + // Wait for conditions + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition(createConfig(jobInfo1, Duration.ofMinutes(8)), conditionCheck); + + // Assert Conditions + assertThatResult(result).meetsConditions(); + + ConditionCheck rowsConditionCheck = + SpannerRowsCheck.builder(spannerResourceManager, CUSTOMERS_TABLE) + .setMinRows(4) + .setMaxRows(4) + .build(); + result = + pipelineOperator() + .waitForCondition(createConfig(jobInfo1, Duration.ofMinutes(8)), rowsConditionCheck); + assertThatResult(result).meetsConditions(); + + // Assert specific rows + assertCustomersTableContents(); + } + + private void assertCustomersTableContents() { + List> events = new ArrayList<>(); + + Map row = new HashMap<>(); + row.put("id", 1); + row.put("first_name", "first1"); + row.put("last_name", "last1"); + row.put("full_name", "first1 last1"); + row.put("migration_shard_id", "L1_1"); + events.add(row); + + row = new HashMap<>(); + row.put("id", 2); + row.put("first_name", "first2"); + row.put("last_name", "last2"); + row.put("full_name", "first2 last2"); + row.put("migration_shard_id", "L1_2"); + events.add(row); + + row = new HashMap<>(); + row.put("id", 1); + row.put("first_name", "first1"); + row.put("last_name", "last1"); + row.put("full_name", "first1 last1"); + row.put("migration_shard_id", "L2_1"); + events.add(row); + + row = new HashMap<>(); + row.put("id", 2); + row.put("first_name", "first2"); + row.put("last_name", "last2"); + row.put("full_name", "first2 last2"); + row.put("migration_shard_id", "L2_2"); + events.add(row); + + SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("select * from Customers")) + .hasRecordsUnorderedCaseInsensitiveColumns(events); + } + private void assertUsersTableContents() { List> events = new ArrayList<>(); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java index a678e990bf..edcba4b5fa 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java @@ -100,7 +100,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + null); } if (jobInfo2 == null) { jobInfo2 = @@ -115,7 +116,8 @@ public void setUp() throws IOException { { put("inputFileFormat", "avro"); } - }); + }, + null); } } } diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro new file mode 100644 index 0000000000000000000000000000000000000000..5dd0748c64fe36ad271c91c820bdd4f250573bb7 GIT binary patch literal 4172 zcmeH}U1%It6o5BJ6Gi+f67yh{lray2+3ua$nc3N(V1faCsUgi|}GG*U(s9)XL9%CN~Cs;Nm%T5GxL~tvdV_Xe_ zY{!{ZevYSK*`Ylhk{n9QOT9ts0djuMFpZMSN>VX6gO z6UJMpvzc_=_TI5jSeolNA#KNm;d?kHT12^v4eItpEA4var02&%#_0)KTWytI)2m>s zrDaH{FSj(n3~V!=W+Be8Ud8#3^|Nsnn{{pczl>FB2hl+=AQJ2_oY#16NUf3{v5kg# zTi?6q8=o2flds!%lf`0U{xs!yy0tCaDr?c&Z5mVOo}LzFJRsBSmbn#Y7wXZ>OI2?d zx$Q>h!tIYV9d9#f+S~Ul4(9RM%zV$S)oYKwfAz%5UFQ#ZQ~U0|vHw|%$|@5zGpn1b zl9h`l$?D7!vyx0S()hgh<^0WC#>Ml|?@-&D)Ip~j9OGG?8dG?B@vMdO*sM6^T1VOU zqMJyNQ)ERJwW6XGiIU@_)M{tneE0R$PFU)I+NlTNYaPHa*Pea#( zKb(ZxjlaJC^9>Y9JY4?jJAg~Q&@F4Kq8j+M07L*7AtI4Tp2$QYDk&g=BB4Yl1~CaE zmPkZVl*GI!!-pL^At!#YM{bRr1dk`lNl5PFU+2F5rxQ-!0dq?;aOSNq24MlY^^H16 z?y==dQ2YCj#na0ul6biI(}_;_7Rf0E)hb9xP6m*$L`Fmzxk$)|ATtfwsmM@9mJ%{0 z7Jit;OlHE^4#0_I4_v+F^6+F5Tpq!ldinazSG(ZHJK@U8eQ@lldxij>IN7surh?$) z#fR2KV%ZE_?u9-juUaZjIfxN;Pj9C_G zti$!4be*`@b&Yu`!$=aIjAUN>rS|skov_daKfkjdx<68f;M1k)3-fa?A-v&FMxpl2 z;&0~`P$cp2*(>J&mV1H7qRO!EWEJOTK}vEIqaMT_W2R{sx=v}asA+|Qsw#>s=kpSF OCLv9xu-dWtDEt@kVwHUW literal 0 HcmV?d00001 diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro new file mode 100644 index 0000000000000000000000000000000000000000..43ffc8865e96999c78b4edf01ee7ca1d01f2f49e GIT binary patch literal 3845 zcmdT`PiP!f7~g!7JxD1YN)BzIVf1FEnVFrP-Ob6AtSA%>?e=1b%bPcElBsXztuwQs zOAV}u7K4XEPgd0GQHmbagCaJ9*b+q{hk^%bD=6xrC$UAe_)TUryV>l_h9(Hz!!mE? z_x*Xl-}im*J!_sGfibe=i=zQuWG?yqv9nUxYA|U^@);GrB~3^kaig^p)_uB^IOp0&8W(41^y>m>>Zqv=QGYk*yOlH_~G>tFye8+ES4YA5K5#<> zPN$aRl1&Ucymc#=E)s76>ntkCO(_%f0QXK%w-{$3>lR1?H-+Ggc&N)3Wg4W#1?l1w zdHWUV0=Fx%M0`xuuT#OBZq%fG8{15ELb8}mnmm-IB93~92_3KHcwwps{3gw3n6oqK zw!@=ip|G^j2_Zwr1oZ`uiHt0Fu|d;W$g++rC!HS~HBKjJ<7*bZmS|$Dr7ij8HAugZmW+^Zat|!t>;LY1ZtnOux6Xa`*Ma=cV^67?#R{fL@V!2edDy(P|qH2Y5 z$ud;KqNZ(>D=ZIjmbp$6n849!69pCST{x7}Qm=peJKKW8TTh%E4D*Y%b>3R6{)yn{DM?le4gBf;7H_Aq_q-lzw8Pu>1t)sDRGiF(o zVuO-OrCctR45L`o^+G|@R8>(3F-=M>3+|2VKxCyO@&n$$Ek>+&{zr1LEc|NdN!< literal 0 HcmV?d00001 diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-schema.sql b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-schema.sql index c41ed6c2f0..57596606b1 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-schema.sql +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/mysql-schema.sql @@ -100,4 +100,26 @@ CREATE TABLE `Authors` ( `id` int NOT NULL, `name` varchar(200), PRIMARY KEY (`id`) -) DEFAULT CHARSET=latin1; \ No newline at end of file +) DEFAULT CHARSET=latin1; + +CREATE TABLE `AllDatatypeTransformation` ( + `varchar_column` varchar(20) NOT NULL, -- To: varchar_column STRING(20) + `tinyint_column` tinyint, -- To: tinyint_column INT64 + `text_column` text, -- To: text_column STRING(MAX) + `date_column` date, -- To: date_column DATE + `int_column` int, -- To: int_column INT64 + `bigint_column` bigint, -- To: bigint_column INT64 + `float_column` float(10,2), -- To: float_column FLOAT64 + `double_column` double, -- To: double_column FLOAT64 + `decimal_column` decimal(10,2), -- To: decimal_column NUMERIC + `datetime_column` datetime, -- To: datetime_column TIMESTAMP + `timestamp_column` timestamp, -- To: timestamp_column TIMESTAMP + `time_column` time, -- To: time_column STRING(MAX) + `year_column` year, -- To: year_column STRING(MAX) + `blob_column` blob, -- To: blob_column BYTES(MAX) + `enum_column` enum('1','2','3'), -- To: enum_column STRING(MAX) + `bool_column` tinyint(1), -- To: bool_column BOOL + `binary_column` binary(20), -- To: binary_column BYTES(MAX) + `bit_column` bit(7), -- To: bit_column BYTES(MAX) + PRIMARY KEY (`varchar_column`) +); \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/spanner-schema.sql b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/spanner-schema.sql index 6eb6a386e5..68df86eaea 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/spanner-schema.sql +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/spanner-schema.sql @@ -94,4 +94,25 @@ CREATE TABLE Users ( CREATE TABLE Authors ( id INT64 NOT NULL, name STRING(200), -) PRIMARY KEY (id); \ No newline at end of file +) PRIMARY KEY (id); + +CREATE TABLE AllDatatypeTransformation ( + varchar_column STRING(20) NOT NULL, + tinyint_column INT64, + text_column STRING(MAX), + date_column DATE, + int_column INT64, + bigint_column INT64, + float_column FLOAT64, + double_column FLOAT64, + decimal_column NUMERIC, + datetime_column TIMESTAMP, + timestamp_column TIMESTAMP, + time_column STRING(MAX), + year_column STRING(MAX), + blob_column BYTES(MAX), + enum_column STRING(MAX), + bool_column BOOL, + binary_column BYTES(MAX), + bit_column BYTES(MAX), +) PRIMARY KEY (varchar_column); \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/statements.sql b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/statements.sql index 4cd0d1c395..f53df76f6f 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/statements.sql +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerDDLIT/statements.sql @@ -187,3 +187,79 @@ Insert into Users values(1, 'Lorem', 'Epsum', 20); Insert into Users values(2, ' ## Authors INSERT INTO Authors (id, name) VALUES (1, 'J.R.R. Tolkien'); INSERT INTO Authors (id, name) VALUES (2, 'Jane Austen'); INSERT INTO Authors (id, name) VALUES (3, 'Douglas Adams'); +## AllDatatypeTransformation +INSERT INTO `AllDatatypeTransformation` VALUES ( + 'example1', 10, 'Some text 1', '2023-05-23', + 100, 123456, 12.34, 123.456, + 12345.67, '2023-05-23 12:34:56', '2023-05-23 12:34:56', '12:34:56', + '2023', X'48656C6C6F', '1', 1, + X'0102030405060708090A0B0C0D0E0F1011121314', B'1010101' +); + +INSERT INTO `AllDatatypeTransformation` VALUES ( + 'example2', 20, 'Some text 2', '2022-12-31', + 200, 987654, 23.45, 234.567, + 23456.78, '2022-12-31 23:59:59', '2022-12-31 23:59:59', '23:59:59', + '2022', X'576F726C64', '2', 0, + X'141312111009080706050403020100FFFFEFEDED', B'0101010' +); + +INSERT INTO `AllDatatypeTransformation` VALUES ( + 'example3', 30, 'Some text 3', '2024-01-01', + 300, 112233, 34.56, 345.678, + 34567.89, '2024-01-01 00:00:00', '2024-01-01 00:00:00', '00:00:00', + '2024', X'536F6D6520626C6F62', '3', 1, + X'00112233445566778899AABBCCDDEEFF00112233', B'1110001' +); + +INSERT INTO `AllDatatypeTransformation` VALUES ( + 'example4', 40, 'Some text 4', '2021-11-11', + 400, 223344, 45.67, 456.789, + 45678.90, '2021-11-11 11:11:11', '2021-11-11 11:11:11', '11:11:11', + '2021', X'416E6F7468657220626C6F62', '1', 0, + X'FFEEDDCCBBAA99887766554433221100AABBCCDD', B'1111111' +); + +UPDATE `AllDatatypeTransformation` SET + `tinyint_column` = 25, + `text_column` = 'Updated text 2', + `date_column` = '2023-01-01', + `int_column` = 250, + `bigint_column` = 56789, + `float_column` = 25.45, + `double_column` = 345.678, + `decimal_column` = 23456.79, + `datetime_column` = '2023-01-01 12:00:00', + `timestamp_column` = '2023-01-01 12:00:00', + `time_column` = '12:00:00', + `year_column` = '2023', + `blob_column` = X'1234567890ABCDEF', + `enum_column` = '3', + `bool_column` = 1, + `binary_column` = X'1234567890ABCDEF1234567890ABCDEF12345678', + `bit_column` = B'1111111' +WHERE `varchar_column` = 'example2'; + +UPDATE `AllDatatypeTransformation` SET + `tinyint_column` = 35, + `text_column` = 'Updated text 3', + `date_column` = '2024-01-02', + `int_column` = 350, + `bigint_column` = 88000, + `float_column` = 35.67, + `double_column` = 456.789, + `decimal_column` = 34567.90, + `datetime_column` = '2024-01-02 00:00:00', + `timestamp_column` = '2024-01-02 00:00:00', + `time_column` = '01:00:00', + `year_column` = '2025', + `blob_column` = X'ABCDEF1234567890', + `enum_column` = '1', + `bool_column` = 0, + `binary_column` = X'ABCDEF1234567890ABCDEF1234567890ABCDEF12', + `bit_column` = B'0000000' +WHERE `varchar_column` = 'example3'; + +DELETE FROM AllDatatypeTransformation WHERE varchar_column = 'example4'; + + diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/Customers-shard1.avro b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/Customers-shard1.avro new file mode 100644 index 0000000000000000000000000000000000000000..8cf987b75ff75a4e87ebfaa9fc52932df4b7667e GIT binary patch literal 1696 zcmds%zi!h&9LIxIRTb(2Lno$NSmGv5;>5{{5FjByv|ANLKi{2`YtKG&cSee;GVuV2 zhhSk~fRTv@s2F$v2E;oc7Vhfg;v{WUBqkCmvYqem-{<=~KaG!Xczf_tihYGAG==k9 zr$|q8iUvqhEF?j7ltF6m4}!&LufOVR)xUINTC4+J!< zCtJ0BMz|8N?z@JVQyfO%^V!A zh;yNsW+HQ)yMP(D#pdD(WMi{ub|vTho%)jWOpOTTRMW+RIC(4tr;r(6mYm69$^|U+ z&fZl=7Q%HWD(BDDW0^+7IwCZIf@?IeP%0!6CY4@ZQWrX(?CNL^e;sV59-h@6h>x)M z;oYyhZ@z86KmYt`>-EmVP9OI=Nu2mGX=1+>34<>%WUzwy`a_V>>6 zJ83_isu$dk0iT#X;C^BTB}JOQ%B8SJS~bE>*a@P381cuk5Q<^J=x*?p?WK u+w`^=F^|o6_tBGs`_B)}mFMkV&=0(umSb=o0KxwRU~2}TRRUCWyX6K5tuS*jOb zKySW*Z{SlXK7t1?zJeZfX46j7cDJnH$rehIet-Wy-{0g}a&pbN4X*^>k?AqX;mgf4 zq$VXn10+bw1x6iIKyL02{HMX_VI>vMiIDnvMhU|b4UW#{lB%L6V3t5CL5ne(cI^Lx z03Iot6R99CtU{J80 zY}NJ|;YPr^?+Ow|Y^r(rhE1ka0&=6K6f_4hx!R1jMx%&cn^8s^jU1t(Vodinb8xsK zF1e(N^1^m*17_S7o0}d(G1hx#R&vhYsxL@S

i-6j>~Yl_#7t0)_Tv$(a%+jKe~2 z?Oksr9mR2LaOB@wW0ILE{|sL*TH(~;k0${&gW?V z!J|)aKK#DFH+c86{dQ|FO2V)g;n)pg61Y8xu^UGjxS{WR^Uud0@4x>_zkV~ncbDHu zJNZPuWNrfKson$TW_nOu<>l*AaC4+(H|&KyKkkQd-wV5pr(o=t-AcMCDU<-_ck4qd zyGp;L-1_Eu4?Ku>`^3wF43p4{cSDcFJ>QR#2;<1d8T9=ygX>P+SF7c;TDR=Ga?7;o sv?_Qp{=wrHhx*EKw$JJN&JDxSzX|~Ve*(~+0SIaU!Rk`{0bO1>bN~PV literal 0 HcmV?d00001 diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-schema.sql b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-schema.sql index 3036eef1cd..db7ff0f75d 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-schema.sql +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-schema.sql @@ -12,3 +12,10 @@ CREATE TABLE `Movie` ( `actor` int, -- To: actor INT64 PRIMARY KEY (`id1`, `id2`) -- Primary keys are reordered in Spanner. Order - (id2, id1, migration_shard_id) ); + +CREATE TABLE `Customers` ( + `id` int NOT NULL, -- To: id INT64 + `first_name` varchar(200), -- To: first_name STRING(200) + `last_name` varchar(200), -- To: last_name STRING(200) + PRIMARY KEY (`id`) -- New column full_name added in spanner +); \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-session.json b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-session.json index 54d07e3b16..ef1a7fd178 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-session.json +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-session.json @@ -7,40 +7,137 @@ "Notes": null, "Tags": null, "SpSchema": { - "t13": { - "Name": "Movie", + "t1": { + "Name": "Customers", "ColIds": [ - "c15", - "c16", - "c17", - "c18", - "c22" + "c4", + "c5", + "c6", + "c14", + "c18" ], - "ShardIdColumn": "c22", + "ShardIdColumn": "c14", "ColDefs": { - "c15": { - "Name": "id1", + "c14": { + "Name": "migration_shard_id", + "T": { + "Name": "STRING", + "Len": 50, + "IsArray": false + }, + "NotNull": false, + "Comment": "", + "Id": "c14", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c18": { + "Name": "full_name", + "T": { + "Name": "STRING", + "Len": 200, + "IsArray": false + }, + "NotNull": false, + "Comment": "", + "Id": "c18", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c4": { + "Name": "id", "T": { "Name": "INT64", "Len": 0, "IsArray": false }, "NotNull": true, - "Comment": "From: id1 int(10)", - "Id": "c15" + "Comment": "From: id int(10)", + "Id": "c4", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c16": { - "Name": "id2", + "c5": { + "Name": "first_name", + "T": { + "Name": "STRING", + "Len": 200, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: first_name varchar(200)", + "Id": "c5", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c6": { + "Name": "last_name", + "T": { + "Name": "STRING", + "Len": 200, + "IsArray": false + }, + "NotNull": false, + "Comment": "From: last_name varchar(200)", + "Id": "c6", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + } + }, + "PrimaryKeys": [ + { + "ColId": "c4", + "Desc": false, + "Order": 2 + }, + { + "ColId": "c14", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "ParentId": "", + "Comment": "Spanner schema for source table Customers", + "Id": "t1" + }, + "t2": { + "Name": "Users", + "ColIds": [ + "c11", + "c12", + "c13", + "c16" + ], + "ShardIdColumn": "c16", + "ColDefs": { + "c11": { + "Name": "id", "T": { "Name": "INT64", "Len": 0, "IsArray": false }, "NotNull": true, - "Comment": "From: id2 int(10)", - "Id": "c16" + "Comment": "From: id int(10)", + "Id": "c11", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c17": { + "c12": { "Name": "name", "T": { "Name": "STRING", @@ -49,20 +146,28 @@ }, "NotNull": false, "Comment": "From: name varchar(200)", - "Id": "c17" + "Id": "c12", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c18": { - "Name": "actor", + "c13": { + "Name": "age_spanner", "T": { "Name": "INT64", "Len": 0, "IsArray": false }, "NotNull": false, - "Comment": "From: actor int(10)", - "Id": "c18" + "Comment": "From: age bigint(19)", + "Id": "c13", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c22": { + "c16": { "Name": "migration_shard_id", "T": { "Name": "STRING", @@ -71,12 +176,16 @@ }, "NotNull": false, "Comment": "", - "Id": "c22" + "Id": "c16", + "AutoGen": { + "Name": "", + "GenerationType": "" + } } }, "PrimaryKeys": [ { - "ColId": "c15", + "ColId": "c11", "Desc": false, "Order": 2 }, @@ -84,107 +193,138 @@ "ColId": "c16", "Desc": false, "Order": 1 - }, - { - "ColId": "c22", - "Desc": false, - "Order": 3 } ], "ForeignKeys": null, "Indexes": null, "ParentId": "", - "Comment": "Spanner schema for source table Movie", - "Id": "t13" + "Comment": "Spanner schema for source table Users", + "Id": "t2" }, - "t14": { - "Name": "Users", + "t3": { + "Name": "Movie", "ColIds": [ - "c19", - "c20", - "c21", - "c23" + "c7", + "c8", + "c9", + "c10", + "c15" ], - "ShardIdColumn": "c23", + "ShardIdColumn": "c15", "ColDefs": { - "c19": { - "Name": "id", + "c10": { + "Name": "actor", "T": { "Name": "INT64", "Len": 0, "IsArray": false }, - "NotNull": true, - "Comment": "From: id int(10)", - "Id": "c19" + "NotNull": false, + "Comment": "From: actor int(10)", + "Id": "c10", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c20": { - "Name": "name", + "c15": { + "Name": "migration_shard_id", "T": { "Name": "STRING", - "Len": 200, + "Len": 50, "IsArray": false }, "NotNull": false, - "Comment": "From: name varchar(200)", - "Id": "c20" + "Comment": "", + "Id": "c15", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c21": { - "Name": "age_spanner", + "c7": { + "Name": "id1", "T": { "Name": "INT64", "Len": 0, "IsArray": false }, - "NotNull": false, - "Comment": "From: age bigint(19)", - "Id": "c21" + "NotNull": true, + "Comment": "From: id1 int(10)", + "Id": "c7", + "AutoGen": { + "Name": "", + "GenerationType": "" + } }, - "c23": { - "Name": "migration_shard_id", + "c8": { + "Name": "id2", + "T": { + "Name": "INT64", + "Len": 0, + "IsArray": false + }, + "NotNull": true, + "Comment": "From: id2 int(10)", + "Id": "c8", + "AutoGen": { + "Name": "", + "GenerationType": "" + } + }, + "c9": { + "Name": "name", "T": { "Name": "STRING", - "Len": 50, + "Len": 200, "IsArray": false }, "NotNull": false, - "Comment": "", - "Id": "c23" + "Comment": "From: name varchar(200)", + "Id": "c9", + "AutoGen": { + "Name": "", + "GenerationType": "" + } } }, "PrimaryKeys": [ { - "ColId": "c19", + "ColId": "c7", "Desc": false, "Order": 2 }, { - "ColId": "c23", + "ColId": "c8", "Desc": false, "Order": 1 + }, + { + "ColId": "c15", + "Desc": false, + "Order": 3 } ], "ForeignKeys": null, "Indexes": null, "ParentId": "", - "Comment": "Spanner schema for source table Users", - "Id": "t14" + "Comment": "Spanner schema for source table Movie", + "Id": "t3" } }, "SyntheticPKeys": {}, "SrcSchema": { - "t13": { - "Name": "Movie", - "Schema": "S1L1", + "t1": { + "Name": "Customers", + "Schema": "it_test", "ColIds": [ - "c15", - "c16", - "c17", - "c18" + "c4", + "c5", + "c6" ], "ColDefs": { - "c15": { - "Name": "id1", + "c4": { + "Name": "id", "Type": { "Name": "int", "Mods": [ @@ -201,10 +341,71 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c15" + "Id": "c4" }, - "c16": { - "Name": "id2", + "c5": { + "Name": "first_name", + "Type": { + "Name": "varchar", + "Mods": [ + 200 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c5" + }, + "c6": { + "Name": "last_name", + "Type": { + "Name": "varchar", + "Mods": [ + 200 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c6" + } + }, + "PrimaryKeys": [ + { + "ColId": "c4", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "Id": "t1" + }, + "t2": { + "Name": "Users", + "Schema": "it_test", + "ColIds": [ + "c11", + "c12", + "c13" + ], + "ColDefs": { + "c11": { + "Name": "id", "Type": { "Name": "int", "Mods": [ @@ -221,9 +422,9 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c16" + "Id": "c11" }, - "c17": { + "c12": { "Name": "name", "Type": { "Name": "varchar", @@ -241,14 +442,14 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c17" + "Id": "c12" }, - "c18": { - "Name": "actor", + "c13": { + "Name": "age", "Type": { - "Name": "int", + "Name": "bigint", "Mods": [ - 10 + 19 ], "ArrayBounds": null }, @@ -261,36 +462,52 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c18" + "Id": "c13" } }, "PrimaryKeys": [ { - "ColId": "c15", + "ColId": "c11", "Desc": false, "Order": 1 - }, - { - "ColId": "c16", - "Desc": false, - "Order": 2 } ], "ForeignKeys": null, "Indexes": null, - "Id": "t13" + "Id": "t2" }, - "t14": { - "Name": "Users", - "Schema": "S1L1", + "t3": { + "Name": "Movie", + "Schema": "it_test", "ColIds": [ - "c19", - "c20", - "c21" + "c7", + "c8", + "c9", + "c10" ], "ColDefs": { - "c19": { - "Name": "id", + "c10": { + "Name": "actor", + "Type": { + "Name": "int", + "Mods": [ + 10 + ], + "ArrayBounds": null + }, + "NotNull": false, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c10" + }, + "c7": { + "Name": "id1", "Type": { "Name": "int", "Mods": [ @@ -307,18 +524,18 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c19" + "Id": "c7" }, - "c20": { - "Name": "name", + "c8": { + "Name": "id2", "Type": { - "Name": "varchar", + "Name": "int", "Mods": [ - 200 + 10 ], "ArrayBounds": null }, - "NotNull": false, + "NotNull": true, "Ignored": { "Check": false, "Identity": false, @@ -327,14 +544,14 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c20" + "Id": "c8" }, - "c21": { - "Name": "age", + "c9": { + "Name": "name", "Type": { - "Name": "bigint", + "Name": "varchar", "Mods": [ - 19 + 200 ], "ArrayBounds": null }, @@ -347,30 +564,80 @@ "ForeignKey": false, "AutoIncrement": false }, - "Id": "c21" + "Id": "c9" } }, "PrimaryKeys": [ { - "ColId": "c19", + "ColId": "c7", "Desc": false, "Order": 1 + }, + { + "ColId": "c8", + "Desc": false, + "Order": 2 } ], "ForeignKeys": null, "Indexes": null, - "Id": "t14" + "Id": "t3" + } + }, + "SchemaIssues": { + "t1": { + "ColumnLevelIssues": { + "c14": [ + 29 + ], + "c4": [ + 14 + ], + "c5": [], + "c6": [] + }, + "TableLevelIssues": null + }, + "t2": { + "ColumnLevelIssues": { + "c11": [ + 14 + ], + "c12": [], + "c13": [], + "c16": [ + 29 + ] + }, + "TableLevelIssues": null + }, + "t3": { + "ColumnLevelIssues": { + "c10": [ + 14 + ], + "c15": [ + 29 + ], + "c7": [ + 14 + ], + "c8": [ + 14 + ], + "c9": [] + }, + "TableLevelIssues": null } }, - "SchemaIssues": {}, "Location": {}, "TimezoneOffset": "+00:00", "SpDialect": "google_standard_sql", "UniquePKey": {}, "Rules": [ { - "Id": "r24", - "Name": "r24", + "Id": "r17", + "Name": "r17", "Type": "add_shard_id_primary_key", "ObjectType": "", "AssociatedObjects": "All Tables", @@ -383,5 +650,8 @@ } } ], - "IsSharded": true + "IsSharded": true, + "SpRegion": "", + "ResourceValidation": false, + "UI": false } \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-statements.sql b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-statements.sql index 2e73dea2e4..b2f26df10e 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-statements.sql +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/mysql-statements.sql @@ -31,4 +31,13 @@ Update Movie Set actor=27 where id1=1 AND id2=1; Insert into Movie values(18, 12 ## S1L2 - shard2 Insert into Movie values(18, 23, 'Mov945', 18); Insert into Movie values(26, 12, 'Mov764', 9); Insert into Movie values(17, 27, 'Mov294', 25); -Update Movie Set actor=8 where id1=26 AND id2=12; Insert into Movie values(13, 8, 'Tester828', 15); delete from Movie where id1=17 AND id2=27; \ No newline at end of file +Update Movie Set actor=8 where id1=26 AND id2=12; Insert into Movie values(13, 8, 'Tester828', 15); delete from Movie where id1=17 AND id2=27; + + +## Custom transformation test + +## S1L1 - shard1 +Insert into Customers values(1, 'first1', 'last1'); Insert into Customers values(2, 'first2', 'last2'); + +## S1L2 - shard2 +Insert into Customers values(1, 'first1', 'last1'); Insert into Customers values(2, 'first2', 'last2'); \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql index 0fcbfffc7a..a31f04ef41 100644 --- a/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql +++ b/v2/datastream-to-spanner/src/test/resources/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql @@ -12,3 +12,11 @@ CREATE TABLE IF NOT EXISTS Movie ( actor INT64, migration_shard_id STRING(50), ) PRIMARY KEY (id2, id1, migration_shard_id); + +CREATE TABLE IF NOT EXISTS Customers ( + id INT64 NOT NULL, + first_name STRING(200), + last_name STRING(200), + full_name STRING(200), + migration_shard_id STRING(50), +) PRIMARY KEY (migration_shard_id, id); \ No newline at end of file diff --git a/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForIT.java b/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForIT.java new file mode 100644 index 0000000000..e7b2f8bea5 --- /dev/null +++ b/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithShardForIT.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.custom; + +import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException; +import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer; +import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest; +import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CustomTransformationWithShardForIT implements ISpannerMigrationTransformer { + + private static final Logger LOG = LoggerFactory.getLogger(CustomShardIdFetcher.class); + + @Override + public void init(String parameters) { + LOG.info("init called with {}", parameters); + } + + @Override + public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request) + throws InvalidTransformationException { + if (request.getTableName().equals("Customers")) { + Map row = new HashMap<>(request.getRequestRow()); + row.put("full_name", row.get("first_name") + " " + row.get("last_name")); + row.put("migration_shard_id", request.getShardId() + "_" + row.get("id")); + MigrationTransformationResponse response = new MigrationTransformationResponse(row, false); + return response; + } else if (request.getTableName().equals("AllDatatypeTransformation")) { + Map row = new HashMap<>(request.getRequestRow()); + // Filter event in case "varchar_column" = "example1" + if (row.get("varchar_column").equals("example1")) { + return new MigrationTransformationResponse(request.getRequestRow(), true); + } + // In case of update events, return request as response without any transformation + if (request.getEventType().equals("UPDATE-INSERT")) { + return new MigrationTransformationResponse(request.getRequestRow(), false); + } + // In case of backfill update the values for all the columns in all the rows except the + // filtered row. + row.put("tinyint_column", (Long) row.get("tinyint_column") + 1); + row.put("text_column", row.get("text_column") + " append"); + row.put("int_column", (Long) row.get("int_column") + 1); + row.put("bigint_column", (Long) row.get("bigint_column") + 1); + row.put("float_column", (double) row.get("float_column") + 1); + row.put("double_column", (double) row.get("double_column") + 1); + Double value = Double.parseDouble((String) row.get("decimal_column")); + row.put("decimal_column", String.valueOf(value + 1)); + row.put("time_column", (Long) row.get("time_column") + 1000); + row.put("bool_column", 1); + row.put("enum_column", "1"); + row.put("blob_column", "576f726d64"); + row.put("binary_column", "0102030405060708090A0B0C0D0E0F1011121314"); + row.put("bit_column", 13); + row.put("year_column", (Long) row.get("year_column") + 1); + try { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX"); + Date date = dateFormat.parse((String) row.get("date_column")); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + calendar.add(Calendar.DAY_OF_MONTH, 1); + row.put("date_column", dateFormat.format(calendar.getTime())); + Date dateTime = dateTimeFormat.parse((String) row.get("datetime_column")); + calendar.setTime(dateTime); + calendar.add(Calendar.SECOND, -1); + row.put("datetime_column", dateTimeFormat.format(calendar.getTime())); + dateTime = dateTimeFormat.parse((String) row.get("timestamp_column")); + calendar.setTime(dateTime); + calendar.add(Calendar.SECOND, -1); + row.put("timestamp_column", dateTimeFormat.format(calendar.getTime())); + + } catch (Exception e) { + throw new InvalidTransformationException(e); + } + + MigrationTransformationResponse response = new MigrationTransformationResponse(row, false); + return response; + } + return new MigrationTransformationResponse(request.getRequestRow(), false); + } + + @Override + public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request) { + return null; + } +} From 76f1349aec905988917d5db82371e5e20c536207 Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Thu, 6 Jun 2024 13:12:07 +0530 Subject: [PATCH 2/2] exclude unit test --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index b1eea19871..f5169217ce 100644 --- a/pom.xml +++ b/pom.xml @@ -362,6 +362,7 @@ **/constants/** **/CustomTransformationImplFetcher.* **/JarFileReader.* + **/CustomTransformationWithShardForIT.*