Skip to content

Commit

Permalink
Remove httpcomponents and change nacos http client.
Browse files Browse the repository at this point in the history
  • Loading branch information
sunquakes committed Oct 11, 2024
1 parent c403fb7 commit 5a2ca4d
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 89 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>6.1.11</spring.version>
<httpclient.version>4.5.14</httpclient.version>
<fastjson.version>2.0.52</fastjson.version>
<lombok.version>1.18.34</lombok.version>
<slf4j.version>2.0.13</slf4j.version>
Expand Down Expand Up @@ -56,11 +55,6 @@
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@Sharable
public abstract class JsonRpcClientHandler extends ChannelInboundHandlerAdapter {

protected Map<Channel, Promise> promiseMap = new ConcurrentHashMap<>();
protected Map<Channel, Promise<String>> promiseMap = new ConcurrentHashMap<>();

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
Expand All @@ -39,7 +39,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
protected void handleInternalError(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
ErrorResponseDto errorResponseDto = new ErrorResponseDto(null, RequestUtils.JSONRPC, new ErrorDto(ErrorEnum.INTERNAL_ERROR.getCode(), ErrorEnum.INTERNAL_ERROR.getText(), null));
Promise promise = promiseMap.get(channel);
Promise<String> promise = promiseMap.get(channel);
if (promise != null) {
promise.setSuccess(JSON.toJSONString(errorResponseDto));
promiseMap.remove(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.OptionalSslHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand Down Expand Up @@ -54,7 +54,7 @@ public void initLoadBalancer() {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true);
int defaultPort = protocol.equals(JsonRpcProtocol.HTTPS.name()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT;
int defaultPort = isHttps(protocol) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT;
if (discovery != null) {
loadBalancer = new JsonRpcLoadBalancer(() -> discovery.value().get(name), defaultPort, bootstrap, poolHandler);
} else {
Expand Down Expand Up @@ -97,15 +97,19 @@ public Object handle(String method, Object[] args) throws JsonRpcException {
class Handler implements JsonRpcChannelHandler {
@Override
public void channelUpdated(Channel ch) throws SSLException {
ch.pipeline()
.addLast("codec", new HttpClientCodec())
.addLast("http-aggregator", new HttpObjectAggregator(1024 * 1024))
.addLast(jsonRpcHttpClientHandler);
protocol = protocol.toUpperCase();
if (protocol.equals(JsonRpcProtocol.HTTPS.name())) {
SocketChannel sc = (SocketChannel) ch;
if (isHttps(protocol)) {
SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
ch.pipeline().addLast(new OptionalSslHandler(sslContext));
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
sc.pipeline()
.addLast("codec", new HttpClientCodec())
.addLast("http-aggregator", new HttpObjectAggregator(1024 * 1024));
sc.pipeline().addLast(jsonRpcHttpClientHandler);
}
}

private boolean isHttps(String scheme) {
return scheme.toUpperCase().equals(JsonRpcProtocol.HTTPS.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import org.springframework.util.StringUtils;

import java.net.InetSocketAddress;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

/**
Expand All @@ -34,6 +34,8 @@ public class JsonRpcLoadBalancer {

private static final int MAX_RETRY_TIMES = 3;

private static final SecureRandom secureRandom = new SecureRandom();

public JsonRpcLoadBalancer(Supplier<String> url, int defaultPort, Bootstrap bootstrap, JsonRpcChannelPoolHandler poolHandler) {
this.bootstrap = bootstrap;
this.poolHandler = poolHandler;
Expand Down Expand Up @@ -68,7 +70,8 @@ public FixedChannelPool getPool() {
initPools();
return getPool();
} else {
int index = ThreadLocalRandom.current().nextInt(pools.size());
secureRandom.nextBytes(new byte[8]);
int index = secureRandom.nextInt(pools.size());
pool = pools.get(index);
}
return pool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (bytes.length > 0) {
String body = new String(bytes);
synchronized (channel) {
Promise promise = promiseMap.get(channel);
Promise<String> promise = promiseMap.get(channel);
if (promise != null) {
promise.setSuccess(body);
}
Expand Down
42 changes: 19 additions & 23 deletions src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.exception.JsonRpcException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;

Expand All @@ -32,8 +29,6 @@
@Slf4j
public class Nacos implements Driver {

private static final int STATUS_CODE_SUCCESS = 200;

private static final int HEARTBEAT_INTERVAL = 5000;

private static final String EPHEMERAL_KEY = "ephemeral";
Expand All @@ -44,14 +39,15 @@ public class Nacos implements Driver {

private UriComponents url;

private CloseableHttpClient client = HttpClients.createDefault();
private NettyHttpClient client;

private String ephemeral = "true";

private List<Map.Entry<String, Service>> heartbeatList = new ArrayList<>();

@Override
public Nacos newClient(String url) {
client = new NettyHttpClient(url);
this.url = UriComponentsBuilder.fromUriString(url).build();
if (this.url.getQueryParams().containsKey(EPHEMERAL_KEY)) {
ephemeral = this.url.getQueryParams().getFirst(EPHEMERAL_KEY);
Expand All @@ -71,16 +67,16 @@ public boolean register(String name, String protocol, String hostname, int port)
.queryParam(EPHEMERAL_KEY, ephemeral)
.build();

HttpPost post = new HttpPost(fullUrl.toString());
try {
HttpResponse res = client.execute(post);
if (res.getStatusLine().getStatusCode() != STATUS_CODE_SUCCESS) {
FullHttpResponse res = client.post(fullUrl.toString());
if (res.status().equals(HttpResponseStatus.OK)) {
throw new JsonRpcException("Failed to register to nacos.");
}
if (IS_EPHEMERAL.equals(ephemeral)) {
registerHeartbeat(name, hostname, port);
}
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
return false;
}
Expand All @@ -95,17 +91,17 @@ public String get(String name) {
.path("/nacos/v1/ns/instance/list")
.queryParam(SERVICE_NAME_KEY, name)
.build();

HttpGet get = new HttpGet(fullUrl.toString());
try {
HttpResponse res = client.execute(get);
if (res.getStatusLine().getStatusCode() != STATUS_CODE_SUCCESS) {
FullHttpResponse res = client.get(fullUrl.toString());
if (res.status().equals(HttpResponseStatus.OK)) {
throw new JsonRpcException("Failed to get the service list from nacos.");
}
String json = EntityUtils.toString(res.getEntity());
ByteBuf buf = res.content();
String json = buf.toString(CharsetUtil.UTF_8);
GetResp resp = JSONObject.parseObject(json, GetResp.class);
return resp.getHosts().stream().filter(item -> item.healthy).map(item -> String.format("%s:%d", item.getIp(), item.getPort())).collect(Collectors.joining(","));
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
}
return name;
Expand All @@ -120,15 +116,15 @@ public String beat(String serviceName, String ip, int port) {
.queryParam("ip", ip)
.queryParam("port", port)
.queryParam(EPHEMERAL_KEY, ephemeral).build();

HttpPut put = new HttpPut(fullUrl.toString());
try {
HttpResponse res = client.execute(put);
if (res.getStatusLine().getStatusCode() != STATUS_CODE_SUCCESS) {
FullHttpResponse res = client.put(fullUrl.toString());
if (res.status().equals(HttpResponseStatus.OK)) {
throw new JsonRpcException("Failed to send heartbeat to nacos.");
}
return EntityUtils.toString(res.getEntity());
ByteBuf buf = res.content();
return buf.toString(CharsetUtil.UTF_8);
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.sunquakes.jsonrpc4j.discovery;

import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.exception.JsonRpcException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
Expand All @@ -13,7 +11,6 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

Expand All @@ -26,44 +23,49 @@
* @version 3.0.0
* @since 3.0.0
**/
public class HttpClient {
public class NettyHttpClient {

private static final int DEFAULT_HTTP_PORT = 80;

private static final int DEFAULT_HTTPS_PORT = 443;

private URI uri;

HttpClient(String host) {
NettyHttpClient(String host) {
uri = URI.create(host);
}

public Object get(String path) {
public FullHttpResponse get(String path) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.GET);
}

public FullHttpResponse post(String path) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.POST);
}

public FullHttpResponse put(String path) throws ExecutionException, InterruptedException {
return request(path, HttpMethod.PUT);
}

private FullHttpResponse request(String path, HttpMethod method) throws InterruptedException, ExecutionException {
EventLoopGroup group = new NioEventLoopGroup();
Promise<String> promise = new DefaultPromise<>(group.next());
Promise<FullHttpResponse> promise = new DefaultPromise<>(group.next());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HttpClientInitializer(promise));
try {
int port = uri.getPort();
int defaultPort = isHttps(uri.getScheme()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT;
if (port == -1) {
port = defaultPort;
}
int port = uri.getPort();
int defaultPort = isHttps(uri.getScheme()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT;
if (port == -1) {
port = defaultPort;
}

Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel();
Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel();

FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path, Unpooled.EMPTY_BUFFER);
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, Unpooled.EMPTY_BUFFER);

channel.writeAndFlush(request).sync();
return promise.sync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JsonRpcException(e.getMessage());
} catch (ExecutionException e) {
throw new JsonRpcException(e.getMessage());
}
channel.writeAndFlush(request).sync();
return promise.sync().get();
}

private boolean isHttps(String scheme) {
Expand All @@ -72,9 +74,9 @@ private boolean isHttps(String scheme) {

class HttpClientInitializer extends ChannelInitializer<Channel> {

Promise<String> promise;
Promise<FullHttpResponse> promise;

HttpClientInitializer(Promise<String> promise) {
HttpClientInitializer(Promise<FullHttpResponse> promise) {
this.promise = promise;
}

Expand All @@ -96,22 +98,21 @@ protected void initChannel(Channel ch) throws Exception {

class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

Promise<String> promise;
Promise<FullHttpResponse> promise;

HttpResponseHandler(Promise<String> promise) {
HttpResponseHandler(Promise<FullHttpResponse> promise) {
this.promise = promise;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
FullHttpResponse httpResponse = msg;
ByteBuf buf = httpResponse.content();
String body = buf.toString(CharsetUtil.UTF_8);
promise.setSuccess(body);
promise.setSuccess(httpResponse);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
promise.setFailure(cause);
ctx.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,12 @@ protected void initChannel(SocketChannel sh) throws Exception {
log.info("JsonRpc tcp server startup successfully.");
} else {
log.info("JsonRpc tcp server startup failed.");
future.cause().printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
});
Expand Down

This file was deleted.

Loading

0 comments on commit 5a2ca4d

Please sign in to comment.