Skip to content

Commit

Permalink
Add GraalVM Reachability Metadata and corresponding nativeTest for Co…
Browse files Browse the repository at this point in the history
…nsul integration
  • Loading branch information
linghengqian committed Jan 15, 2024
1 parent 453aab3 commit df6a8e2
Show file tree
Hide file tree
Showing 14 changed files with 476 additions and 473 deletions.
33 changes: 9 additions & 24 deletions mode/type/cluster/repository/provider/consul/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,9 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>${consul.api.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>${consul-client.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand All @@ -54,25 +43,21 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import com.google.common.base.Strings;
import com.orbitz.consul.Consul;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.model.session.Session;
import com.orbitz.consul.model.session.SessionCreatedResponse;
import com.orbitz.consul.option.ImmutablePutOptions;
import lombok.Getter;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
Expand All @@ -39,212 +36,177 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* Registry repository of Consul.
* Before JDK 18 implemented in JEP 400, the return value of `{@link java.nio.charset.Charset}.defaultCharset()` on the
* Windows platform was usually not `{@link java.nio.charset.StandardCharsets}.UTF_8`.
* This explains the series of settings this class has on CharSet.
*/
public final class ConsulRepository implements ClusterPersistRepository {

private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);

private ShardingSphereConsulClient consulClient;
private Consul consulClient;

private ConsulProperties consulProps;

@Getter
private DistributedLockHolder distributedLockHolder;

private Map<String, Collection<String>> watchKeyMap;
private final Map<String, KVCache> caches = new ConcurrentHashMap<>();

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
consulProps = new ConsulProperties(config.getProps());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
consulClient = createConsulClient(config.getServerLists(), consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS));
distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
}

@Override
public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
if (null == response) {
return null;
/**
* Set ReadTimeoutMillis to avoid `java.lang.IllegalArgumentException: Cache watchInterval=10sec >= networkClientReadTimeout=10000ms. It can cause issues`.
*
* @param serverLists serverUrl.
* @param blockQueryTimeToSeconds blockQueryTimeToSeconds for Mode Config.
* @return Consul client.
* @throws RuntimeException MalformedURLException.
*/
@SuppressWarnings("HttpUrlsUsage")
private Consul createConsulClient(final String serverLists, final long blockQueryTimeToSeconds) {
Consul.Builder builder = Consul.builder().withReadTimeoutMillis(Duration.ofSeconds(blockQueryTimeToSeconds).toMillis());
if (Strings.isNullOrEmpty(serverLists)) {
return builder.build();
}
GetValue value = response.getValue();
return null == value ? null : value.getValue();
URL serverUrl;
try {
serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
return builder.withUrl(serverUrl).build();
}

@Override
public List<String> getChildrenKeys(final String key) {
Response<List<String>> response = consulClient.getKVKeysOnly(key);
if (null == response) {
try {
return consulClient.keyValueClient()
.getValues(key)
.stream()
.map(Value::getKey)
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
return Collections.emptyList();
}
List<String> value = response.getValue();
return null == value ? Collections.emptyList() : value;
}

@Override
public boolean isExisted(final String key) {
return null != consulClient.getKVValue(key).getValue();
public void persist(final String key, final String value) {
consulClient.keyValueClient().putValue(key, value, StandardCharsets.UTF_8);
}

@Override
public void persist(final String key, final String value) {
consulClient.setKVValue(key, value);
public void update(final String key, final String value) {
consulClient.keyValueClient().putValue(key, value, StandardCharsets.UTF_8);
}

@Override
public void update(final String key, final String value) {
consulClient.setKVValue(key, value);
public String getDirectly(final String key) {
try {
return consulClient.keyValueClient().getValueAsString(key, StandardCharsets.UTF_8).orElse(null);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
return null;
}
}

@Override
public void delete(final String key) {
consulClient.deleteKVValue(key);
public boolean isExisted(final String key) {
try {
return consulClient.keyValueClient().getValueAsString(key, StandardCharsets.UTF_8).isPresent();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
return false;
}
}

/**
* {@link ConsulRawClient} is a wrapper of blocking HTTP client and does not have a close method.
* Using such a Client does not necessarily conform to the implementation of the relevant SPI. ShardingSphere needs to
* consider solutions similar to <a href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
* Persist Ephemeral by flushing session by update TTL.
*
* @see ConsulRawClient
* @param key key of data
* @param value value of data
*/
@Override
public void close() {
}

@Override
public void persistEphemeral(final String key, final String value) {
Response<String> response = consulClient.sessionCreate(createNewSession(key), QueryParams.DEFAULT);
String sessionId = response.getValue();
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
generatorFlushSessionTtlTask(consulClient, sessionId);
verifyConsulAgentRunning();
}

@SuppressWarnings("HttpUrlsUsage")
private ConsulRawClient createConsulRawClient(final String serverLists) {
if (Strings.isNullOrEmpty(serverLists)) {
return new ConsulRawClient();
if (isExisted(key)) {
consulClient.keyValueClient().deleteKeys(key);
}
URL serverUrl;
try {
serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
if (-1 == serverUrl.getPort()) {
return new ConsulRawClient(serverUrl.getHost());
}
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
}

private NewSession createNewSession(final String key) {
NewSession result = new NewSession();
result.setName(key);
result.setBehavior(Session.Behavior.DELETE);
result.setTtl(consulProps.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
return result;
persistExclusiveEphemeral(key, value);
}

@Override
public void persistExclusiveEphemeral(final String key, final String value) {
persistEphemeral(key, value);
Session deleteSession = ImmutableSession.builder()
.name(key)
.behavior("delete")
.ttl((String) consulProps.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS))
.build();
SessionCreatedResponse response = consulClient.sessionClient().createSession(deleteSession);
String sessionId = response.getId();
consulClient.keyValueClient().putValue(key, value, 0L, ImmutablePutOptions.builder().acquire(sessionId).build());
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.sessionClient().renewSession(sessionId), 1L, 10L, TimeUnit.SECONDS);
}

@Override
public void watch(final String key, final DataChangedEventListener listener) {
Thread watchThread = new Thread(() -> watchChildKeyChangeEvent(key, listener));
watchThread.setDaemon(true);
watchThread.start();
}

private void watchChildKeyChangeEvent(final String key, final DataChangedEventListener listener) {
AtomicBoolean running = new AtomicBoolean(true);
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
fireDataChangeEvent(each, listener, DataChangedEvent.Type.ADDED);
} else if (watchKeys.contains(each.getKey()) && each.getModifyIndex() >= currentIndex) {
fireDataChangeEvent(each, listener, DataChangedEvent.Type.UPDATED);
}
}
for (String each : watchKeys) {
if (!newKeys.contains(each)) {
GetValue getValue = new GetValue();
getValue.setKey(each);
fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.DELETED);
}
}
watchKeyMap.put(key, newKeys);
} else if (null != index && index < currentIndex) {
currentIndex = 0;
}
public void delete(final String key) {
if (isExisted(key)) {
consulClient.keyValueClient().deleteKeys(key);
}
}

private void fireDataChangeEvent(final GetValue getValue, final DataChangedEventListener listener, final DataChangedEvent.Type type) {
listener.onChange(new DataChangedEvent(getValue.getKey(), getValue.getValue(), type));
}

/**
* Flush session by update TTL.
* Consul doesn't tell clients what key changed when performing a watch. we best bet is to do a comparison with the previous set of values.
* This is a bit troublesome in ShardingSphere context implementation.
*
* @param consulClient consul client
* @param sessionId session id
* @param key key of data
* @param listener data changed event listener
*/
public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
@Override
public void watch(final String key, final DataChangedEventListener listener) {
KVCache cache = caches.get(key);
if (null == cache) {
cache = KVCache.newCache(consulClient.keyValueClient(), key);
caches.put(key, cache);
}
cache.addListener(newValues -> {
Optional<Value> newValue = newValues.values().stream().filter(value -> value.getKey().equals(key)).findAny();
newValue.ifPresent(value -> {
Optional<String> decodedValue = newValue.get().getValueAsString();
decodedValue.ifPresent(v -> listener.onChange(new DataChangedEvent(key, v, DataChangedEvent.Type.UPDATED)));
});
});
cache.start();
}

/**
* See <a href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status">Status HTTP API</a> .
*
* @throws RuntimeException Unable to connect to Consul Agent.
*/
private void verifyConsulAgentRunning() {
HttpResponse httpResponse = consulClient.getRawClient().makeGetRequest("/v1/status/leader");
if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
throw new RuntimeException("Unable to connect to Consul Agent and StatusCode is " + httpResponse.getStatusCode() + ".");
}
@Override
public void close() {
caches.values().forEach(KVCache::close);
consulClient.destroy();
}

@Override
Expand Down
Loading

0 comments on commit df6a8e2

Please sign in to comment.