Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSONUtils and replace apache httpclient. #20

Merged
merged 3 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.sunquakes.jsonrpc4j.ErrorEnum;
import com.sunquakes.jsonrpc4j.dto.ErrorDto;
import com.sunquakes.jsonrpc4j.dto.ErrorResponseDto;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
Expand Down Expand Up @@ -41,7 +41,7 @@ protected void handleInternalError(ChannelHandlerContext ctx) {
ErrorResponseDto errorResponseDto = new ErrorResponseDto(null, RequestUtils.JSONRPC, new ErrorDto(ErrorEnum.INTERNAL_ERROR.getCode(), ErrorEnum.INTERNAL_ERROR.getText(), null));
Promise<String> promise = promiseMap.get(channel);
if (promise != null) {
promise.setSuccess(JSON.toJSONString(errorResponseDto));
promise.setSuccess(JSONUtils.toString(errorResponseDto));
promiseMap.remove(channel);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
Expand All @@ -26,6 +26,6 @@ public JsonRpcClientInvocationHandler(JsonRpcClientInterface jsonRpcClient, Stri
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
String methodPath = String.format("/%s/%s", service, method.getName());
return JSON.to(method.getReturnType(), jsonRpcClient.handle(methodPath, args));
return JSONUtils.toJavaObject(method.getReturnType(), jsonRpcClient.handle(methodPath, args));
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.ErrorEnum;
import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.config.Config;
import com.sunquakes.jsonrpc4j.dto.ErrorDto;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.dto.ResponseDto;
import com.sunquakes.jsonrpc4j.exception.JsonRpcClientException;
import com.sunquakes.jsonrpc4j.exception.JsonRpcException;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -64,19 +64,15 @@ public void initLoadBalancer() {

@Override
public Object handle(String method, Object[] args) throws JsonRpcException {
JSONObject request = new JSONObject();
request.put("id", RequestUtils.getId());
request.put("jsonrpc", RequestUtils.JSONRPC);
request.put("method", method);
request.put("params", args);
RequestDto requestDto = new RequestDto(RequestUtils.getId(), RequestUtils.JSONRPC, method, args);
ResponseDto responseDto;
String body;
FixedChannelPool pool = loadBalancer.getPool();
try {
Channel channel = pool.acquire().get();
body = jsonRpcHttpClientHandler.send(request, channel);
body = jsonRpcHttpClientHandler.send(requestDto, channel);
pool.release(channel);
responseDto = JSONObject.parseObject(body, ResponseDto.class);
responseDto = JSONUtils.parseJavaObject(body, ResponseDto.class);
} catch (InterruptedException e) {
loadBalancer.removePool(pool);
Thread.currentThread().interrupt();
Expand All @@ -85,9 +81,9 @@ public Object handle(String method, Object[] args) throws JsonRpcException {
throw new JsonRpcClientException(e.getMessage());
}
if (responseDto.getResult() == null) {
JSONObject bodyJSON = JSON.parseObject(body);
if (bodyJSON.containsKey("error")) {
ErrorDto errorDto = JSONObject.parseObject(bodyJSON.getString("error"), ErrorDto.class);
Object error = JSONUtils.get(JSONUtils.parseJSONObject(body), "error");
if (error != null) {
ErrorDto errorDto = JSONUtils.parseJavaObject(JSONUtils.toString(error), ErrorDto.class);
throw ErrorEnum.getException(errorDto.getCode(), errorDto.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand All @@ -25,11 +26,11 @@
public class JsonRpcHttpClientHandler extends JsonRpcClientHandler {

@Synchronized
public synchronized String send(JSONObject data, Channel channel) throws InterruptedException, ExecutionException {
public synchronized String send(RequestDto data, Channel channel) throws InterruptedException, ExecutionException {
Promise<String> promise = new DefaultPromise<>(channel.eventLoop());
promiseMap.put(channel, promise);

String message = data.toJSONString();
String message = JSONUtils.toString(data);
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
request.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8");
ByteBuf buffer = request.content().clear();
Expand Down
20 changes: 8 additions & 12 deletions src/main/java/com/sunquakes/jsonrpc4j/client/JsonRpcTcpClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.ErrorEnum;
import com.sunquakes.jsonrpc4j.config.Config;
import com.sunquakes.jsonrpc4j.dto.ErrorDto;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.dto.ResponseDto;
import com.sunquakes.jsonrpc4j.exception.JsonRpcClientException;
import com.sunquakes.jsonrpc4j.exception.JsonRpcException;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -71,19 +71,15 @@ protected void initChannel(SocketChannel ch) throws Exception {

@Override
public Object handle(String method, Object[] args) throws JsonRpcException {
JSONObject request = new JSONObject();
request.put("id", RequestUtils.getId());
request.put("jsonrpc", RequestUtils.JSONRPC);
request.put("method", method);
request.put("params", args);
RequestDto requestDto = new RequestDto(RequestUtils.getId(), RequestUtils.JSONRPC, method, args);
ResponseDto responseDto;
String body;
FixedChannelPool pool = loadBalancer.getPool();
try {
Channel channel = pool.acquire().get();
body = jsonRpcTcpClientHandler.send(request, channel);
body = jsonRpcTcpClientHandler.send(requestDto, channel);
pool.release(channel);
responseDto = JSONObject.parseObject(body, ResponseDto.class);
responseDto = JSONUtils.toJavaObject(ResponseDto.class, body);
} catch (InterruptedException e) {
loadBalancer.removePool(pool);
Thread.currentThread().interrupt();
Expand All @@ -92,9 +88,9 @@ public Object handle(String method, Object[] args) throws JsonRpcException {
throw new JsonRpcClientException(e.getMessage());
}
if (responseDto.getResult() == null) {
JSONObject bodyJSON = JSON.parseObject(body);
if (bodyJSON.containsKey("error")) {
ErrorDto errorDto = JSONObject.parseObject(bodyJSON.getString("error"), ErrorDto.class);
Object error = JSONUtils.get(JSONUtils.parseJSONObject(body), "error");
if (error != null) {
ErrorDto errorDto = JSONUtils.parseJavaObject(JSONUtils.toString(error), ErrorDto.class);
throw ErrorEnum.getException(errorDto.getCode(), errorDto.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.sunquakes.jsonrpc4j.client;

import com.alibaba.fastjson2.JSONObject;
import com.sunquakes.jsonrpc4j.dto.RequestDto;
import com.sunquakes.jsonrpc4j.utils.ByteArrayUtils;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
Expand Down Expand Up @@ -34,8 +35,8 @@ public JsonRpcTcpClientHandler(TcpClientOption tcpClientOption) {
}

@Synchronized
public synchronized String send(JSONObject request, Channel channel) throws InterruptedException, ExecutionException {
String message = request.toJSONString() + tcpClientOption.getPackageEof();
public synchronized String send(RequestDto request, Channel channel) throws InterruptedException, ExecutionException {
String message = JSONUtils.toString(request) + tcpClientOption.getPackageEof();
ByteBuf byteBuf = channel.alloc().buffer(tcpClientOption.getPackageMaxLength());
byteBuf.writeBytes(message.getBytes());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sunquakes.jsonrpc4j.discovery;
package com.sunquakes.jsonrpc4j.client;

import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.utils.RequestUtils;
Expand Down Expand Up @@ -30,9 +30,9 @@ public class NettyHttpClient {

private static final int DEFAULT_HTTPS_PORT = 443;

private URI uri;
private final URI uri;

NettyHttpClient(String host) {
public NettyHttpClient(String host) {
uri = URI.create(host);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ protected void initChannel(Channel ch) throws Exception {
}
}

class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
static class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

Promise<FullHttpResponse> promise;

Expand Down
84 changes: 34 additions & 50 deletions src/main/java/com/sunquakes/jsonrpc4j/discovery/Consul.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.sunquakes.jsonrpc4j.discovery;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.annotation.JSONField;
import com.sunquakes.jsonrpc4j.JsonRpcProtocol;
import com.sunquakes.jsonrpc4j.client.NettyHttpClient;
import com.sunquakes.jsonrpc4j.utils.AddressUtils;
import com.sunquakes.jsonrpc4j.utils.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -70,7 +69,7 @@ public boolean register(String name, String protocol, String hostname, int port)
.path("/v1/agent/service/register")
.build();
try {
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONObject.toJSONString(service));
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONUtils.toString(service));
if (!res.status().equals(HttpResponseStatus.OK)) {
return false;
}
Expand All @@ -87,9 +86,9 @@ private void checkRegister(String protocol, String hostname, int port, String na
Check serviceCheck = new Check();
serviceCheck.setName(name);
if (protocol.equals(JsonRpcProtocol.TCP.name())) {
serviceCheck.setTcp(AddressUtils.getUrl(hostname, port));
serviceCheck.setTCP(AddressUtils.getUrl(hostname, port));
} else {
serviceCheck.setHttp(String.format("%s://%s:%d", protocol, hostname, port));
serviceCheck.setHTTP(String.format("%s://%s:%d", protocol, hostname, port));
}
serviceCheck.setInterval(checkInterval);
// Set the init status passing
Expand All @@ -101,7 +100,7 @@ private void checkRegister(String protocol, String hostname, int port, String na
.path("/v1/agent/check/register")
.build();
try {
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONObject.toJSONString(serviceCheck));
FullHttpResponse res = client.put(fullUrl.getPath() + "?" + fullUrl.getQuery(), JSONUtils.toString(serviceCheck));
if (!res.status().equals(HttpResponseStatus.OK)) {
log.error("Health check register failed.");
}
Expand All @@ -124,7 +123,7 @@ public String get(String name) {
}
ByteBuf content = res.content();
String body = content.toString(CharsetUtil.UTF_8);
List<HealthService> healthyServices = JSONArray.parseArray(body, HealthService.class);
List<HealthService> healthyServices = JSONUtils.parseList(body, HealthService.class);
return healthyServices.stream().map(item -> AddressUtils.getUrl(item.getService().getAddress(), item.getService().getPort())).collect(Collectors.joining(","));
} catch (Exception e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -157,81 +156,66 @@ private NettyHttpClient getClient() {
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HealthService {
@SuppressWarnings({"java:S116"})
static public class HealthService {

@JSONField(name = "AggregatedStatus")
private String aggregatedStatus;
private String AggregatedStatus;

@JSONField(name = "Service")
private NewService service;
private NewService Service;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class NewService {
@SuppressWarnings({"java:S116"})
static public class NewService {

@JSONField(name = "ID")
private String id;
private String ID;

@JSONField(name = "Service")
private String service;
private String Service;

@JSONField(name = "Port")
private Integer port;
private Integer Port;

@JSONField(name = "Address")
private String address;
private String Address;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RegisterService {
@SuppressWarnings({"java:S116"})
static public class RegisterService {

@JSONField(name = "ID")
private String id;
private String ID;

@JSONField(name = "Name")
private String name;
private String Name;

@JSONField(name = "Port")
private Integer port;
private Integer Port;

@JSONField(name = "Address")
private String address;
private String Address;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Check {
@SuppressWarnings({"java:S116"})
static public class Check {

@JSONField(name = "ID")
private String id;
private String ID;

@JSONField(name = "Name")
private String name;
private String Name;

@JSONField(name = "Status")
private String status;
private String Status;

@JSONField(name = "ServiceID")
private String serviceID;
private String ServiceID;

@JSONField(name = "HTTP")
private String http;
private String HTTP;

@JSONField(name = "Method")
private String method;
private String Method;

@JSONField(name = "TCP")
private String tcp;
private String TCP;

@JSONField(name = "Interval")
private String interval;
private String Interval;

@JSONField(name = "Timeout")
private String timeout;
private String Timeout;
}
}
Loading
Loading