Skip to content

Commit

Permalink
Merge pull request #960 from lrhkobe/trace_improve_2
Browse files Browse the repository at this point in the history
[ISSUE #956] add trace for tcp protocol in eventmesh-runtime
close #956
  • Loading branch information
xwm1992 authored Jun 24, 2022
2 parents 21264de + db3bd54 commit 2cace5f
Show file tree
Hide file tree
Showing 12 changed files with 851 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.eventmesh.runtime.connector.ConnectorResource;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.registry.Registry;
import org.apache.eventmesh.runtime.trace.Trace;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,6 +51,8 @@ public class EventMeshServer {

private Registry registry;

private static Trace trace;

private ConnectorResource connectorResource;

private ServiceState serviceState;
Expand All @@ -62,6 +65,7 @@ public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration;
this.acl = new Acl();
this.registry = new Registry();
this.trace = new Trace(eventMeshHttpConfiguration.eventMeshServerTraceEnable);
this.connectorResource = new ConnectorResource();

ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTcpConfiguration);
Expand All @@ -70,7 +74,6 @@ public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
}

public void init() throws Exception {

if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
}
Expand All @@ -90,6 +93,10 @@ public void init() throws Exception {
registry.init(eventMeshHttpConfiguration.eventMeshRegistryPluginType);
}

if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) {
trace.init(eventMeshHttpConfiguration.eventMeshTracePluginType);
}

connectorResource.init(eventMeshHttpConfiguration.eventMeshConnectorPluginType);

// server init
Expand Down Expand Up @@ -167,6 +174,11 @@ public void shutdown() throws Exception {
acl.shutdown();
}

if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) {
trace.shutdown();
}


ConfigurationContextUtil.clear();
serviceState = ServiceState.STOPED;
logger.info("server state:{}", serviceState);
Expand All @@ -184,6 +196,10 @@ public EventMeshTCPServer getEventMeshTCPServer() {
return eventMeshTCPServer;
}

public static Trace getTrace() {
return trace;
}

public ServiceState getServiceState() {
return serviceState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public class EventMeshConstants {
public static final String RSP_SEND_EVENTMESH_IP = "rspsendeventmeship";
public static final String RSP_RECEIVE_EVENTMESH_IP = "rspreceiveeventmeship";

public static final String RSP_SYS = "rsp0sys";
public static final String RSP_IP = "rsp0ip";
public static final String RSP_IDC = "rsp0idc";
public static final String RSP_GROUP = "rsp0group";
public static final String RSP_URL = "rsp0url";

public static final String REQ_SYS = "req0sys";
public static final String REQ_IP = "req0ip";
public static final String REQ_IDC = "req0idc";
public static final String REQ_GROUP = "req0group";

//default TTL 4 hours
public static final Integer DEFAULT_MSG_TTL_MILLS = 14400000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.GoodbyeTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.HeartBeatTask;
Expand All @@ -33,13 +35,18 @@
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.RecommendTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.SubscribeTask;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.UnSubscribeTask;
import org.apache.eventmesh.runtime.trace.TraceUtils;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.opentelemetry.api.trace.Span;

public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<Package> {

Expand All @@ -55,11 +62,27 @@ public EventMeshTcpMessageDispatcher(EventMeshTCPServer eventMeshTCPServer) {
protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Exception {
long startTime = System.currentTimeMillis();
validateMsg(pkg);
eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics().getClient2eventMeshMsgNum().incrementAndGet();
Command cmd = null;

eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics()
.getClient2eventMeshMsgNum().incrementAndGet();

Command cmd = pkg.getHeader().getCmd();
try {
Runnable task;
cmd = pkg.getHeader().getCmd();

if (isNeedTrace(cmd)) {
pkg.getHeader().getProperties()
.put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp);
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);

pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem());
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IP, session.getClient().getHost());
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IDC, session.getClient().getIdc());
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup());
}

if (cmd.equals(Command.RECOMMEND_REQUEST)) {
messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
Expand All @@ -80,22 +103,43 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep

logMessageFlow(ctx, pkg, cmd);

if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getSessionState() == SessionState.CLOSED) {
throw new Exception("this eventMesh tcp session will be closed, may be reboot or version change!");
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx)
.getSessionState() == SessionState.CLOSED) {
throw new Exception(
"this eventMesh tcp session will be closed, may be reboot or version change!");
}

dispatch(ctx, pkg, startTime, cmd);
} catch (Exception e) {
logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);

if (isNeedTrace(cmd)) {
Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
TimeUnit.MILLISECONDS, false);
TraceUtils.finishSpanWithException(span, pkg.getHeader().getProperties(),
"exception occurred while dispatch pkg", e);
}

writeToClient(cmd, pkg, ctx, e);
}
}

private boolean isNeedTrace(Command cmd) {
if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable
&& cmd != null && (Command.REQUEST_TO_SERVER == cmd
|| Command.ASYNC_MESSAGE_TO_SERVER == cmd
|| Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
return true;
}
return false;
}

private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e) {
try {
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
.getSeq()));
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
pkg.getHeader().getSeq()));
ctx.writeAndFlush(res);
} catch (Exception ex) {
logger.warn("writeToClient failed", ex);
Expand Down Expand Up @@ -131,11 +175,12 @@ private Command getReplyCommand(Command cmd) {

private void logMessageFlow(ChannelHandlerContext ctx, Package pkg, Command cmd) {
if (pkg.getBody() instanceof EventMeshMessage) {
messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd, EventMeshUtil.printMqMessage((EventMeshMessage) pkg
.getBody()), eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
} else {
messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
}
}

Expand All @@ -153,8 +198,8 @@ private void validateMsg(Package pkg) throws Exception {
}
}

private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd) throws
Exception {
private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd)
throws Exception {
Runnable task;
switch (cmd) {
case HEARTBEAT_REQUEST:
Expand Down
Loading

0 comments on commit 2cace5f

Please sign in to comment.