Skip to content

Commit

Permalink
Add GraalVM Reachability Metadata and corresponding nativeTest for Co…
Browse files Browse the repository at this point in the history
…nsul integration
  • Loading branch information
linghengqian committed Jan 2, 2024
1 parent 592464b commit fda710a
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.google.common.base.Strings;
import lombok.Getter;
import org.apache.http.HttpStatus;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
Expand Down Expand Up @@ -68,7 +70,7 @@ public final class ConsulRepository implements ClusterPersistRepository {
@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
consulProps = new ConsulProperties(config.getProps());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists(), consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS));
consulClient = new ShardingSphereConsulClient(rawClient);
distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
Expand Down Expand Up @@ -137,9 +139,10 @@ public void persistEphemeral(final String key, final String value) {
}

@SuppressWarnings("HttpUrlsUsage")
private ConsulRawClient createConsulRawClient(final String serverLists) {
private ConsulRawClient createConsulRawClient(final String serverLists, final long blockQueryTimeToSeconds) {
CloseableHttpClient httpClient = HttpClientBuilder.create().setConnectionTimeToLive(blockQueryTimeToSeconds, TimeUnit.SECONDS).build();
if (Strings.isNullOrEmpty(serverLists)) {
return new ConsulRawClient();
return new ConsulRawClient(httpClient);
}
URL serverUrl;
try {
Expand All @@ -148,9 +151,9 @@ private ConsulRawClient createConsulRawClient(final String serverLists) {
throw new RuntimeException(e);
}
if (-1 == serverUrl.getPort()) {
return new ConsulRawClient(serverUrl.getHost());
return new ConsulRawClient(serverUrl.getHost(), httpClient);
}
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort(), httpClient);
}

private NewSession createNewSession(final String key) {
Expand All @@ -177,13 +180,11 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi
AtomicBoolean running = new AtomicBoolean(true);
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
QueryParams queryParams = QueryParams.Builder.builder().setIndex(currentIndex).build();
Response<List<GetValue>> response = consulClient.getKVValues(key, queryParams);
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
if (null != index && 0 == currentIndex && null != value) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
Expand All @@ -194,7 +195,7 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi
}
continue;
}
if (null != index && index > currentIndex) {
if (null != index && index > currentIndex && null != value) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
Expand All @@ -215,7 +216,7 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi
}
}
watchKeyMap.put(key, newKeys);
} else if (null != index && index < currentIndex) {
} else if (null != index && index < currentIndex && null != value) {
currentIndex = 0;
}
}
Expand Down
6 changes: 6 additions & 0 deletions test/native/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-cluster-mode-repository-consul</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.natived.jdbc.commons.testcontainers;

import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.Capability;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@SuppressWarnings({"FieldMayBeFinal", "FieldCanBeLocal", "resource", "DataFlowIssue", "unused"})
public class ShardingSphereConsulContainer extends GenericContainer<ShardingSphereConsulContainer> {

private static final DockerImageName DEFAULT_OLD_IMAGE_NAME = DockerImageName.parse("consul");

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("hashicorp/consul");

private static final int CONSUL_HTTP_PORT = 8500;

private static final int CONSUL_GRPC_PORT = 8502;

private List<String> initCommands = new ArrayList<>();

private String[] startConsulCmd = new String[]{"agent", "-dev", "-client", "0.0.0.0"};

/**
* Manually specify the Port for ShardingSphere's nativeTest.
* @param dockerImageName docker image name
*/
public ShardingSphereConsulContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_OLD_IMAGE_NAME, DEFAULT_IMAGE_NAME);
setWaitStrategy(Wait.forHttp("/v1/status/leader").forPort(CONSUL_HTTP_PORT).forStatusCode(200));
withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withCapAdd(Capability.IPC_LOCK);
cmd.withHostConfig(new HostConfig().withPortBindings(new PortBinding(Ports.Binding.bindPort(62391), new ExposedPort(CONSUL_HTTP_PORT))));
});
withEnv("CONSUL_ADDR", "http://0.0.0.0:" + CONSUL_HTTP_PORT);
withCommand(startConsulCmd);
}

@Override
protected void containerIsStarted(final InspectContainerResponse containerInfo) {
if (!initCommands.isEmpty()) {
String commands = initCommands.stream().map(command -> "consul " + command).collect(Collectors.joining(" && "));
try {
ExecResult execResult = this.execInContainer("/bin/sh", "-c", commands);
if (0 != execResult.getExitCode()) {
logger().error(
"Failed to execute these init commands {}. Exit code {}. Stdout {}. Stderr {}",
initCommands,
execResult.getExitCode(),
execResult.getStdout(),
execResult.getStderr());
}
} catch (IOException | InterruptedException e) {
logger().error(
"Failed to execute these init commands {}. Exception message: {}",
initCommands,
e.getMessage());
}
}
}

/**
* work with Consul Command.
* @param commands The commands to send to the consul cli
* @return this
*/
public ShardingSphereConsulContainer withConsulCommand(final String... commands) {
initCommands.addAll(Arrays.asList(commands));
return self();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.natived.jdbc.mode.cluster;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulRawClient;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import org.apache.shardingsphere.test.natived.jdbc.commons.FileTestUtils;
import org.apache.shardingsphere.test.natived.jdbc.commons.TestShardingService;
import org.apache.shardingsphere.test.natived.jdbc.commons.testcontainers.ShardingSphereConsulContainer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledInNativeImage;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import javax.sql.DataSource;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;

public class ConsulTest {

private static final int CONSUL_HOST_HTTP_PORT = 62391;

private TestShardingService testShardingService;

@Test
@EnabledInNativeImage
void assertShardingInLocalTransactions() throws SQLException, IOException {
try (
GenericContainer<?> consulContainer = new ShardingSphereConsulContainer(DockerImageName.parse("hashicorp/consul:1.10.12"))) {
consulContainer.start();
beforeAll();
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("test-native/yaml/mode/cluster/consul.yaml"));
testShardingService = new TestShardingService(dataSource);
initEnvironment();
Awaitility.await().atMost(Duration.ofSeconds(30L)).ignoreExceptions().until(() -> {
dataSource.getConnection().close();
return true;
});
testShardingService.processSuccess();
testShardingService.cleanEnvironment();
}
}

private void initEnvironment() throws SQLException {
testShardingService.getOrderRepository().createTableIfNotExistsInMySQL();
testShardingService.getOrderItemRepository().createTableIfNotExistsInMySQL();
testShardingService.getAddressRepository().createTableIfNotExists();
testShardingService.getOrderRepository().truncateTable();
testShardingService.getOrderItemRepository().truncateTable();
testShardingService.getAddressRepository().truncateTable();
}

private void beforeAll() {
Awaitility.await().atMost(Duration.ofSeconds(30L)).ignoreExceptions().until(this::verifyConsulAgentRunning);
}

private boolean verifyConsulAgentRunning() {
boolean flag = false;
HttpResponse httpResponse = new ConsulRawClient("http://localhost", CONSUL_HOST_HTTP_PORT).makeGetRequest("/v1/status/leader");
if (HttpStatus.SC_OK == httpResponse.getStatusCode()) {
flag = true;
}
return flag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
}, {
"condition":{"typeReachable":"org.apache.shardingsphere.test.natived.jdbc.mode.cluster.ZookeeperTest"},
"pattern":"\\Qtest-native/yaml/mode/cluster/zookeeper.yaml\\E"
}, {
"condition":{"typeReachable":"org.apache.shardingsphere.test.natived.jdbc.mode.cluster.ConsulTest"},
"pattern":"\\Qtest-native/yaml/mode/cluster/consul.yaml\\E"
}]},
"bundles":[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

mode:
type: Cluster
repository:
type: Consul
props:
namespace: governance-consul-data-source
server-lists: localhost:62391

dataSources:
ds_0:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: org.h2.Driver
jdbcUrl: jdbc:h2:mem:cluster_consul_ds_0;MODE=MYSQL;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE
username: root
password: 123456
ds_1:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: org.h2.Driver
jdbcUrl: jdbc:h2:mem:cluster_consul_ds_1;MODE=MYSQL;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE
username: root
password: 123456
ds_2:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: org.h2.Driver
jdbcUrl: jdbc:h2:mem:cluster_consul_ds_2;MODE=MYSQL;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE
username: root
password: 123456

rules:
- !SHARDING
tables:
t_order:
actualDataNodes:
keyGenerateStrategy:
column: order_id
keyGeneratorName: snowflake
t_order_item:
actualDataNodes:
keyGenerateStrategy:
column: order_item_id
keyGeneratorName: snowflake
defaultDatabaseStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: inline
shardingAlgorithms:
inline:
type: CLASS_BASED
props:
strategy: STANDARD
algorithmClassName: org.apache.shardingsphere.test.natived.jdbc.commons.algorithm.ClassBasedInlineShardingAlgorithmFixture
keyGenerators:
snowflake:
type: SNOWFLAKE
auditors:
sharding_key_required_auditor:
type: DML_SHARDING_CONDITIONS

- !BROADCAST
tables:
- t_address

props:
sql-show: false
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mode:
repository:
type: ZooKeeper
props:
namespace: governance
namespace: governance-zookeeper-data-source
server-lists: localhost:62372

dataSources:
Expand Down

0 comments on commit fda710a

Please sign in to comment.