diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/HttpHandlerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/HttpHandlerManager.java index a3b8787ac0..1aada72b63 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/HttpHandlerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/HttpHandlerManager.java @@ -18,40 +18,14 @@ package org.apache.eventmesh.runtime.admin.controller; import org.apache.eventmesh.runtime.common.EventHttpHandler; -import org.apache.eventmesh.runtime.util.HttpResponseUtils; -import org.apache.eventmesh.runtime.util.Utils; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.URI; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; - -import com.sun.net.httpserver.Headers; -import com.sun.net.httpserver.HttpContext; -import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpPrincipal; import lombok.extern.slf4j.Slf4j; @@ -84,165 +58,9 @@ public void register() { }); } - public void exec(ChannelHandlerContext ctx, HttpRequest httpRequest) { - String uriStr = httpRequest.uri(); - URI uri = URI.create(uriStr); - HttpHandler httpHandler = httpHandlerMap.get(uri.getPath()); - if (httpHandler != null) { - try { - HttpHandlerManager.AdminHttpExchange adminHttpExchange = new HttpHandlerManager.AdminHttpExchange(ctx, httpRequest); - httpHandler.handle(adminHttpExchange); - adminHttpExchange.writeAndFlash(); - return; - } catch (Exception e) { - log.error(e.getMessage(), e); - ctx.writeAndFlush(HttpResponseUtils.createInternalServerError()).addListener(ChannelFutureListener.CLOSE); - } - } else { - ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE); - } + public Optional getHttpHandler(String path) { + return Optional.ofNullable(httpHandlerMap.get(path)); } - class AdminHttpExchange extends HttpExchange { - - - ChannelHandlerContext ctx; - Optional httpRequest; - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - Map responseCode = new HashMap<>(); - - Headers responseHeader = new Headers(); - - public AdminHttpExchange(ChannelHandlerContext ctx, HttpRequest httpRequest) { - this.ctx = ctx; - if (httpRequest instanceof FullHttpRequest) { - this.httpRequest = Optional.ofNullable((FullHttpRequest) httpRequest); - } - } - - @Override - public Headers getRequestHeaders() { - Headers headers = new Headers(); - httpRequest.ifPresent(e -> { - final Map headerMap = Utils.parseHttpHeader(e); - headerMap.putAll(headerMap); - }); - - return headers; - } - - @Override - public Headers getResponseHeaders() { - return responseHeader; - } - - @Override - public URI getRequestURI() { - if (httpRequest.isPresent()) { - return URI.create(httpRequest.get().uri()); - } - return null; - } - - @Override - public String getRequestMethod() { - if (httpRequest.isPresent()) { - return httpRequest.get().method().name(); - } - return null; - } - - @Override - public HttpContext getHttpContext() { - return null; - } - - @Override - public void close() { - - } - - @Override - public InputStream getRequestBody() { - if (httpRequest.isPresent()) { - ByteBuf content = httpRequest.get().content(); - byte[] bytes = new byte[content.readableBytes()]; - try { - content.readBytes(bytes); - } finally { - content.release(); - } - return new ByteArrayInputStream(bytes); - } - return new ByteArrayInputStream(new byte[0]); - } - - @Override - public OutputStream getResponseBody() { - return outputStream; - } - - @Override - public void sendResponseHeaders(int i, long l) throws IOException { - responseCode.put(i, l); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - - @Override - public int getResponseCode() { - Set> entries = responseCode.entrySet(); - Optional> first = entries.stream().findFirst(); - return first.get().getKey(); - } - - @Override - public InetSocketAddress getLocalAddress() { - return null; - } - - @Override - public String getProtocol() { - return null; - } - - @Override - public Object getAttribute(String s) { - return null; - } - - @Override - public void setAttribute(String s, Object o) { - - } - - @Override - public void setStreams(InputStream inputStream, OutputStream outputStream) { - - } - - @Override - public HttpPrincipal getPrincipal() { - return null; - } - - public void writeAndFlash() { - byte[] bytes = outputStream.toByteArray(); - Headers responseHeaders = getResponseHeaders(); - - DefaultFullHttpResponse defaultFullHttpResponse = - new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(getResponseCode()), - Unpooled.copiedBuffer(bytes)); - responseHeaders.entrySet().stream().forEach(e -> { - defaultFullHttpResponse.headers().add(e.getKey(), e.getValue()); - }); - ctx.writeAndFlush(defaultFullHttpResponse).addListener(ChannelFutureListener.CLOSE); - } - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java index 1080446295..a1d0f360e1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAdminServer.java @@ -19,8 +19,26 @@ import org.apache.eventmesh.runtime.admin.controller.HttpHandlerManager; import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; +import org.apache.eventmesh.runtime.util.HttpResponseUtils; +import org.apache.eventmesh.runtime.util.Utils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; @@ -29,10 +47,20 @@ import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpContext; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpPrincipal; import lombok.extern.slf4j.Slf4j; @@ -86,6 +114,167 @@ public void start() throws Exception { started.compareAndSet(false, true); } + public void parseHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) { + String uriStr = httpRequest.uri(); + URI uri = URI.create(uriStr); + Optional httpHandlerOpt = httpHandlerManager.getHttpHandler(uri.getPath()); + if (httpHandlerOpt.isPresent()) { + try { + AdminHttpExchange adminHttpExchange = new AdminHttpExchange(ctx, httpRequest); + httpHandlerOpt.get().handle(adminHttpExchange); + adminHttpExchange.writeAndFlash(); + return; + } catch (Exception e) { + log.error(e.getMessage(), e); + ctx.writeAndFlush(HttpResponseUtils.createInternalServerError()).addListener(ChannelFutureListener.CLOSE); + } + } else { + ctx.writeAndFlush(HttpResponseUtils.createNotFound()).addListener(ChannelFutureListener.CLOSE); + } + } + + class AdminHttpExchange extends HttpExchange { + + + ChannelHandlerContext ctx; + Optional httpRequest; + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + Map responseCode = new HashMap<>(); + + Headers responseHeader = new Headers(); + + public AdminHttpExchange(ChannelHandlerContext ctx, HttpRequest httpRequest) { + this.ctx = ctx; + if (httpRequest instanceof FullHttpRequest) { + this.httpRequest = Optional.ofNullable((FullHttpRequest) httpRequest); + } + } + + @Override + public Headers getRequestHeaders() { + Headers headers = new Headers(); + httpRequest.ifPresent(e -> { + final Map headerMap = Utils.parseHttpHeader(e); + headerMap.putAll(headerMap); + }); + + return headers; + } + + @Override + public Headers getResponseHeaders() { + return responseHeader; + } + + @Override + public URI getRequestURI() { + if (httpRequest.isPresent()) { + return URI.create(httpRequest.get().uri()); + } + return null; + } + + @Override + public String getRequestMethod() { + if (httpRequest.isPresent()) { + return httpRequest.get().method().name(); + } + return null; + } + + @Override + public HttpContext getHttpContext() { + return null; + } + + @Override + public void close() { + + } + + @Override + public InputStream getRequestBody() { + if (httpRequest.isPresent()) { + ByteBuf content = httpRequest.get().content(); + byte[] bytes = new byte[content.readableBytes()]; + try { + content.readBytes(bytes); + } finally { + content.release(); + } + return new ByteArrayInputStream(bytes); + } + return new ByteArrayInputStream(new byte[0]); + } + + @Override + public OutputStream getResponseBody() { + return outputStream; + } + + @Override + public void sendResponseHeaders(int i, long l) throws IOException { + responseCode.put(i, l); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return null; + } + + @Override + public int getResponseCode() { + Set> entries = responseCode.entrySet(); + Optional> first = entries.stream().findFirst(); + return first.get().getKey(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return null; + } + + @Override + public String getProtocol() { + return null; + } + + @Override + public Object getAttribute(String s) { + return null; + } + + @Override + public void setAttribute(String s, Object o) { + + } + + @Override + public void setStreams(InputStream inputStream, OutputStream outputStream) { + + } + + @Override + public HttpPrincipal getPrincipal() { + return null; + } + + public void writeAndFlash() { + byte[] bytes = outputStream.toByteArray(); + Headers responseHeaders = getResponseHeaders(); + + DefaultFullHttpResponse defaultFullHttpResponse = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(getResponseCode()), + Unpooled.copiedBuffer(bytes)); + responseHeaders.entrySet().stream().forEach(e -> { + defaultFullHttpResponse.headers().add(e.getKey(), e.getValue()); + }); + ctx.writeAndFlush(defaultFullHttpResponse).addListener(ChannelFutureListener.CLOSE); + } + } + private class AdminServerInitializer extends ChannelInitializer { HttpHandlerManager httpHandlerManager; @@ -103,26 +292,20 @@ protected void initChannel(final SocketChannel channel) { new HttpResponseEncoder(), httpConnectionHandler, new HttpObjectAggregator(Integer.MAX_VALUE), - new AdminServerHandler(httpHandlerManager)); + new AdminServerHandler()); } } private class AdminServerHandler extends ChannelInboundHandlerAdapter { - HttpHandlerManager httpHandlerManager; - - public AdminServerHandler(HttpHandlerManager httpHandlerManager) { - this.httpHandlerManager = httpHandlerManager; - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof HttpRequest)) { return; } HttpRequest httpRequest = (HttpRequest) msg; - httpHandlerManager.exec(ctx, httpRequest); + parseHttpRequest(ctx, httpRequest); } }