Skip to content

Commit

Permalink
[INLONG-8967][Sort] Add Mysql connector on flink 1.15 (#8980)
Browse files Browse the repository at this point in the history
  • Loading branch information
liaosunny123 authored Oct 7, 2023
1 parent 51c689e commit a3b3fc7
Show file tree
Hide file tree
Showing 17 changed files with 1,072 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,13 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/target</directory>
<outputDirectory>inlong-sort/connectors</outputDirectory>
<includes>
<include>sort-connector-mysql-v1.15-${project.version}.jar</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
</assembly>
6 changes: 6 additions & 0 deletions inlong-sort/sort-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-mysql-cdc.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-starrocks-v1.15</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.StarRocksManager;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;

/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Mysql cdc to StarRocks
*/
public class MysqlToRocksTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(MysqlToRocksTest.class);

private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
private static final String sqlFile;

static {
try {
sqlFile =
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
StarRocksManager.buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@ClassRule
public static final MySqlContainer MYSQL_CONTAINER =
(MySqlContainer) new MySqlContainer(MySqlContainer.MySqlVersion.V8_0)
.withDatabaseName("test")
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(LOG));

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializeMysqlTable();
initializeStarRocksTable(STAR_ROCKS);
}

private void initializeMysqlTable() {
try {
Class.forName(MYSQL_CONTAINER.getDriverClassName());
Connection conn = DriverManager
.getConnection(MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
Statement stat = conn.createStatement();
stat.execute(
"CREATE TABLE test_input1 (\n"
+ " id SERIAL,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512),\n"
+ " PRIMARY KEY(id)\n"
+ ");");
stat.close();
conn.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@AfterClass
public static void teardown() {
if (MYSQL_CONTAINER != null) {
MYSQL_CONTAINER.stop();
}
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
}

/**
* Test flink sql postgresql cdc to StarRocks
*
* @throws Exception The exception may throws when execute the case
*/
@Test
public void testMysqlUpdateAndDelete() throws Exception {
submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
waitUntilJobRunning(Duration.ofSeconds(10));

// generate input
try (Connection conn =
DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
Statement stat = conn.createStatement()) {
stat.execute(
"INSERT INTO test_input1 "
+ "VALUES (1,'jacket','water resistent white wind breaker');");
stat.execute(
"INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');");
stat.execute(
"update test_input1 set name = 'tom' where id = 2;");
stat.execute(
"delete from test_input1 where id = 1;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}

JdbcProxy proxy =
new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(),
STAR_ROCKS.getPassword(),
STAR_ROCKS.getDriverClassName());
List<String> expectResult =
Arrays.asList("2,tom,Big 2-wheel scooter ");
proxy.checkResultWithTimeout(
expectResult,
"test_output1",
3,
60000L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.net.URISyntaxException;
import java.nio.file.Path;
Expand All @@ -45,8 +42,12 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Postgres cdc to StarRocks
Expand All @@ -58,72 +59,42 @@ public class PostgresToStarRocksTest extends FlinkContainerTestEnv {
private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");

private static final Logger STAR_ROCKS_LOG = LoggerFactory.getLogger(StarRocksContainer.class);

private static final String sqlFile;

// ----------------------------------------------------------------------------------------
// StarRocks Variables
// ----------------------------------------------------------------------------------------
private static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks";
private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks";
private static final String NEW_STARROCKS_TAG = "latest";
private static final String STAR_ROCKS_IMAGE_NAME = "starrocks/allin1-ubi:3.0.4";

static {
try {
sqlFile = Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()).toString();
sqlFile = Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI())
.toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

private static String getNewStarRocksImageName() {
return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG;
}

public static void buildStarRocksImage() {
GenericContainer oldStarRocks = new GenericContainer(STAR_ROCKS_IMAGE_NAME);
Startables.deepStart(Stream.of(oldStarRocks)).join();
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
"/data/deploy/");
try {
oldStarRocks.execInContainer("chmod", "+x", "/data/deploy/start_fe_be.sh");
} catch (Exception e) {
e.printStackTrace();
}
oldStarRocks.getDockerClient()
.commitCmd(oldStarRocks.getContainerId())
.withRepository(NEW_STARROCKS_REPOSITORY)
.withTag(NEW_STARROCKS_TAG).exec();
oldStarRocks.stop();
}

@ClassRule
public static StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
public static StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@ClassRule
public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer(
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"))
.withUsername("flinkuser")
.withPassword("flinkpw")
.withDatabaseName("test")
.withNetwork(NETWORK)
.withNetworkAliases("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOG));
.withUsername("flinkuser")
.withPassword("flinkpw")
.withDatabaseName("test")
.withNetwork(NETWORK)
.withNetworkAliases("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOG));

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializePostgresTable();
initializeStarRocksTable();
initializeStarRocksTable(STAR_ROCKS);
}

private void initializePostgresTable() {
Expand All @@ -149,23 +120,6 @@ private void initializePostgresTable() {
}
}

private void initializeStarRocksTable() {
try (Connection conn =
DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(),
STAR_ROCKS.getPassword());
Statement stat = conn.createStatement()) {
stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n"
+ " id INT NOT NULL,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512)\n"
+ ")\n"
+ "PRIMARY KEY(id)\n"
+ "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\" = \"1\");");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@AfterClass
public static void teardown() {
if (POSTGRES_CONTAINER != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public static void before() {
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withLogConsumer(new Slf4jLogConsumer(JM_LOG));
taskManager =
new GenericContainer<>("flink:1.15.4-scala_2.12")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public MySqlContainer() {

public MySqlContainer(MySqlVersion version) {
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
addExposedPort(MYSQL_PORT);
addFixedExposedPort(33306, 3306);
}

@Override
Expand All @@ -60,8 +60,6 @@ protected Set<Integer> getLivenessCheckPorts() {
@Override
protected void configure() {
// HERE is the difference, copy to /etc/mysql/, if copy to /etc/mysql/conf.d will be wrong
optionallyMapResourceParameterAsVolume(
MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");

if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
optionallyMapResourceParameterAsVolume(
Expand All @@ -79,6 +77,7 @@ protected void configure() {
throw new ContainerLaunchException(
"Empty password can be used only with the root user");
}
withCommand("--default-authentication-plugin=mysql_native_password");
setStartupAttempts(3);
}

Expand All @@ -100,7 +99,8 @@ public String getJdbcUrl(String databaseName) {
+ getDatabasePort()
+ "/"
+ databaseName
+ additionalUrlParams;
+ additionalUrlParams
+ "?useSSL=false&allowPublicKeyRetrieval=true";
}

@Override
Expand Down
Loading

0 comments on commit a3b3fc7

Please sign in to comment.