diff --git a/Changelog.txt b/Changelog.txt new file mode 100644 index 000000000..734b6711e --- /dev/null +++ b/Changelog.txt @@ -0,0 +1,30 @@ +Change log + +@version Mycat-1.5.1 + +@since 2017-01-15 +@author little-pan + +2017-01-20 +-------------------------------------------------- ++ Supports single SQL comment statments such as: +# update company set description=description where id=1; +-- +-- select 1; +-- +/***/ +/**/ +/*select 1; */ +* some description issues. + +2017-01-15 +-------------------------------------------------- +* ServerConnection {get|set}Session2() changed to {} {get|set}Session(). ++ Trace buffer allocate & recycle. ++ Trace Frontend connection & MySQL backend connection, includes create, take, + release and close. ++ Trace tx execution such as start, commit and rollback. +* fixbug one rollback multi-response and rollback not over(response handler handed over + RollbackReleaseHandler is incorrect) issues in RollbackNodeHandler, and remove the class + RollbackNodeHandler for success or fail(connection broken only) always correct after + calling backend rollback() in RollbackReleaseHandler. diff --git a/src/main/java/demo/catlets/BatchInsertSequence.java b/src/main/java/demo/catlets/BatchInsertSequence.java index a82eb487e..a9e4059c7 100644 --- a/src/main/java/demo/catlets/BatchInsertSequence.java +++ b/src/main/java/demo/catlets/BatchInsertSequence.java @@ -59,7 +59,7 @@ public void processSQL(String sql, EngineCtx ctx) { return; } - sc.getSession2().execute(rrs, sqltype);//将路由好的数据执行入库 + sc.getSession().execute(rrs, sqltype);//将路由好的数据执行入库 } catch (Exception e) { LOGGER.error("BatchInsertSequence.processSQL(String sql, EngineCtx ctx)",e); diff --git a/src/main/java/org/opencloudb/MycatConfig.java b/src/main/java/org/opencloudb/MycatConfig.java index 0620d67a3..8090066ff 100644 --- a/src/main/java/org/opencloudb/MycatConfig.java +++ b/src/main/java/org/opencloudb/MycatConfig.java @@ -24,6 +24,8 @@ package org.opencloudb; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.NetworkChannel; import java.util.ArrayList; @@ -114,7 +116,13 @@ public void setSocketParams(AbstractConnection con, boolean isFrontChannel) con.setPacketHeaderSize(system.getPacketHeaderSize()); con.setIdleTimeout(system.getIdleTimeout()); con.setCharset(system.getCharset()); - + // set local-port + // @since 2017-01-12 little-pan + final SocketAddress addr = channel.getLocalAddress(); + if(addr instanceof InetSocketAddress){ + final InetSocketAddress inetAddr = (InetSocketAddress)addr; + con.setLocalPort(inetAddr.getPort()); + } } public Map getUsers() { diff --git a/src/main/java/org/opencloudb/MycatServer.java b/src/main/java/org/opencloudb/MycatServer.java index ff099d502..d25377c06 100644 --- a/src/main/java/org/opencloudb/MycatServer.java +++ b/src/main/java/org/opencloudb/MycatServer.java @@ -27,6 +27,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URL; import java.nio.channels.AsynchronousChannelGroup; import java.util.Map; import java.util.Properties; @@ -38,12 +39,12 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; + import org.apache.log4j.Logger; import org.opencloudb.backend.PhysicalDBPool; import org.opencloudb.buffer.BufferPool; import org.opencloudb.cache.CacheService; import org.opencloudb.classloader.DynaClassLoader; -import org.opencloudb.config.ZkConfig; import org.opencloudb.config.model.SystemConfig; import org.opencloudb.interceptor.SQLInterceptor; import org.opencloudb.manager.ManagerConnectionFactory; @@ -183,8 +184,15 @@ public MycatConfig getConfig() { } public void beforeStart() { - String home = SystemConfig.getHomePath(); - Log4jInitializer.configureAndWatch(home + "/conf/log4j.xml", LOG_WATCH_DELAY); + // always load log4j.xml from classpath for development & deployment, otherwise can + //lead to can't find the ${MYCAT_HOME}/conf/log4j.xml config file in common development! + // @since 2017-01-13 pzp + final URL config = SystemConfig.class.getClassLoader().getResource("log4j.xml"); + if(config == null){ + System.err.println("log4j.xml not in classpath"); + return; + } + Log4jInitializer.configureAndWatch(config.getFile(), LOG_WATCH_DELAY); //ZkConfig.instance().initZk(); } diff --git a/src/main/java/org/opencloudb/MycatStartup.java b/src/main/java/org/opencloudb/MycatStartup.java index 9e17e638c..06f45bc5a 100644 --- a/src/main/java/org/opencloudb/MycatStartup.java +++ b/src/main/java/org/opencloudb/MycatStartup.java @@ -35,17 +35,28 @@ */ public final class MycatStartup { private static final String dateFormat = "yyyy-MM-dd HH:mm:ss"; + + static{ + // init home path which no specified on bootstrap such as common development, + //before log4j first start, otherwise can lead to create another mycat.log file + //in the "/logs" directory! + // @since 2017-01-13 pzp + SystemConfig.getHomePath(); + } public static void main(String[] args) { //是否启用zk配置,/myid.properties中的loadZk属性决定,默认不启用,从本地xml文件中读取配置 ZkConfig.instance().initZk(); try { - String home = SystemConfig.getHomePath(); + final String home = SystemConfig.getHomePath(); if (home == null) { - System.out.println(SystemConfig.SYS_HOME + " is not set."); + System.out.println(SystemConfig.SYS_HOME + " is not set."); System.exit(-1); } + // Very important for log4j.xml + // @since 2017-01-13 little-pan + System.out.println("MyCAT Server home path " + home); // init MycatServer server = MycatServer.getInstance(); server.beforeStart(); diff --git a/src/main/java/org/opencloudb/backend/ConQueue.java b/src/main/java/org/opencloudb/backend/ConQueue.java index 6ac21b45b..bbff1b05a 100644 --- a/src/main/java/org/opencloudb/backend/ConQueue.java +++ b/src/main/java/org/opencloudb/backend/ConQueue.java @@ -3,6 +3,8 @@ import java.util.ArrayList; import java.util.concurrent.ConcurrentLinkedQueue; +import org.opencloudb.trace.Tracer; + public class ConQueue { private final ConcurrentLinkedQueue autoCommitCons = new ConcurrentLinkedQueue(); private final ConcurrentLinkedQueue manCommitCons = new ConcurrentLinkedQueue(); @@ -22,8 +24,10 @@ public BackendConnection takeIdleCon(boolean autoCommit) { con = f2.poll(); } if (con == null || con.isClosedOrQuit()) { + Tracer.trace(con, "no idle cnxn"); return null; } else { + Tracer.trace(con, "take idle cnxn: %s", con); return con; } diff --git a/src/main/java/org/opencloudb/backend/PhysicalDBPool.java b/src/main/java/org/opencloudb/backend/PhysicalDBPool.java index 31ba5d66f..1017a0030 100644 --- a/src/main/java/org/opencloudb/backend/PhysicalDBPool.java +++ b/src/main/java/org/opencloudb/backend/PhysicalDBPool.java @@ -39,6 +39,7 @@ import org.opencloudb.heartbeat.DBHeartbeat; import org.opencloudb.mysql.nio.handler.GetConnectionHandler; import org.opencloudb.mysql.nio.handler.ResponseHandler; +import org.opencloudb.trace.Tracer; public class PhysicalDBPool { @@ -291,7 +292,7 @@ private String getMessage(int index, String info) { private boolean initSource(int index, PhysicalDatasource ds) { int initSize = ds.getConfig().getMinCon(); - LOGGER.info("init backend myqsl source ,create connections total " + initSize + " for " + ds.getName() + " index :" + index); + LOGGER.info("init backend mysql source ,create connections total " + initSize + " for " + ds.getName() + " index :" + index); CopyOnWriteArrayList list = new CopyOnWriteArrayList(); GetConnectionHandler getConHandler = new GetConnectionHandler(list, initSize); @@ -324,18 +325,18 @@ private boolean initSource(int index, PhysicalDatasource ds) { } public void doHeartbeat() { - - if (writeSources == null || writeSources.length == 0) { + Tracer.trace(hostName, "no writable dataSource: " + + "initSuccess = %s, activedIndex = %d, banlance = %s, writeType = %s", + initSuccess, activedIndex, banlance, writeType); return; } - for (PhysicalDatasource source : this.allDs) { - + for (final PhysicalDatasource source : this.allDs) { if (source != null) { source.doHeartbeat(); } else { - StringBuilder s = new StringBuilder(); + final StringBuilder s = new StringBuilder(); s.append(Alarms.DEFAULT).append(hostName).append(" current dataSource is null!"); LOGGER.error(s.toString()); } diff --git a/src/main/java/org/opencloudb/backend/PhysicalDatasource.java b/src/main/java/org/opencloudb/backend/PhysicalDatasource.java index 2310cac73..53a98b7d2 100644 --- a/src/main/java/org/opencloudb/backend/PhysicalDatasource.java +++ b/src/main/java/org/opencloudb/backend/PhysicalDatasource.java @@ -41,6 +41,7 @@ import org.opencloudb.mysql.nio.handler.DelegateResponseHandler; import org.opencloudb.mysql.nio.handler.NewConnectionRespHandler; import org.opencloudb.mysql.nio.handler.ResponseHandler; +import org.opencloudb.trace.Tracer; import org.opencloudb.util.TimeUtil; public abstract class PhysicalDatasource { @@ -198,7 +199,8 @@ public boolean isSalveOrRead(){ } public void heatBeatCheck(long timeout, long conHeartBeatPeriod) { - int ildeCloseCount = hostConfig.getMinCon() * 3; + // not used so comment it. + //int ildeCloseCount = hostConfig.getMinCon() * 3; int maxConsInOneCheck = 5; LinkedList heartBeatCons = new LinkedList(); @@ -304,7 +306,11 @@ public void stopHeartbeat() { public void doHeartbeat() { // 未到预定恢复时间,不执行心跳检测。 - if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) { + final long current = TimeUtil.currentTimeMillis(), + recovery = heartbeatRecoveryTime; + if (current < recovery) { + Tracer.trace(name, + "heartbeat skip: current(%d) < recoveryTime(%d)", current, recovery); return; } if (!heartbeat.isStop()) { @@ -313,7 +319,10 @@ public void doHeartbeat() { } catch (Exception e) { LOGGER.error(name + " heartbeat error.", e); } + return; } + + Tracer.trace(name, "heartbeat skip: heartbeat stopped, current = %d", current); } private BackendConnection takeCon(BackendConnection conn, @@ -330,6 +339,8 @@ private BackendConnection takeCon(BackendConnection conn, conn.setAttachment(attachment); conn.setLastTime(System.currentTimeMillis()); //每次取连接的时候,更新下lasttime,防止在前端连接检查的时候,关闭连接,导致sql执行失败 handler.connectionAcquired(conn); + + Tracer.trace(conn, "take cnxn ok: %s", conn); return conn; } @@ -356,6 +367,8 @@ public void connectionAcquired(BackendConnection conn) { } } }); + + Tracer.trace(name, "submit cnxn creation: schema = %s, response-handler = %s", schema, handler); } public void getConnection(String schema,boolean autocommit, final ResponseHandler handler, @@ -365,18 +378,16 @@ public void getConnection(String schema,boolean autocommit, final ResponseHandle takeCon(con, handler, attachment, schema); return; } else { - int activeCons = this.getActiveCount();//当前最大活动连接 - if(activeCons+1>size){//下一个连接大于最大连接数 + final int activeCons = this.getActiveCount();//当前最大活动连接 + if(activeCons + 1 > size){//下一个连接大于最大连接数 LOGGER.error("the max activeConnnections size can not be max than maxconnections"); throw new IOException("the max activeConnnections size can not be max than maxconnections"); }else{ // create connection - LOGGER.info("not ilde connection in pool,create new connection for " + this.name + LOGGER.info("no ilde connection in pool, create new connection for " + this.name + " of schema "+schema); createNewConnection(handler, attachment, schema); } - } - } private void returnCon(BackendConnection c) { @@ -391,11 +402,14 @@ private void returnCon(BackendConnection c) { } else { ok = queue.getManCommitCons().offer(c); } + if (!ok) { - LOGGER.warn("can't return to pool ,so close con " + c); c.close("can't return to pool "); + return; } + + Tracer.trace(c, "release ok: cnxn = %s", c); } public void releaseChannel(BackendConnection c) { @@ -407,6 +421,8 @@ public void releaseChannel(BackendConnection c) { } public void connectionClosed(BackendConnection conn) { + Tracer.trace(conn); + ConQueue queue = this.conMap.getSchemaConQueue(conn.getSchema()); if (queue != null) { queue.removeCon(conn); diff --git a/src/main/java/org/opencloudb/buffer/BufferPool.java b/src/main/java/org/opencloudb/buffer/BufferPool.java index 16dd3dd58..32e437a12 100644 --- a/src/main/java/org/opencloudb/buffer/BufferPool.java +++ b/src/main/java/org/opencloudb/buffer/BufferPool.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.log4j.Logger; +import org.opencloudb.trace.Tracer; /** * @author mycat @@ -111,13 +112,16 @@ public ByteBuffer allocate() { LOGGER.warn("Direct buffer OOM occurs: so allocate from heap", oom); node = this.createTempBuffer(chunkSize); } + return node; } + Tracer.trace(node); return node; } private boolean checkValidBuffer(ByteBuffer buffer) { // 拒绝回收null和容量大于chunkSize的缓存 if (buffer == null || !buffer.isDirect()) { + Tracer.trace(buffer, "no recycle need"); return false; } else if (buffer.capacity() > chunkSize) { LOGGER.warn("cant' recycle a buffer large than my pool chunksize " @@ -138,17 +142,18 @@ public void recycle(ByteBuffer buffer) { BufferQueue localQueue = localBufferPool.get(); if (localQueue.snapshotSize() < threadLocalCount) { localQueue.put(buffer); - } else { - // recyle 3/4 thread local buffer - items.addAll(localQueue.removeItems(threadLocalCount * 3 / 4)); - items.offer(buffer); - sharedOptsCount++; + return; } + // recyle 3/4 thread local buffer + items.addAll(localQueue.removeItems(threadLocalCount * 3 / 4)); + items.offer(buffer); + sharedOptsCount++; } else { sharedOptsCount++; items.offer(buffer); } - + + Tracer.trace(buffer); } public int getAvgBufSize() { @@ -172,12 +177,16 @@ public boolean testIfDuplicate(ByteBuffer buffer) { } private ByteBuffer createTempBuffer(int size) { - return ByteBuffer.allocate(size); + final ByteBuffer buffer = ByteBuffer.allocate(size); + Tracer.trace(buffer); + return buffer; } private ByteBuffer createDirectBuffer(int size) { // for performance - return ByteBuffer.allocateDirect(size); + final ByteBuffer buffer = ByteBuffer.allocateDirect(size); + Tracer.trace(buffer); + return buffer; } public ByteBuffer allocate(int size) { diff --git a/src/main/java/org/opencloudb/buffer/BufferQueue.java b/src/main/java/org/opencloudb/buffer/BufferQueue.java index 2963e558e..f678ff03f 100644 --- a/src/main/java/org/opencloudb/buffer/BufferQueue.java +++ b/src/main/java/org/opencloudb/buffer/BufferQueue.java @@ -30,6 +30,8 @@ import java.util.LinkedList; import java.util.List; +import org.opencloudb.trace.Tracer; + /** * @author mycat */ @@ -70,6 +72,8 @@ public Collection removeItems(long count) { * @throws InterruptedException */ public void put(ByteBuffer buffer) { + Tracer.trace(buffer); + this.items.offer(buffer); if (items.size() > total) { throw new java.lang.RuntimeException( @@ -80,7 +84,9 @@ public void put(ByteBuffer buffer) { } public ByteBuffer poll() { - return items.poll(); + final ByteBuffer buffer = items.poll(); + Tracer.trace(buffer); + return buffer; } public boolean isEmpty() { diff --git a/src/main/java/org/opencloudb/cache/CacheService.java b/src/main/java/org/opencloudb/cache/CacheService.java index 93612229b..3be8fec9c 100644 --- a/src/main/java/org/opencloudb/cache/CacheService.java +++ b/src/main/java/org/opencloudb/cache/CacheService.java @@ -123,7 +123,7 @@ private void createLayeredPool(String cacheName, String type, int size, int expireSeconds) { checkExists(cacheName); logger.info("create layer cache pool " + cacheName + " of type " + type - + " ,default cache size " + size + " ,default expire seconds" + + " ,default cache size " + size + " ,default expire seconds " + expireSeconds); DefaultLayedCachePool layerdPool = new DefaultLayedCachePool(cacheName, this.getCacheFact(type), size, expireSeconds); diff --git a/src/main/java/org/opencloudb/config/Versions.java b/src/main/java/org/opencloudb/config/Versions.java index 790673b17..4901dfdff 100644 --- a/src/main/java/org/opencloudb/config/Versions.java +++ b/src/main/java/org/opencloudb/config/Versions.java @@ -32,7 +32,7 @@ public abstract class Versions { public static final byte PROTOCOL_VERSION = 10; /**服务器版本**/ - public static byte[] SERVER_VERSION = "5.5.8-mycat-1.5.1-RELEASE-20160525102622".getBytes(); + public static byte[] SERVER_VERSION = "5.5.8-mycat-1.5.1u01-RELEASE-20170207".getBytes(); public static void setServerVersion(String version) { byte[] mysqlVersionPart = version.getBytes(); diff --git a/src/main/java/org/opencloudb/config/loader/xml/XMLSchemaLoader.java b/src/main/java/org/opencloudb/config/loader/xml/XMLSchemaLoader.java index 3ac54c72c..aef861ecd 100644 --- a/src/main/java/org/opencloudb/config/loader/xml/XMLSchemaLoader.java +++ b/src/main/java/org/opencloudb/config/loader/xml/XMLSchemaLoader.java @@ -606,7 +606,7 @@ private void loadDataHosts(Element root) { String name = element.getAttribute("name"); if (dataHosts.containsKey(name)) { - throw new ConfigException("dataHost name " + name + "duplicated!"); + throw new ConfigException("dataHost name " + name + " duplicated!"); } int maxCon = Integer.valueOf(element.getAttribute("maxCon")); diff --git a/src/main/java/org/opencloudb/heartbeat/MySQLDetector.java b/src/main/java/org/opencloudb/heartbeat/MySQLDetector.java index fd43024f2..d5d2f8f9a 100644 --- a/src/main/java/org/opencloudb/heartbeat/MySQLDetector.java +++ b/src/main/java/org/opencloudb/heartbeat/MySQLDetector.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.opencloudb.backend.PhysicalDBPool; import org.opencloudb.backend.PhysicalDatasource; import org.opencloudb.config.model.DataHostConfig; import org.opencloudb.mysql.nio.MySQLDataSource; @@ -117,7 +116,7 @@ public boolean isQuit() { @Override public void onResult(SQLQueryResult> result) { if (result.isSuccess()) { - int balance = heartbeat.getSource().getDbPool().getBalance(); + //int balance = heartbeat.getSource().getDbPool().getBalance(); PhysicalDatasource source = heartbeat.getSource(); int switchType = source.getHostConfig().getSwitchType(); Map resultResult = result.getResult(); @@ -146,7 +145,7 @@ public void onResult(SQLQueryResult> result) { else if( switchType==DataHostConfig.CLUSTER_STATUS_SWITCH_DS && source.getHostConfig().isShowClusterSql()) { - String Variable_name = resultResult!=null? resultResult.get("Variable_name"):null; + //String Variable_name = resultResult!=null? resultResult.get("Variable_name"):null; String wsrep_cluster_status = resultResult!=null? resultResult.get("wsrep_cluster_status"):null;//Primary String wsrep_connected = resultResult!=null?resultResult.get("wsrep_connected"):null;//ON String wsrep_ready = resultResult!=null?resultResult.get("wsrep_ready"):null;//ON diff --git a/src/main/java/org/opencloudb/jdbc/ShowVariables.java b/src/main/java/org/opencloudb/jdbc/ShowVariables.java index abffedd9a..d577cb36d 100644 --- a/src/main/java/org/opencloudb/jdbc/ShowVariables.java +++ b/src/main/java/org/opencloudb/jdbc/ShowVariables.java @@ -186,12 +186,12 @@ private static RowDataPacket getRow(String name, String value, String charset) { public static void execute(ServerConnection sc, String orgin, BackendConnection jdbcConnection) { execute(sc, orgin); - NonBlockingSession session = sc.getSession2(); + NonBlockingSession session = sc.getSession(); session.releaseConnectionIfSafe(jdbcConnection, LOGGER.isDebugEnabled(), false); } public static void justReturnValue(ServerConnection sc, String orgin, BackendConnection jdbcConnection) { justReturnValue(sc, orgin); - NonBlockingSession session = sc.getSession2(); + NonBlockingSession session = sc.getSession(); session.releaseConnectionIfSafe(jdbcConnection, LOGGER.isDebugEnabled(), false); } } \ No newline at end of file diff --git a/src/main/java/org/opencloudb/mysql/nio/MySQLConnection.java b/src/main/java/org/opencloudb/mysql/nio/MySQLConnection.java index 8f2bce46e..b04a0fdc9 100644 --- a/src/main/java/org/opencloudb/mysql/nio/MySQLConnection.java +++ b/src/main/java/org/opencloudb/mysql/nio/MySQLConnection.java @@ -36,6 +36,7 @@ import org.opencloudb.route.RouteResultsetNode; import org.opencloudb.server.ServerConnection; import org.opencloudb.server.parser.ServerParse; +import org.opencloudb.trace.Tracer; import org.opencloudb.util.TimeUtil; import java.io.UnsupportedEncodingException; @@ -247,6 +248,8 @@ public String getPassword() { } public void authenticate() { + Tracer.trace(this); + AuthPacket packet = new AuthPacket(); packet.packetId = 1; packet.clientFlags = clientFlags; @@ -285,9 +288,11 @@ protected void sendQueryCmd(String query) { try { packet.arg = query.getBytes(charset); } catch (UnsupportedEncodingException e) { + Tracer.trace(this, "charset error: charset = %s, cnxn = %s", charset, this); throw new RuntimeException(e); } lastTime = TimeUtil.currentTimeMillis(); + Tracer.trace(this, "query: %s", query); packet.write(this); } @@ -350,8 +355,10 @@ public boolean synAndExecuted(MySQLConnection conn) { conn.metaDataSyned = true; return false; } else if (remains < 0) { + Tracer.trace(conn); return true; } + Tracer.trace(conn); return false; } @@ -372,6 +379,7 @@ private void updateConnectionInfo(MySQLConnection conn) if (autocommit != null) { conn.autocommit = autocommit; } + Tracer.trace(conn); } } @@ -383,6 +391,7 @@ private void updateConnectionInfo(MySQLConnection conn) public boolean syncAndExcute() { StatusSync sync = this.statusSync; if (sync == null) { + Tracer.trace(this); return true; } else { boolean executed = sync.synAndExecuted(this); @@ -399,7 +408,7 @@ public void execute(RouteResultsetNode rrn, ServerConnection sc, if (!modifiedSQLExecuted && rrn.isModifySQL()) { modifiedSQLExecuted = true; } - String xaTXID = sc.getSession2().getXaTXID(); + String xaTXID = sc.getSession().getXaTXID(); synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(), autocommit); } @@ -527,7 +536,9 @@ public void close(String reason) { this.respHandler.connectionClose(this, reason); respHandler = null; } + return; } + Tracer.trace(this, "reason: %s, cnxn: %s", reason, this); } public void commit() { @@ -658,7 +669,8 @@ public String toString() { + ", user=" + user + ", schema=" + schema + ", old shema=" + oldSchema + ", borrowed=" + borrowed + ", fromSlaveDB=" + fromSlaveDB - + ", threadId=" + threadId + ", charset=" + charset + + ", threadId=" + threadId + ", charset=" + charset + + ", charsetIndex=" + charsetIndex + ", txIsolation=" + txIsolation + ", autocommit=" + autocommit + ", attachment=" + attachment + ", respHandler=" + respHandler + ", host=" + host + ", port=" + port + ", statusSync=" diff --git a/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionAuthenticator.java b/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionAuthenticator.java index 222a8a96b..03948d477 100644 --- a/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionAuthenticator.java +++ b/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionAuthenticator.java @@ -36,6 +36,7 @@ import org.opencloudb.net.mysql.HandshakePacket; import org.opencloudb.net.mysql.OkPacket; import org.opencloudb.net.mysql.Reply323Packet; +import org.opencloudb.trace.Tracer; /** * MySQL 验证处理器 @@ -99,6 +100,7 @@ public void handle(byte[] data) { String errMsg = new String(err.message); LOGGER.warn("can't connect to mysql server ,errmsg:"+errMsg+" "+source); //source.close(errMsg); + Tracer.trace(source, "auth error: cnxn = %s", source); throw new ConnectionException(err.errno, errMsg); //如果是EOFPacket,则为MySQL 4.1版本,是MySQL323加密 case EOFPacket.FIELD_COUNT: @@ -112,16 +114,17 @@ public void handle(byte[] data) { source.authenticate(); break; } else { + Tracer.trace(source); throw new RuntimeException("Unknown Packet!"); } } - } catch (RuntimeException e) { if (listener != null) { listener.connectionError(e, source); return; } + Tracer.trace(source); throw e; } } @@ -134,16 +137,20 @@ private void processHandShakePacket(byte[] data) { source.setThreadId(packet.threadId); // 设置字符集编码 - int charsetIndex = (packet.serverCharsetIndex & 0xff); - String charset = CharsetUtil.getCharset(charsetIndex); + final int charsetIndex = (packet.serverCharsetIndex & 0xff); + final String charset = CharsetUtil.getCharset(charsetIndex); if (charset != null) { source.setCharset(charset); } else { + Tracer.trace(source); throw new RuntimeException("Unknown charsetIndex:" + charsetIndex); } + Tracer.trace(source, "after process: charset-idx = %d, charset = %s, cnxn = %s", + charsetIndex, charset, source); } private void auth323(byte packetId) { + Tracer.trace(source); // 发送323响应认证数据包 Reply323Packet r323 = new Reply323Packet(); r323.packetId = ++packetId; diff --git a/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionFactory.java b/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionFactory.java index 9fe284a8e..80ace0a65 100644 --- a/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionFactory.java +++ b/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionFactory.java @@ -72,6 +72,7 @@ public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler, .postConnect(c); } + return c; } diff --git a/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionHandler.java b/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionHandler.java index aace9acd4..56ec50976 100644 --- a/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionHandler.java +++ b/src/main/java/org/opencloudb/mysql/nio/MySQLConnectionHandler.java @@ -32,6 +32,7 @@ import org.opencloudb.net.mysql.ErrorPacket; import org.opencloudb.net.mysql.OkPacket; import org.opencloudb.net.mysql.RequestFilePacket; +import org.opencloudb.trace.Tracer; import java.util.ArrayList; import java.util.List; @@ -149,6 +150,7 @@ protected void handleData(byte[] data) { } break; default: + Tracer.trace(source, "status: %s, cnxn: %s", resultStatus, source); throw new RuntimeException("unknown status!"); } } diff --git a/src/main/java/org/opencloudb/mysql/nio/MySQLDataSource.java b/src/main/java/org/opencloudb/mysql/nio/MySQLDataSource.java index 7564f03f0..316b667f6 100644 --- a/src/main/java/org/opencloudb/mysql/nio/MySQLDataSource.java +++ b/src/main/java/org/opencloudb/mysql/nio/MySQLDataSource.java @@ -55,9 +55,5 @@ public void createNewConnection(ResponseHandler handler,String schema) throws IO public DBHeartbeat createHeartBeat() { return new MySQLHeartbeat(this); } - - - - -} \ No newline at end of file +} diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/CommitNodeHandler.java b/src/main/java/org/opencloudb/mysql/nio/handler/CommitNodeHandler.java index 6dcbd2ea3..fb7f029b1 100644 --- a/src/main/java/org/opencloudb/mysql/nio/handler/CommitNodeHandler.java +++ b/src/main/java/org/opencloudb/mysql/nio/handler/CommitNodeHandler.java @@ -101,9 +101,13 @@ public void okResponse(byte[] ok, BackendConnection conn) { } } } + session.clearResources(false); - ServerConnection source = session.getSource(); + final ServerConnection source = session.getSource(); source.write(ok); + // Tag tx done successfully. + // @since 2017-01-16 little-pan + source.onTxDone("commit"); } @Override @@ -113,7 +117,6 @@ public void errorResponse(byte[] err, BackendConnection conn) { String errInfo = new String(errPkg.message); session.getSource().setTxInterrupt(errInfo); errPkg.write(session.getSource()); - } @Override diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/GetConnectionHandler.java b/src/main/java/org/opencloudb/mysql/nio/handler/GetConnectionHandler.java index b7aac0f2d..3bdbfb736 100644 --- a/src/main/java/org/opencloudb/mysql/nio/handler/GetConnectionHandler.java +++ b/src/main/java/org/opencloudb/mysql/nio/handler/GetConnectionHandler.java @@ -29,6 +29,7 @@ import org.apache.log4j.Logger; import org.opencloudb.backend.BackendConnection; +import org.opencloudb.trace.Tracer; /** * wuzh @@ -83,7 +84,7 @@ public void errorResponse(byte[] err, BackendConnection conn) { @Override public void okResponse(byte[] ok, BackendConnection conn) { logger.info("received ok resp: " + conn + " " + new String(ok)); - + Tracer.trace(conn); } @Override @@ -109,7 +110,7 @@ public void writeQueueAvailable() { @Override public void connectionClose(BackendConnection conn, String reason) { - + Tracer.trace(conn, "reason: %s, cnxn: %s", reason, conn); } } \ No newline at end of file diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeCoordinator.java b/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeCoordinator.java index d306f41e9..61c4ef4ad 100644 --- a/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeCoordinator.java +++ b/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeCoordinator.java @@ -80,15 +80,10 @@ public void errorResponse(byte[] err, BackendConnection conn) { if (this.cmdHandler.releaseConOnErr()) { session.releaseConnection(conn); } else { - - - - session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), - false); + session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false); } if (this.finished()) { - cmdHandler.errorResponse(session, err, this.nodeCount, - this.faileCount.get()); + cmdHandler.errorResponse(session, err, this.nodeCount, this.faileCount.get()); if (cmdHandler.isAutoClearSessionCons()) { session.clearResources(session.getSource().isTxInterrupted()); } @@ -101,15 +96,16 @@ public void okResponse(byte[] ok, BackendConnection conn) { if (this.cmdHandler.relaseConOnOK()) { session.releaseConnection(conn); } else { - session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), - false); + session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false); } if (this.finished()) { cmdHandler.okResponse(session, ok); if (cmdHandler.isAutoClearSessionCons()) { session.clearResources(false); } - + // Tag tx done successfully. + // @since 2017-01-16 little-pan + session.getSource().onTxDone("commit"); } } diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeHandler.java b/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeHandler.java index 17f683447..d32407fa3 100644 --- a/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeHandler.java +++ b/src/main/java/org/opencloudb/mysql/nio/handler/MultiNodeHandler.java @@ -131,8 +131,8 @@ public void errorResponse(byte[] data, BackendConnection conn) { err.read(data); String errmsg = new String(err.message); this.setFail(errmsg); - LOGGER.warn("error response from " + conn + " err " + errmsg + " code:" - + err.errno); + LOGGER.warn("error response from " + conn + " err " + errmsg + + " code: " + err.errno); this.tryErrorFinished(this.decrementCountBy(1)); } @@ -142,7 +142,6 @@ public boolean clearIfSessionClosed(NonBlockingSession session) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("session closed ,clear resources " + session); } - session.clearResources(true); this.clearResources(); return true; @@ -202,12 +201,14 @@ protected void tryErrorFinished(boolean allEnd) { } if (session.getSource().isAutocommit()) { session.closeAndClearResources(error); + // Tag tx done. + // @since 2017-01-16 little-pan + session.getSource().onTxDone("rollback"); } else { session.getSource().setTxInterrupt(this.error); // clear resouces clearResources(); } - } } @@ -218,13 +219,14 @@ public void connectionClose(BackendConnection conn, String reason) { lock.lock(); try { finished = (this.nodeCount == 0); - + // dec should be here for nodeCount sync reason. + // @since 2017-01-16 little-pan + if (finished == false) { + finished = this.decrementCountBy(1); + } } finally { lock.unlock(); } - if (finished == false) { - finished = this.decrementCountBy(1); - } if (error == null) { error = "back connection closed "; } @@ -232,5 +234,6 @@ public void connectionClose(BackendConnection conn, String reason) { } public void clearResources() { + } } \ No newline at end of file diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/ResponseHandler.java b/src/main/java/org/opencloudb/mysql/nio/handler/ResponseHandler.java index 6469446a5..616622d66 100644 --- a/src/main/java/org/opencloudb/mysql/nio/handler/ResponseHandler.java +++ b/src/main/java/org/opencloudb/mysql/nio/handler/ResponseHandler.java @@ -28,7 +28,6 @@ import org.opencloudb.backend.BackendConnection; /** - * @author mycat * @author mycat */ public interface ResponseHandler { diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/RollbackNodeHandler.java b/src/main/java/org/opencloudb/mysql/nio/handler/RollbackNodeHandler.java deleted file mode 100644 index 4a72cc24d..000000000 --- a/src/main/java/org/opencloudb/mysql/nio/handler/RollbackNodeHandler.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://code.google.com/p/opencloudb/. - * - */ -package org.opencloudb.mysql.nio.handler; - -import java.util.List; - -import org.apache.log4j.Logger; -import org.opencloudb.backend.BackendConnection; -import org.opencloudb.config.ErrorCode; -import org.opencloudb.route.RouteResultsetNode; -import org.opencloudb.server.NonBlockingSession; - -/** - * @author mycat - */ -public class RollbackNodeHandler extends MultiNodeHandler { - private static final Logger LOGGER = Logger - .getLogger(RollbackNodeHandler.class); - - public RollbackNodeHandler(NonBlockingSession session) { - super(session); - } - - public void rollback() { - final int initCount = session.getTargetCount(); - lock.lock(); - try { - reset(initCount); - } finally { - lock.unlock(); - } - if (session.closed()) { - decrementCountToZero(); - return; - } - - // 执行 - int started = 0; - for (final RouteResultsetNode node : session.getTargetKeys()) { - if (node == null) { - try { - LOGGER.error("null is contained in RoutResultsetNodes, source = " - + session.getSource()); - } catch (Exception e) { - } - continue; - } - final BackendConnection conn = session.getTarget(node); - if (conn != null) { - boolean isClosed=conn.isClosedOrQuit(); - if(isClosed) - { - session.getSource().writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, - "receive rollback,but find backend con is closed or quit"); - LOGGER.error( conn+"receive rollback,but fond backend con is closed or quit"); - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("rollback job run for " + conn); - } - if (clearIfSessionClosed(session)) { - return; - } - conn.setResponseHandler(RollbackNodeHandler.this); - conn.rollback(); - - ++started; - } - } - - if (started < initCount && decrementCountBy(initCount - started)) { - /** - * assumption: only caused by front-end connection close.
- * Otherwise, packet must be returned to front-end - */ - session.clearResources(true); - } - } - - @Override - public void okResponse(byte[] ok, BackendConnection conn) { - if (decrementCountBy(1)) { - // clear all resources - session.clearResources(false); - if (this.isFail() || session.closed()) { - tryErrorFinished(true); - } else { - session.getSource().write(ok); - } - } - } - - @Override - public void rowEofResponse(byte[] eof, BackendConnection conn) { - LOGGER.error(new StringBuilder().append("unexpected packet for ") - .append(conn).append(" bound by ").append(session.getSource()) - .append(": field's eof").toString()); - } - - @Override - public void connectionAcquired(BackendConnection conn) { - LOGGER.error("unexpected invocation: connectionAcquired from rollback"); - } - - @Override - public void fieldEofResponse(byte[] header, List fields, - byte[] eof, BackendConnection conn) { - LOGGER.error(new StringBuilder().append("unexpected packet for ") - .append(conn).append(" bound by ").append(session.getSource()) - .append(": field's eof").toString()); - } - - @Override - public void rowResponse(byte[] row, BackendConnection conn) { - LOGGER.error(new StringBuilder().append("unexpected packet for ") - .append(conn).append(" bound by ").append(session.getSource()) - .append(": field's eof").toString()); - } - - @Override - public void writeQueueAvailable() { - - } - -} diff --git a/src/main/java/org/opencloudb/mysql/nio/handler/SingleNodeHandler.java b/src/main/java/org/opencloudb/mysql/nio/handler/SingleNodeHandler.java index 71417df6a..371a34d87 100644 --- a/src/main/java/org/opencloudb/mysql/nio/handler/SingleNodeHandler.java +++ b/src/main/java/org/opencloudb/mysql/nio/handler/SingleNodeHandler.java @@ -146,9 +146,9 @@ private void recycleResources() { public void execute() throws Exception { //从这里开始计算处理时间 startTime=System.currentTimeMillis(); - ServerConnection sc = session.getSource(); + final ServerConnection sc = session.getSource(); this.isRunning = true; - this.packetId = 0; + this.packetId = 0; final BackendConnection conn = session.getTarget(node); //之前是否获取过Connection并且Connection有效 if (session.tryExistsCon(conn, node)) { @@ -200,15 +200,25 @@ private void executeException(BackendConnection c, Exception e) { @Override public void connectionError(Throwable e, BackendConnection conn) { - endRunning(); ErrorPacket err = new ErrorPacket(); err.packetId = ++packetId; - err.errno = ErrorCode.ER_NEW_ABORTING_CONNECTION; - err.message = StringUtil.encode(e.getMessage(), session.getSource() - .getCharset()); - ServerConnection source = session.getSource(); + err.errno = ErrorCode.ER_NEW_ABORTING_CONNECTION; + err.message = + StringUtil.encode(e.getMessage(), session.getSource().getCharset()); + final ServerConnection source = session.getSource(); source.write(err.write(allocBuffer(), source, true)); + + maybeTxDone(source); + } + + /** Tag tx maybe done. + * @since 2017-01-16 little-pan + */ + private final void maybeTxDone(final ServerConnection source){ + if(source.isAutocommit()){ + source.onTxDone("autocommit"); + } } @Override @@ -237,6 +247,8 @@ private void backConnectionErr(ErrorPacket errPkg, BackendConnection conn) { source.setTxInterrupt(errmgs); errPkg.write(source); recycleResources(); + + maybeTxDone(source); } @@ -247,7 +259,8 @@ public void okResponse(byte[] data, BackendConnection conn) { ServerConnection source = session.getSource(); OkPacket ok = new OkPacket(); ok.read(data); - boolean isCanClose2Client =(!rrs.isCallStatement()) ||(rrs.isCallStatement() &&!rrs.getProcedure().isResultSimpleValue()); + final boolean isCanClose2Client = (!rrs.isCallStatement()) + || (rrs.isCallStatement() &&!rrs.getProcedure().isResultSimpleValue()); if (rrs.isLoadData()) { byte lastPackId = source.getLoadDataInfileHandler() .getLastPackId(); @@ -257,22 +270,23 @@ public void okResponse(byte[] data, BackendConnection conn) { { ok.packetId = ++packetId;// OK_PACKET } - - + if(isCanClose2Client) { - session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), - false); - endRunning(); + session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false); + endRunning(); } ok.serverStatus = source.isAutocommit() ? 2 : 1; recycleResources(); - if(isCanClose2Client) - { source.setLastInsertId(ok.insertId); + if(isCanClose2Client) { + source.setLastInsertId(ok.insertId); ok.write(source); + + maybeTxDone(source); } + //TODO: add by zhuam //查询结果派发 QueryResult queryResult = new QueryResult(session.getSource().getUser(), session.getSource().getHost(), @@ -285,9 +299,8 @@ public void okResponse(byte[] data, BackendConnection conn) { @Override public void rowEofResponse(byte[] eof, BackendConnection conn) { - ServerConnection source = session.getSource(); - conn.recordSql(source.getHost(), source.getSchema(), - node.getStatement()); + final ServerConnection source = session.getSource(); + conn.recordSql(source.getHost(), source.getSchema(), node.getStatement()); // 判断是调用存储过程的话不能在这里释放链接 if (!rrs.isCallStatement()||(rrs.isCallStatement()&&rrs.getProcedure().isResultSimpleValue())) { @@ -299,7 +312,8 @@ public void rowEofResponse(byte[] eof, BackendConnection conn) { eof[3] = ++packetId; buffer = source.writeToBuffer(eof, allocBuffer()); source.write(buffer); - + + maybeTxDone(source); } /** @@ -399,8 +413,9 @@ public void requestDataResponse(byte[] data, BackendConnection conn) { @Override public String toString() { - return "SingleNodeHandler [node=" + node + ", packetId=" + packetId - + "]"; + return "SingleNodeHandler [node=" + node + + ", packetId=" + packetId + + ", source=" + session.getSource() + "]"; } } diff --git a/src/main/java/org/opencloudb/net/AbstractConnection.java b/src/main/java/org/opencloudb/net/AbstractConnection.java index f6dedbb85..2da5631da 100644 --- a/src/main/java/org/opencloudb/net/AbstractConnection.java +++ b/src/main/java/org/opencloudb/net/AbstractConnection.java @@ -34,6 +34,7 @@ import com.google.common.base.Strings; import org.apache.log4j.Logger; import org.opencloudb.mysql.CharsetUtil; +import org.opencloudb.trace.Tracer; import org.opencloudb.util.CompressUtil; import org.opencloudb.util.TimeUtil; @@ -95,7 +96,6 @@ public String getCharset() { } public boolean setCharset(String charset) { - //修复PHP字符集设置错误, 如: set names 'utf8' if ( charset != null ) { charset = charset.replace("'", ""); @@ -107,13 +107,12 @@ public boolean setCharset(String charset) { this.charsetIndex = ci; return true; } else { + Tracer.traceCnxn(this, + "no charset index: charset = %s, cnxn = %s", charset, this); return false; } } - - - public boolean isSupportCompress() { return isSupportCompress; @@ -278,8 +277,9 @@ public void doNextWriteCheck() throws IOException { this.socketWR.doNextWriteCheck(); } - public void onReadData(int got) throws IOException { + public void onReadData(final int got) throws IOException { if (isClosed.get()) { + Tracer.traceCnxn(this, "connection closed: recv-bytes = %d, cnxn = %s", got, this); return; } ByteBuffer buffer = this.readBuffer; @@ -542,7 +542,8 @@ private void closeSocket() { if (closed == false) { LOGGER.warn("close socket of connnection failed " + this); } - + + Tracer.traceCnxn(this); } } diff --git a/src/main/java/org/opencloudb/net/NIOConnector.java b/src/main/java/org/opencloudb/net/NIOConnector.java index d6c39a8b6..ed9b432b4 100644 --- a/src/main/java/org/opencloudb/net/NIOConnector.java +++ b/src/main/java/org/opencloudb/net/NIOConnector.java @@ -34,6 +34,7 @@ import org.apache.log4j.Logger; import org.opencloudb.MycatServer; +import org.opencloudb.trace.Tracer; /** * @author mycat @@ -62,6 +63,8 @@ public long getConnectCount() { } public void postConnect(AbstractConnection c) { + Tracer.traceCnxn(c, "offer for connect: cnxn = %s", c); + connectQueue.offer(c); selector.wakeup(); } @@ -83,6 +86,10 @@ public void run() { finishConnect(key, att); } else { key.cancel(); + + final AbstractConnection c = (AbstractConnection)att; + Tracer.traceCnxn(c, + "channel canceled: invalid or not cnxntable, cnxn = %s", c); } } } finally { diff --git a/src/main/java/org/opencloudb/net/NIOProcessor.java b/src/main/java/org/opencloudb/net/NIOProcessor.java index d36c52a48..08f126bf1 100644 --- a/src/main/java/org/opencloudb/net/NIOProcessor.java +++ b/src/main/java/org/opencloudb/net/NIOProcessor.java @@ -35,6 +35,7 @@ import org.opencloudb.backend.BackendConnection; import org.opencloudb.buffer.BufferPool; import org.opencloudb.statistic.CommandCount; +import org.opencloudb.trace.Tracer; import org.opencloudb.util.NameableExecutor; import org.opencloudb.util.TimeUtil; @@ -222,7 +223,8 @@ public void removeConnection(AbstractConnection con) { this.frontends.remove(con.getId()); this.frontendsLength.decrementAndGet(); } - + + Tracer.traceCnxn(con); } //jdbc连接用这个释放 public void removeConnection(BackendConnection con){ diff --git a/src/main/java/org/opencloudb/net/NIOReactor.java b/src/main/java/org/opencloudb/net/NIOReactor.java index da7aae6e6..6b4544db2 100644 --- a/src/main/java/org/opencloudb/net/NIOReactor.java +++ b/src/main/java/org/opencloudb/net/NIOReactor.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.log4j.Logger; +import org.opencloudb.trace.Tracer; /** * 网络事件反应器 @@ -57,6 +58,8 @@ final void startup() { } final void postRegister(AbstractConnection c) { + Tracer.traceCnxn(c, "offer for rw: cnxn = %s", c); + reactorR.registerQueue.offer(c); reactorR.selector.wakeup(); } diff --git a/src/main/java/org/opencloudb/net/NIOSocketWR.java b/src/main/java/org/opencloudb/net/NIOSocketWR.java index 013b24cfd..5704439f7 100644 --- a/src/main/java/org/opencloudb/net/NIOSocketWR.java +++ b/src/main/java/org/opencloudb/net/NIOSocketWR.java @@ -7,6 +7,7 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.atomic.AtomicBoolean; +import org.opencloudb.trace.Tracer; import org.opencloudb.util.TimeUtil; public class NIOSocketWR extends SocketWR { @@ -35,6 +36,7 @@ public void register(Selector selector) throws IOException { public void doNextWriteCheck() { if (!writing.compareAndSet(false, true)) { + Tracer.traceCnxn(con, "channel writing: return simply, cnxn = %s", con); return; } @@ -80,6 +82,7 @@ private boolean write0() throws IOException { if (buffer.hasRemaining()) { con.writeAttempts++; + Tracer.traceCnxn(con, "channel busy: hand over, cnxn = %s", con); return false; } else { con.writeBuffer = null; @@ -108,11 +111,13 @@ private boolean write0() throws IOException { if (buffer.hasRemaining()) { con.writeBuffer = buffer; con.writeAttempts++; + Tracer.traceCnxn(con, "channel busy: hand over, cnxn = %s", con); return false; } else { con.recycle(buffer); } } + Tracer.traceCnxn(con, "write ok: queue drained, cnxn = %s", con); return true; } @@ -120,6 +125,8 @@ private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); + + Tracer.traceCnxn(con); } catch (Exception e) { AbstractConnection.LOGGER.warn("can't disable write " + e + " con " + con); @@ -128,6 +135,8 @@ private void disableWrite() { } private void enableWrite(boolean wakeup) { + Tracer.traceCnxn(con, "wakeup reactor? %s", wakeup); + boolean needWakeup = false; try { SelectionKey key = this.processKey; @@ -135,7 +144,6 @@ private void enableWrite(boolean wakeup) { needWakeup = true; } catch (Exception e) { AbstractConnection.LOGGER.warn("can't enable write " + e); - } if (needWakeup && wakeup) { processKey.selector().wakeup(); diff --git a/src/main/java/org/opencloudb/net/factory/FrontendConnectionFactory.java b/src/main/java/org/opencloudb/net/factory/FrontendConnectionFactory.java index 6dd4f35e3..a1d8c66eb 100644 --- a/src/main/java/org/opencloudb/net/factory/FrontendConnectionFactory.java +++ b/src/main/java/org/opencloudb/net/factory/FrontendConnectionFactory.java @@ -43,9 +43,8 @@ public FrontendConnection make(NetworkChannel channel) throws IOException { FrontendConnection c = getConnection(channel); MycatServer.getInstance().getConfig().setSocketParams(c, true); + return c; } - - } \ No newline at end of file diff --git a/src/main/java/org/opencloudb/net/handler/FrontendAuthenticator.java b/src/main/java/org/opencloudb/net/handler/FrontendAuthenticator.java index 9768350cf..7b62b44a8 100644 --- a/src/main/java/org/opencloudb/net/handler/FrontendAuthenticator.java +++ b/src/main/java/org/opencloudb/net/handler/FrontendAuthenticator.java @@ -38,6 +38,7 @@ import org.opencloudb.net.mysql.AuthPacket; import org.opencloudb.net.mysql.MySQLPacket; import org.opencloudb.net.mysql.QuitPacket; +import org.opencloudb.trace.Tracer; /** * 前端认证处理器 @@ -57,6 +58,8 @@ public FrontendAuthenticator(FrontendConnection source) { @Override public void handle(byte[] data) { + Tracer.trace(source); + // check quit packet if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) { source.close("quit packet"); diff --git a/src/main/java/org/opencloudb/net/mysql/CommandPacket.java b/src/main/java/org/opencloudb/net/mysql/CommandPacket.java index c7274142b..b99ce1fc3 100644 --- a/src/main/java/org/opencloudb/net/mysql/CommandPacket.java +++ b/src/main/java/org/opencloudb/net/mysql/CommandPacket.java @@ -24,7 +24,6 @@ package org.opencloudb.net.mysql; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; diff --git a/src/main/java/org/opencloudb/parser/util/ParseUtil.java b/src/main/java/org/opencloudb/parser/util/ParseUtil.java index 182391fec..656e63200 100644 --- a/src/main/java/org/opencloudb/parser/util/ParseUtil.java +++ b/src/main/java/org/opencloudb/parser/util/ParseUtil.java @@ -197,8 +197,8 @@ public static String parseAlias(String stmt, final int aliasIndex) { * @param offset * @return */ - public static int comment(String stmt, int offset) { - int len = stmt.length(); + public static int comment(final String stmt, final int offset) { + final int len = stmt.length(); int n = offset; switch (stmt.charAt(n)) { case '/': @@ -207,18 +207,46 @@ public static int comment(String stmt, int offset) { if (stmt.charAt(i) == '*') { int m = i + 1; if (len > m && stmt.charAt(m) == '/') { - return m; + // fixbug>> + // the rest whitespace? eg. [\r]\n + // @since 2017-01-21 little-pan + for(;len > ++m && stmt.charAt(m) <= '\u0020';); + return (--m); } } } } break; + case '-': + boolean line = false; + if(++n < len && stmt.charAt(n) == '-'){ + if(++n < len && stmt.charAt(n) <= '\u0020'){ + // skip to '#' handler + line = true; + } + if(n == len){ + // only "--" + return (--n); + } + } + if(line == false){ + return offset; + } case '#': - for (int i = n + 1; i < len; ++i) { + int i; + for (i = n + 1; i < len; ++i) { if (stmt.charAt(i) == '\n') { return i; } } + // fixbug>> + // case 1. the last char is '\r' in mysqlslap Ver 1.0 Distrib 5.5.28, for Win32 (x86) + // case 2. the last char is not '\r' or '\n' in SQL comment + // @since 2017-01-21 little-pan + if(i == len){ + return (--i); + } + //< backConnections = ssesion.getTargetMap() .values(); int cncount = backConnections.size(); diff --git a/src/main/java/org/opencloudb/route/handler/HintCatletHandler.java b/src/main/java/org/opencloudb/route/handler/HintCatletHandler.java index 9f8c78ad0..8728c57d6 100644 --- a/src/main/java/org/opencloudb/route/handler/HintCatletHandler.java +++ b/src/main/java/org/opencloudb/route/handler/HintCatletHandler.java @@ -51,7 +51,7 @@ public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, Catlet catlet = (Catlet) MycatServer.getInstance() .getCatletClassLoader().getInstanceofClass(cateletClass); catlet.route(sysConfig, schema, sqlType, realSQL,charset, sc, cachePool); - catlet.processSQL(realSQL, new EngineCtx(sc.getSession2())); + catlet.processSQL(realSQL, new EngineCtx(sc.getSession())); } catch (Exception e) { LOGGER.warn("catlet error "+e); throw new SQLNonTransientException(e); diff --git a/src/main/java/org/opencloudb/route/util/RouterUtil.java b/src/main/java/org/opencloudb/route/util/RouterUtil.java index 08b8535a3..4ef90f581 100644 --- a/src/main/java/org/opencloudb/route/util/RouterUtil.java +++ b/src/main/java/org/opencloudb/route/util/RouterUtil.java @@ -460,7 +460,7 @@ public static boolean processWithMycatSeq(SchemaConfig schema, int sqlType, } public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){ - MycatServer.getInstance().getSequnceProcessor().addNewSql(new SessionSQLPair(sc.getSession2(), schema, sql, sqlType)); + MycatServer.getInstance().getSequnceProcessor().addNewSql(new SessionSQLPair(sc.getSession(), schema, sql, sqlType)); } public static boolean processInsert(SchemaConfig schema, int sqlType, @@ -1213,7 +1213,7 @@ public static boolean processERChildTable(final SchemaConfig schema, final Strin } if(processedInsert==false){ rrs.setFinishedRoute(true); - sc.getSession2().execute(rrs, ServerParse.INSERT); + sc.getSession().execute(rrs, ServerParse.INSERT); } return true; } @@ -1239,7 +1239,7 @@ public String call() throws Exception { public void onSuccess(String result) { if (Strings.isNullOrEmpty(result)) { StringBuilder s = new StringBuilder(); - LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() + + LOGGER.warn(s.append(sc.getSession()).append(origSQL).toString() + " err:" + "can't find (root) parent sharding node for sql:" + origSQL); sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can't find (root) parent sharding node for sql:" + origSQL); return; @@ -1261,7 +1261,7 @@ public void onSuccess(String result) { } if(processedInsert==false){ RouteResultset executeRrs = RouterUtil.routeToSingleNode(rrs, result, origSQL); - sc.getSession2().execute(executeRrs, ServerParse.INSERT); + sc.getSession().execute(executeRrs, ServerParse.INSERT); } } @@ -1269,7 +1269,7 @@ public void onSuccess(String result) { @Override public void onFailure(Throwable t) { StringBuilder s = new StringBuilder(); - LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() + + LOGGER.warn(s.append(sc.getSession()).append(origSQL).toString() + " err:" + t.getMessage()); sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, t.getMessage() + " " + s.toString()); } diff --git a/src/main/java/org/opencloudb/server/NonBlockingSession.java b/src/main/java/org/opencloudb/server/NonBlockingSession.java index 6a1986c2c..9e670ae04 100644 --- a/src/main/java/org/opencloudb/server/NonBlockingSession.java +++ b/src/main/java/org/opencloudb/server/NonBlockingSession.java @@ -38,12 +38,11 @@ import org.opencloudb.backend.BackendConnection; import org.opencloudb.backend.PhysicalDBNode; import org.opencloudb.config.ErrorCode; -import org.opencloudb.config.model.SystemConfig; import org.opencloudb.mysql.nio.handler.CommitNodeHandler; import org.opencloudb.mysql.nio.handler.KillConnectionHandler; import org.opencloudb.mysql.nio.handler.MultiNodeCoordinator; import org.opencloudb.mysql.nio.handler.MultiNodeQueryHandler; -import org.opencloudb.mysql.nio.handler.RollbackNodeHandler; +//import org.opencloudb.mysql.nio.handler.RollbackNodeHandler; import org.opencloudb.mysql.nio.handler.RollbackReleaseHandler; import org.opencloudb.mysql.nio.handler.SingleNodeHandler; import org.opencloudb.net.FrontendConnection; @@ -51,6 +50,7 @@ import org.opencloudb.route.RouteResultset; import org.opencloudb.route.RouteResultsetNode; import org.opencloudb.sqlcmd.SQLCmdConstant; +import org.opencloudb.trace.Tracer; /** * @author mycat @@ -65,7 +65,7 @@ public class NonBlockingSession implements Session { // life-cycle: each sql execution private volatile SingleNodeHandler singleNodeHandler; private volatile MultiNodeQueryHandler multiNodeHandler; - private volatile RollbackNodeHandler rollbackHandler; + //private volatile RollbackNodeHandler rollbackHandler; private final MultiNodeCoordinator multiNodeCoordinator; private final CommitNodeHandler commitHandler; private volatile String xaTXID; @@ -105,7 +105,7 @@ public BackendConnection removeTarget(RouteResultsetNode key) { } @Override - public void execute(RouteResultset rrs, int type) { + public void execute(final RouteResultset rrs, final int type) { // clear prev execute resources clearHandlesResources(); if (LOGGER.isDebugEnabled()) { @@ -122,6 +122,13 @@ public void execute(RouteResultset rrs, int type) { + source.getSchema()); return; } + + // Tag tx start if not. + // @since 2017-01-16 little-pan + if(target.size() == 0 && source.getTxLocalId() == 0){ + source.onTxStart(rrs.getStatement()); + } + if (nodes.length == 1) { singleNodeHandler = new SingleNodeHandler(rrs, this); try { @@ -131,13 +138,9 @@ public void execute(RouteResultset rrs, int type) { source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); } } else { - boolean autocommit = source.isAutocommit(); - SystemConfig sysConfig = MycatServer.getInstance().getConfig() - .getSystem(); - int mutiNodeLimitType = sysConfig.getMutiNodeLimitType(); - multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, - this); - + final boolean autocommit = source.isAutocommit(); + multiNodeHandler = + new MultiNodeQueryHandler(type, rrs, autocommit, this); try { multiNodeHandler.execute(); } catch (Exception e) { @@ -153,13 +156,15 @@ public void commit() { ByteBuffer buffer = source.allocate(); buffer = source.writeToBuffer(OkPacket.OK, buffer); source.write(buffer); + // Tag tx done. + if(source.getTxLocalId() != 0){ + source.onTxDone("commit"); + } return; } else if (initCount == 1) { BackendConnection con = target.elements().nextElement(); commitHandler.commit(con); - } else { - if (LOGGER.isDebugEnabled()) { LOGGER.debug("multi node commit to send ,total " + initCount); } @@ -177,10 +182,28 @@ public void rollback() { ByteBuffer buffer = source.allocate(); buffer = source.writeToBuffer(OkPacket.OK, buffer); source.write(buffer); + // Tag tx done. + if(source.getTxLocalId() != 0){ + source.onTxDone("rollback"); + } return; } - rollbackHandler = new RollbackNodeHandler(this); - rollbackHandler.rollback(); + //rollbackHandler = new RollbackNodeHandler(this); + //rollbackHandler.rollback(); + // + // We should always clear resource here and response directly, because success or + //fail(connection broken only) always be correct after calling backend rollback(). + // @since 2017-01-16 little-pan + // + // clear resources & rollback + clearResources(true); + // do-response + final ByteBuffer buffer = + source.writeToBuffer(OkPacket.OK, source.allocate()); + source.write(buffer); + // Tag tx done successfully. + // @since 2017-01-16 little-pan + source.onTxDone("rollback"); } @Override @@ -220,10 +243,8 @@ public void releaseConnectionIfSafe(BackendConnection conn, boolean debug, } } - public void releaseConnection(RouteResultsetNode rrn, boolean debug, - final boolean needRollback) { - - BackendConnection c = target.remove(rrn); + public void releaseConnection(RouteResultsetNode rrn, boolean debug, final boolean needRollback) { + final BackendConnection c = target.remove(rrn); if (c != null) { if (debug) { LOGGER.debug("release connection " + c); @@ -234,15 +255,14 @@ public void releaseConnection(RouteResultsetNode rrn, boolean debug, if (!c.isClosedOrQuit()) { if (c.isAutocommit()) { c.release(); - } else - // if (needRollback) - { + } else if (needRollback) { + // Should used to rollback(), so that should uncomment the code block. + // @since 2017-01-16 little-pan c.setResponseHandler(new RollbackReleaseHandler()); c.rollback(); - } - // else { -// c.release(); -// } + } else { + c.release(); + } } } } @@ -274,15 +294,14 @@ public void releaseConnection(BackendConnection con) { /** * @return previous bound connection */ - public BackendConnection bindConnection(RouteResultsetNode key, - BackendConnection conn) { - // System.out.println("bind connection "+conn+ - // " to key "+key.getName()+" on sesion "+this); + public BackendConnection bindConnection(final RouteResultsetNode key, + final BackendConnection conn) { + Tracer.trace(conn, "binding: route-node = %s, src-cnxn = %s, back-cnxn = %s", + key, source, conn); return target.put(key, conn); } - public boolean tryExistsCon(final BackendConnection conn, - RouteResultsetNode node) { + public boolean tryExistsCon(final BackendConnection conn, RouteResultsetNode node) { if (conn == null) { return false; @@ -300,7 +319,7 @@ public boolean tryExistsCon(final BackendConnection conn, } else { // slavedb connection and can't use anymore ,release it if (LOGGER.isDebugEnabled()) { - LOGGER.debug("release slave connection,can't be used in trasaction " + LOGGER.debug("release slave connection, can't be used in trasaction " + conn + " for " + node); } releaseConnection(node, LOGGER.isDebugEnabled(), false); diff --git a/src/main/java/org/opencloudb/server/ServerConnection.java b/src/main/java/org/opencloudb/server/ServerConnection.java index 54845bf80..66018dc71 100644 --- a/src/main/java/org/opencloudb/server/ServerConnection.java +++ b/src/main/java/org/opencloudb/server/ServerConnection.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.nio.channels.NetworkChannel; -import java.util.Map; import org.apache.log4j.Logger; import org.opencloudb.MycatServer; @@ -38,6 +37,7 @@ import org.opencloudb.server.response.Heartbeat; import org.opencloudb.server.response.Ping; import org.opencloudb.server.util.SchemaUtil; +import org.opencloudb.trace.Tracer; import org.opencloudb.util.TimeUtil; /** @@ -49,6 +49,11 @@ public class ServerConnection extends FrontendConnection { private static final long AUTH_TIMEOUT = 15 * 1000L; private volatile int txIsolation; + // local tx id: used to trace tx execution, + // 0 - tx not start, other - tx pending. + // @since 2017-01-15 little-pan + private volatile int txLocalId, txLastLocalId; + private volatile boolean autocommit; private volatile boolean txInterrupted; private volatile String txInterrputMsg = ""; @@ -110,12 +115,21 @@ public boolean isTxInterrupted() { return txInterrupted; } - public NonBlockingSession getSession2() { + + /** + *

+ * getSession2() -> getSession(). + *

+ * + * @since 2017-01-15 little-pan + * @return server connection session + */ + public NonBlockingSession getSession() { return session; } - public void setSession2(NonBlockingSession session2) { - this.session = session2; + public void setSession(NonBlockingSession session) { + this.session = session; } @Override @@ -289,12 +303,43 @@ public void close(String reason) { getLoadDataInfileHandler().clear(); } } + + public int getTxLocalId(){ + return txLocalId; + } + + public int getTxLastLocalId(){ + return txLastLocalId; + } + + public ServerConnection onTxStart(final String stmt){ + final int txid = txLocalId; + if(txid != 0){ + LOGGER.warn("tx ended abnormally!"); + } + txLocalId = txLastLocalId + 1; + Tracer.traceTx(this, "tx start: stmt = %s, cnxn = %s", stmt, this); + return this; + } + + public ServerConnection onTxDone(final String reason){ + final int txid= txLocalId; + if(txid == 0){ + // case: multi-call. + return this; + } + Tracer.traceTx(this, "tx done: reason = %s, cnxn = %s", reason, this); + txLastLocalId = txid; + txLocalId = 0; + return this; + } @Override public String toString() { return "ServerConnection [id=" + id + ", schema=" + schema + ", host=" + host + ", user=" + user + ",txIsolation=" + txIsolation - + ", autocommit=" + autocommit + ", schema=" + schema + "]"; + + ", autocommit=" + autocommit + + ", charset=" + charset + ", charsetIndex=" + charsetIndex +"]"; } } \ No newline at end of file diff --git a/src/main/java/org/opencloudb/server/ServerConnectionFactory.java b/src/main/java/org/opencloudb/server/ServerConnectionFactory.java index 108f7ce73..c1fe8b77c 100644 --- a/src/main/java/org/opencloudb/server/ServerConnectionFactory.java +++ b/src/main/java/org/opencloudb/server/ServerConnectionFactory.java @@ -32,6 +32,7 @@ import org.opencloudb.net.FrontendConnection; import org.opencloudb.net.factory.FrontendConnectionFactory; import org.opencloudb.server.handler.ServerLoadDataInfileHandler; +import org.opencloudb.trace.Tracer; /** * @author mycat @@ -49,7 +50,9 @@ protected FrontendConnection getConnection(NetworkChannel channel) throws IOExce // c.setPrepareHandler(new ServerPrepareHandler(c)); c.setTxIsolation(sys.getTxIsolation()); //创建绑定唯一Session - c.setSession2(new NonBlockingSession(c)); + c.setSession(new NonBlockingSession(c)); + + Tracer.trace(c, "created"); return c; } diff --git a/src/main/java/org/opencloudb/server/handler/BeginHandler.java b/src/main/java/org/opencloudb/server/handler/BeginHandler.java index 0a01fc9f0..826a9b36e 100644 --- a/src/main/java/org/opencloudb/server/handler/BeginHandler.java +++ b/src/main/java/org/opencloudb/server/handler/BeginHandler.java @@ -23,7 +23,6 @@ */ package org.opencloudb.server.handler; -import org.opencloudb.config.ErrorCode; import org.opencloudb.server.ServerConnection; /** @@ -39,7 +38,7 @@ public static void handle(String stmt, ServerConnection c) { c.write(c.writeToBuffer(AC_OFF, c.allocate())); }else { - c.getSession2().commit() ; + c.getSession().commit() ; } } diff --git a/src/main/java/org/opencloudb/server/handler/Explain2Handler.java b/src/main/java/org/opencloudb/server/handler/Explain2Handler.java index dde39e43c..c631bb8e5 100644 --- a/src/main/java/org/opencloudb/server/handler/Explain2Handler.java +++ b/src/main/java/org/opencloudb/server/handler/Explain2Handler.java @@ -75,7 +75,7 @@ public static void handle(String stmt, ServerConnection c, int offset) { RouteResultset rrs = new RouteResultset(sql, ServerParse.SELECT); EMPTY_ARRAY[0] = node; rrs.setNodes(EMPTY_ARRAY); - SingleNodeHandler singleNodeHandler = new SingleNodeHandler(rrs, c.getSession2()); + SingleNodeHandler singleNodeHandler = new SingleNodeHandler(rrs, c.getSession()); singleNodeHandler.execute(); } catch (Exception e) { logger.error(e.getMessage(), e.getCause()); diff --git a/src/main/java/org/opencloudb/server/handler/ServerLoadDataInfileHandler.java b/src/main/java/org/opencloudb/server/handler/ServerLoadDataInfileHandler.java index ec3477d03..d926633eb 100644 --- a/src/main/java/org/opencloudb/server/handler/ServerLoadDataInfileHandler.java +++ b/src/main/java/org/opencloudb/server/handler/ServerLoadDataInfileHandler.java @@ -221,7 +221,7 @@ public void start(String sql) { flushDataToFile(); isStartLoadData = false; - serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); + serverConnection.getSession().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); } } @@ -657,13 +657,10 @@ public void end(byte packID) if (rrs != null) { flushDataToFile(); - serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); + serverConnection.getSession().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); } - // sendOk(++packID); - - } @@ -811,7 +808,7 @@ private static void deleteFile(String dirPath) File file = fileList[i]; if (file.isFile()&&file.exists()) { - boolean delete = file.delete(); + //boolean delete = file.delete(); } else if (file.isDirectory()) { deleteFile(file.getAbsolutePath()); diff --git a/src/main/java/org/opencloudb/server/handler/SetHandler.java b/src/main/java/org/opencloudb/server/handler/SetHandler.java index 4706545ca..894a40bbb 100644 --- a/src/main/java/org/opencloudb/server/handler/SetHandler.java +++ b/src/main/java/org/opencloudb/server/handler/SetHandler.java @@ -48,7 +48,6 @@ /** * SET 语句处理 * - * @author mycat * @author zhuam */ public final class SetHandler { @@ -82,7 +81,7 @@ public static void handle(String stmt, ServerConnection c, int offset) { "set xa cmd on can't used in autocommit connection "); return; } - c.getSession2().setXATXEnabled(true); + c.getSession().setXATXEnabled(true); c.write(c.writeToBuffer(OkPacket.OK, c.allocate())); break; } diff --git a/src/main/java/org/opencloudb/server/handler/StartHandler.java b/src/main/java/org/opencloudb/server/handler/StartHandler.java index 1fd070d09..aa23ae84c 100644 --- a/src/main/java/org/opencloudb/server/handler/StartHandler.java +++ b/src/main/java/org/opencloudb/server/handler/StartHandler.java @@ -23,7 +23,6 @@ */ package org.opencloudb.server.handler; -import org.opencloudb.config.ErrorCode; import org.opencloudb.server.ServerConnection; import org.opencloudb.server.parser.ServerParse; import org.opencloudb.server.parser.ServerParseStart; @@ -43,7 +42,7 @@ public static void handle(String stmt, ServerConnection c, int offset) { c.write(c.writeToBuffer(AC_OFF, c.allocate())); }else { - c.getSession2().commit() ; + c.getSession().commit() ; } break; default: diff --git a/src/main/java/org/opencloudb/server/parser/ServerParse.java b/src/main/java/org/opencloudb/server/parser/ServerParse.java index 3a44a98aa..8f37406ad 100644 --- a/src/main/java/org/opencloudb/server/parser/ServerParse.java +++ b/src/main/java/org/opencloudb/server/parser/ServerParse.java @@ -79,6 +79,9 @@ public static int parse(String stmt) { && stmt.charAt(lenth - 1) == '/') { return MYSQL_CMD_COMMENT; } + case '-': + // + "-- comment" + // @since 2017-01-21 little-pan case '#': i = ParseUtil.comment(stmt, i); if (i + 1 == lenth) { diff --git a/src/main/java/org/opencloudb/sqlengine/OneRawSQLQueryResultHandler.java b/src/main/java/org/opencloudb/sqlengine/OneRawSQLQueryResultHandler.java index 918ae7af9..502435840 100644 --- a/src/main/java/org/opencloudb/sqlengine/OneRawSQLQueryResultHandler.java +++ b/src/main/java/org/opencloudb/sqlengine/OneRawSQLQueryResultHandler.java @@ -77,9 +77,9 @@ public boolean onRowData(String dataNode, byte[] rowData) { @Override public void finished(String dataNode, boolean failed) { - SQLQueryResult> queryRestl=new SQLQueryResult>(this.result,!failed); + SQLQueryResult> queryRestl = + new SQLQueryResult>(this.result,!failed); this.callback.onResult(queryRestl); - } } diff --git a/src/main/java/org/opencloudb/sqlengine/SQLJob.java b/src/main/java/org/opencloudb/sqlengine/SQLJob.java index 8f208bc30..915bb2ecf 100644 --- a/src/main/java/org/opencloudb/sqlengine/SQLJob.java +++ b/src/main/java/org/opencloudb/sqlengine/SQLJob.java @@ -12,6 +12,7 @@ import org.opencloudb.net.mysql.ErrorPacket; import org.opencloudb.route.RouteResultsetNode; import org.opencloudb.server.parser.ServerParse; +import org.opencloudb.trace.Tracer; /** * asyn execute in EngineCtx or standalone (EngineCtx=null) @@ -104,13 +105,14 @@ private void doFinished(boolean failed) { if (ctx != null) { ctx.onJobFinished(this); } + Tracer.trace(connection, "finished = %s, failed = %s, cnxn = %s", + finished, failed, connection); } @Override public void connectionError(Throwable e, BackendConnection conn) { LOGGER.info("can't get connection for sql :" + sql); doFinished(true); - } @Override @@ -121,7 +123,6 @@ public void errorResponse(byte[] err, BackendConnection conn) { + " from of sql :" + sql + " at con:" + conn); conn.release(); doFinished(true); - } @Override diff --git a/src/main/java/org/opencloudb/trace/Tracer.java b/src/main/java/org/opencloudb/trace/Tracer.java new file mode 100644 index 000000000..9b4fd086e --- /dev/null +++ b/src/main/java/org/opencloudb/trace/Tracer.java @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ + +package org.opencloudb.trace; + +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import org.apache.log4j.Logger; +import org.opencloudb.backend.BackendConnection; +import org.opencloudb.net.AbstractConnection; +import org.opencloudb.net.FrontendConnection; +import org.opencloudb.server.ServerConnection; + +/** + *

+ * A component is used to trace mycat execution deeply, which includes basic trace + *and frontend-or-backend connection trace, for debug intention and our high qualified + *mycat server. + *

+ * + *

+ * The tracer is disabled by default, and you can enable it by specifying + *the system property {@code -Dmycat.trace.enabled=true} and setting log4j + *debug level. + *

+ * + * @author little-pan + * @since 2017-01-11 + */ +public final class Tracer { + final static Logger LOGGER = Logger.getLogger(Tracer.class); + + final static boolean trace, traceStack, useBuffer, + buffer, fronend, backend, + tx; + final static int maxBuffer, initBuffer, stackMaxDeep; + final static String lineSep, stackPrompt, stackIndent; + + final static Method address; + + + private final static ThreadLocal localBuffer = new ThreadLocal(){ + @Override + protected StringBuilder initialValue(){ + return (new StringBuilder(initBuffer)); + } + }; + + static{ + trace = Boolean.parseBoolean(System.getProperty("mycat.trace.enabled", "false")); + buffer = Boolean.parseBoolean(System.getProperty("mycat.trace.buffer", "false")); + fronend = Boolean.parseBoolean(System.getProperty("mycat.trace.fronend", "false")); + backend = Boolean.parseBoolean(System.getProperty("mycat.trace.backend", "false")); + tx = Boolean.parseBoolean(System.getProperty("mycat.trace.tx", "true")); + if(trace && buffer){ + final ByteBuffer dbuf = ByteBuffer.allocateDirect(1); + Method addr = null; + try{ + addr = dbuf.getClass().getDeclaredMethod("address"); + addr.setAccessible(true); + }catch(final Exception e){ + addr = null; + } + address = addr; + }else{ + address = null; + } + + traceStack = Boolean.parseBoolean(System.getProperty("mycat.trace.stack", "true")); + stackPrompt = System.getProperty("mycat.trace.stackPrompt", ">"); + stackIndent = System.getProperty("mycat.trace.stackIndent", " "); + stackMaxDeep= Integer.parseInt(System.getProperty("mycat.trace.stackMaxDeep", "20")); + + useBuffer = Boolean.parseBoolean(System.getProperty("mycat.trace.useBuffer", "true")); + maxBuffer = Integer.parseInt(System.getProperty("mycat.trace.maxBuffer", "4096")); + initBuffer= Integer.parseInt(System.getProperty("mycat.trace.initBuffer", "512")); + + lineSep = System.getProperty("line.separator"); + } + + private Tracer(){ + // noop + } + + public final static void trace(final String format, final Object ...args){ + if(trace && LOGGER.isDebugEnabled()){ + trace0(2, null, format, args); + } + } + + public final static void trace(final String tag, final String format, final Object ...args){ + if(trace && LOGGER.isDebugEnabled()){ + trace0(2, tag, format, args); + } + } + + public final static void trace(final ByteBuffer buffer){ + if(trace && Tracer.buffer && LOGGER.isDebugEnabled()){ + trace0(2, bufferTag(buffer), + "pos = %x, rem = %x, lim = %x", + buffer.position(), buffer.remaining(), buffer.limit()); + } + } + + public final static void trace(final ByteBuffer buffer, final String format, final Object ...args){ + if(trace && Tracer.buffer && LOGGER.isDebugEnabled()){ + trace0(2, bufferTag(buffer), format, args); + } + } + + private final static String bufferTag(final ByteBuffer buffer){ + if(buffer == null){ + return ("buffer#null"); + } + // ByteBuffer hash-code related to content, so that it same when + //the buffer block size same. We can get the buffer address or array + //hash code as buffer hash code for tracing. + // @since 2017-01-14 little-pan + final String heap; + long hash = 0L; + if(buffer.isDirect()){ + heap = "oheap"; + try{ + if(address != null){ + hash = (Long)address.invoke(buffer); + } + }catch(final Exception e){ + // ignore + } + }else{ + heap = "heap"; + final byte[] array = buffer.array(); + hash = array.hashCode(); + } + return (String.format("%s#%x-%x", heap, hash, buffer.capacity())); + } + + public final static void traceTx(final ServerConnection connection) { + if(trace && Tracer.tx && LOGGER.isDebugEnabled()){ + trace0(2, txTag(connection), connection + ""); + } + } + + public final static void traceTx(final ServerConnection connection, final String format, final Object ...args) { + if(trace && Tracer.tx && LOGGER.isDebugEnabled()){ + trace0(2, txTag(connection), format, args); + } + } + + private final static String txTag(final ServerConnection connection) { + if(connection == null){ + return ("tx#null"); + } + return (String.format("tx#%x-%x", connection.hashCode(), connection.getTxLocalId())); + } + + public final static void traceCnxn(final AbstractConnection connection){ + if(trace && LOGGER.isDebugEnabled()){ + if( (connection instanceof BackendConnection && Tracer.backend == false) + || + (connection instanceof FrontendConnection && Tracer.fronend == false) + ){ + return; + } + trace0(2, connectTag(connection), connection+""); + } + } + + public final static void traceCnxn(final AbstractConnection connection, final String format, final Object ...args){ + if(trace && LOGGER.isDebugEnabled()){ + if( (connection instanceof BackendConnection && Tracer.backend == false) + || + (connection instanceof FrontendConnection && Tracer.fronend == false) + ){ + return; + } + trace0(2, connectTag(connection), format, args); + } + } + + private final static String connectTag(final AbstractConnection connection){ + final String tag; + if(connection instanceof BackendConnection){ + tag = backendTag((BackendConnection)connection); + }else if(connection instanceof FrontendConnection){ + tag = fronendTag((FrontendConnection)connection); + }else if(connection == null){ + tag = "connect#null"; + }else{ + tag = String.format("connect#%x-%d", + connection.hashCode(), connection.getId()); + } + return tag; + } + + public final static void trace(final FrontendConnection fronend){ + if(trace && Tracer.fronend && LOGGER.isDebugEnabled()){ + trace0(2, fronendTag(fronend), fronend+""); + } + } + + public final static void trace(final FrontendConnection fronend, final String format, final Object ...args){ + if(trace && Tracer.fronend && LOGGER.isDebugEnabled()){ + trace0(2, fronendTag(fronend), format, args); + } + } + + private final static String fronendTag(final FrontendConnection fronend){ + if(fronend == null){ + return "fronend#null"; + } + return (String.format("fronend#%x-%d", fronend.hashCode(), fronend.getId())); + } + + public final static void trace(final BackendConnection backend){ + if(trace && Tracer.backend && LOGGER.isDebugEnabled()){ + trace0(2, backendTag(backend), backend+""); + } + } + + public final static void trace(final BackendConnection backend, final String format, final Object ...args){ + if(trace && Tracer.backend && LOGGER.isDebugEnabled()){ + trace0(2, backendTag(backend), format, args); + } + } + + private final static String backendTag(final BackendConnection backend){ + if(backend == null){ + return "backend#null"; + } + return (String.format("backend#%x-%d", backend.hashCode(), backend.getId())); + } + + final static void trace0(final int traces, + final String tag, final String format, final Object ...args){ + final StringBuilder buf = acquireBuffer(); + try{ + // tag: optional + if(tag != null){ + buf.append('[').append(tag).append(']').append(' '); + } + buf.append((args == null) ? format: (String.format(format, args))); + if(traceStack){ + final Thread curThread = Thread.currentThread(); + final String thrName = curThread.getName(); + final StackTraceElement[] stack = curThread.getStackTrace(); + final int size = stack.length, maxDeep = stackMaxDeep; + final int base = traces + 1/* ignored: trace(), getStackTrace() */; + for(int i = size, k = 0; i > base; ++k){ + buf.append(lineSep); + if(tag != null){ + buf.append('[').append(tag).append(']'); + } + buf.append('[').append(thrName).append(']'); + if(k == maxDeep){ + buf.append('<').append(".........").append(i-base).append(" levels omitted"); + break; + } + final StackTraceElement e = stack[--i]; + final int w = (size - (i+1))<<1; + for(int j = 0; j < w; ++j){ + buf.append(stackIndent); + } + buf.append(stackPrompt).append(' ').append(e); + } + } + LOGGER.debug(buf); + }finally{ + releaseBuffer(buf); + } + } + + private final static StringBuilder acquireBuffer(){ + if(useBuffer){ + return (localBuffer.get()); + } + return (new StringBuilder()); + } + + private final static void releaseBuffer(final StringBuilder buf){ + buf.setLength(0); + if(buf.capacity() > maxBuffer){ + localBuffer.remove(); + } + } + + private final static void testDeeper(){ + trace("Tracer", "test"); + trace("Tracer", "test: s0 = %d, s1 = %d", 1, 2); + trace("Tracer", "test: cur= %s", new java.util.Date()); + trace((String)null, "test"); + trace("test: JAN = %.2f, FEB = %.2f", 2017.01, 2017.02); + trace("test"); + } + + private final static void test(){ + testDeeper(); + } + + public static void main(String args[]){ + test(); + } + +} diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml index 88aa64161..ab60be4e1 100644 --- a/src/main/resources/log4j.xml +++ b/src/main/resources/log4j.xml @@ -15,7 +15,7 @@ - limitations under the License. --> - + @@ -34,9 +34,9 @@ - - - + + +