From 5e8b93840e22623f07e70a8ff4e028765a176d49 Mon Sep 17 00:00:00 2001 From: karsonto Date: Wed, 3 Apr 2024 15:37:35 +0800 Subject: [PATCH] use httpRequest --- .../common/protocol/http/HttpCommand.java | 3 - .../eventmesh/common/utils/JsonUtils.java | 3 +- .../admin/handler/AbstractHttpHandler.java | 33 +-------- .../runtime/boot/AbstractHTTPServer.java | 31 +------- .../runtime/boot/EventMeshAdminServer.java | 15 +--- .../runtime/util/HttpRequestUtil.java | 71 +++++++++++++++++++ 6 files changed, 80 insertions(+), 76 deletions(-) create mode 100644 eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java index b635ce9f64..f7ebd0cab8 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java @@ -27,7 +27,6 @@ import org.apache.eventmesh.common.protocol.http.header.Header; import org.apache.eventmesh.common.utils.JsonUtils; -import java.net.URI; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -68,8 +67,6 @@ public class HttpCommand implements ProtocolTransportObject { // Command response time public long resTime; - public URI requestURI; - public CmdType cmdType = CmdType.REQ; public HttpCommand() { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index 3a5b15576b..71d42e3452 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -60,8 +60,7 @@ public static T mapToObject(Map map, Class beanClass) { Object obj = OBJECT_MAPPER.convertValue(map, beanClass); return (T) obj; } - - + /** * Serialize object to json string. * diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java index 44f042c4b0..67606ebe95 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/AbstractHttpHandler.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.common.enums.HttpMethod; import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.util.HttpRequestUtil; import org.apache.eventmesh.runtime.util.HttpResponseUtils; import java.io.IOException; @@ -35,11 +36,6 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.QueryStringDecoder; -import io.netty.handler.codec.http.multipart.Attribute; -import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; -import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; -import io.netty.handler.codec.http.multipart.InterfaceHttpData; import io.netty.util.AsciiString; import lombok.Data; @@ -47,8 +43,6 @@ @Data public abstract class AbstractHttpHandler implements org.apache.eventmesh.runtime.admin.handler.HttpHandler { - private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false); - protected void write(ChannelHandlerContext ctx, byte[] result, AsciiString headerValue) { ctx.writeAndFlush(HttpResponseUtils.getHttpResponse(result, ctx, headerValue)).addListener(ChannelFutureListener.CLOSE); } @@ -148,30 +142,7 @@ protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Ex } protected Map parseHttpRequestBody(final HttpRequest httpRequest) throws IOException { - final long bodyDecodeStart = System.currentTimeMillis(); - final Map httpRequestBody = new HashMap<>(); - - if (io.netty.handler.codec.http.HttpMethod.GET.equals(httpRequest.method())) { - new QueryStringDecoder(httpRequest.uri()) - .parameters() - .forEach((key, value) -> httpRequestBody.put(key, value.get(0))); - } else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) { - decodeHttpRequestBody(httpRequest, httpRequestBody); - } - return httpRequestBody; - } - - private void decodeHttpRequestBody(HttpRequest httpRequest, Map httpRequestBody) throws IOException { - final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); - for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) { - if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) { - final Attribute data = (Attribute) param; - httpRequestBody.put(data.getName(), data.getValue()); - } - } - decoder.destroy(); + return HttpRequestUtil.parseHttpRequestBody(httpRequest, null, null); } - - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index fbab8e990b..f4662118be 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -33,6 +33,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor; import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer; +import org.apache.eventmesh.runtime.util.HttpRequestUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.runtime.util.TraceUtils; import org.apache.eventmesh.runtime.util.Utils; @@ -42,7 +43,6 @@ import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -78,12 +78,8 @@ import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.QueryStringDecoder; -import io.netty.handler.codec.http.multipart.Attribute; import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import io.netty.handler.codec.http.multipart.DiskAttribute; -import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; -import io.netty.handler.codec.http.multipart.InterfaceHttpData; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; @@ -256,29 +252,8 @@ public void sendResponse(final ChannelHandlerContext ctx, final DefaultFullHttpR * @return */ protected Map parseHttpRequestBody(final HttpRequest httpRequest) throws IOException { - final long bodyDecodeStart = System.currentTimeMillis(); - final Map httpRequestBody = new HashMap<>(); - - if (HttpMethod.GET.equals(httpRequest.method())) { - new QueryStringDecoder(httpRequest.uri()) - .parameters() - .forEach((key, value) -> httpRequestBody.put(key, value.get(0))); - } else if (HttpMethod.POST.equals(httpRequest.method())) { - decodeHttpRequestBody(httpRequest, httpRequestBody); - } - metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); - return httpRequestBody; - } - - private void decodeHttpRequestBody(HttpRequest httpRequest, Map httpRequestBody) throws IOException { - final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); - for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) { - if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) { - final Attribute data = (Attribute) param; - httpRequestBody.put(data.getName(), data.getValue()); - } - } - decoder.destroy(); + return HttpRequestUtil.parseHttpRequestBody(httpRequest, () -> System.currentTimeMillis(), + (startTime) -> metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - startTime)); } @Sharable diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java index aca7caca19..f7b16baa51 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java @@ -20,14 +20,10 @@ import static org.apache.eventmesh.runtime.util.HttpResponseUtils.getHttpResponse; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.runtime.admin.handler.AdminHandlerManager; import org.apache.eventmesh.runtime.admin.handler.HttpHandler; -import org.apache.eventmesh.runtime.admin.response.Error; import org.apache.eventmesh.runtime.util.HttpResponseUtils; -import java.io.PrintWriter; -import java.io.StringWriter; import java.net.URI; import java.util.Objects; import java.util.Optional; @@ -111,15 +107,10 @@ public void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) httpHandlerOpt.get().handle(httpRequest, ctx); return; } catch (Exception e) { - StringWriter writer = new StringWriter(); - PrintWriter printWriter = new PrintWriter(writer); - e.printStackTrace(printWriter); - printWriter.flush(); - String stackTrace = writer.toString(); - Error error = new Error(e.toString(), stackTrace); - String result = JsonUtils.toJSONString(error); + log.error("admin server channelRead error", e); ctx.writeAndFlush( - getHttpResponse(Objects.requireNonNull(result).getBytes(Constants.DEFAULT_CHARSET), ctx, HttpHeaderValues.APPLICATION_JSON, + getHttpResponse(Objects.requireNonNull(e.getMessage()).getBytes(Constants.DEFAULT_CHARSET), + ctx, HttpHeaderValues.APPLICATION_JSON, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE); } } else { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java new file mode 100644 index 0000000000..ecfcd0e198 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpRequestUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.util; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import javax.annotation.Nullable; + +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.codec.http.multipart.Attribute; +import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; + +public class HttpRequestUtil { + + private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false); + + public static Map parseHttpRequestBody(final HttpRequest httpRequest, @Nullable Supplier start, @Nullable Consumer end) + throws IOException { + T t = null; + if (!Objects.isNull(start)) { + t = start.get(); + } + final Map httpRequestBody = new HashMap<>(); + if (io.netty.handler.codec.http.HttpMethod.GET.equals(httpRequest.method())) { + new QueryStringDecoder(httpRequest.uri()) + .parameters() + .forEach((key, value) -> httpRequestBody.put(key, value.get(0))); + } else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) { + decodeHttpRequestBody(httpRequest, httpRequestBody); + } + if (Objects.isNull(t)) { + end.accept(t); + } + return httpRequestBody; + } + + private static void decodeHttpRequestBody(HttpRequest httpRequest, Map httpRequestBody) throws IOException { + final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); + for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) { + if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) { + final Attribute data = (Attribute) param; + httpRequestBody.put(data.getName(), data.getValue()); + } + } + decoder.destroy(); + } + +}