Skip to content

Commit

Permalink
add mongodb->starrocks end-to-end test
Browse files Browse the repository at this point in the history
  • Loading branch information
Curry-CPU committed Oct 9, 2023
1 parent 0780030 commit 588c0de
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand Down Expand Up @@ -179,6 +190,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mongodb-cdc-v1.15</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-mongodb-cdc.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.bson.Document;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Updates.*;

/**
* End-to-end tests for sort-connector-mongodb-cdc-v1.15 uber jar.
* Test flink sql Mongodb cdc to StarRocks
*/
public class MongodbToStarRocksTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(MongodbToStarRocksTest.class);

private static final Path mongodbJar = TestUtils.getResource("sort-connector-mongodb-cdc.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");

private static final Logger STAR_ROCKS_LOG = LoggerFactory.getLogger(StarRocksContainer.class);

private static final String sqlFile;

// ----------------------------------------------------------------------------------------
// StarRocks Variables
// ----------------------------------------------------------------------------------------
private static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks";
private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks";
private static final String NEW_STARROCKS_TAG = "latest";
private static final String STAR_ROCKS_IMAGE_NAME = "starrocks/allin1-ubi:3.0.4";

static {
try {
sqlFile = Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/mongodb_test.sql").toURI())
.toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

private static String getNewStarRocksImageName() {
return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG;
}

public static void buildStarRocksImage() {
GenericContainer oldStarRocks = new GenericContainer(STAR_ROCKS_IMAGE_NAME);
Startables.deepStart(Stream.of(oldStarRocks)).join();
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
"/data/deploy/");
try {
oldStarRocks.execInContainer("chmod", "+x", "/data/deploy/start_fe_be.sh");
} catch (Exception e) {
e.printStackTrace();
}
oldStarRocks.getDockerClient()
.commitCmd(oldStarRocks.getContainerId())
.withRepository(NEW_STARROCKS_REPOSITORY)
.withTag(NEW_STARROCKS_TAG).exec();
oldStarRocks.stop();
}

@ClassRule
public static StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@ClassRule
public static MongoDBContainer MONGODB_CONTAINER = new MongoDBContainer(
DockerImageName.parse("mongo:4.0.10").asCompatibleSubstituteFor("mongo"))
.withNetwork(NETWORK)
.withNetworkAliases("mongo")
.withLogConsumer(new Slf4jLogConsumer(LOG));

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializeStarRocksTable();
}

private void initializeStarRocksTable() {
try (Connection conn =
DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(),
STAR_ROCKS.getPassword());
Statement stat = conn.createStatement()) {
stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n"
+ " _id INT NOT NULL,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512)\n"
+ ")\n"
+ "PRIMARY KEY(_id)\n"
+ "DISTRIBUTED by HASH(_id) PROPERTIES (\"replication_num\" = \"1\");");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@AfterClass
public static void teardown() {
if (MONGODB_CONTAINER != null) {
MONGODB_CONTAINER.stop();
}
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
}

/**
* Test flink sql postgresql cdc to StarRocks
*
* @throws Exception The exception may throws when execute the case
*/
@Test
public void testMongodbUpdateAndDelete() throws Exception {
submitSQLJob(sqlFile, jdbcJar, mongodbJar, mysqlJdbcJar);
waitUntilJobRunning(Duration.ofSeconds(10));

// generate input
MongoClient mongoClient = MongoClients.create(MONGODB_CONTAINER.getConnectionString());
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("test_input1");

Document document1 = new Document("_id", 1)
.append("name", "jacket")
.append("description", "water resistent white wind breaker");
Document document2 = new Document("_id", 2)
.append("name", "scooter")
.append("description", "Big 2-wheel scooter ");
List<Document> documents = new ArrayList<Document>();
documents.add(document1);
documents.add(document2);
collection.insertMany(documents);
collection.updateOne(eq("_id", 2), combine(set("name", "tom")));
collection.deleteOne(eq("_id",1));

JdbcProxy proxy =
new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(),
STAR_ROCKS.getPassword(),
STAR_ROCKS.getDriverClassName());
List<String> expectResult =
Arrays.asList("2,tom,Big 2-wheel scooter ");
proxy.checkResultWithTimeout(
expectResult,
"test_output1",
3,
60000L);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 2 users different privileges:
--
-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
-- 2) 'inlong' - all privileges
--
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "inventory" },
{ role: "readAnyDatabase", db: "inventory" }
]
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
SET 'execution.checkpointing.interval' = '3s';
CREATE TABLE test_input1 (
_id INT primary key,
name STRING,
description STRING
) WITH (
'connector' = 'mongodb-cdc-inlong',
'hosts' = 'mongo:27017',
'database' = 'test',
'collection' = 'test_input1',
'connection.options' = 'connectTimeoutMS=30000&maxIdleTimeMS=20000'
);

CREATE TABLE test_output1 (
_id INT primary key,
name STRING,
description STRING
) WITH (
'connector' = 'starrocks-inlong',
'jdbc-url' = 'jdbc:mysql://starrocks:9030',
'load-url'='starrocks:8030',
'database-name'='test',
'table-name' = 'test_output1',
'username' = 'inlong',
'password' = 'inlong',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.buffer-flush.interval-ms' = '1000'
);

INSERT INTO test_output1 select * from test_input1;

0 comments on commit 588c0de

Please sign in to comment.