diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java index 020959f0b3e..3fcbe2d302b 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java @@ -121,25 +121,22 @@ public InlongStream init() { InlongStreamInfo streamInfo = streamContext.getStreamInfo(); StreamPipeline streamPipeline = inlongStream.createPipeline(); streamInfo.setTempView(GsonUtil.toJson(streamPipeline)); - String streamIndex = managerClient.createStreamInfo(streamInfo); - streamInfo.setId(Double.valueOf(streamIndex).intValue()); + Double streamIndex = managerClient.createStreamInfo(streamInfo); + streamInfo.setId(streamIndex.intValue()); //Create source and update index List sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values()); for (SourceRequest sourceRequest : sourceRequests) { - String sourceIndex = managerClient.createSource(sourceRequest); - sourceRequest.setId(Double.valueOf(sourceIndex).intValue()); + sourceRequest.setId(managerClient.createSource(sourceRequest).intValue()); } //Create sink and update index List sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values()); for (SinkRequest sinkRequest : sinkRequests) { - String sinkIndex = managerClient.createSink(sinkRequest); - sinkRequest.setId(Double.valueOf(sinkIndex).intValue()); + sinkRequest.setId(managerClient.createSink(sinkRequest).intValue()); } //Create transform and update index List transformRequests = Lists.newArrayList(streamContext.getTransformRequests().values()); for (TransformRequest transformRequest : transformRequests) { - String transformIndex = managerClient.createTransform(transformRequest); - transformRequest.setId(Double.valueOf(transformIndex).intValue()); + transformRequest.setId(managerClient.createTransform(transformRequest).intValue()); } return inlongStream; } @@ -200,8 +197,7 @@ private void initOrUpdateTransform() { continue; } TransformRequest transformRequest = requestEntry.getValue(); - String index = managerClient.createTransform(transformRequest); - transformRequest.setId(Double.valueOf(index).intValue()); + transformRequest.setId(managerClient.createTransform(transformRequest).intValue()); } } @@ -238,8 +234,7 @@ private void initOrUpdateSource() { continue; } SourceRequest sourceRequest = requestEntry.getValue(); - String index = managerClient.createSource(sourceRequest); - sourceRequest.setId(Double.valueOf(index).intValue()); + sourceRequest.setId(managerClient.createSource(sourceRequest).intValue()); } } @@ -276,8 +271,7 @@ private void initOrUpdateSink() { continue; } SinkRequest sinkRequest = requestEntry.getValue(); - String index = managerClient.createSink(sinkRequest); - sinkRequest.setId(Double.valueOf(index).intValue()); + sinkRequest.setId(managerClient.createSink(sinkRequest).intValue()); } } } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java index 749feb46877..04a5031289e 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java @@ -105,26 +105,27 @@ public Pair isGroupExists(InlongGroupRequest group } public boolean isGroupExists(String inlongGroupId) { - if (StringUtils.isEmpty(inlongGroupId)) { - throw new IllegalArgumentException("InlongGroupId should not be empty"); - } + AssertUtil.notEmpty(inlongGroupId, "InlongGroupId should not be empty"); + String path = HTTP_PATH + "/group/exist/" + inlongGroupId; final String url = formatUrl(path); - Request request = new Request.Builder().get() + Request request = new Request.Builder() + .get() .url(url) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); - if (responseBody.getErrMsg() != null) { + + Response responseBody = InlongParser.parseResponse(Boolean.class, body); + if (!responseBody.isSuccess()) { throw new RuntimeException(responseBody.getErrMsg()); } - return Boolean.parseBoolean(responseBody.getData().toString()); + + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong group check exists failed: %s", e.getMessage()), e); } @@ -136,13 +137,13 @@ public InlongGroupResponse getGroupInfo(String inlongGroupId) { } String path = HTTP_PATH + "/group/get/" + inlongGroupId; final String url = formatUrl(path); - Request request = new Request.Builder().get() + Request request = new Request.Builder() + .get() .url(url) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); @@ -165,23 +166,24 @@ public PageInfo listGroups(String keyword, int status, if (pageNum <= 0) { pageNum = 1; } + JSONObject groupQuery = new JSONObject(); groupQuery.put("keyword", keyword); groupQuery.put("status", status); groupQuery.put("pageNum", pageNum); groupQuery.put("pageSize", pageSize); String operationData = GsonUtil.toJson(groupQuery); + RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), operationData); String path = HTTP_PATH + "/group/list"; final String url = formatUrl(path); - Request request = new Request.Builder().get() + Request request = new Request.Builder() .url(url) - .method("POST", requestBody) + .post(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); @@ -198,6 +200,7 @@ public PageInfo listGroups(String keyword, int status, } catch (Exception e) { throw new RuntimeException(String.format("Inlong group get failed: %s", e.getMessage()), e); } + } /** @@ -211,18 +214,19 @@ public Response> listGroups(InlongGroupPageReq RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), requestParams); String path = HTTP_PATH + "/group/list"; final String url = formatUrl(path); - Request request = new Request.Builder().get() + Request request = new Request.Builder() .url(url) - .method("POST", requestBody) + .post(requestBody) .build(); Call call = httpClient.newCall(request); - okhttp3.Response response = call.execute(); - assert response.body() != null; - String body = response.body().string(); - assertHttpSuccess(response, body, path); - return JsonUtils.parse(body, - new TypeReference>>() { - }); + try (okhttp3.Response response = call.execute()) { + assert response.body() != null; + String body = response.body().string(); + assertHttpSuccess(response, body, path); + return JsonUtils.parse(body, + new TypeReference>>() { + }); + } } /** @@ -235,19 +239,18 @@ public String createGroup(InlongGroupRequest groupInfo) { final String url = formatUrl(path); Request request = new Request.Builder() .url(url) - .method("POST", bizBody) + .post(bizBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(String.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return responseBody.getData().toString(); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("inlong group save failed: %s", e.getMessage()), e); } @@ -265,42 +268,40 @@ public Pair updateGroup(InlongGroupRequest groupRequest) { final String url = formatUrl(path); Request request = new Request.Builder() .url(url) - .method("POST", groupBody) + .post(groupBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); - return Pair.of(responseBody.getData().toString(), responseBody.getErrMsg()); + Response responseBody = InlongParser.parseResponse(String.class, body); + return Pair.of(responseBody.getData(), responseBody.getErrMsg()); } catch (Exception e) { throw new RuntimeException(String.format("Inlong group update failed: %s", e.getMessage()), e); } } - public String createStreamInfo(InlongStreamInfo streamInfo) { + public Double createStreamInfo(InlongStreamInfo streamInfo) { String path = HTTP_PATH + "/stream/save"; final String stream = GsonUtil.toJson(streamInfo); final RequestBody streamBody = RequestBody.create(MediaType.parse("application/json"), stream); final String url = formatUrl(path); Request request = new Request.Builder() .url(url) - .method("POST", streamBody) + .post(streamBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Double.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return responseBody.getData().toString(); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong stream save failed: %s", e.getMessage()), e); } @@ -313,22 +314,23 @@ public Boolean isStreamExists(InlongStreamInfo streamInfo) { AssertUtil.notEmpty(streamId, "InlongStreamId should not be empty"); String path = HTTP_PATH + "/stream/exist/" + groupId + "/" + streamId; final String url = formatUrl(path); - Request request = new Request.Builder().get() + Request request = new Request.Builder() + .get() .url(url) .build(); - Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = httpClient.newCall(request).execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); - if (responseBody.getErrMsg() != null) { + Response responseBody = InlongParser.parseResponse(Boolean.class, body); + if (!responseBody.isSuccess()) { throw new RuntimeException(responseBody.getErrMsg()); } - return Boolean.parseBoolean(responseBody.getData().toString()); + + return responseBody.getData(); } catch (Exception e) { + log.error("Inlong stream check exists failed", e); throw new RuntimeException(String.format("Inlong stream check exists failed: %s", e.getMessage()), e); } } @@ -341,18 +343,18 @@ public Pair updateStreamInfo(InlongStreamInfo streamInfo) { final String stream = GsonUtil.toJson(streamInfo); RequestBody bizBody = RequestBody.create(MediaType.parse("application/json"), stream); Request request = new Request.Builder() - .method("POST", bizBody) + .post(bizBody) .url(url) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { + assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); if (responseBody.getData() != null) { - return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg()); + return Pair.of(responseBody.getData(), responseBody.getErrMsg()); } else { return Pair.of(false, responseBody.getErrMsg()); } @@ -370,8 +372,8 @@ public InlongStreamInfo getStreamInfo(InlongStreamInfo streamInfo) { .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { + assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); Response responseBody = InlongParser.parseResponse(body); @@ -402,8 +404,7 @@ public List listStreamInfo(String inlongGroupId) { .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); @@ -416,26 +417,25 @@ public List listStreamInfo(String inlongGroupId) { } } - public String createSource(SourceRequest sourceRequest) { + public Double createSource(SourceRequest sourceRequest) { String path = HTTP_PATH + "/source/save"; final String source = GsonUtil.toJson(sourceRequest); final RequestBody sourceBody = RequestBody.create(MediaType.parse("application/json"), source); final String url = formatUrl(path); Request request = new Request.Builder() .url(url) - .method("POST", sourceBody) + .post(sourceBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed: %s", body)); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Double.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return responseBody.getData().toString(); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong source save failed: %s", e.getMessage()), e); } @@ -457,8 +457,7 @@ public List listSources(String groupId, String streamId, Str .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { String body = response.body().string(); assertHttpSuccess(response, body, path); Response responseBody = InlongParser.parseResponse(body); @@ -478,18 +477,17 @@ public Pair updateSource(SourceRequest sourceRequest) { final String storage = GsonUtil.toJson(sourceRequest); final RequestBody storageBody = RequestBody.create(MediaType.parse("application/json"), storage); Request request = new Request.Builder() - .method("POST", storageBody) + .post(storageBody) .url(url) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); if (responseBody.getData() != null) { - return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg()); + return Pair.of(responseBody.getData(), responseBody.getErrMsg()); } else { return Pair.of(false, responseBody.getErrMsg()); } @@ -505,45 +503,43 @@ public boolean deleteSource(int id) { RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) - .method("DELETE", requestBody) + .delete(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return Boolean.parseBoolean(responseBody.getData().toString()); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException( String.format("Inlong source delete failed: %s", e.getMessage()), e); } } - public String createTransform(TransformRequest transformRequest) { + public Double createTransform(TransformRequest transformRequest) { String path = HTTP_PATH + "/transform/save"; final String sink = GsonUtil.toJson(transformRequest); final RequestBody transformBody = RequestBody.create(MediaType.parse("application/json"), sink); final String url = formatUrl(path); Request request = new Request.Builder() .url(url) - .method("POST", transformBody) + .post(transformBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Double.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return responseBody.getData().toString(); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong transform save failed: %s", e.getMessage()), e); } @@ -582,13 +578,12 @@ public Pair updateTransform(TransformRequest transformRequest) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); if (responseBody.getData() != null) { - return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg()); + return Pair.of(responseBody.getData(), responseBody.getErrMsg()); } else { return Pair.of(false, responseBody.getErrMsg()); } @@ -610,45 +605,43 @@ public boolean deleteTransform(TransformRequest transformRequest) { RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) - .method("DELETE", requestBody) + .delete(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return Boolean.parseBoolean(responseBody.getData().toString()); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException( String.format("Inlong transform delete failed: %s", e.getMessage()), e); } } - public String createSink(SinkRequest sinkRequest) { + public Double createSink(SinkRequest sinkRequest) { String path = HTTP_PATH + "/sink/save"; final String sink = GsonUtil.toJson(sinkRequest); final RequestBody sinkBody = RequestBody.create(MediaType.parse("application/json"), sink); final String url = formatUrl(path); Request request = new Request.Builder() .url(url) - .method("POST", sinkBody) + .post(sinkBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Double.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return responseBody.getData().toString(); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong sink save failed: %s", e.getMessage()), e); } @@ -661,19 +654,18 @@ public boolean deleteSink(int id) { RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) - .method("DELETE", requestBody) + .delete(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return Boolean.parseBoolean(responseBody.getData().toString()); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException( String.format("Inlong sink delete failed: %s", e.getMessage()), e); @@ -696,8 +688,7 @@ public List listSinks(String groupId, String streamId, String .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { String body = response.body().string(); assertHttpSuccess(response, body, path); Response responseBody = InlongParser.parseResponse(body); @@ -717,18 +708,17 @@ public Pair updateSink(SinkRequest sinkRequest) { final String storage = GsonUtil.toJson(sinkRequest); final RequestBody storageBody = RequestBody.create(MediaType.parse("application/json"), storage); Request request = new Request.Builder() - .method("POST", storageBody) + .post(storageBody) .url(url) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); if (responseBody.getData() != null) { - return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg()); + return Pair.of(responseBody.getData(), responseBody.getErrMsg()); } else { return Pair.of(false, responseBody.getErrMsg()); } @@ -745,19 +735,18 @@ public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) { RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) - .method("POST", requestBody) + .post(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(WorkflowResult.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return InlongParser.parseWorkflowResult(responseBody); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong group init failed: %s", e.getMessage()), e); @@ -770,30 +759,32 @@ public WorkflowResult startInlongGroup(int taskId, JSONObject workflowTaskOperation = new JSONObject(); workflowTaskOperation.put("transferTo", Lists.newArrayList()); workflowTaskOperation.put("remark", "approved by system"); + JSONObject inlongGroupApproveForm = new JSONObject(); inlongGroupApproveForm.put("groupApproveInfo", initMsg.getKey()); inlongGroupApproveForm.put("streamApproveInfoList", initMsg.getValue()); inlongGroupApproveForm.put("formName", "InlongGroupApproveForm"); workflowTaskOperation.put("form", inlongGroupApproveForm); + String operationData = GsonUtil.toJson(workflowTaskOperation); + final String path = HTTP_PATH + "/workflow/approve/" + taskId; final String url = formatUrl(path); RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), operationData); Request request = new Request.Builder() .url(url) - .method("POST", requestBody) + .post(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(WorkflowResult.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return InlongParser.parseWorkflowResult(responseBody); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException(String.format("Inlong group start failed: %s", e.getMessage()), e); @@ -826,12 +817,11 @@ public boolean operateInlongGroup(String groupId, InlongGroupStatus status, bool RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) - .method("POST", requestBody) + .post(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); @@ -859,19 +849,18 @@ public boolean deleteInlongGroup(String groupId, boolean async) { RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) - .method("DELETE", requestBody) + .delete(requestBody) .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); - Response responseBody = InlongParser.parseResponse(body); + Response responseBody = InlongParser.parseResponse(Boolean.class, body); AssertUtil.isTrue(responseBody.getErrMsg() == null, String.format("Inlong request failed: %s", responseBody.getErrMsg())); - return Boolean.parseBoolean(responseBody.getData().toString()); + return responseBody.getData(); } catch (Exception e) { throw new RuntimeException( String.format("Inlong group delete failed: %s", e.getMessage()), e); @@ -891,8 +880,7 @@ public List getInlongGroupError(String inlongGroupId) { .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); @@ -919,8 +907,7 @@ public List getStreamLogs(String inlongGroupI .build(); Call call = httpClient.newCall(request); - try { - okhttp3.Response response = call.execute(); + try (okhttp3.Response response = call.execute()) { assert response.body() != null; String body = response.body().string(); assertHttpSuccess(response, body, path); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java index a161d1da999..eb547dacc03 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java @@ -22,6 +22,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import lombok.experimental.UtilityClass; import org.apache.commons.lang3.tuple.Pair; import org.apache.inlong.manager.common.beans.Response; import org.apache.inlong.manager.common.enums.MQType; @@ -54,13 +55,13 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.common.pojo.transform.TransformResponse; import org.apache.inlong.manager.common.pojo.workflow.EventLogView; -import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult; import java.util.List; /** * Parser for Inlong entity */ +@UtilityClass public class InlongParser { public static final String GROUP_INFO = "groupInfo"; @@ -72,14 +73,15 @@ public class InlongParser { public static final String SOURCE_TYPE = "sourceType"; public static Response parseResponse(String responseBody) { - Response response = GsonUtil.fromJson(responseBody, Response.class); - return response; + return GsonUtil.fromJson(responseBody, Response.class); } - public static WorkflowResult parseWorkflowResult(Response response) { - Object data = response.getData(); - String resultData = GsonUtil.toJson(data); - return GsonUtil.fromJson(resultData, WorkflowResult.class); + public static Response parseResponse(Class responseType, String responseBody) { + AssertUtil.notNull(responseType, "responseType must not be null"); + return GsonUtil.fromJson( + responseBody, + com.google.gson.reflect.TypeToken.getParameterized(Response.class, responseType).getType() + ); } public static InlongGroupResponse parseGroupInfo(Response response) {