Skip to content

Commit

Permalink
Merge pull request #20 from sunquakes/development
Browse files Browse the repository at this point in the history
Add JSONUtils and replace apache httpclient.
  • Loading branch information
sunquakes authored Oct 19, 2024
2 parents 673c939 + c95671c commit 19e0f88
Show file tree
Hide file tree
Showing 21 changed files with 380 additions and 220 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.sunquakes.jsonrpc4j.ErrorEnum;
import com.sunquakes.jsonrpc4j.dto.ErrorDto;
import com.sunquakes.jsonrpc4j.dto.ErrorResponseDto;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
Expand Down Expand Up @@ -41,7 +41,7 @@ protected void handleInternalError(ChannelHandlerContext ctx) {
ErrorResponseDto errorResponseDto = new ErrorResponseDto(null, RequestUtils.JSONRPC, new ErrorDto(ErrorEnum.INTERNAL_ERROR.getCode(), ErrorEnum.INTERNAL_ERROR.getText(), null));
Promise<String> promise = promiseMap.get(channel);
if (promise != null) {
promise.setSuccess(JSON.toJSONString(errorResponseDto));
promise.setSuccess(JSONUtils.toString(errorResponseDto));
promiseMap.remove(channel);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
Expand All @@ -26,6 +26,6 @@ public JsonRpcClientInvocationHandler(JsonRpcClientInterface jsonRpcClient, Stri
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
String methodPath = String.format("/%s/%s", service, method.getName());
return JSON.to(method.getReturnType(), jsonRpcClient.handle(methodPath, args));
return JSONUtils.toJavaObject(method.getReturnType(), jsonRpcClient.handle(methodPath, args));
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.ErrorEnum;
import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.config.Config;
import com.sunquakes.jsonrpc4j.dto.ErrorDto;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.dto.ResponseDto;
import com.sunquakes.jsonrpc4j.exception.JsonRpcClientException;
import com.sunquakes.jsonrpc4j.exception.JsonRpcException;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -64,19 +64,15 @@ public void initLoadBalancer() {

@Override
public Object handle(String method, Object[] args) throws JsonRpcException {
JSONObject request = new JSONObject();
request.put("id", RequestUtils.getId());
request.put("jsonrpc", RequestUtils.JSONRPC);
request.put("method", method);
request.put("params", args);
RequestDto requestDto = new RequestDto(RequestUtils.getId(), RequestUtils.JSONRPC, method, args);
ResponseDto responseDto;
String body;
FixedChannelPool pool = loadBalancer.getPool();
try {
Channel channel = pool.acquire().get();
body = jsonRpcHttpClientHandler.send(request, channel);
body = jsonRpcHttpClientHandler.send(requestDto, channel);
pool.release(channel);
responseDto = JSONObject.parseObject(body, ResponseDto.class);
responseDto = JSONUtils.parseJavaObject(body, ResponseDto.class);
} catch (InterruptedException e) {
loadBalancer.removePool(pool);
Thread.currentThread().interrupt();
Expand All @@ -85,9 +81,9 @@ public Object handle(String method, Object[] args) throws JsonRpcException {
throw new JsonRpcClientException(e.getMessage());
}
if (responseDto.getResult() == null) {
JSONObject bodyJSON = JSON.parseObject(body);
if (bodyJSON.containsKey("error")) {
ErrorDto errorDto = JSONObject.parseObject(bodyJSON.getString("error"), ErrorDto.class);
Object error = JSONUtils.get(JSONUtils.parseJSONObject(body), "error");
if (error != null) {
ErrorDto errorDto = JSONUtils.parseJavaObject(JSONUtils.toString(error), ErrorDto.class);
throw ErrorEnum.getException(errorDto.getCode(), errorDto.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand All @@ -25,11 +26,11 @@
public class JsonRpcHttpClientHandler extends JsonRpcClientHandler {

@Synchronized
public synchronized String send(JSONObject data, Channel channel) throws InterruptedException, ExecutionException {
public synchronized String send(RequestDto data, Channel channel) throws InterruptedException, ExecutionException {
Promise<String> promise = new DefaultPromise<>(channel.eventLoop());
promiseMap.put(channel, promise);

String message = data.toJSONString();
String message = JSONUtils.toString(data);
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
request.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8");
ByteBuf buffer = request.content().clear();
Expand Down
20 changes: 8 additions & 12 deletions src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcTcpClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.ErrorEnum;
import com.sunquakes.jsonrpc4j.config.Config;
import com.sunquakes.jsonrpc4j.dto.ErrorDto;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.dto.ResponseDto;
import com.sunquakes.jsonrpc4j.exception.JsonRpcClientException;
import com.sunquakes.jsonrpc4j.exception.JsonRpcException;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -71,19 +71,15 @@ protected void initChannel(SocketChannel ch) throws Exception {

@Override
public Object handle(String method, Object[] args) throws JsonRpcException {
JSONObject request = new JSONObject();
request.put("id", RequestUtils.getId());
request.put("jsonrpc", RequestUtils.JSONRPC);
request.put("method", method);
request.put("params", args);
RequestDto requestDto = new RequestDto(RequestUtils.getId(), RequestUtils.JSONRPC, method, args);
ResponseDto responseDto;
String body;
FixedChannelPool pool = loadBalancer.getPool();
try {
Channel channel = pool.acquire().get();
body = jsonRpcTcpClientHandler.send(request, channel);
body = jsonRpcTcpClientHandler.send(requestDto, channel);
pool.release(channel);
responseDto = JSONObject.parseObject(body, ResponseDto.class);
responseDto = JSONUtils.toJavaObject(ResponseDto.class, body);
} catch (InterruptedException e) {
loadBalancer.removePool(pool);
Thread.currentThread().interrupt();
Expand All @@ -92,9 +88,9 @@ public Object handle(String method, Object[] args) throws JsonRpcException {
throw new JsonRpcClientException(e.getMessage());
}
if (responseDto.getResult() == null) {
JSONObject bodyJSON = JSON.parseObject(body);
if (bodyJSON.containsKey("error")) {
ErrorDto errorDto = JSONObject.parseObject(bodyJSON.getString("error"), ErrorDto.class);
Object error = JSONUtils.get(JSONUtils.parseJSONObject(body), "error");
if (error != null) {
ErrorDto errorDto = JSONUtils.parseJavaObject(JSONUtils.toString(error), ErrorDto.class);
throw ErrorEnum.getException(errorDto.getCode(), errorDto.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.utils.ByteArrayUtils;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
Expand Down Expand Up @@ -34,8 +35,8 @@ public JsonRpcTcpClientHandler(TcpClientOption tcpClientOption) {
}

@Synchronized
public synchronized String send(JSONObject request, Channel channel) throws InterruptedException, ExecutionException {
String message = request.toJSONString() + tcpClientOption.getPackageEof();
public synchronized String send(RequestDto request, Channel channel) throws InterruptedException, ExecutionException {
String message = JSONUtils.toString(request) + tcpClientOption.getPackageEof();
ByteBuf byteBuf = channel.alloc().buffer(tcpClientOption.getPackageMaxLength());
byteBuf.writeBytes(message.getBytes());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sunquakes.jsonrpc4j.discovery;
package com.sunquakes.jsonrpc4j.client;

import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
Expand Down Expand Up @@ -30,9 +30,9 @@ public class NettyHttpClient {

private static final int DEFAULT_HTTPS_PORT = 443;

private URI uri;
private final URI uri;

NettyHttpClient(String host) {
public NettyHttpClient(String host) {
uri = URI.create(host);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ protected void initChannel(Channel ch) throws Exception {
}
}

class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
static class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

Promise<FullHttpResponse> promise;

Expand Down
84 changes: 34 additions & 50 deletions src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.sunquakes.jsonrpc4j.discovery;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.annotation.JSONField;
import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.client.NettyHttpClient;
import com.sunquakes.jsonrpc4j.utils.AddressUtils;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -70,7 +69,7 @@ public boolean register(String name, String protocol, String hostname, int port)
.path("/v1/agent/service/register")
.build();
try {
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONObject.toJSONString(service));
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONUtils.toString(service));
if (!res.status().equals(HttpResponseStatus.OK)) {
return false;
}
Expand All @@ -87,9 +86,9 @@ private void checkRegister(String protocol, String hostname, int port, String na
Check serviceCheck = new Check();
serviceCheck.setName(name);
if (protocol.equals(JsonRpcProtocol.TCP.name())) {
serviceCheck.setTcp(AddressUtils.getUrl(hostname, port));
serviceCheck.setTCP(AddressUtils.getUrl(hostname, port));
} else {
serviceCheck.setHttp(String.format("%s://%s:%d", protocol, hostname, port));
serviceCheck.setHTTP(String.format("%s://%s:%d", protocol, hostname, port));
}
serviceCheck.setInterval(checkInterval);
// Set the init status passing
Expand All @@ -101,7 +100,7 @@ private void checkRegister(String protocol, String hostname, int port, String na
.path("/v1/agent/check/register")
.build();
try {
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONObject.toJSONString(serviceCheck));
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONUtils.toString(serviceCheck));
if (!res.status().equals(HttpResponseStatus.OK)) {
log.error("Health check register failed.");
}
Expand All @@ -124,7 +123,7 @@ public String get(String name) {
}
ByteBuf content = res.content();
String body = content.toString(CharsetUtil.UTF_8);
List<HealthService> healthyServices = JSONArray.parseArray(body, HealthService.class);
List<HealthService> healthyServices = JSONUtils.parseList(body, HealthService.class);
return healthyServices.stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(","));
} catch (Exception e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -157,81 +156,66 @@ private NettyHttpClient getClient() {
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HealthService {
@SuppressWarnings({"java:S116"})
static public class HealthService {

@JSONField(name = "AggregatedStatus")
private String aggregatedStatus;
private String AggregatedStatus;

@JSONField(name = "Service")
private NewService service;
private NewService Service;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class NewService {
@SuppressWarnings({"java:S116"})
static public class NewService {

@JSONField(name = "ID")
private String id;
private String ID;

@JSONField(name = "Service")
private String service;
private String Service;

@JSONField(name = "Port")
private Integer port;
private Integer Port;

@JSONField(name = "Address")
private String address;
private String Address;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RegisterService {
@SuppressWarnings({"java:S116"})
static public class RegisterService {

@JSONField(name = "ID")
private String id;
private String ID;

@JSONField(name = "Name")
private String name;
private String Name;

@JSONField(name = "Port")
private Integer port;
private Integer Port;

@JSONField(name = "Address")
private String address;
private String Address;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Check {
@SuppressWarnings({"java:S116"})
static public class Check {

@JSONField(name = "ID")
private String id;
private String ID;

@JSONField(name = "Name")
private String name;
private String Name;

@JSONField(name = "Status")
private String status;
private String Status;

@JSONField(name = "ServiceID")
private String serviceID;
private String ServiceID;

@JSONField(name = "HTTP")
private String http;
private String HTTP;

@JSONField(name = "Method")
private String method;
private String Method;

@JSONField(name = "TCP")
private String tcp;
private String TCP;

@JSONField(name = "Interval")
private String interval;
private String Interval;

@JSONField(name = "Timeout")
private String timeout;
private String Timeout;
}
}
Loading

0 comments on commit 19e0f88

Please sign in to comment.