From c403fb7936913da679fc4c4a0a89a8e38b48a05f Mon Sep 17 00:00:00 2001 From: Shing Rui Date: Thu, 10 Oct 2024 22:58:20 +0800 Subject: [PATCH 1/6] Add discovery http client. --- .../jsonrpc4j/discovery/HttpClient.java | 118 ++++++++++++++++++ .../jsonrpc4j/discovery/HttpClientTest.java | 14 +++ 2 files changed, 132 insertions(+) create mode 100644 src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java create mode 100644 src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java new file mode 100644 index 0000000..9dfd3cf --- /dev/null +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java @@ -0,0 +1,118 @@ +package com.sunquakes.jsonrpc4j.discovery; + +import com.sunquakes.jsonrpc4j.JsonRpcProtocol; +import com.sunquakes.jsonrpc4j.exception.JsonRpcException; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.Promise; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.concurrent.ExecutionException; + +/** + * @author Shing Rui sunquakes@outlook.com + * @version 3.0.0 + * @since 3.0.0 + **/ +public class HttpClient { + + private static final int DEFAULT_HTTP_PORT = 80; + + private static final int DEFAULT_HTTPS_PORT = 443; + + private URI uri; + + HttpClient(String host) { + uri = URI.create(host); + } + + public Object get(String path) { + EventLoopGroup group = new NioEventLoopGroup(); + Promise promise = new DefaultPromise<>(group.next()); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .handler(new HttpClientInitializer(promise)); + try { + int port = uri.getPort(); + int defaultPort = isHttps(uri.getScheme()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT; + if (port == -1) { + port = defaultPort; + } + + Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel(); + + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path, Unpooled.EMPTY_BUFFER); + + channel.writeAndFlush(request).sync(); + return promise.sync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JsonRpcException(e.getMessage()); + } catch (ExecutionException e) { + throw new JsonRpcException(e.getMessage()); + } + } + + private boolean isHttps(String scheme) { + return scheme.toUpperCase().equals(JsonRpcProtocol.HTTPS.name()); + } + + class HttpClientInitializer extends ChannelInitializer { + + Promise promise; + + HttpClientInitializer(Promise promise) { + this.promise = promise; + } + + @Override + protected void initChannel(Channel ch) throws Exception { + SocketChannel sc = (SocketChannel) ch; + sc.config().setKeepAlive(true); + sc.config().setTcpNoDelay(true); + if (isHttps(uri.getScheme())) { + SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); + } + sc.pipeline() + .addLast("codec", new HttpClientCodec()) + .addLast("http-aggregator", new HttpObjectAggregator(1024 * 1024)); + sc.pipeline().addLast(new HttpResponseHandler(promise)); + } + } + + class HttpResponseHandler extends SimpleChannelInboundHandler { + + Promise promise; + + HttpResponseHandler(Promise promise) { + this.promise = promise; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { + FullHttpResponse httpResponse = msg; + ByteBuf buf = httpResponse.content(); + String body = buf.toString(CharsetUtil.UTF_8); + promise.setSuccess(body); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java b/src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java new file mode 100644 index 0000000..9046543 --- /dev/null +++ b/src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java @@ -0,0 +1,14 @@ +package com.sunquakes.jsonrpc4j.discovery; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class HttpClientTest { + + @Test + void testGet() { + HttpClient httpClient = new HttpClient("https://www.github.com"); + assertNotNull(httpClient.get("/")); + } +} From 5a2ca4d6218e39a4bc54cf3dcc52d60498d14e5c Mon Sep 17 00:00:00 2001 From: Shing Rui Date: Fri, 11 Oct 2024 22:12:15 +0800 Subject: [PATCH 2/6] Remove httpcomponents and change nacos http client. --- pom.xml | 6 -- .../client/JsonRpcClientHandler.java | 4 +- .../jsonrpc4j/client/JsonRpcHttpClient.java | 22 ++++--- .../jsonrpc4j/client/JsonRpcLoadBalancer.java | 7 ++- .../client/JsonRpcTcpClientHandler.java | 2 +- .../sunquakes/jsonrpc4j/discovery/Nacos.java | 42 ++++++------- .../{HttpClient.java => NettyHttpClient.java} | 61 ++++++++++--------- .../jsonrpc4j/server/JsonRpcTcpServer.java | 2 - .../jsonrpc4j/discovery/HttpClientTest.java | 14 ----- .../discovery/NettyHttpClientTest.java | 23 +++++++ 10 files changed, 94 insertions(+), 89 deletions(-) rename src/main/java/com/sunquakes/jsonrpc4j/discovery/{HttpClient.java => NettyHttpClient.java} (64%) delete mode 100644 src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java create mode 100644 src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java diff --git a/pom.xml b/pom.xml index e425a7a..869a583 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,6 @@ 17 UTF-8 6.1.11 - 4.5.14 2.0.52 1.18.34 2.0.13 @@ -56,11 +55,6 @@ spring-web ${spring.version} - - org.apache.httpcomponents - httpclient - ${httpclient.version} - com.alibaba.fastjson2 fastjson2 diff --git a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcClientHandler.java b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcClientHandler.java index ff2ab16..c33f54e 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcClientHandler.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcClientHandler.java @@ -24,7 +24,7 @@ @Sharable public abstract class JsonRpcClientHandler extends ChannelInboundHandlerAdapter { - protected Map promiseMap = new ConcurrentHashMap<>(); + protected Map> promiseMap = new ConcurrentHashMap<>(); @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { @@ -39,7 +39,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E protected void handleInternalError(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); ErrorResponseDto errorResponseDto = new ErrorResponseDto(null, RequestUtils.JSONRPC, new ErrorDto(ErrorEnum.INTERNAL_ERROR.getCode(), ErrorEnum.INTERNAL_ERROR.getText(), null)); - Promise promise = promiseMap.get(channel); + Promise promise = promiseMap.get(channel); if (promise != null) { promise.setSuccess(JSON.toJSONString(errorResponseDto)); promiseMap.remove(channel); diff --git a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcHttpClient.java b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcHttpClient.java index 1f30a9b..1ba4cd1 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcHttpClient.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcHttpClient.java @@ -16,10 +16,10 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.FixedChannelPool; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.ssl.OptionalSslHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -54,7 +54,7 @@ public void initLoadBalancer() { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true); - int defaultPort = protocol.equals(JsonRpcProtocol.HTTPS.name()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT; + int defaultPort = isHttps(protocol) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT; if (discovery != null) { loadBalancer = new JsonRpcLoadBalancer(() -> discovery.value().get(name), defaultPort, bootstrap, poolHandler); } else { @@ -97,15 +97,19 @@ public Object handle(String method, Object[] args) throws JsonRpcException { class Handler implements JsonRpcChannelHandler { @Override public void channelUpdated(Channel ch) throws SSLException { - ch.pipeline() - .addLast("codec", new HttpClientCodec()) - .addLast("http-aggregator", new HttpObjectAggregator(1024 * 1024)) - .addLast(jsonRpcHttpClientHandler); - protocol = protocol.toUpperCase(); - if (protocol.equals(JsonRpcProtocol.HTTPS.name())) { + SocketChannel sc = (SocketChannel) ch; + if (isHttps(protocol)) { SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); - ch.pipeline().addLast(new OptionalSslHandler(sslContext)); + ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); } + sc.pipeline() + .addLast("codec", new HttpClientCodec()) + .addLast("http-aggregator", new HttpObjectAggregator(1024 * 1024)); + sc.pipeline().addLast(jsonRpcHttpClientHandler); } } + + private boolean isHttps(String scheme) { + return scheme.toUpperCase().equals(JsonRpcProtocol.HTTPS.name()); + } } \ No newline at end of file diff --git a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcLoadBalancer.java b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcLoadBalancer.java index ab31a01..132e694 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcLoadBalancer.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcLoadBalancer.java @@ -7,10 +7,10 @@ import org.springframework.util.StringUtils; import java.net.InetSocketAddress; +import java.security.SecureRandom; import java.util.Arrays; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; /** @@ -34,6 +34,8 @@ public class JsonRpcLoadBalancer { private static final int MAX_RETRY_TIMES = 3; + private static final SecureRandom secureRandom = new SecureRandom(); + public JsonRpcLoadBalancer(Supplier url, int defaultPort, Bootstrap bootstrap, JsonRpcChannelPoolHandler poolHandler) { this.bootstrap = bootstrap; this.poolHandler = poolHandler; @@ -68,7 +70,8 @@ public FixedChannelPool getPool() { initPools(); return getPool(); } else { - int index = ThreadLocalRandom.current().nextInt(pools.size()); + secureRandom.nextBytes(new byte[8]); + int index = secureRandom.nextInt(pools.size()); pool = pools.get(index); } return pool; diff --git a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcTcpClientHandler.java b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcTcpClientHandler.java index 245ea65..5e125c2 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcTcpClientHandler.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcTcpClientHandler.java @@ -84,7 +84,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (bytes.length > 0) { String body = new String(bytes); synchronized (channel) { - Promise promise = promiseMap.get(channel); + Promise promise = promiseMap.get(channel); if (promise != null) { promise.setSuccess(body); } diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java index 69175b8..b853fa6 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java @@ -2,17 +2,14 @@ import com.alibaba.fastjson2.JSONObject; import com.sunquakes.jsonrpc4j.exception.JsonRpcException; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.CharsetUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; @@ -32,8 +29,6 @@ @Slf4j public class Nacos implements Driver { - private static final int STATUS_CODE_SUCCESS = 200; - private static final int HEARTBEAT_INTERVAL = 5000; private static final String EPHEMERAL_KEY = "ephemeral"; @@ -44,7 +39,7 @@ public class Nacos implements Driver { private UriComponents url; - private CloseableHttpClient client = HttpClients.createDefault(); + private NettyHttpClient client; private String ephemeral = "true"; @@ -52,6 +47,7 @@ public class Nacos implements Driver { @Override public Nacos newClient(String url) { + client = new NettyHttpClient(url); this.url = UriComponentsBuilder.fromUriString(url).build(); if (this.url.getQueryParams().containsKey(EPHEMERAL_KEY)) { ephemeral = this.url.getQueryParams().getFirst(EPHEMERAL_KEY); @@ -71,16 +67,16 @@ public boolean register(String name, String protocol, String hostname, int port) .queryParam(EPHEMERAL_KEY, ephemeral) .build(); - HttpPost post = new HttpPost(fullUrl.toString()); try { - HttpResponse res = client.execute(post); - if (res.getStatusLine().getStatusCode() != STATUS_CODE_SUCCESS) { + FullHttpResponse res = client.post(fullUrl.toString()); + if (res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to register to nacos."); } if (IS_EPHEMERAL.equals(ephemeral)) { registerHeartbeat(name, hostname, port); } } catch (Exception e) { + Thread.currentThread().interrupt(); log.error(e.getMessage(), e); return false; } @@ -95,17 +91,17 @@ public String get(String name) { .path("/nacos/v1/ns/instance/list") .queryParam(SERVICE_NAME_KEY, name) .build(); - - HttpGet get = new HttpGet(fullUrl.toString()); try { - HttpResponse res = client.execute(get); - if (res.getStatusLine().getStatusCode() != STATUS_CODE_SUCCESS) { + FullHttpResponse res = client.get(fullUrl.toString()); + if (res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to get the service list from nacos."); } - String json = EntityUtils.toString(res.getEntity()); + ByteBuf buf = res.content(); + String json = buf.toString(CharsetUtil.UTF_8); GetResp resp = JSONObject.parseObject(json, GetResp.class); return resp.getHosts().stream().filter(item -> item.healthy).map(item -> String.format("%s:%d", item.getIp(), item.getPort())).collect(Collectors.joining(",")); } catch (Exception e) { + Thread.currentThread().interrupt(); log.error(e.getMessage(), e); } return name; @@ -120,15 +116,15 @@ public String beat(String serviceName, String ip, int port) { .queryParam("ip", ip) .queryParam("port", port) .queryParam(EPHEMERAL_KEY, ephemeral).build(); - - HttpPut put = new HttpPut(fullUrl.toString()); try { - HttpResponse res = client.execute(put); - if (res.getStatusLine().getStatusCode() != STATUS_CODE_SUCCESS) { + FullHttpResponse res = client.put(fullUrl.toString()); + if (res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to send heartbeat to nacos."); } - return EntityUtils.toString(res.getEntity()); + ByteBuf buf = res.content(); + return buf.toString(CharsetUtil.UTF_8); } catch (Exception e) { + Thread.currentThread().interrupt(); log.error(e.getMessage(), e); return null; } diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java similarity index 64% rename from src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java rename to src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java index 9dfd3cf..e5aca70 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/HttpClient.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java @@ -1,9 +1,7 @@ package com.sunquakes.jsonrpc4j.discovery; import com.sunquakes.jsonrpc4j.JsonRpcProtocol; -import com.sunquakes.jsonrpc4j.exception.JsonRpcException; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -13,7 +11,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.netty.util.CharsetUtil; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Promise; @@ -26,7 +23,7 @@ * @version 3.0.0 * @since 3.0.0 **/ -public class HttpClient { +public class NettyHttpClient { private static final int DEFAULT_HTTP_PORT = 80; @@ -34,36 +31,41 @@ public class HttpClient { private URI uri; - HttpClient(String host) { + NettyHttpClient(String host) { uri = URI.create(host); } - public Object get(String path) { + public FullHttpResponse get(String path) throws ExecutionException, InterruptedException { + return request(path, HttpMethod.GET); + } + + public FullHttpResponse post(String path) throws ExecutionException, InterruptedException { + return request(path, HttpMethod.POST); + } + + public FullHttpResponse put(String path) throws ExecutionException, InterruptedException { + return request(path, HttpMethod.PUT); + } + + private FullHttpResponse request(String path, HttpMethod method) throws InterruptedException, ExecutionException { EventLoopGroup group = new NioEventLoopGroup(); - Promise promise = new DefaultPromise<>(group.next()); + Promise promise = new DefaultPromise<>(group.next()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new HttpClientInitializer(promise)); - try { - int port = uri.getPort(); - int defaultPort = isHttps(uri.getScheme()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT; - if (port == -1) { - port = defaultPort; - } + int port = uri.getPort(); + int defaultPort = isHttps(uri.getScheme()) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT; + if (port == -1) { + port = defaultPort; + } - Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel(); + Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel(); - FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path, Unpooled.EMPTY_BUFFER); + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, Unpooled.EMPTY_BUFFER); - channel.writeAndFlush(request).sync(); - return promise.sync().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JsonRpcException(e.getMessage()); - } catch (ExecutionException e) { - throw new JsonRpcException(e.getMessage()); - } + channel.writeAndFlush(request).sync(); + return promise.sync().get(); } private boolean isHttps(String scheme) { @@ -72,9 +74,9 @@ private boolean isHttps(String scheme) { class HttpClientInitializer extends ChannelInitializer { - Promise promise; + Promise promise; - HttpClientInitializer(Promise promise) { + HttpClientInitializer(Promise promise) { this.promise = promise; } @@ -96,22 +98,21 @@ protected void initChannel(Channel ch) throws Exception { class HttpResponseHandler extends SimpleChannelInboundHandler { - Promise promise; + Promise promise; - HttpResponseHandler(Promise promise) { + HttpResponseHandler(Promise promise) { this.promise = promise; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { FullHttpResponse httpResponse = msg; - ByteBuf buf = httpResponse.content(); - String body = buf.toString(CharsetUtil.UTF_8); - promise.setSuccess(body); + promise.setSuccess(httpResponse); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + promise.setFailure(cause); ctx.close(); } } diff --git a/src/main/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServer.java b/src/main/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServer.java index abd872a..69062a4 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServer.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServer.java @@ -86,14 +86,12 @@ protected void initChannel(SocketChannel sh) throws Exception { log.info("JsonRpc tcp server startup successfully."); } else { log.info("JsonRpc tcp server startup failed."); - future.cause().printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } future.channel().closeFuture().sync(); } catch (InterruptedException e) { - e.printStackTrace(); Thread.currentThread().interrupt(); } }); diff --git a/src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java b/src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java deleted file mode 100644 index 9046543..0000000 --- a/src/test/java/com/sunquakes/jsonrpc4j/discovery/HttpClientTest.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.sunquakes.jsonrpc4j.discovery; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class HttpClientTest { - - @Test - void testGet() { - HttpClient httpClient = new HttpClient("https://www.github.com"); - assertNotNull(httpClient.get("/")); - } -} diff --git a/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java b/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java new file mode 100644 index 0000000..e2072dc --- /dev/null +++ b/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java @@ -0,0 +1,23 @@ +package com.sunquakes.jsonrpc4j.discovery; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class NettyHttpClientTest { + + @Test + void testGet() { + NettyHttpClient httpClient = new NettyHttpClient("https://www.github.com"); + try { + System.out.println(httpClient.get("/")); + assertNotNull(httpClient.get("/")); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} From da81ab81b65c1d7209b56a98ab5188236b38113d Mon Sep 17 00:00:00 2001 From: Shing Rui Date: Sat, 12 Oct 2024 21:52:51 +0800 Subject: [PATCH 3/6] Test tcp server. --- .../server/JsonRpcTcpServerTest.java | 78 ++++--------------- 1 file changed, 16 insertions(+), 62 deletions(-) diff --git a/src/test/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServerTest.java b/src/test/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServerTest.java index 49a0918..3105cac 100644 --- a/src/test/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServerTest.java +++ b/src/test/java/com/sunquakes/jsonrpc4j/server/JsonRpcTcpServerTest.java @@ -2,6 +2,7 @@ import com.alibaba.fastjson2.JSONObject; import com.sunquakes.jsonrpc4j.dto.ResponseDto; +import com.sunquakes.jsonrpc4j.utils.ByteArrayUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Value; @@ -47,76 +48,29 @@ void testHandle() throws IOException { bw = new BufferedWriter(new OutputStreamWriter(os)); bw.write((request + packageEof)); bw.flush(); - StringBuffer sb = new StringBuffer(); byte[] buffer = new byte[packageMaxLength]; - int bufferLength = buffer.length; int len; InputStream is = s.getInputStream(); - String init = ""; - int packageEofLength = packageEof.length(); + byte[] packageEofBytes = packageEof.getBytes(); + byte[] bytes = new byte[0]; - int i = sb.indexOf(packageEof); - if (i != -1) { - sb.substring(0, i); - if (i + packageEofLength < sb.length()) { - init = sb.substring(i + packageEofLength); - } else { - init = ""; - } - } else { - while ((len = is.read(buffer)) != -1) { - if (bufferLength == len) { - sb.append(new String(buffer)); - } else { - byte[] end = Arrays.copyOfRange(buffer, 0, len); - sb.append(new String(end)); - } - i = sb.indexOf(packageEof); - if (i != -1) { - sb.substring(0, i); - if (i + packageEofLength < sb.length()) { - init = sb.substring(i + packageEofLength); - } else { - init = ""; - } - break; - } - } - } - ResponseDto responseDto = JSONObject.parseObject(sb.toString(), ResponseDto.class); - assertEquals(3, responseDto.getResult()); + while ((len = is.read(buffer)) != -1) { + int mergeLength = bytes.length + len; + byte[] mergedArray = new byte[mergeLength]; + System.arraycopy(bytes, 0, mergedArray, 0, bytes.length); + System.arraycopy(buffer, 0, mergedArray, bytes.length, mergeLength); + bytes = mergedArray; - sb = new StringBuffer(init); - - i = sb.indexOf(packageEof); - if (i != -1) { - sb.substring(0, i); - if (i + packageEofLength < sb.length()) { - init = sb.substring(i + packageEofLength); - } else { - init = ""; - } - } else { - while ((len = is.read(buffer)) != -1) { - if (bufferLength == len) { - sb.append(new String(buffer)); - } else { - byte[] end = Arrays.copyOfRange(buffer, 0, len); - sb.append(new String(end)); - } - i = sb.indexOf(packageEof); - if (i != -1) { - sb.substring(0, i); - if (i + packageEofLength < sb.length()) { - init = sb.substring(i + packageEofLength); - } else { - init = ""; - } - break; - } + int i = ByteArrayUtils.strstr(bytes, packageEofBytes, 0); + if (i != -1) { + bytes = Arrays.copyOfRange(bytes, 0, i); + break; } } + String sb = new String(bytes); + ResponseDto responseDto = JSONObject.parseObject(sb, ResponseDto.class); + assertEquals(3, responseDto.getResult()); responseDto = JSONObject.parseObject(sb.toString(), ResponseDto.class); assertEquals(3, responseDto.getResult()); From 4629b11651fb8b59f2e50061116dd8a4c087a4c8 Mon Sep 17 00:00:00 2001 From: Shing Rui Date: Mon, 14 Oct 2024 21:52:11 +0800 Subject: [PATCH 4/6] Change consul httpclient to netty client. --- pom.xml | 6 - .../sunquakes/jsonrpc4j/discovery/Consul.java | 199 +++++++++++++----- .../sunquakes/jsonrpc4j/discovery/Nacos.java | 4 +- .../jsonrpc4j/discovery/NettyHttpClient.java | 20 +- .../discovery/NettyHttpClientTest.java | 44 +++- .../jsonrpc4j/spring/boot/ConsulTest.java | 83 ++++++++ .../resources/application-consul.properties | 12 ++ 7 files changed, 293 insertions(+), 75 deletions(-) create mode 100644 src/test/java/com/sunquakes/jsonrpc4j/spring/boot/ConsulTest.java create mode 100644 src/test/resources/application-consul.properties diff --git a/pom.xml b/pom.xml index 869a583..57f8c84 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,6 @@ 2.0.13 3.3.2 4.1.112.Final - 1.4.5 5.10.3 5.2.0 1.3.2 @@ -81,11 +80,6 @@ netty-all ${netty.version} - - com.ecwid.consul - consul-api - ${consul.version} - org.junit.jupiter junit-jupiter-api diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java index decfa93..dec7bda 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java @@ -1,19 +1,22 @@ package com.sunquakes.jsonrpc4j.discovery; -import com.ecwid.consul.v1.ConsulClient; -import com.ecwid.consul.v1.QueryParams; -import com.ecwid.consul.v1.Response; -import com.ecwid.consul.v1.agent.model.NewService; -import com.ecwid.consul.v1.health.HealthServicesRequest; -import com.ecwid.consul.v1.health.model.HealthService; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.annotation.JSONField; import com.sunquakes.jsonrpc4j.JsonRpcProtocol; import com.sunquakes.jsonrpc4j.utils.AddressUtils; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.CharsetUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -27,9 +30,7 @@ public class Consul implements Driver { private UriComponents url; - private ConsulClient client; - - private String token; // Request token. + private NettyHttpClient client; private boolean check; // Whether to enable health check. @@ -39,8 +40,6 @@ public class Consul implements Driver { private static final String CHECK_FIELD = "check"; - private static final String TOKEN_FIELD = "token"; - private static final String CHECK_INTERVAL_FIELD = "checkInterval"; private static final String INSTANCE_ID_FIELD = "instanceId"; @@ -54,73 +53,92 @@ public Consul newClient(String url) { @Override public boolean register(String name, String protocol, String hostname, int port) { - NewService newService = new NewService(); String id; if (instanceId != null) { id = String.format("%s-%s:%d", name, instanceId, port); } else { id = AddressUtils.getUrl(name, port); } - newService.setId(id); - newService.setName(name); - newService.setPort(port); - if (hostname != null) { - newService.setAddress(hostname); - } if (check) { - protocol = protocol.toUpperCase(); - NewService.Check serviceCheck = new NewService.Check(); - if (protocol.equals(JsonRpcProtocol.TCP.name())) { - serviceCheck.setTcp(AddressUtils.getUrl(hostname, port)); - } else { - serviceCheck.setHttp(String.format("%s://%s:%d", protocol, hostname, port)); - } - serviceCheck.setInterval(checkInterval); - // Set the init status passing - serviceCheck.setStatus("passing"); - // Set the http method - serviceCheck.setMethod("GET"); - newService.setCheck(serviceCheck); + checkRegister(protocol, hostname, port); } + RegisterService service = new RegisterService(id, name, port, hostname); + UriComponentsBuilder builder = UriComponentsBuilder.newInstance(); + UriComponents fullUrl = builder.uriComponents(this.url) + .path("/v1/agent/service/register") + .build(); try { - if (token != null) { - client.agentServiceRegister(newService, token); - } else { - client.agentServiceRegister(newService); + FullHttpResponse res = client.put(fullUrl.toUriString(), JSONObject.toJSONString(service)); + if (!res.status().equals(HttpResponseStatus.OK)) { + return false; } - } catch (RuntimeException e) { + } catch (Exception e) { + Thread.currentThread().interrupt(); log.error(e.getMessage(), e); return false; } return true; } + private void checkRegister(String protocol, String hostname, int port) { + protocol = protocol.toUpperCase(); + Check serviceCheck = new Check(); + if (protocol.equals(JsonRpcProtocol.TCP.name())) { + serviceCheck.setTcp(AddressUtils.getUrl(hostname, port)); + } else { + serviceCheck.setHttp(String.format("%s://%s:%d", protocol, hostname, port)); + } + serviceCheck.setInterval(checkInterval); + // Set the init status passing + serviceCheck.setStatus("passing"); + // Set the http method + serviceCheck.setMethod("GET"); + UriComponentsBuilder builder = UriComponentsBuilder.newInstance(); + UriComponents fullUrl = builder.uriComponents(this.url) + .path("/v1/agent/check/register") + .build(); + try { + FullHttpResponse res = client.put(fullUrl.toUriString(), JSONObject.toJSONString(serviceCheck)); + if (!res.status().equals(HttpResponseStatus.OK)) { + log.error("Health check register failed."); + } + } catch (Exception e) { + Thread.currentThread().interrupt(); + log.error(e.getMessage(), e); + } + } + @Override public String get(String name) { - HealthServicesRequest.Builder builder = HealthServicesRequest.newBuilder() - .setPassing(true) - .setQueryParams(QueryParams.DEFAULT); - if (token != null) { - builder.setToken(token); + UriComponentsBuilder builder = UriComponentsBuilder.newInstance(); + UriComponents fullUrl = builder.uriComponents(this.url) + .path("/v1/agent/health/service/name/" + name) + .build(); + try { + FullHttpResponse res = client.get(fullUrl.toUriString()); + if (!res.status().equals(HttpResponseStatus.OK)) { + return ""; + } + String body = res.content().toString(CharsetUtil.UTF_8); + List healthyServices = JSONObject.parseObject(body, ArrayList.class); + return healthyServices.stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(",")); + } catch (Exception e) { + Thread.currentThread().interrupt(); + log.error(e.getMessage(), e); + return ""; } - HealthServicesRequest request = builder.build(); - Response> healthyServices = client.getHealthServices(name, request); - return healthyServices.getValue().stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(",")); } - private ConsulClient getClient() { + private NettyHttpClient getClient() { if (url.getPort() == -1) { - client = new ConsulClient(String.format("%s://%s", url.getScheme(), url.getHost())); + client = new NettyHttpClient(String.format("%s://%s", url.getScheme(), url.getHost())); } else { - client = new ConsulClient(String.format("%s://%s", url.getScheme(), url.getHost()), url.getPort()); + client = new NettyHttpClient(String.format("%s://%s:%d", url.getScheme(), url.getHost(), url.getPort())); } // Deserialize url parameters. MultiValueMap queryParams = url.getQueryParams(); - if (queryParams.containsKey(TOKEN_FIELD) && StringUtils.hasLength(queryParams.getFirst(TOKEN_FIELD))) { - token = queryParams.getFirst(TOKEN_FIELD); - } if (queryParams.containsKey(CHECK_FIELD)) { check = Boolean.parseBoolean(queryParams.getFirst(CHECK_FIELD)); } @@ -132,4 +150,85 @@ private ConsulClient getClient() { } return client; } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public class HealthService { + + @JSONField(name = "AggregatedStatus") + private String aggregatedStatus; + + @JSONField(name = "Service") + private NewService service; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public class NewService { + + @JSONField(name = "ID") + private String id; + + @JSONField(name = "Service") + private String service; + + @JSONField(name = "Port") + private Integer port; + + @JSONField(name = "Address") + private String address; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public class RegisterService { + + @JSONField(name = "ID") + private String id; + + @JSONField(name = "Name") + private String name; + + @JSONField(name = "Port") + private Integer port; + + @JSONField(name = "Address") + private String address; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public class Check { + + @JSONField(name = "ID") + private String id; + + @JSONField(name = "Name") + private String name; + + @JSONField(name = "Status") + private String status; + + @JSONField(name = "ServiceID") + private String serviceID; + + @JSONField(name = "HTTP") + private String http; + + @JSONField(name = "Method") + private String method; + + @JSONField(name = "TCP") + private String tcp; + + @JSONField(name = "Interval") + private String interval; + + @JSONField(name = "Timeout") + private String timeout; + } } diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java index b853fa6..77115f3 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java @@ -68,7 +68,7 @@ public boolean register(String name, String protocol, String hostname, int port) .build(); try { - FullHttpResponse res = client.post(fullUrl.toString()); + FullHttpResponse res = client.post(fullUrl.toString(), null); if (res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to register to nacos."); } @@ -117,7 +117,7 @@ public String beat(String serviceName, String ip, int port) { .queryParam("port", port) .queryParam(EPHEMERAL_KEY, ephemeral).build(); try { - FullHttpResponse res = client.put(fullUrl.toString()); + FullHttpResponse res = client.put(fullUrl.toString(), null); if (res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to send heartbeat to nacos."); } diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java index e5aca70..fba9758 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java @@ -2,6 +2,7 @@ import com.sunquakes.jsonrpc4j.JsonRpcProtocol; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -36,18 +37,18 @@ public class NettyHttpClient { } public FullHttpResponse get(String path) throws ExecutionException, InterruptedException { - return request(path, HttpMethod.GET); + return request(path, HttpMethod.GET, null); } - public FullHttpResponse post(String path) throws ExecutionException, InterruptedException { - return request(path, HttpMethod.POST); + public FullHttpResponse post(String path, String body) throws ExecutionException, InterruptedException { + return request(path, HttpMethod.POST, body); } - public FullHttpResponse put(String path) throws ExecutionException, InterruptedException { - return request(path, HttpMethod.PUT); + public FullHttpResponse put(String path, String body) throws ExecutionException, InterruptedException { + return request(path, HttpMethod.PUT, body); } - private FullHttpResponse request(String path, HttpMethod method) throws InterruptedException, ExecutionException { + private FullHttpResponse request(String path, HttpMethod method, String body) throws InterruptedException, ExecutionException { EventLoopGroup group = new NioEventLoopGroup(); Promise promise = new DefaultPromise<>(group.next()); Bootstrap bootstrap = new Bootstrap(); @@ -63,6 +64,13 @@ private FullHttpResponse request(String path, HttpMethod method) throws Interrup Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel(); FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, Unpooled.EMPTY_BUFFER); + if (body != null) { + ByteBuf buffer = request.content().clear(); + buffer.writerIndex(); + buffer.writeBytes(body.getBytes()); + buffer.writerIndex(); + buffer.readableBytes(); + } channel.writeAndFlush(request).sync(); return promise.sync().get(); diff --git a/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java b/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java index e2072dc..28c8c2e 100644 --- a/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java +++ b/src/test/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClientTest.java @@ -1,23 +1,45 @@ package com.sunquakes.jsonrpc4j.discovery; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class NettyHttpClientTest { @Test - void testGet() { - NettyHttpClient httpClient = new NettyHttpClient("https://www.github.com"); - try { - System.out.println(httpClient.get("/")); - assertNotNull(httpClient.get("/")); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + void testGet() throws ExecutionException, InterruptedException { + NettyHttpClient nettyHttpClient = mock(NettyHttpClient.class); + FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("Hello world.", StandardCharsets.UTF_8)); + when(nettyHttpClient.get(anyString())).thenReturn(fullHttpResponse); + assertEquals("Hello world.", nettyHttpClient.get("/").content().toString(CharsetUtil.UTF_8)); + } + + @Test + void testPost() throws ExecutionException, InterruptedException { + NettyHttpClient nettyHttpClient = mock(NettyHttpClient.class); + FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("Hello world.", StandardCharsets.UTF_8)); + when(nettyHttpClient.post(anyString(), any())).thenReturn(fullHttpResponse); + assertEquals("Hello world.", nettyHttpClient.post("/", null).content().toString(CharsetUtil.UTF_8)); + } + + @Test + void testPut() throws ExecutionException, InterruptedException { + NettyHttpClient nettyHttpClient = mock(NettyHttpClient.class); + FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("Hello world.", StandardCharsets.UTF_8)); + when(nettyHttpClient.put(anyString(), any())).thenReturn(fullHttpResponse); + assertEquals("Hello world.", nettyHttpClient.put("/", null).content().toString(CharsetUtil.UTF_8)); } } diff --git a/src/test/java/com/sunquakes/jsonrpc4j/spring/boot/ConsulTest.java b/src/test/java/com/sunquakes/jsonrpc4j/spring/boot/ConsulTest.java new file mode 100644 index 0000000..54a9d3e --- /dev/null +++ b/src/test/java/com/sunquakes/jsonrpc4j/spring/boot/ConsulTest.java @@ -0,0 +1,83 @@ +package com.sunquakes.jsonrpc4j.spring.boot; + +import com.sunquakes.jsonrpc4j.discovery.Consul; +import com.sunquakes.jsonrpc4j.spring.JsonRpcServiceDiscovery; +import com.sunquakes.jsonrpc4j.spring.boot.dto.ArgsDto; +import com.sunquakes.jsonrpc4j.spring.boot.dto.ResultDto; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = JsonRpcApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles("consul") +class ConsulTest { + + @Value("${jsonrpc.server.protocol}") + private String protocol; + + @Value("${jsonrpc.server.port}") + private int port; + + private static MockedStatic mockStatic; + + public ConsulTest() { + Consul consul = mock(Consul.class); + when(consul.get(anyString())).thenReturn("localhost:" + 3208); + JsonRpcServiceDiscovery instanse = mock(JsonRpcServiceDiscovery.class); + when(instanse.getDriver()).thenReturn(consul); + mockStatic = mockStatic(JsonRpcServiceDiscovery.class); + mockStatic.when(() -> JsonRpcServiceDiscovery.newInstance(anyString(), anyString())).thenReturn(instanse); + } + + @AfterEach + public void releaseMocks() { + mockStatic.close(); + } + + @Resource + private IJsonRpcClient jsonRpcClient; + + @Test + void testGetConfiguration() { + assertEquals("tcp", protocol); + assertEquals(3208, port); + } + + @Test + void testRequest() { + // test request + { + assertEquals(3, jsonRpcClient.add(1, 2)); + assertEquals(7, jsonRpcClient.add(3, 4)); + assertEquals(7, jsonRpcClient.add(5, 2)); + + assertEquals(1, jsonRpcClient.sub(3, 2)); + assertEquals(2, jsonRpcClient.sub(7, 5)); + + ArgsDto args = new ArgsDto(); + args.setA(8); + args.setB(9); + ResultDto result = jsonRpcClient.add2(args); + assertEquals(17, result.getC()); + + ArgsDto innerArgs = new ArgsDto(); + innerArgs.setA(10); + innerArgs.setB(11); + args.setArgs(innerArgs); + result = jsonRpcClient.add3(args); + assertEquals(21, result.getResult().getC()); + } + } +} diff --git a/src/test/resources/application-consul.properties b/src/test/resources/application-consul.properties new file mode 100644 index 0000000..f96d32a --- /dev/null +++ b/src/test/resources/application-consul.properties @@ -0,0 +1,12 @@ +jsonrpc.server.protocol=tcp +jsonrpc.server.port=3208 + +jsonrpc.server.package-eof=\r\n +jsonrpc.server.package-max-length=2097152 + +# jsonrpc.discovery.hostname=192.168.39.1 +jsonrpc.discovery.url=http://127.0.0.1:8500 +jsonrpc.discovery.driver-name=com.sunquakes.jsonrpc4j.discovery.Consul + +jsonrpc.client.package-eof=\r\n +jsonrpc.client.package-max-length=2097152 \ No newline at end of file From 5b9a1607d66a8df19bb49a18343684f63a933ca6 Mon Sep 17 00:00:00 2001 From: Shing Rui Date: Tue, 15 Oct 2024 21:13:06 +0800 Subject: [PATCH 5/6] Fixed can not register service. --- .../com/sunquakes/jsonrpc4j/discovery/Consul.java | 8 +++++--- .../jsonrpc4j/discovery/NettyHttpClient.java | 12 ++++++++---- src/test/resources/application-consul.properties | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java index dec7bda..0e0cc0c 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java @@ -1,9 +1,11 @@ package com.sunquakes.jsonrpc4j.discovery; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.annotation.JSONField; import com.sunquakes.jsonrpc4j.JsonRpcProtocol; import com.sunquakes.jsonrpc4j.utils.AddressUtils; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.CharsetUtil; @@ -16,7 +18,6 @@ import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -120,8 +121,9 @@ public String get(String name) { if (!res.status().equals(HttpResponseStatus.OK)) { return ""; } - String body = res.content().toString(CharsetUtil.UTF_8); - List healthyServices = JSONObject.parseObject(body, ArrayList.class); + ByteBuf content = res.content(); + String body = content.toString(CharsetUtil.UTF_8); + List healthyServices = JSONArray.parseArray(body, HealthService.class); return healthyServices.stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(",")); } catch (Exception e) { Thread.currentThread().interrupt(); diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java index fba9758..da9bd74 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/NettyHttpClient.java @@ -1,9 +1,9 @@ package com.sunquakes.jsonrpc4j.discovery; import com.sunquakes.jsonrpc4j.JsonRpcProtocol; +import com.sunquakes.jsonrpc4j.utils.RequestUtils; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -63,14 +63,17 @@ private FullHttpResponse request(String path, HttpMethod method, String body) th Channel channel = bootstrap.connect(new InetSocketAddress(uri.getHost(), port)).sync().channel(); - FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, Unpooled.EMPTY_BUFFER); + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path); + request.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8"); if (body != null) { ByteBuf buffer = request.content().clear(); buffer.writerIndex(); buffer.writeBytes(body.getBytes()); buffer.writerIndex(); buffer.readableBytes(); + request.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); } + request.headers().add(HttpHeaderNames.HOST, RequestUtils.getLocalIp()); channel.writeAndFlush(request).sync(); return promise.sync().get(); @@ -114,8 +117,9 @@ class HttpResponseHandler extends SimpleChannelInboundHandler @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { - FullHttpResponse httpResponse = msg; - promise.setSuccess(httpResponse); + ByteBuf buf = msg.content(); + FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(msg.status().code()), buf.copy()); + promise.setSuccess(res); } @Override diff --git a/src/test/resources/application-consul.properties b/src/test/resources/application-consul.properties index f96d32a..894c7a6 100644 --- a/src/test/resources/application-consul.properties +++ b/src/test/resources/application-consul.properties @@ -5,7 +5,7 @@ jsonrpc.server.package-eof=\r\n jsonrpc.server.package-max-length=2097152 # jsonrpc.discovery.hostname=192.168.39.1 -jsonrpc.discovery.url=http://127.0.0.1:8500 +jsonrpc.discovery.url=http://127.0.0.1:8500?instanceId=2&check=true&checkInterval=5s jsonrpc.discovery.driver-name=com.sunquakes.jsonrpc4j.discovery.Consul jsonrpc.client.package-eof=\r\n From 081143c5621e756d197060c84cd321c504190f0c Mon Sep 17 00:00:00 2001 From: Shing Rui Date: Wed, 16 Oct 2024 22:43:49 +0800 Subject: [PATCH 6/6] Fixed can not register service. --- .../com/sunquakes/jsonrpc4j/discovery/Consul.java | 11 ++++++----- .../com/sunquakes/jsonrpc4j/discovery/Nacos.java | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java index 0e0cc0c..f8e12c3 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java @@ -61,7 +61,7 @@ public boolean register(String name, String protocol, String hostname, int port) id = AddressUtils.getUrl(name, port); } if (check) { - checkRegister(protocol, hostname, port); + checkRegister(protocol, hostname, port, name); } RegisterService service = new RegisterService(id, name, port, hostname); @@ -70,7 +70,7 @@ public boolean register(String name, String protocol, String hostname, int port) .path("/v1/agent/service/register") .build(); try { - FullHttpResponse res = client.put(fullUrl.toUriString(), JSONObject.toJSONString(service)); + FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONObject.toJSONString(service)); if (!res.status().equals(HttpResponseStatus.OK)) { return false; } @@ -82,9 +82,10 @@ public boolean register(String name, String protocol, String hostname, int port) return true; } - private void checkRegister(String protocol, String hostname, int port) { + private void checkRegister(String protocol, String hostname, int port, String name) { protocol = protocol.toUpperCase(); Check serviceCheck = new Check(); + serviceCheck.setName(name); if (protocol.equals(JsonRpcProtocol.TCP.name())) { serviceCheck.setTcp(AddressUtils.getUrl(hostname, port)); } else { @@ -100,7 +101,7 @@ private void checkRegister(String protocol, String hostname, int port) { .path("/v1/agent/check/register") .build(); try { - FullHttpResponse res = client.put(fullUrl.toUriString(), JSONObject.toJSONString(serviceCheck)); + FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONObject.toJSONString(serviceCheck)); if (!res.status().equals(HttpResponseStatus.OK)) { log.error("Health check register failed."); } @@ -117,7 +118,7 @@ public String get(String name) { .path("/v1/agent/health/service/name/" + name) .build(); try { - FullHttpResponse res = client.get(fullUrl.toUriString()); + FullHttpResponse res = client.get(fullUrl.getPath() + "?" + fullUrl.getQuery()); if (!res.status().equals(HttpResponseStatus.OK)) { return ""; } diff --git a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java index 77115f3..c0e3749 100644 --- a/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java +++ b/src/main/java/com/sunquakes/jsonrpc4j/discovery/Nacos.java @@ -68,8 +68,8 @@ public boolean register(String name, String protocol, String hostname, int port) .build(); try { - FullHttpResponse res = client.post(fullUrl.toString(), null); - if (res.status().equals(HttpResponseStatus.OK)) { + FullHttpResponse res = client.post(fullUrl.getPath() + "?" + fullUrl.getQuery(), null); + if (!res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to register to nacos."); } if (IS_EPHEMERAL.equals(ephemeral)) { @@ -92,8 +92,8 @@ public String get(String name) { .queryParam(SERVICE_NAME_KEY, name) .build(); try { - FullHttpResponse res = client.get(fullUrl.toString()); - if (res.status().equals(HttpResponseStatus.OK)) { + FullHttpResponse res = client.get(fullUrl.getPath() + "?" + fullUrl.getQuery()); + if (!res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to get the service list from nacos."); } ByteBuf buf = res.content(); @@ -117,8 +117,8 @@ public String beat(String serviceName, String ip, int port) { .queryParam("port", port) .queryParam(EPHEMERAL_KEY, ephemeral).build(); try { - FullHttpResponse res = client.put(fullUrl.toString(), null); - if (res.status().equals(HttpResponseStatus.OK)) { + FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), null); + if (!res.status().equals(HttpResponseStatus.OK)) { throw new JsonRpcException("Failed to send heartbeat to nacos."); } ByteBuf buf = res.content();