Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A trace component is used to trace mycat execution deeply(窥探mycat异步执行的神器). #1335

Open
wants to merge 10 commits into
base: 1.5
Choose a base branch
from
30 changes: 30 additions & 0 deletions Changelog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Change log

@version Mycat-1.5.1

@since 2017-01-15
@author little-pan

2017-01-20
--------------------------------------------------
+ Supports single SQL comment statments such as:
# update company set description=description where id=1;
--
-- select 1;
--
/***/
/**/
/*select 1; */
* some description issues.

2017-01-15
--------------------------------------------------
* ServerConnection {get|set}Session2() changed to {} {get|set}Session().
+ Trace buffer allocate & recycle.
+ Trace Frontend connection & MySQL backend connection, includes create, take,
release and close.
+ Trace tx execution such as start, commit and rollback.
* fixbug one rollback multi-response and rollback not over(response handler handed over
RollbackReleaseHandler is incorrect) issues in RollbackNodeHandler, and remove the class
RollbackNodeHandler for success or fail(connection broken only) always correct after
calling backend rollback() in RollbackReleaseHandler.
2 changes: 1 addition & 1 deletion src/main/java/demo/catlets/BatchInsertSequence.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void processSQL(String sql, EngineCtx ctx) {
return;
}

sc.getSession2().execute(rrs, sqltype);//将路由好的数据执行入库
sc.getSession().execute(rrs, sqltype);//将路由好的数据执行入库

} catch (Exception e) {
LOGGER.error("BatchInsertSequence.processSQL(String sql, EngineCtx ctx)",e);
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/opencloudb/MycatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
package org.opencloudb;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.NetworkChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -114,7 +116,13 @@ public void setSocketParams(AbstractConnection con, boolean isFrontChannel)
con.setPacketHeaderSize(system.getPacketHeaderSize());
con.setIdleTimeout(system.getIdleTimeout());
con.setCharset(system.getCharset());

// set local-port
// @since 2017-01-12 little-pan
final SocketAddress addr = channel.getLocalAddress();
if(addr instanceof InetSocketAddress){
final InetSocketAddress inetAddr = (InetSocketAddress)addr;
con.setLocalPort(inetAddr.getPort());
}
}

public Map<String, UserConfig> getUsers() {
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/org/opencloudb/MycatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.Map;
import java.util.Properties;
Expand All @@ -38,12 +39,12 @@

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import org.apache.log4j.Logger;
import org.opencloudb.backend.PhysicalDBPool;
import org.opencloudb.buffer.BufferPool;
import org.opencloudb.cache.CacheService;
import org.opencloudb.classloader.DynaClassLoader;
import org.opencloudb.config.ZkConfig;
import org.opencloudb.config.model.SystemConfig;
import org.opencloudb.interceptor.SQLInterceptor;
import org.opencloudb.manager.ManagerConnectionFactory;
Expand Down Expand Up @@ -183,8 +184,15 @@ public MycatConfig getConfig() {
}

public void beforeStart() {
String home = SystemConfig.getHomePath();
Log4jInitializer.configureAndWatch(home + "/conf/log4j.xml", LOG_WATCH_DELAY);
// always load log4j.xml from classpath for development & deployment, otherwise can
//lead to can't find the ${MYCAT_HOME}/conf/log4j.xml config file in common development!
// @since 2017-01-13 pzp
final URL config = SystemConfig.class.getClassLoader().getResource("log4j.xml");
if(config == null){
System.err.println("log4j.xml not in classpath");
return;
}
Log4jInitializer.configureAndWatch(config.getFile(), LOG_WATCH_DELAY);

//ZkConfig.instance().initZk();
}
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/org/opencloudb/MycatStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,28 @@
*/
public final class MycatStartup {
private static final String dateFormat = "yyyy-MM-dd HH:mm:ss";

static{
// init home path which no specified on bootstrap such as common development,
//before log4j first start, otherwise can lead to create another mycat.log file
//in the "/logs" directory!
// @since 2017-01-13 pzp
SystemConfig.getHomePath();
}

public static void main(String[] args) {
//是否启用zk配置,/myid.properties中的loadZk属性决定,默认不启用,从本地xml文件中读取配置
ZkConfig.instance().initZk();

try {
String home = SystemConfig.getHomePath();
final String home = SystemConfig.getHomePath();
if (home == null) {
System.out.println(SystemConfig.SYS_HOME + " is not set.");
System.out.println(SystemConfig.SYS_HOME + " is not set.");
System.exit(-1);
}
// Very important for log4j.xml
// @since 2017-01-13 little-pan
System.out.println("MyCAT Server home path " + home);
// init
MycatServer server = MycatServer.getInstance();
server.beforeStart();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/opencloudb/backend/ConQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.opencloudb.trace.Tracer;

public class ConQueue {
private final ConcurrentLinkedQueue<BackendConnection> autoCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private final ConcurrentLinkedQueue<BackendConnection> manCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
Expand All @@ -22,8 +24,10 @@ public BackendConnection takeIdleCon(boolean autoCommit) {
con = f2.poll();
}
if (con == null || con.isClosedOrQuit()) {
Tracer.trace(con, "no idle cnxn");
return null;
} else {
Tracer.trace(con, "take idle cnxn: %s", con);
return con;
}

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/org/opencloudb/backend/PhysicalDBPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opencloudb.heartbeat.DBHeartbeat;
import org.opencloudb.mysql.nio.handler.GetConnectionHandler;
import org.opencloudb.mysql.nio.handler.ResponseHandler;
import org.opencloudb.trace.Tracer;

public class PhysicalDBPool {

Expand Down Expand Up @@ -291,7 +292,7 @@ private String getMessage(int index, String info) {
private boolean initSource(int index, PhysicalDatasource ds) {
int initSize = ds.getConfig().getMinCon();

LOGGER.info("init backend myqsl source ,create connections total " + initSize + " for " + ds.getName() + " index :" + index);
LOGGER.info("init backend mysql source ,create connections total " + initSize + " for " + ds.getName() + " index :" + index);

CopyOnWriteArrayList<BackendConnection> list = new CopyOnWriteArrayList<BackendConnection>();
GetConnectionHandler getConHandler = new GetConnectionHandler(list, initSize);
Expand Down Expand Up @@ -324,18 +325,18 @@ private boolean initSource(int index, PhysicalDatasource ds) {
}

public void doHeartbeat() {


if (writeSources == null || writeSources.length == 0) {
Tracer.trace(hostName, "no writable dataSource: "
+ "initSuccess = %s, activedIndex = %d, banlance = %s, writeType = %s",
initSuccess, activedIndex, banlance, writeType);
return;
}

for (PhysicalDatasource source : this.allDs) {

for (final PhysicalDatasource source : this.allDs) {
if (source != null) {
source.doHeartbeat();
} else {
StringBuilder s = new StringBuilder();
final StringBuilder s = new StringBuilder();
s.append(Alarms.DEFAULT).append(hostName).append(" current dataSource is null!");
LOGGER.error(s.toString());
}
Expand Down
32 changes: 24 additions & 8 deletions src/main/java/org/opencloudb/backend/PhysicalDatasource.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opencloudb.mysql.nio.handler.DelegateResponseHandler;
import org.opencloudb.mysql.nio.handler.NewConnectionRespHandler;
import org.opencloudb.mysql.nio.handler.ResponseHandler;
import org.opencloudb.trace.Tracer;
import org.opencloudb.util.TimeUtil;

public abstract class PhysicalDatasource {
Expand Down Expand Up @@ -198,7 +199,8 @@ public boolean isSalveOrRead(){
}

public void heatBeatCheck(long timeout, long conHeartBeatPeriod) {
int ildeCloseCount = hostConfig.getMinCon() * 3;
// not used so comment it.
//int ildeCloseCount = hostConfig.getMinCon() * 3;
int maxConsInOneCheck = 5;
LinkedList<BackendConnection> heartBeatCons = new LinkedList<BackendConnection>();

Expand Down Expand Up @@ -304,7 +306,11 @@ public void stopHeartbeat() {

public void doHeartbeat() {
// 未到预定恢复时间,不执行心跳检测。
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
final long current = TimeUtil.currentTimeMillis(),
recovery = heartbeatRecoveryTime;
if (current < recovery) {
Tracer.trace(name,
"heartbeat skip: current(%d) < recoveryTime(%d)", current, recovery);
return;
}
if (!heartbeat.isStop()) {
Expand All @@ -313,7 +319,10 @@ public void doHeartbeat() {
} catch (Exception e) {
LOGGER.error(name + " heartbeat error.", e);
}
return;
}

Tracer.trace(name, "heartbeat skip: heartbeat stopped, current = %d", current);
}

private BackendConnection takeCon(BackendConnection conn,
Expand All @@ -330,6 +339,8 @@ private BackendConnection takeCon(BackendConnection conn,
conn.setAttachment(attachment);
conn.setLastTime(System.currentTimeMillis()); //每次取连接的时候,更新下lasttime,防止在前端连接检查的时候,关闭连接,导致sql执行失败
handler.connectionAcquired(conn);

Tracer.trace(conn, "take cnxn ok: %s", conn);
return conn;
}

Expand All @@ -356,6 +367,8 @@ public void connectionAcquired(BackendConnection conn) {
}
}
});

Tracer.trace(name, "submit cnxn creation: schema = %s, response-handler = %s", schema, handler);
}

public void getConnection(String schema,boolean autocommit, final ResponseHandler handler,
Expand All @@ -365,18 +378,16 @@ public void getConnection(String schema,boolean autocommit, final ResponseHandle
takeCon(con, handler, attachment, schema);
return;
} else {
int activeCons = this.getActiveCount();//当前最大活动连接
if(activeCons+1>size){//下一个连接大于最大连接数
final int activeCons = this.getActiveCount();//当前最大活动连接
if(activeCons + 1 > size){//下一个连接大于最大连接数
LOGGER.error("the max activeConnnections size can not be max than maxconnections");
throw new IOException("the max activeConnnections size can not be max than maxconnections");
}else{ // create connection
LOGGER.info("not ilde connection in pool,create new connection for " + this.name
LOGGER.info("no ilde connection in pool, create new connection for " + this.name
+ " of schema "+schema);
createNewConnection(handler, attachment, schema);
}

}

}

private void returnCon(BackendConnection c) {
Expand All @@ -391,11 +402,14 @@ private void returnCon(BackendConnection c) {
} else {
ok = queue.getManCommitCons().offer(c);
}

if (!ok) {

LOGGER.warn("can't return to pool ,so close con " + c);
c.close("can't return to pool ");
return;
}

Tracer.trace(c, "release ok: cnxn = %s", c);
}

public void releaseChannel(BackendConnection c) {
Expand All @@ -407,6 +421,8 @@ public void releaseChannel(BackendConnection c) {
}

public void connectionClosed(BackendConnection conn) {
Tracer.trace(conn);

ConQueue queue = this.conMap.getSchemaConQueue(conn.getSchema());
if (queue != null) {
queue.removeCon(conn);
Expand Down
25 changes: 17 additions & 8 deletions src/main/java/org/opencloudb/buffer/BufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.log4j.Logger;
import org.opencloudb.trace.Tracer;

/**
* @author mycat
Expand Down Expand Up @@ -111,13 +112,16 @@ public ByteBuffer allocate() {
LOGGER.warn("Direct buffer OOM occurs: so allocate from heap", oom);
node = this.createTempBuffer(chunkSize);
}
return node;
}
Tracer.trace(node);
return node;
}

private boolean checkValidBuffer(ByteBuffer buffer) {
// 拒绝回收null和容量大于chunkSize的缓存
if (buffer == null || !buffer.isDirect()) {
Tracer.trace(buffer, "no recycle need");
return false;
} else if (buffer.capacity() > chunkSize) {
LOGGER.warn("cant' recycle a buffer large than my pool chunksize "
Expand All @@ -138,17 +142,18 @@ public void recycle(ByteBuffer buffer) {
BufferQueue localQueue = localBufferPool.get();
if (localQueue.snapshotSize() < threadLocalCount) {
localQueue.put(buffer);
} else {
// recyle 3/4 thread local buffer
items.addAll(localQueue.removeItems(threadLocalCount * 3 / 4));
items.offer(buffer);
sharedOptsCount++;
return;
}
// recyle 3/4 thread local buffer
items.addAll(localQueue.removeItems(threadLocalCount * 3 / 4));
items.offer(buffer);
sharedOptsCount++;
} else {
sharedOptsCount++;
items.offer(buffer);
}


Tracer.trace(buffer);
}

public int getAvgBufSize() {
Expand All @@ -172,12 +177,16 @@ public boolean testIfDuplicate(ByteBuffer buffer) {
}

private ByteBuffer createTempBuffer(int size) {
return ByteBuffer.allocate(size);
final ByteBuffer buffer = ByteBuffer.allocate(size);
Tracer.trace(buffer);
return buffer;
}

private ByteBuffer createDirectBuffer(int size) {
// for performance
return ByteBuffer.allocateDirect(size);
final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
Tracer.trace(buffer);
return buffer;
}

public ByteBuffer allocate(int size) {
Expand Down
Loading