Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#1634 from shreyakhajanchi:forw…
Browse files Browse the repository at this point in the history
…ard-transformation-it

PiperOrigin-RevId: 640876003
  • Loading branch information
cloud-teleport committed Jun 6, 2024
2 parents b6cd530 + 76f1349 commit dc4bfa6
Show file tree
Hide file tree
Showing 19 changed files with 962 additions and 129 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@
<exclude>**/constants/**</exclude>
<exclude>**/CustomTransformationImplFetcher.*</exclude>
<exclude>**/JarFileReader.*</exclude>
<exclude>**/CustomTransformationWithShardForIT.*</exclude>
</excludes>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class DataStreamToSpannerDDLIT extends DataStreamToSpannerITBase {
private static final String TABLE6 = "Books";
private static final String TABLE7 = "Authors";

private static final String TRANSFORMATION_TABLE = "AllDatatypeTransformation";

private static HashSet<DataStreamToSpannerDDLIT> testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo;

Expand All @@ -72,7 +75,7 @@ public class DataStreamToSpannerDDLIT extends DataStreamToSpannerITBase {
* @throws IOException
*/
@Before
public void setUp() throws IOException {
public void setUp() throws IOException, InterruptedException {
// Prevent cleaning up of dataflow job after a test method is executed.
skipBaseCleanup = true;
synchronized (DataStreamToSpannerDDLIT.class) {
Expand All @@ -81,6 +84,11 @@ public void setUp() throws IOException {
spannerResourceManager = setUpSpannerResourceManager();
pubsubResourceManager = setUpPubSubResourceManager();
createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE);
createAndUploadJarToGcs("DatatypeIT");
CustomTransformation customTransformation =
CustomTransformation.builder(
"customTransformation.jar", "com.custom.CustomTransformationWithShardForIT")
.build();
jobInfo =
launchDataflowJob(
getClass().getSimpleName(),
Expand All @@ -93,7 +101,8 @@ public void setUp() throws IOException {
{
put("inputFileFormat", "avro");
}
});
},
customTransformation);
}
}
}
Expand Down Expand Up @@ -222,6 +231,59 @@ public void migrationTestWithAllDatatypeDefaultMapping() {
assertAllDatatypeColumns2TableCdcContents();
}

@Test
public void migrationTestWithAllDatatypeTransformation() {
// Construct a ChainedConditionCheck with 4 stages.
// 1. Send initial wave of events
// 2. Wait on Spanner to have events
ChainedConditionCheck conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo,
TRANSFORMATION_TABLE,
"backfill.avro",
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro"),
SpannerRowsCheck.builder(spannerResourceManager, TRANSFORMATION_TABLE)
.setMinRows(3)
.setMaxRows(3)
.build()))
.build();

// Wait for conditions
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

// Assert Conditions
assertThatResult(result).meetsConditions();

assertAllDatatypeTransformationTableBackfillContents();

conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo,
TRANSFORMATION_TABLE,
"cdc.avro",
"DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro"),
SpannerRowsCheck.builder(spannerResourceManager, TRANSFORMATION_TABLE)
.setMinRows(2)
.setMaxRows(2)
.build()))
.build();

result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

// Assert Conditions
assertThatResult(result).meetsConditions();

assertAllDatatypeTransformationTableCdcContents();
}

@Test
public void migrationTestWithDatatypeSizeConversion() {
// Construct a ChainedConditionCheck with 4 stages.
Expand Down Expand Up @@ -544,6 +606,126 @@ private void assertAllDatatypeColumns2TableBackfillContents() {
.hasRecordsUnorderedCaseInsensitiveColumns(events);
}

private void assertAllDatatypeTransformationTableBackfillContents() {
List<Map<String, Object>> events = new ArrayList<>();

Map<String, Object> row = new HashMap<>();
row.put("varchar_column", "example2");
row.put("tinyint_column", 21);
row.put("text_column", "Some text 2 append");
row.put("date_column", "2023-01-01");
row.put("int_column", 201);
row.put("bigint_column", 987655);
row.put("float_column", 24.45);
row.put("double_column", 235.567);
row.put("decimal_column", 23457.78);
row.put("datetime_column", "2022-12-31T23:59:58Z");
row.put("timestamp_column", "2022-12-31T23:59:58Z");
row.put("time_column", "86399001000");
row.put("year_column", "2023");
row.put("blob_column", "V29ybWQ=");
row.put("enum_column", "1");
row.put("bool_column", true);
row.put("binary_column", "AQIDBAUGBwgJCgsMDQ4PEBESExQ=");
row.put("bit_column", "Ew==");
events.add(row);

row = new HashMap<>();
row.put("varchar_column", "example3");
row.put("tinyint_column", 31);
row.put("text_column", "Some text 3 append");
row.put("date_column", "2024-01-02");
row.put("int_column", 301);
row.put("bigint_column", 112234);
row.put("float_column", 35.56);
row.put("double_column", 346.678);
row.put("decimal_column", 34568.89);
row.put("datetime_column", "2023-12-31T23:59:59Z");
row.put("timestamp_column", "2023-12-31T23:59:59Z");
row.put("time_column", "1000");
row.put("year_column", "2025");
row.put("blob_column", "V29ybWQ=");
row.put("enum_column", "1");
row.put("bool_column", true);
row.put("binary_column", "AQIDBAUGBwgJCgsMDQ4PEBESExQ=");
row.put("bit_column", "Ew==");
events.add(row);

row = new HashMap<>();
row.put("varchar_column", "example4");
row.put("tinyint_column", 41);
row.put("text_column", "Some text 4 append");
row.put("date_column", "2021-11-12");
row.put("int_column", 401);
row.put("bigint_column", 223345);
row.put("float_column", 46.67);
row.put("double_column", 457.789);
row.put("decimal_column", 45679.90);
row.put("datetime_column", "2021-11-11T11:11:10Z");
row.put("timestamp_column", "2021-11-11T11:11:10Z");
row.put("time_column", "40271001000");
row.put("year_column", "2022");
row.put("blob_column", "V29ybWQ=");
row.put("enum_column", "1");
row.put("bool_column", true);
row.put("binary_column", "AQIDBAUGBwgJCgsMDQ4PEBESExQ=");
row.put("bit_column", "Ew==");
events.add(row);

SpannerAsserts.assertThatStructs(
spannerResourceManager.runQuery("select* from AllDatatypeTransformation"))
.hasRecordsUnorderedCaseInsensitiveColumns(events);
}

private void assertAllDatatypeTransformationTableCdcContents() {
List<Map<String, Object>> events = new ArrayList<>();
Map<String, Object> row = new HashMap<>();
row.put("varchar_column", "example2");
row.put("tinyint_column", 25);
row.put("text_column", "Updated text 2");
row.put("date_column", "2023-01-01");
row.put("int_column", 250);
row.put("bigint_column", 56789);
row.put("float_column", 25.45);
row.put("double_column", 345.678);
row.put("decimal_column", 23456.79);
row.put("datetime_column", "2023-01-01T12:00:00Z");
row.put("timestamp_column", "2023-01-01T12:00:00Z");
row.put("time_column", "43200000000");
row.put("year_column", "2023");
row.put("blob_column", "EjRWeJCrze8=");
row.put("enum_column", "3");
row.put("bool_column", true);
row.put("binary_column", "EjRWeJCrze8SNFZ4kKvN7xI0Vng=");
row.put("bit_column", "ASc=");
events.add(row);

row = new HashMap<>();
row.put("varchar_column", "example3");
row.put("tinyint_column", 35);
row.put("text_column", "Updated text 3");
row.put("date_column", "2024-01-02");
row.put("int_column", 350);
row.put("bigint_column", 88000);
row.put("float_column", 35.67);
row.put("double_column", 456.789);
row.put("decimal_column", 34567.90);
row.put("datetime_column", "2024-01-02T00:00:00Z");
row.put("timestamp_column", "2024-01-02T00:00:00Z");
row.put("time_column", "3600000000");
row.put("year_column", "2025");
row.put("blob_column", "q83vEjRWeJA=");
row.put("enum_column", "1");
row.put("bool_column", false);
row.put("binary_column", "q83vEjRWeJCrze8SNFZ4kKvN7xI=");
row.put("bit_column", "AA==");
events.add(row);

SpannerAsserts.assertThatStructs(
spannerResourceManager.runQuery("select * from AllDatatypeTransformation"))
.hasRecordsUnorderedCaseInsensitiveColumns(events);
}

private void assertAllDatatypeColumns2TableCdcContents() {
List<Map<String, Object>> events = new ArrayList<>();
Map<String, Object> row = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void setUp() throws IOException {
{
put("inputFileFormat", "avro");
}
});
},
null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;

import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.common.io.Resources;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
Expand All @@ -28,11 +29,14 @@
import java.util.Map.Entry;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.utils.IORedirectUtil;
import org.apache.beam.it.common.utils.PipelineUtils;
import org.apache.beam.it.conditions.ConditionCheck;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for DataStreamToSpanner integration tests. It provides helper functions related to
Expand All @@ -42,6 +46,7 @@ public abstract class DataStreamToSpannerITBase extends TemplateTestBase {

// Format of avro file path in GCS - {table}/2023/12/20/06/57/{fileName}
public static final String DATA_STREAM_EVENT_FILES_PATH_FORMAT_IN_GCS = "%s/2023/12/20/06/57/%s";
private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpannerITBase.class);

public PubsubResourceManager setUpPubSubResourceManager() throws IOException {
return PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build();
Expand Down Expand Up @@ -162,7 +167,8 @@ protected LaunchInfo launchDataflowJob(
String gcsPathPrefix,
SpannerResourceManager spannerResourceManager,
PubsubResourceManager pubsubResourceManager,
Map<String, String> jobParameters)
Map<String, String> jobParameters,
CustomTransformation customTransformation)
throws IOException {

if (sessionFileResourceName != null) {
Expand Down Expand Up @@ -217,6 +223,13 @@ protected LaunchInfo launchDataflowJob(
getGcsPath(gcsPathPrefix + "/transformationContext.json"));
}

if (customTransformation != null) {
params.put(
"transformationJarPath",
getGcsPath(gcsPathPrefix + "/" + customTransformation.jarPath()));
params.put("transformationClassName", customTransformation.classPath());
}

// overridden parameters
if (jobParameters != null) {
for (Entry<String, String> entry : jobParameters.entrySet()) {
Expand All @@ -236,4 +249,24 @@ protected LaunchInfo launchDataflowJob(

return jobInfo;
}

public void createAndUploadJarToGcs(String gcsPathPrefix)
throws IOException, InterruptedException {
String[] commands = {"cd ../spanner-custom-shard", "mvn install"};

// Join the commands with && to execute them sequentially
String[] shellCommand = {"/bin/bash", "-c", String.join(" && ", commands)};

Process exec = Runtime.getRuntime().exec(shellCommand);

IORedirectUtil.redirectLinesLog(exec.getInputStream(), LOG);
IORedirectUtil.redirectLinesLog(exec.getErrorStream(), LOG);

if (exec.waitFor() != 0) {
throw new RuntimeException("Error staging template, check Maven logs.");
}
gcsClient.uploadArtifact(
gcsPathPrefix + "/customTransformation.jar",
"../spanner-custom-shard/target/spanner-custom-shard-1.0-SNAPSHOT.jar");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void setUp() throws IOException {
{
put("inputFileFormat", "avro");
}
});
},
null);
}
}
}
Expand Down
Loading

0 comments on commit dc4bfa6

Please sign in to comment.