Skip to content

Commit

Permalink
Change consul httpclient to netty client.
Browse files Browse the repository at this point in the history
  • Loading branch information
sunquakes committed Oct 14, 2024
1 parent da81ab8 commit 4629b11
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 75 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
<slf4j.version>2.0.13</slf4j.version>
<spring-boot.version>3.3.2</spring-boot.version>
<netty.version>4.1.112.Final</netty.version>
<consul.version>1.4.5</consul.version>
<jupiter.version>5.10.3</jupiter.version>
<mockito.version>5.2.0</mockito.version>
<javax.version>1.3.2</javax.version>
Expand Down Expand Up @@ -81,11 +80,6 @@
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>${consul.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
199 changes: 149 additions & 50 deletions src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package com.sunquakes.jsonrpc4j.discovery;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.annotation.JSONField;
import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.utils.AddressUtils;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -27,9 +30,7 @@ public class Consul implements Driver {

private UriComponents url;

private ConsulClient client;

private String token; // Request token.
private NettyHttpClient client;

private boolean check; // Whether to enable health check.

Expand All @@ -39,8 +40,6 @@ public class Consul implements Driver {

private static final String CHECK_FIELD = "check";

private static final String TOKEN_FIELD = "token";

private static final String CHECK_INTERVAL_FIELD = "checkInterval";

private static final String INSTANCE_ID_FIELD = "instanceId";
Expand All @@ -54,73 +53,92 @@ public Consul newClient(String url) {

@Override
public boolean register(String name, String protocol, String hostname, int port) {
NewService newService = new NewService();
String id;
if (instanceId != null) {
id = String.format("%s-%s:%d", name, instanceId, port);
} else {
id = AddressUtils.getUrl(name, port);
}
newService.setId(id);
newService.setName(name);
newService.setPort(port);
if (hostname != null) {
newService.setAddress(hostname);
}
if (check) {
protocol = protocol.toUpperCase();
NewService.Check serviceCheck = new NewService.Check();
if (protocol.equals(JsonRpcProtocol.TCP.name())) {
serviceCheck.setTcp(AddressUtils.getUrl(hostname, port));
} else {
serviceCheck.setHttp(String.format("%s://%s:%d", protocol, hostname, port));
}
serviceCheck.setInterval(checkInterval);
// Set the init status passing
serviceCheck.setStatus("passing");
// Set the http method
serviceCheck.setMethod("GET");
newService.setCheck(serviceCheck);
checkRegister(protocol, hostname, port);
}

RegisterService service = new RegisterService(id, name, port, hostname);
UriComponentsBuilder builder = UriComponentsBuilder.newInstance();
UriComponents fullUrl = builder.uriComponents(this.url)
.path("/v1/agent/service/register")
.build();
try {
if (token != null) {
client.agentServiceRegister(newService, token);
} else {
client.agentServiceRegister(newService);
FullHttpResponse res = client.put(fullUrl.toUriString(), JSONObject.toJSONString(service));
if (!res.status().equals(HttpResponseStatus.OK)) {
return false;
}
} catch (RuntimeException e) {
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
return false;
}
return true;
}

private void checkRegister(String protocol, String hostname, int port) {
protocol = protocol.toUpperCase();
Check serviceCheck = new Check();
if (protocol.equals(JsonRpcProtocol.TCP.name())) {
serviceCheck.setTcp(AddressUtils.getUrl(hostname, port));
} else {
serviceCheck.setHttp(String.format("%s://%s:%d", protocol, hostname, port));
}
serviceCheck.setInterval(checkInterval);
// Set the init status passing
serviceCheck.setStatus("passing");
// Set the http method
serviceCheck.setMethod("GET");
UriComponentsBuilder builder = UriComponentsBuilder.newInstance();
UriComponents fullUrl = builder.uriComponents(this.url)
.path("/v1/agent/check/register")
.build();
try {
FullHttpResponse res = client.put(fullUrl.toUriString(), JSONObject.toJSONString(serviceCheck));
if (!res.status().equals(HttpResponseStatus.OK)) {
log.error("Health check register failed.");
}
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
}
}

@Override
public String get(String name) {
HealthServicesRequest.Builder builder = HealthServicesRequest.newBuilder()
.setPassing(true)
.setQueryParams(QueryParams.DEFAULT);
if (token != null) {
builder.setToken(token);
UriComponentsBuilder builder = UriComponentsBuilder.newInstance();
UriComponents fullUrl = builder.uriComponents(this.url)
.path("/v1/agent/health/service/name/" + name)
.build();
try {
FullHttpResponse res = client.get(fullUrl.toUriString());
if (!res.status().equals(HttpResponseStatus.OK)) {
return "";
}
String body = res.content().toString(CharsetUtil.UTF_8);
List<HealthService> healthyServices = JSONObject.parseObject(body, ArrayList.class);
return healthyServices.stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(","));
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
return "";
}
HealthServicesRequest request = builder.build();
Response<List<HealthService>> healthyServices = client.getHealthServices(name, request);
return healthyServices.getValue().stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(","));
}

private ConsulClient getClient() {
private NettyHttpClient getClient() {
if (url.getPort() == -1) {
client = new ConsulClient(String.format("%s://%s", url.getScheme(), url.getHost()));
client = new NettyHttpClient(String.format("%s://%s", url.getScheme(), url.getHost()));
} else {
client = new ConsulClient(String.format("%s://%s", url.getScheme(), url.getHost()), url.getPort());
client = new NettyHttpClient(String.format("%s://%s:%d", url.getScheme(), url.getHost(), url.getPort()));
}

// Deserialize url parameters.
MultiValueMap<String, String> queryParams = url.getQueryParams();
if (queryParams.containsKey(TOKEN_FIELD) && StringUtils.hasLength(queryParams.getFirst(TOKEN_FIELD))) {
token = queryParams.getFirst(TOKEN_FIELD);
}
if (queryParams.containsKey(CHECK_FIELD)) {
check = Boolean.parseBoolean(queryParams.getFirst(CHECK_FIELD));
}
Expand All @@ -132,4 +150,85 @@ private ConsulClient getClient() {
}
return client;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class HealthService {

@JSONField(name = "AggregatedStatus")
private String aggregatedStatus;

@JSONField(name = "Service")
private NewService service;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class NewService {

@JSONField(name = "ID")
private String id;

@JSONField(name = "Service")
private String service;

@JSONField(name = "Port")
private Integer port;

@JSONField(name = "Address")
private String address;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RegisterService {

@JSONField(name = "ID")
private String id;

@JSONField(name = "Name")
private String name;

@JSONField(name = "Port")
private Integer port;

@JSONField(name = "Address")
private String address;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Check {

@JSONField(name = "ID")
private String id;

@JSONField(name = "Name")
private String name;

@JSONField(name = "Status")
private String status;

@JSONField(name = "ServiceID")
private String serviceID;

@JSONField(name = "HTTP")
private String http;

@JSONField(name = "Method")
private String method;

@JSONField(name = "TCP")
private String tcp;

@JSONField(name = "Interval")
private String interval;

@JSONField(name = "Timeout")
private String timeout;
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public boolean register(String name, String protocol, String hostname, int port)
.build();

try {
FullHttpResponse res = client.post(fullUrl.toString());
FullHttpResponse res = client.post(fullUrl.toString(), null);
if (res.status().equals(HttpResponseStatus.OK)) {
throw new JsonRpcException("Failed to register to nacos.");
}
Expand Down Expand Up @@ -117,7 +117,7 @@ public String beat(String serviceName, String ip, int port) {
.queryParam("port", port)
.queryParam(EPHEMERAL_KEY, ephemeral).build();
try {
FullHttpResponse res = client.put(fullUrl.toString());
FullHttpResponse res = client.put(fullUrl.toString(), null);
if (res.status().equals(HttpResponseStatus.OK)) {
throw new JsonRpcException("Failed to send heartbeat to nacos.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -36,18 +37,18 @@ public class NettyHttpClient {
}

public FullHttpResponse get(String path) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.GET);
return request(path, HttpMethod.GET, null);
}

public FullHttpResponse post(String path) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.POST);
public FullHttpResponse post(String path, String body) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.POST, body);
}

public FullHttpResponse put(String path) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.PUT);
public FullHttpResponse put(String path, String body) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.PUT, body);
}

private FullHttpResponse request(String path, HttpMethod method) throws InterruptedException, ExecutionException {
private FullHttpResponse request(String path, HttpMethod method, String body) throws InterruptedException, ExecutionException {
EventLoopGroup group = new NioEventLoopGroup();
Promise<FullHttpResponse> promise = new DefaultPromise<>(group.next());
Bootstrap bootstrap = new Bootstrap();
Expand All @@ -63,6 +64,13 @@ private FullHttpResponse request(String path, HttpMethod method) throws Interrup
Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel();

FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, Unpooled.EMPTY_BUFFER);
if (body != null) {
ByteBuf buffer = request.content().clear();
buffer.writerIndex();
buffer.writeBytes(body.getBytes());
buffer.writerIndex();
buffer.readableBytes();
}

channel.writeAndFlush(request).sync();
return promise.sync().get();
Expand Down
Loading

0 comments on commit 4629b11

Please sign in to comment.