Skip to content

Commit

Permalink
use httpRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Apr 3, 2024
1 parent 57d0c04 commit 5e8b938
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -68,8 +67,6 @@ public class HttpCommand implements ProtocolTransportObject {
// Command response time
public long resTime;

public URI requestURI;

public CmdType cmdType = CmdType.REQ;

public HttpCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public static <T> T mapToObject(Map<String, Object> map, Class<T> beanClass) {
Object obj = OBJECT_MAPPER.convertValue(map, beanClass);
return (T) obj;
}



/**
* Serialize object to json string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.eventmesh.common.enums.HttpMethod;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.util.HttpRequestUtil;
import org.apache.eventmesh.runtime.util.HttpResponseUtils;

import java.io.IOException;
Expand All @@ -35,20 +36,13 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.util.AsciiString;

import lombok.Data;

@Data
public abstract class AbstractHttpHandler implements org.apache.eventmesh.runtime.admin.handler.HttpHandler {

private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false);

protected void write(ChannelHandlerContext ctx, byte[] result, AsciiString headerValue) {
ctx.writeAndFlush(HttpResponseUtils.getHttpResponse(result, ctx, headerValue)).addListener(ChannelFutureListener.CLOSE);
}
Expand Down Expand Up @@ -148,30 +142,7 @@ protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Ex
}

protected Map<String, Object> parseHttpRequestBody(final HttpRequest httpRequest) throws IOException {
final long bodyDecodeStart = System.currentTimeMillis();
final Map<String, Object> httpRequestBody = new HashMap<>();

if (io.netty.handler.codec.http.HttpMethod.GET.equals(httpRequest.method())) {
new QueryStringDecoder(httpRequest.uri())
.parameters()
.forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) {
decodeHttpRequestBody(httpRequest, httpRequestBody);
}
return httpRequestBody;
}

private void decodeHttpRequestBody(HttpRequest httpRequest, Map<String, Object> httpRequestBody) throws IOException {
final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) {
if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) {
final Attribute data = (Attribute) param;
httpRequestBody.put(data.getName(), data.getValue());
}
}
decoder.destroy();
return HttpRequestUtil.parseHttpRequestBody(httpRequest, null, null);
}


}

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.util.HttpRequestUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.runtime.util.Utils;
Expand All @@ -42,7 +43,6 @@
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -78,12 +78,8 @@
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.DiskAttribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
Expand Down Expand Up @@ -256,29 +252,8 @@ public void sendResponse(final ChannelHandlerContext ctx, final DefaultFullHttpR
* @return
*/
protected Map<String, Object> parseHttpRequestBody(final HttpRequest httpRequest) throws IOException {
final long bodyDecodeStart = System.currentTimeMillis();
final Map<String, Object> httpRequestBody = new HashMap<>();

if (HttpMethod.GET.equals(httpRequest.method())) {
new QueryStringDecoder(httpRequest.uri())
.parameters()
.forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (HttpMethod.POST.equals(httpRequest.method())) {
decodeHttpRequestBody(httpRequest, httpRequestBody);
}
metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart);
return httpRequestBody;
}

private void decodeHttpRequestBody(HttpRequest httpRequest, Map<String, Object> httpRequestBody) throws IOException {
final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) {
if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) {
final Attribute data = (Attribute) param;
httpRequestBody.put(data.getName(), data.getValue());
}
}
decoder.destroy();
return HttpRequestUtil.parseHttpRequestBody(httpRequest, () -> System.currentTimeMillis(),
(startTime) -> metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - startTime));
}

@Sharable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
import static org.apache.eventmesh.runtime.util.HttpResponseUtils.getHttpResponse;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.admin.handler.AdminHandlerManager;
import org.apache.eventmesh.runtime.admin.handler.HttpHandler;
import org.apache.eventmesh.runtime.admin.response.Error;
import org.apache.eventmesh.runtime.util.HttpResponseUtils;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -111,15 +107,10 @@ public void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest)
httpHandlerOpt.get().handle(httpRequest, ctx);
return;
} catch (Exception e) {
StringWriter writer = new StringWriter();
PrintWriter printWriter = new PrintWriter(writer);
e.printStackTrace(printWriter);
printWriter.flush();
String stackTrace = writer.toString();
Error error = new Error(e.toString(), stackTrace);
String result = JsonUtils.toJSONString(error);
log.error("admin server channelRead error", e);
ctx.writeAndFlush(
getHttpResponse(Objects.requireNonNull(result).getBytes(Constants.DEFAULT_CHARSET), ctx, HttpHeaderValues.APPLICATION_JSON,
getHttpResponse(Objects.requireNonNull(e.getMessage()).getBytes(Constants.DEFAULT_CHARSET),
ctx, HttpHeaderValues.APPLICATION_JSON,
HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.util;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;

public class HttpRequestUtil {

private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false);

public static <T> Map<String, Object> parseHttpRequestBody(final HttpRequest httpRequest, @Nullable Supplier<T> start, @Nullable Consumer<T> end)
throws IOException {
T t = null;
if (!Objects.isNull(start)) {
t = start.get();
}
final Map<String, Object> httpRequestBody = new HashMap<>();
if (io.netty.handler.codec.http.HttpMethod.GET.equals(httpRequest.method())) {
new QueryStringDecoder(httpRequest.uri())
.parameters()
.forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) {
decodeHttpRequestBody(httpRequest, httpRequestBody);
}
if (Objects.isNull(t)) {
end.accept(t);
}
return httpRequestBody;
}

private static void decodeHttpRequestBody(HttpRequest httpRequest, Map<String, Object> httpRequestBody) throws IOException {
final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) {
if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) {
final Attribute data = (Attribute) param;
httpRequestBody.put(data.getName(), data.getValue());
}
}
decoder.destroy();
}

}

0 comments on commit 5e8b938

Please sign in to comment.