Skip to content

Commit

Permalink
use httpRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Apr 3, 2024
1 parent d383da0 commit 57d0c04
Show file tree
Hide file tree
Showing 27 changed files with 147 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import lombok.Data;

@Deprecated
@Data
public class HttpCommand implements ProtocolTransportObject {

Expand Down Expand Up @@ -69,14 +70,6 @@ public class HttpCommand implements ProtocolTransportObject {

public URI requestURI;

public URI getRequestURI() {
return requestURI;
}

public void setRequestURI(URI requestURI) {
this.requestURI = requestURI;
}

public CmdType cmdType = CmdType.REQ;

public HttpCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.enums.HttpMethod;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.util.HttpResponseUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -31,16 +31,24 @@
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.util.AsciiString;

import lombok.Data;

@Data
public abstract class AbstractHttpHandler implements org.apache.eventmesh.runtime.admin.handler.HttpHandler {

private static final DefaultHttpDataFactory DEFAULT_HTTP_DATA_FACTORY = new DefaultHttpDataFactory(false);

protected void write(ChannelHandlerContext ctx, byte[] result, AsciiString headerValue) {
ctx.writeAndFlush(HttpResponseUtils.getHttpResponse(result, ctx, headerValue)).addListener(ChannelFutureListener.CLOSE);
}
Expand Down Expand Up @@ -111,32 +119,57 @@ protected Map<String, String> queryToMap(String query) {
}

@Override
public void handle(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
switch (HttpMethod.valueOf(httpCommand.getHttpMethod())) {
public void handle(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
switch (HttpMethod.valueOf(httpRequest.method().name())) {
case OPTIONS:
preflight(ctx);
break;
case GET:
get(httpCommand, ctx);
get(httpRequest, ctx);
break;
case POST:
post(httpCommand, ctx);
post(httpRequest, ctx);
break;
case DELETE:
delete(httpCommand, ctx);
delete(httpRequest, ctx);
break;
default: // do nothing
break;
}
}

protected void post(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception{
protected void post(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception{
}

protected void delete(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception{
}

protected void delete(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception{
protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception{
}

protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception{
protected Map<String, Object> parseHttpRequestBody(final HttpRequest httpRequest) throws IOException {
final long bodyDecodeStart = System.currentTimeMillis();
final Map<String, Object> httpRequestBody = new HashMap<>();

if (io.netty.handler.codec.http.HttpMethod.GET.equals(httpRequest.method())) {
new QueryStringDecoder(httpRequest.uri())
.parameters()
.forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (io.netty.handler.codec.http.HttpMethod.POST.equals(httpRequest.method())) {
decodeHttpRequestBody(httpRequest, httpRequestBody);
}
return httpRequestBody;
}

private void decodeHttpRequestBody(HttpRequest httpRequest, Map<String, Object> httpRequestBody) throws IOException {
final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
for (final InterfaceHttpData param : decoder.getBodyHttpDatas()) {
if (InterfaceHttpData.HttpDataType.Attribute == param.getHttpDataType()) {
final Attribute data = (Attribute) param;
httpRequestBody.put(data.getName(), data.getValue());
}
}
decoder.destroy();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.admin.response.GetConfigurationResponse;
import org.apache.eventmesh.runtime.common.EventHttpHandler;
Expand All @@ -33,6 +32,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;

Expand Down Expand Up @@ -73,7 +73,7 @@ public ConfigurationHandler(
}

@Override
protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
HttpHeaders responseHeaders = new DefaultHttpHeaders();
responseHeaders.add(EventMeshConstants.CONTENT_TYPE, EventMeshConstants.APPLICATION_JSON);
responseHeaders.add(EventMeshConstants.HANDLER_ORIGIN, "*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.common.EventHttpHandler;
import org.apache.eventmesh.webhook.api.WebHookConfig;
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;

import java.util.Map;
import java.util.Objects;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -43,8 +43,7 @@
* {@linkplain org.apache.eventmesh.webhook.admin.FileWebHookConfigOperation
* #deleteWebHookConfig FileWebHookConfigOperation} method as implementation to delete the WebHook configuration file;
* <p>
* When {@code eventMesh.webHook.operationMode=nacos}, It calls the
* {@linkplain org.apache.eventmesh.webhook.admin.NacosWebHookConfigOperation
* When {@code eventMesh.webHook.operationMode=nacos}, It calls the {@linkplain org.apache.eventmesh.webhook.admin.NacosWebHookConfigOperation
* #deleteWebHookConfig NacosWebHookConfigOperation} method as implementation to delete the WebHook configuration from Nacos.
* <p>
* The {@linkplain org.apache.eventmesh.webhook.receive.storage.HookConfigOperationManager#deleteWebHookConfig HookConfigOperationManager} , another
Expand All @@ -70,11 +69,11 @@ public DeleteWebHookConfigHandler(WebHookConfigOperation operation) {
}

@Override
public void handle(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
Body body = httpCommand.getBody();
public void handle(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
Map<String, Object> body = parseHttpRequestBody(httpRequest);
Objects.requireNonNull(body, "body can not be null");
// Resolve to WebHookConfig
WebHookConfig webHookConfig = JsonUtils.mapToObject(body.toMap(), WebHookConfig.class);
WebHookConfig webHookConfig = JsonUtils.mapToObject(body, WebHookConfig.class);
// Delete the existing WebHookConfig
Integer code = operation.deleteWebHookConfig(webHookConfig); // operating result
String result = 1 == code ? "deleteWebHookConfig Succeed!" : "deleteWebHookConfig Failed!";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.common.EventHttpHandler;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,6 +35,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -74,10 +75,10 @@ public EventHandler(
}

@Override
protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
HttpHeaders responseHeaders = new DefaultHttpHeaders();
responseHeaders.add(EventMeshConstants.HANDLER_ORIGIN, "*");
String queryString = httpCommand.getRequestURI().getQuery();
String queryString = URI.create(httpRequest.uri()).getQuery();
if (queryString == null || "".equals(queryString)) {
write401(ctx);
return;
Expand All @@ -99,10 +100,10 @@ protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Ex
}

@Override
protected void post(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
protected void post(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
HttpHeaders responseHeaders = new DefaultHttpHeaders();
responseHeaders.add(EventMeshConstants.HANDLER_ORIGIN, "*");
String request = JsonUtils.toJSONString(httpCommand.getBody().toMap());
String request = JsonUtils.toJSONString(parseHttpRequestBody(httpRequest));
byte[] rawRequest = request.getBytes(StandardCharsets.UTF_8);
CloudEvent event = Objects.requireNonNull(EventFormatProvider
.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.admin.request.DeleteGrpcClientRequest;
import org.apache.eventmesh.runtime.admin.response.GetClientResponse;
Expand All @@ -39,6 +37,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;

Expand Down Expand Up @@ -71,10 +70,10 @@ public GrpcClientHandler(
}

@Override
protected void delete(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
Body body = httpCommand.getBody();
protected void delete(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
Map<String, Object> body = parseHttpRequestBody(httpRequest);
Objects.requireNonNull(body, "body can not be null");
DeleteGrpcClientRequest deleteGrpcClientRequest = JsonUtils.mapToObject(body.toMap(), DeleteGrpcClientRequest.class);
DeleteGrpcClientRequest deleteGrpcClientRequest = JsonUtils.mapToObject(body, DeleteGrpcClientRequest.class);
String url = Objects.requireNonNull(deleteGrpcClientRequest).getUrl();
ConsumerManager consumerManager = eventMeshGrpcServer.getConsumerManager();
Map<String, List<ConsumerGroupClient>> clientTable = consumerManager.getClientTable();
Expand All @@ -93,7 +92,7 @@ protected void delete(HttpCommand httpCommand, ChannelHandlerContext ctx) throws
}

@Override
protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
HttpHeaders responseHeaders = new DefaultHttpHeaders();
responseHeaders.add(EventMeshConstants.CONTENT_TYPE, EventMeshConstants.APPLICATION_JSON);
responseHeaders.add(EventMeshConstants.HANDLER_ORIGIN, "*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.admin.request.DeleteHTTPClientRequest;
import org.apache.eventmesh.runtime.admin.response.GetClientResponse;
Expand All @@ -31,12 +29,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;

Expand Down Expand Up @@ -68,10 +68,10 @@ public HTTPClientHandler(
this.eventMeshHTTPServer = eventMeshHTTPServer;
}

protected void delete(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
Body body = httpCommand.getBody();
protected void delete(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
Map<String, Object> body = parseHttpRequestBody(httpRequest);
if (!Objects.isNull(body)) {
DeleteHTTPClientRequest deleteHTTPClientRequest = JsonUtils.mapToObject(body.toMap(), DeleteHTTPClientRequest.class);
DeleteHTTPClientRequest deleteHTTPClientRequest = JsonUtils.mapToObject(body, DeleteHTTPClientRequest.class);
String url = Objects.requireNonNull(deleteHTTPClientRequest).getUrl();

for (List<Client> clientList : eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().values()) {
Expand All @@ -91,7 +91,7 @@ protected void delete(HttpCommand httpCommand, ChannelHandlerContext ctx) throws
*
* @throws Exception if an I/O error occurs while handling the request
*/
protected void get(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
protected void get(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
HttpHeaders responseHeaders = new DefaultHttpHeaders();
// Set the response headers
responseHeaders.add(EventMeshConstants.CONTENT_TYPE, EventMeshConstants.APPLICATION_JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.protocol.http.HttpCommand;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;

/**
* Admin HttpHandler
*/
public interface HttpHandler {

void handle(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception;
void handle(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.eventmesh.runtime.admin.handler;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.common.EventHttpHandler;
import org.apache.eventmesh.webhook.api.WebHookConfig;
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;

import java.util.Map;
import java.util.Objects;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -78,10 +78,10 @@ public InsertWebHookConfigHandler(WebHookConfigOperation operation) {
*/

@Override
public void handle(HttpCommand httpCommand, ChannelHandlerContext ctx) throws Exception {
Body body = httpCommand.getBody();
public void handle(HttpRequest httpRequest, ChannelHandlerContext ctx) throws Exception {
Map<String, Object> body = parseHttpRequestBody(httpRequest);
Objects.requireNonNull(body, "body can not be null");
WebHookConfig webHookConfig = JsonUtils.mapToObject(body.toMap(), WebHookConfig.class);
WebHookConfig webHookConfig = JsonUtils.mapToObject(body, WebHookConfig.class);
// Add the WebHookConfig if no existing duplicate configuration is found
Integer code = operation.insertWebHookConfig(webHookConfig); // operating result
String result = 1 == code ? "insertWebHookConfig Succeed!" : "insertWebHookConfig Failed!";
Expand Down
Loading

0 comments on commit 57d0c04

Please sign in to comment.