Skip to content

Commit

Permalink
resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Feb 5, 2024
1 parent 5746137 commit 7ebdf35
Show file tree
Hide file tree
Showing 26 changed files with 81 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ private void injectHttpRequestHeader(final ChannelHandlerContext ctx, final Http

private void processHttpCommandRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) {
final HttpCommand request = asyncContext.getRequest();

final HttpRequestProcessor choosed = httpRequestProcessorTable.get(request.getRequestCode());
Runnable runnable = () -> {
try {
Expand Down Expand Up @@ -427,6 +426,7 @@ private void processHttpCommandRequest(final ChannelHandlerContext ctx, final As
if (!asyncContext.isComplete()) {
return;
}

log.debug("{}", asyncContext.getResponse());
metrics.getSummaryMetrics()
.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

import lombok.extern.slf4j.Slf4j;


/**
* Add multiple managers to the underlying server
*/
Expand All @@ -83,22 +84,16 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {

private final Acl acl;
private final EventBus eventBus = new EventBus();

private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
private ConsumerManager consumerManager;
private ProducerManager producerManager;
private SubscriptionManager subscriptionManager;

private FilterEngine filterEngine;

private TransformerEngine transformerEngine;

private HttpRetryer httpRetryer;

private transient RateLimiter msgRateLimiter;
private transient RateLimiter batchRateLimiter;

private final transient HTTPClientPool httpClientPool = new HTTPClientPool(10);

public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {

super(eventMeshHttpConfiguration.getHttpServerPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;

import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;

import lombok.RequiredArgsConstructor;


@RequiredArgsConstructor
public class AdminMetricsProcessor extends AbstractHttpRequestProcessor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
Expand All @@ -27,6 +26,8 @@
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.util.RemotingHelper;

import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,6 +58,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

@Override
public Executor executor() {
return (Runnable runnable)-> {runnable.run();};
return (Runnable runnable) -> {
runnable.run();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -49,6 +48,7 @@
import org.apache.commons.lang3.StringUtils;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand All @@ -37,6 +36,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand All @@ -39,6 +38,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.enums.ConnectionType;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -43,6 +42,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody;
Expand Down Expand Up @@ -46,6 +45,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.Executor;

/**
* http processor
Expand All @@ -31,8 +32,9 @@ public interface HttpProcessor {
HttpResponse handler(HttpRequest httpRequest);

/**
*
* @return {@link Executor}
*/
default Executor executor() {return null;}
default Executor executor() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -41,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -186,7 +186,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final

@Override
public String[] paths() {
return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
return new String[] {RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
Expand Down Expand Up @@ -49,6 +48,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -194,6 +194,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
} catch (Exception e) {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);

handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
responseBodyMap, null);
}
Expand All @@ -213,8 +214,10 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
.removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {

log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);

handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
responseBodyMap, null);
}
Expand All @@ -227,7 +230,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final

@Override
public String[] paths() {
return new String[]{RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
return new String[] {RequestURI.UNSUBSCRIBE_LOCAL.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand All @@ -34,6 +33,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -44,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -218,7 +218,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest

@Override
public String[] paths() {
return new String[]{RequestURI.SUBSCRIBE_REMOTE.getRequestURI()};
return new String[] {RequestURI.SUBSCRIBE_REMOTE.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
Expand All @@ -44,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -185,7 +185,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest

@Override
public String[] paths() {
return new String[]{RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()};
return new String[] {RequestURI.UNSUBSCRIBE_REMOTE.getRequestURI()};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
Expand Down Expand Up @@ -48,6 +47,7 @@
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.concurrent.Executor;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.AclException;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
Expand Down Expand Up @@ -309,7 +309,7 @@ public void onException(final OnExceptionContext context) {

@Override
public String[] paths() {
return new String[]{RequestURI.PUBLISH.getRequestURI()};
return new String[] {RequestURI.PUBLISH.getRequestURI()};
}

@Override
Expand Down
Loading

0 comments on commit 7ebdf35

Please sign in to comment.