diff --git a/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java b/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java index f9efb46..64d8ca2 100644 --- a/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java @@ -61,7 +61,6 @@ public enum CurrPacketType { public AbstractMySQLSession(BufferPool bufferPool, Selector selector, SocketChannel channel) throws IOException { this(bufferPool, selector, channel, SelectionKey.OP_READ); - } @@ -93,7 +92,8 @@ public void responseOKOrError(MySQLPacket pkg) throws IOException { /** * 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE - * 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded=true + * 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded + * =true * * @param proxyBuf * @return @@ -143,7 +143,56 @@ public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf // 解包获取包的数据长度 int pkgLength = ParseUtil.getPacketLength(buffer, offset); // 解析报文类型 - final byte packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize); + // final byte packetType = buffer.get(offset + + // ParseUtil.msyql_packetHeaderSize); + + // 解析报文类型 + int packetType = -1; + + // 在包长度小于7时,作为resultSet的首包 + if (pkgLength <= 7) { + int index = offset + ParseUtil.msyql_packetHeaderSize; + + long len = proxyBuf.getInt(index, 1) & 0xff; + // 如果长度小于251,则取默认的长度 + if (len < 251) { + packetType = (int) len; + } else if (len == 0xfc) { + // 进行验证是否位数足够,作为短包处理 + if (!ParseUtil.validateResultHeader(offset, limit, 2)) { + // 收到短半包 + logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset, + limit); + return CurrPacketType.ShortHalfPacket; + } + packetType = (int) proxyBuf.getInt(index + 1, 2); + } else if (len == 0xfd) { + + // 进行验证是否位数足够,作为短包处理 + if (!ParseUtil.validateResultHeader(offset, limit, 3)) { + // 收到短半包 + logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset, + limit); + return CurrPacketType.ShortHalfPacket; + } + + packetType = (int) proxyBuf.getInt(index + 1, 3); + } else { + // 进行验证是否位数足够,作为短包处理 + if (!ParseUtil.validateResultHeader(offset, limit, 8)) { + // 收到短半包 + logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset, + limit); + return CurrPacketType.ShortHalfPacket; + } + + packetType = (int) proxyBuf.getInt(index + 1, 8); + } + } else { + // 解析报文类型 + packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize); + } + // 包的类型 curPackInf.pkgType = packetType; // 设置包的长度 @@ -179,6 +228,4 @@ public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf } } - - } diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index f98ae35..93d45e7 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -5,6 +5,8 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import io.mycat.mycat2.cmds.pkgread.PkgFirstReader; +import io.mycat.mycat2.cmds.pkgread.PkgProcess; import io.mycat.proxy.BufferPool; /** @@ -25,6 +27,11 @@ public class MySQLSession extends AbstractMySQLSession { */ private MycatSession mycatSession; + /** + * 当前结束检查处理的状态,默认为首包检查读取 + */ + public PkgProcess currPkgProc = PkgFirstReader.INSTANCE; + public MySQLSession(BufferPool bufferPool, Selector selector, SocketChannel channel) throws IOException { super(bufferPool, selector, channel, SelectionKey.OP_CONNECT); } @@ -58,8 +65,7 @@ public void setMycatSession(MycatSession mycatSession) { @Override protected void doTakeReadOwner() { - this.getMycatSession().takeOwner(SelectionKey.OP_READ); - + this.getMycatSession().takeOwner(SelectionKey.OP_READ); } public String getCurrBackendCachedName() { @@ -70,6 +76,4 @@ public void setCurrBackendCachedName(String currBackendCachedName) { this.currBackendCachedName = currBackendCachedName; } - - } diff --git a/source/src/main/java/io/mycat/mycat2/beans/MySQLDataSource.java b/source/src/main/java/io/mycat/mycat2/beans/MySQLDataSource.java index 6424f16..167e5ea 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/MySQLDataSource.java +++ b/source/src/main/java/io/mycat/mycat2/beans/MySQLDataSource.java @@ -24,7 +24,21 @@ package io.mycat.mycat2.beans; import java.io.IOException; - +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TransferQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.tasks.BackendCharsetReadTask; +import io.mycat.mycat2.tasks.BackendConCreateTask; +import io.mycat.proxy.BufferPool; +import io.mycat.proxy.ProxyReactorThread; +import io.mycat.proxy.ProxyRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,28 +50,54 @@ public class MySQLDataSource { public static final Logger LOGGER = LoggerFactory.getLogger(MySQLDataSource.class); private final String name; - private final int size; + private final AtomicInteger activeSize; private final MySQLBean mysqlBean; private final ConMap conMap = new ConMap(); private long heartbeatRecoveryTime; private boolean slaveNode; + /** collationIndex 和 charsetName 的映射 */ + public final Map INDEX_TO_CHARSET = new HashMap<>(); + /** charsetName 到 默认collationIndex 的映射 */ + public final Map CHARSET_TO_INDEX = new HashMap<>(); + + private TransferQueue sessionQueue = new LinkedTransferQueue<>(); + + public MySQLSession getSession() { + MySQLSession session = sessionQueue.poll(); + if (session != null) { + return session; + } + + //todo 新建连接 + return null; + } + public MySQLDataSource(MySQLBean config, boolean islaveNode) { - this.size = config.getMaxCon(); + this.activeSize = new AtomicInteger(0); this.mysqlBean = config; this.name = config.getHostName(); this.slaveNode = islaveNode; - } - // public MySQLBackendConnection createNewConnection(String reactor, String - // schema, MySQLFrontConnection mySQLFrontConnection, BackConnectionCallback - // userCallback) throws IOException { - // MySQLBackendConnection con = factory.make(this, reactor, schema, - // mySQLFrontConnection, userCallback); - // this.conMap.getSchemaConQueue(schema).getAutoCommitCons().add(con); - // return con; - // } + public void createMySQLSession(BufferPool bufferPool, Selector selector) { + try { + BackendConCreateTask authProcessor = new BackendConCreateTask(bufferPool, selector, this, null); + authProcessor.setCallback((optSession, sender, exeSucces, retVal) -> { + if (exeSucces) { + int curSize = activeSize.incrementAndGet(); + if (curSize == 1) { + BackendCharsetReadTask backendCharsetReadTask = new BackendCharsetReadTask(optSession, this); + optSession.setCurNIOHandler(backendCharsetReadTask); + backendCharsetReadTask.readCharset(); + } + sessionQueue.add(optSession); + } + }); + } catch (IOException e) { + LOGGER.warn("error to create mysqlSession for datasource: {}", this.name); + } + } public boolean isSlaveNode() { return slaveNode; @@ -67,8 +107,8 @@ public void setSlaveNode(boolean slaveNode) { this.slaveNode = slaveNode; } - public int getSize() { - return size; + public AtomicInteger getActiveSize() { + return activeSize; } public String getName() { @@ -78,22 +118,15 @@ public String getName() { public boolean initSource() { int initSize = this.mysqlBean.getMinCon(); LOGGER.info("init backend myqsl source ,create connections total " + initSize + " for " + mysqlBean); - // Set reactos = - // SQLEngineCtx.INSTANCE().getReactorMap().keySet(); - // Iterator itor = reactos.iterator(); - // for (int i = 0; i < initSize; i++) { - // try { - // String actorName = null; - // if (!itor.hasNext()) { - // itor = reactos.iterator(); - // } - // actorName = itor.next(); - // this.createNewConnectionOnReactor(actorName, - // this.mysqlBean.getDefaultSchema(), null, null); - // } catch (Exception e) { - // LOGGER.warn(" init connection error.", e); - // } - // } + + ProxyRuntime runtime = ProxyRuntime.INSTANCE; + ProxyReactorThread[] reactorThreads = runtime.getReactorThreads(); + int reactorSize = runtime.getNioReactorThreads(); + for (int i = 0; i < initSize; i++) { + ProxyReactorThread reactorThread = reactorThreads[i % reactorSize]; + reactorThread.addNIOJob(() -> createMySQLSession(reactorThread.getBufPool(), reactorThread.getSelector())); + } + LOGGER.info("init source finished"); return true; } @@ -156,7 +189,7 @@ public MySQLBean getConfig() { @Override public String toString() { final StringBuilder sbuf = new StringBuilder("MySQLDataSource[").append("name=").append(name).append(',') - .append("size=").append(size).append(',').append("heartbeatRecoveryTime=").append(heartbeatRecoveryTime) + .append("activeSize=").append(activeSize).append(',').append("heartbeatRecoveryTime=").append(heartbeatRecoveryTime) .append(',').append("slaveNode=").append(slaveNode).append(',').append("mysqlBean=").append(mysqlBean) .append(']'); return (sbuf.toString()); diff --git a/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java b/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java index 8695686..8c7e70d 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java +++ b/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java @@ -5,7 +5,7 @@ * */ public class MySQLPackageInf { -public byte pkgType; +public int pkgType; public boolean crossBuffer; public int startPos; public int endPos; diff --git a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java index 6761911..944647e 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java @@ -13,6 +13,10 @@ import io.mycat.mycat2.MycatSession; import io.mycat.mycat2.MySQLCommand; import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mycat2.cmds.judge.DirectTransJudge; +import io.mycat.mycat2.cmds.judge.ErrorJudge; +import io.mycat.mycat2.cmds.judge.OkJudge; +import io.mycat.mycat2.console.SessionKeyEnum; import io.mycat.mysql.packet.MySQLPacket; import io.mycat.proxy.ProxyBuffer; @@ -28,17 +32,17 @@ public class DirectPassthrouhCmd implements MySQLCommand { public static final DirectPassthrouhCmd INSTANCE = new DirectPassthrouhCmd(); - // ********** 临时处理,等待与KK 代码合并 - private static final Map finishPackage = new HashMap<>(); - - private Map curfinishPackage = new HashMap<>(); + /** + * 指定需要处理的包类型信息 + */ + private static final Map JUDGEMAP = new HashMap<>(); static { - finishPackage.put(MySQLPacket.OK_PACKET, 1); - finishPackage.put(MySQLPacket.ERROR_PACKET, 1); - finishPackage.put(MySQLPacket.EOF_PACKET, 2); + // 用来进行ok包的处理理 + JUDGEMAP.put((int) MySQLPacket.OK_PACKET, OkJudge.INSTANCE); + // 用来进行error包的处理 + JUDGEMAP.put((int) MySQLPacket.ERROR_PACKET, ErrorJudge.INSTANCE); } - // ********** 临时处理,等待与KK 代码合并 @Override public boolean procssSQL(MycatSession session) throws IOException { @@ -58,17 +62,6 @@ public boolean procssSQL(MycatSession session) throws IOException { return false; } - private boolean isfinishPackage(MySQLPackageInf curMSQLPackgInf) throws IOException { - switch (curMSQLPackgInf.pkgType) { - case MySQLPacket.OK_PACKET: - case MySQLPacket.ERROR_PACKET: - case MySQLPacket.EOF_PACKET: - return true; - default: - return false; - } - } - @Override public void clearResouces(boolean sessionCLosed) { // TODO Auto-generated method stub @@ -77,75 +70,39 @@ public void clearResouces(boolean sessionCLosed) { @Override public boolean onBackendResponse(MySQLSession session) throws IOException { - logger.info("received backend mysql data "); + + // 首先进行一次报文的读取操作 if (!session.readFromChannel()) { return false; } - ProxyBuffer curBuffer = session.proxyBuffer; - MySQLPackageInf curMSQLPackgInf = session.curMSQLPackgInf; - boolean isallfinish = false; - boolean isContinue = true; - while (isContinue) { - switch (session.resolveMySQLPackage(curBuffer, curMSQLPackgInf, true)) { - case Full: - Integer count = curfinishPackage.get(curMSQLPackgInf.pkgType); - if (count != null) { - if (--count == 0) { - isallfinish = true; - curfinishPackage.clear(); - } - curfinishPackage.put(curMSQLPackgInf.pkgType, count); - } - if (curBuffer.readIndex == curBuffer.writeIndex) { - isContinue = false; - } else { - isContinue = true; - } - break; - case LongHalfPacket: - if (curMSQLPackgInf.crossBuffer) { - // 发生过透传的半包,往往包的长度超过了buffer 的长度. - logger.debug(" readed crossBuffer LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); - } else if (!isfinishPackage(curMSQLPackgInf)) { - // 不需要整包解析的长半包透传. result set .这种半包直接透传 - curMSQLPackgInf.crossBuffer = true; - curBuffer.readIndex = curMSQLPackgInf.endPos; - curMSQLPackgInf.remainsBytes = curMSQLPackgInf.pkgLength - - (curMSQLPackgInf.endPos - curMSQLPackgInf.startPos); - logger.debug(" readed LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); - } else { - // 读取到了EOF/OK/ERROR 类型长半包 是需要保证是整包的. - logger.debug(" readed finished LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); - } - isContinue = false; - break; - case ShortHalfPacket: - logger.debug(" readed ShortHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); - isContinue = false; - break; - } - } + // 进行报文处理的流程化 + boolean nextReadFlag = false; + do { + // 进行报文的处理流程 + nextReadFlag = session.currPkgProc.procssPkg(session); + } while (nextReadFlag); - // 切换buffer 读写状态 - curBuffer.flip(); - MycatSession mycatSession = session.getMycatSession(); - // 直接透传报文 - mycatSession.takeOwner(SelectionKey.OP_WRITE); - mycatSession.writeToChannel(); - /** - * 当前命令处理是否全部结束,全部结束时需要清理资源 - */ return false; } @Override public boolean onFrontWriteFinished(MycatSession session) throws IOException { // 判断是否结果集传输完成,决定命令是否结束,切换到前端读取数据 - // todo + // 检查当前已经结束,进行切换 logger.warn("not well implemented ,please fix it "); - session.proxyBuffer.flip(); - session.chnageBothReadOpts(); + + // 检查如果存在传输的标识,说明后传数据向前传传输未完成,注册后端的读取事件 + if (session.getSessionAttrMap().containsKey(SessionKeyEnum.SESSION_KEY_TRANSFER_OVER_FLAG.getKey())) { + session.proxyBuffer.flip(); + session.giveupOwner(SelectionKey.OP_READ); + } + // 当传输标识不存在,则说已经结束,则切换到前端的读取 + else { + session.proxyBuffer.flip(); + // session.chnageBothReadOpts(); + session.takeOwner(SelectionKey.OP_READ); + } return false; } @@ -153,8 +110,9 @@ public boolean onFrontWriteFinished(MycatSession session) throws IOException { @Override public boolean onBackendWriteFinished(MySQLSession session) throws IOException { // 绝大部分情况下,前端把数据写完后端发送出去后,就等待后端返回数据了, - // 此时Buffer改为读状态 + // 向后端写入完成数据后,则从后端读取数据 session.proxyBuffer.flip(); + // 由于单工模式,在向后端写入完成后,需要从后端进行数据读取 session.change2ReadOpts(); return false; diff --git a/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCommand.java b/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCommand.java new file mode 100644 index 0000000..a8d922e --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCommand.java @@ -0,0 +1,189 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.SQLCommand; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.proxy.ProxyBuffer; + +/** + * 进行load data的命令处理 + * + * @author wuzhihui + * + */ +public class LoadDataCommand implements SQLCommand { + + private static final Logger logger = LoggerFactory.getLogger(LoadDataCommand.class); + + /** + * 透传的实例对象 + */ + public static final LoadDataCommand INSTANCE = new LoadDataCommand(); + + /** + * loaddata传送结束标识长度 + */ + private static final int FLAGLENGTH = 4; + + /** + * 结束flag标识 + */ + private byte[] overFlag = new byte[FLAGLENGTH]; + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + + // 进行传输,并检查返回结果检查 ,当传输完成,就将切换为正常的透传 + if (transLoadData(session)) { + session.curSQLCommand = DirectPassthrouhCmd.INSTANCE; + // 当load data的包完成后,则又重新打开包完整性检查 + session.getSessionAttrMap().remove(SessionKeyEnum.SESSION_PKG_READ_FLAG.getKey()); + } + + return false; + } + + /** + * 进行load data的数据传输操作 + * + * @param session + * 会话标识 + * @param backresReceived + * 是否为后端报文标识 + * @return true 当前传输结束 + * @throws IOException + */ + public boolean transLoadData(MycatSession session) throws IOException { + + ProxyBuffer curBuffer = session.proxyBuffer; + + // 进行结束符的读取 + this.readOverByte(curBuffer); + + // 切换buffer 读写状态 + curBuffer.flip(); + // 检查当前是否结束 + if (checkOver()) { + + // 没有读取,直接透传时,需要指定 透传的数据 截止位置 + curBuffer.readIndex = curBuffer.writeIndex; + + MySQLSession mycatSession = session.getBackend(); + + // 读取结束后 改变 owner,对端Session获取,并且感兴趣写事件 + session.giveupOwner(SelectionKey.OP_READ); + mycatSession.writeToChannel(); + // 完成后,需要将buffer切换为写入事件,读取后端的数据 + curBuffer.flip(); + + return true; + } else { + // 没有读取,直接透传时,需要指定 透传的数据 截止位置 + curBuffer.readIndex = curBuffer.writeIndex; + + // 将控制权交给后端 + session.giveupOwner(SelectionKey.OP_READ); + MySQLSession mysqlSession = session.getBackend(); + mysqlSession.writeToChannel(); + + // 然后又将后端的事件改变为前端的 + session.takeOwner(SelectionKey.OP_READ); + + // 完成后,需要将buffer切换为写入事件,读取前端的数据 + curBuffer.flip(); + + return false; + } + + } + + /** + * 进行结束符的读取 + * + * @param curBuffer + * buffer数组信息 + */ + private void readOverByte(ProxyBuffer curBuffer) { + // 获取当前buffer的最后 + ByteBuffer buffer = curBuffer.getBuffer(); + + // 如果数据的长度超过了,结束符的长度,可直接提取结束符 + if (buffer.position() >= FLAGLENGTH) { + int opts = curBuffer.writeIndex; + buffer.position(opts - FLAGLENGTH); + buffer.get(overFlag, 0, FLAGLENGTH); + buffer.position(opts); + } + // 如果小于结束符,说明需要进行两个byte数组的合并 + else { + int opts = curBuffer.writeIndex; + // 计算放入的位置 + int moveSize = FLAGLENGTH - opts; + int index = 0; + // 进行数组的移动,以让出空间进行放入新的数据 + for (int i = FLAGLENGTH - moveSize; i < FLAGLENGTH; i++) { + overFlag[index] = overFlag[i]; + index++; + } + // 读取数据 + buffer.position(0); + buffer.get(overFlag, moveSize, opts); + buffer.position(opts); + } + + } + + /** + * 进行结束符的检查, + * + * 数据的结束符为0,0,0,包序,即可以验证读取到3个连续0,即为结束 + * + * @return + */ + private boolean checkOver() { + for (int i = 0; i < overFlag.length - 1; i++) { + if (overFlag[i] != 0) { + return false; + } + } + return true; + } + + @Override + public void clearResouces(boolean sessionCLosed) { + + } + + @Override + public boolean onBackendResponse(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + return false; + } + + @Override + public boolean onBackendWriteFinished(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/judge/DirectTransJudge.java b/source/src/main/java/io/mycat/mycat2/cmds/judge/DirectTransJudge.java new file mode 100644 index 0000000..53ac96e --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/judge/DirectTransJudge.java @@ -0,0 +1,20 @@ +package io.mycat.mycat2.cmds.judge; + +import io.mycat.mycat2.MySQLSession; + +/** + * 接口传输判断接口 + * @since 2017年8月18日 下午11:41:46 + * @version 0.0.1 + * @author liujun + */ +public interface DirectTransJudge { + + /** + * 当满足判断的类型后,进行的判断执行 + * @param session 后端会话对象信息 + * @return 是否需要进行一步的读取 true 需要进一步的读取 false不需要,一般查询的eof有可能会进一步的读取,其他都不需要 + */ + public boolean judge(MySQLSession session); + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/judge/EofJudge.java b/source/src/main/java/io/mycat/mycat2/cmds/judge/EofJudge.java new file mode 100644 index 0000000..8d0b2f9 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/judge/EofJudge.java @@ -0,0 +1,64 @@ +package io.mycat.mycat2.cmds.judge; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.mysql.packet.EOFPacket; +import io.mycat.proxy.ProxyBuffer; + +/** + * 进行query包的透传结束判断处理 + * + * @since 2017年8月19日 上午12:08:27 + * @version 0.0.1 + * @author liujun + */ +public class EofJudge implements DirectTransJudge { + + /** + * eof包判断实例 + */ + public static final EofJudge INSTANCE = new EofJudge(); + + @Override + public boolean judge(MySQLSession session) { + + ProxyBuffer curBuffer = session.proxyBuffer; + // 进行当前 + curBuffer.readIndex = session.curMSQLPackgInf.startPos; + // 进行当前ok包的读取 + EOFPacket eofPkg = new EOFPacket(); + eofPkg.read(curBuffer); + + boolean multQuery = ServerStatusEnum.StatusCheck(eofPkg.status, ServerStatusEnum.MULT_QUERY); + boolean multResult = ServerStatusEnum.StatusCheck(eofPkg.status, ServerStatusEnum.MORE_RESULTS); + + if (multQuery || multResult) { + // 标识当前处于使用中 + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), false); + return true; + } + + // 事务状态的检查 + boolean trans = ServerStatusEnum.StatusCheck(eofPkg.status, ServerStatusEnum.IN_TRANSACTION); + + // 检查当前是否需要需要进行下一次的数据读取 + + // 如果当前事务状态被设置,连接标识为不能结束 + if (trans) { + // 标识当前处于使用中 + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), false); + // 标识当前处于事物中 + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_TRANSACTION_FLAG.getKey(), true); + } + // 当连接使用完毕,则标识为可以结束 + else { + // 标识当前处于闲置中, + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), true); + // 当发现完毕后,将标识移除 + session.getSessionAttrMap().remove(SessionKeyEnum.SESSION_KEY_TRANSACTION_FLAG.getKey()); + } + + return false; + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/judge/ErrorJudge.java b/source/src/main/java/io/mycat/mycat2/cmds/judge/ErrorJudge.java new file mode 100644 index 0000000..8e00b31 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/judge/ErrorJudge.java @@ -0,0 +1,32 @@ +package io.mycat.mycat2.cmds.judge; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.console.SessionKeyEnum; + +/** + * 进行error包的透传结束判断处理 + * + * @since 2017年8月19日 上午12:08:27 + * @version 0.0.1 + * @author liujun + */ +public class ErrorJudge implements DirectTransJudge { + + /** + * error包判断实例 + */ + public static final ErrorJudge INSTANCE = new ErrorJudge(); + + @Override + public boolean judge(MySQLSession session) { + // 进行当前 + // 首先检查是否处于事务中,如果非事务中,将结识连接结束 + if (!session.getSessionAttrMap().containsKey(SessionKeyEnum.SESSION_KEY_TRANSACTION_FLAG.getKey())) { + // 标识当前处于闲置中, + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), true); + } + + return false; + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/judge/OkJudge.java b/source/src/main/java/io/mycat/mycat2/cmds/judge/OkJudge.java new file mode 100644 index 0000000..640d56d --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/judge/OkJudge.java @@ -0,0 +1,63 @@ +package io.mycat.mycat2.cmds.judge; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.mycat2.net.DefaultMycatSessionHandler; +import io.mycat.mysql.packet.OKPacket; +import io.mycat.proxy.ProxyBuffer; + +/** + * 进行ok包的透传结束判断处理 + * + * @since 2017年8月19日 上午12:08:27 + * @version 0.0.1 + * @author liujun + */ +public class OkJudge implements DirectTransJudge { + + /** + * ok包判断实例 + */ + public static final OkJudge INSTANCE = new OkJudge(); + + @Override + public boolean judge(MySQLSession session) { + ProxyBuffer curBuffer = session.proxyBuffer; + // 进行当前 + curBuffer.readIndex = session.curMSQLPackgInf.startPos; + // 进行当前ok包的读取 + OKPacket okpkg = new OKPacket(); + okpkg.read(curBuffer); + + boolean multQuery = ServerStatusEnum.StatusCheck(okpkg.serverStatus, ServerStatusEnum.MULT_QUERY); + boolean multResult = ServerStatusEnum.StatusCheck(okpkg.serverStatus, ServerStatusEnum.MORE_RESULTS); + + if (multQuery || multResult) { + // 标识当前处于使用中,不能结束 + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), false); + return true; + } + + // 事务状态的检查 + boolean trans = ServerStatusEnum.StatusCheck(okpkg.serverStatus, ServerStatusEnum.IN_TRANSACTION); + + // 如果当前事务状态被设置,连接标识为不能结束 + if (trans) { + // 标识当前处于使用中,不能结束, + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), false); + // 如果发现事务标识,则标识当前处于会话中 + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_TRANSACTION_FLAG.getKey(), true); + } + // 当连接使用完毕,则标识为可以结束 + else { + // 标识当前处于闲置中, + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_CONN_IDLE_FLAG.getKey(), true); + // 当发现完毕后,将标识移除 + session.getSessionAttrMap().remove(SessionKeyEnum.SESSION_KEY_TRANSACTION_FLAG.getKey()); + } + + return false; + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/judge/ServerStatusEnum.java b/source/src/main/java/io/mycat/mycat2/cmds/judge/ServerStatusEnum.java new file mode 100644 index 0000000..4f95e94 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/judge/ServerStatusEnum.java @@ -0,0 +1,120 @@ +package io.mycat.mycat2.cmds.judge; + +/** + * 服务器状态的枚举信息 + * + * @since 2017年5月14日 下午3:25:00 + * @version 0.0.1 + * @author liujun + */ +public enum ServerStatusEnum { + + /** + * 检查in transation是否设置 + */ + IN_TRANSACTION(0), + + /** + * 是否设置了自动提交 + */ + AUTO_COMMIT(1), + + /** + * 是否返回更多的结果 + */ + MORE_RESULTS(2), + + /** + * 多个结果集 + */ + MULT_QUERY(3), + + /** + * 设置bad index used + */ + BAD_INDEX_USED(4), + + /** + * 索引 + */ + NO_INDEX_Used(5), + + /** + * 参数 + */ + CURSOR_EXISTS(6), + + /** + * 进行检查 + */ + LAST_ROW_SENT(7), + + /** + * 数据库删除检查 + */ + DATABASE_DROPPED(8), + + /** + * 空间检查 + */ + NO_BACKSLASH_ESCAPES(9), + + /** + * 会话检查 + */ + SESSION_STATE_CHECK(10), + + /** + * 检查 + */ + QUERY_WAS_SLOW(11), + + /** + * 参数 + */ + PS_OUT_PARAMS(12),; + + /** + * 状态位信息 + */ + private int statusBit; + + private ServerStatusEnum(int statusBit) { + this.statusBit = statusBit; + } + + public int getStatusBit() { + return statusBit; + } + + public void setStatusBit(int statusBit) { + this.statusBit = statusBit; + } + + /** + * 进行状态的检查 + * + * @param value + * 状态值 + * @param status + * 比较的状态枚举 + * @return true 状态中有设置,否则为未设置 + */ + public static boolean StatusCheck(int value, ServerStatusEnum status) { + int tempVal = 1 << status.getStatusBit(); + if ((value & tempVal) == tempVal) { + return true; + } + return false; + } + + public static void main(String[] args) { + boolean result = ServerStatusEnum.StatusCheck(0x0003, ServerStatusEnum.IN_TRANSACTION); + System.out.println(result); + boolean resultauto = ServerStatusEnum.StatusCheck(0x0003, ServerStatusEnum.AUTO_COMMIT); + System.out.println(resultauto); + boolean resulresult = ServerStatusEnum.StatusCheck(0x0003, ServerStatusEnum.MORE_RESULTS); + System.out.println(resulresult); + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java new file mode 100644 index 0000000..3304fd7 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgFirstReader.java @@ -0,0 +1,129 @@ +package io.mycat.mycat2.cmds.pkgread; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.util.HashMap; +import java.util.Map; + +import io.mycat.mycat2.AbstractMySQLSession.CurrPacketType; +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mycat2.cmds.LoadDataCommand; +import io.mycat.mycat2.cmds.judge.DirectTransJudge; +import io.mycat.mycat2.cmds.judge.ErrorJudge; +import io.mycat.mycat2.cmds.judge.OkJudge; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.proxy.ProxyBuffer; + +/** + * + * 进行首包的读取 + * + * @since 2017年8月23日 下午11:09:49 + * @version 0.0.1 + * @author liujun + */ +public class PkgFirstReader implements PkgProcess { + + /** + * 首包处理的实例对象 + */ + public static final PkgFirstReader INSTANCE = new PkgFirstReader(); + + /** + * 查询包标识的开始 + */ + private static final int QUERY_PKG_START = 0x01; + + /** + * 指定需要处理的包类型信息 + */ + private static final Map JUDGEMAP = new HashMap<>(); + + static { + // 用来进行ok包的处理理 + JUDGEMAP.put((int) MySQLPacket.OK_PACKET, OkJudge.INSTANCE); + // 用来进行error包的处理 + JUDGEMAP.put((int) MySQLPacket.ERROR_PACKET, ErrorJudge.INSTANCE); + } + + @Override + public boolean procssPkg(MySQLSession session) throws IOException { + + MySQLPackageInf curMSQLPackgInf = session.curMSQLPackgInf; + + ProxyBuffer curBuffer = session.proxyBuffer; + + // 进行首次的报文解析 + CurrPacketType pkgTypeEnum = session.resolveMySQLPackage(curBuffer, curMSQLPackgInf, true); + + // 首包,必须为全包进行解析,否则再读取一次,进行操作 + if (null != pkgTypeEnum && CurrPacketType.Full == pkgTypeEnum) { + + int pkgType = curMSQLPackgInf.pkgType; + + // 如果当前为查询包,则切换到查询的逻辑命令处理 + if (QUERY_PKG_START <= pkgType) { + + // 当前确认查询包,则切换至查询的读取操作 + session.currPkgProc = PkgResultSetReader.INSTANCE; + return true; + } + // 如果当前为特殊的load data包,则直接进行切换至load data的逻辑处理 + else if (session.curMSQLPackgInf.pkgType == MySQLPacket.LOAD_DATA_PACKET) { + session.getMycatSession().curSQLCommand = LoadDataCommand.INSTANCE; + // 将前端的包检查关闭 + session.getMycatSession().getSessionAttrMap().put(SessionKeyEnum.SESSION_PKG_READ_FLAG.getKey(), true); + + // 切换buffer 读状态 + curBuffer.flip(); + MycatSession mycatSession = session.getMycatSession(); + // 直接透传报文 + mycatSession.takeOwner(SelectionKey.OP_READ); + mycatSession.writeToChannel(); + + // 完成后,需要将buffer切换为写入事件,读取前端的数据 + curBuffer.flip(); + } + // 如果为ok和error则切换到error的包判断 + else { + DirectTransJudge judge = JUDGEMAP.get(session.curMSQLPackgInf.pkgType); + // 当检查到为需要检查的包,则进行检查 + if (null != judge) { + //当检查到完毕后,直接结束 + session.getMycatSession().getSessionAttrMap().remove(SessionKeyEnum.SESSION_KEY_TRANSFER_OVER_FLAG.getKey()); + + judge.judge(session); + } + + // 切换buffer 读写状态 + curBuffer.flip(); + MycatSession mycatSession = session.getMycatSession(); + //当知道操作完成后,前段的注册感兴趣事件为读取 + mycatSession.takeOwner(SelectionKey.OP_READ); + mycatSession.writeToChannel(); + } + } + //对于首包非完整的,透传已经检查完毕 + else { + // 切换buffer 读写状态 + curBuffer.flip(); + MycatSession mycatSession = session.getMycatSession(); + // 直接透传报文 + mycatSession.takeOwner(SelectionKey.OP_WRITE); + mycatSession.writeToChannel(); + + // 标识当前传输未结束 + mycatSession.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_TRANSFER_OVER_FLAG.getKey(), true); + } + + /** + * 当前命令处理是否全部结束,全部结束时需要清理资源 + */ + return false; + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgProcess.java b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgProcess.java new file mode 100644 index 0000000..3ae7c0c --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgProcess.java @@ -0,0 +1,26 @@ +package io.mycat.mycat2.cmds.pkgread; + +import java.io.IOException; + +import io.mycat.mycat2.MySQLSession; + +/** + * 用来进行包操作的接口 + * + * @since 2017年8月23日 下午11:08:26 + * @version 0.0.1 + * @author liujun + */ +public interface PkgProcess { + + /** + * 进行包操作的接口 + * + * @param session + * 后端会话信息 + * @return true 继续处理,false退出处理 + * @throws IOException + */ + public boolean procssPkg(MySQLSession session) throws IOException; + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgResultSetReader.java b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgResultSetReader.java new file mode 100644 index 0000000..caeb2fd --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/pkgread/PkgResultSetReader.java @@ -0,0 +1,148 @@ +package io.mycat.mycat2.cmds.pkgread; + +import java.io.IOException; +import java.nio.channels.SelectionKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mycat2.cmds.judge.EofJudge; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.proxy.ProxyBuffer; + +/** + * + * 仅进行查询结果的命令处理 + * + * @author liujun + * @version 1.0.0 + * @since 2017年8月22日 下午4:13:07 + */ +public class PkgResultSetReader implements PkgProcess { + + private static final Logger logger = LoggerFactory.getLogger(PkgResultSetReader.class); + + public static final PkgResultSetReader INSTANCE = new PkgResultSetReader(); + + /** + * 后端报文处理 + * + * @param session + * @return + * @throws IOException + */ + public boolean procssPkg(MySQLSession session) throws IOException { + + MySQLPackageInf curMSQLPackgInf = session.curMSQLPackgInf; + + ProxyBuffer curBuffer = session.proxyBuffer; + + boolean isContinue = true; + boolean isFinish = false; + + while (isContinue) { + // 进行报文的读取操作 + switch (session.resolveMySQLPackage(curBuffer, curMSQLPackgInf, true)) { + // 如果当前为整包 + case Full: + // 检查当前是否为eof包,并且为整包 ,解析eof包 + if (session.curMSQLPackgInf.pkgType == MySQLPacket.EOF_PACKET) { + // 首先检查当前列标识结果 + if (!session.getSessionAttrMap().containsKey(SessionKeyEnum.SESSION_KEY_COLUMN_OVER.getKey())) { + session.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_COLUMN_OVER.getKey(), true); + } + // 如果当前列列结束,则进行结束标识验证 + else { + // 进行标识重置 + session.getSessionAttrMap().remove(SessionKeyEnum.SESSION_KEY_COLUMN_OVER.getKey()); + isFinish = true; + + // 如果当前的eof包大于1说明已经为eof结束包,切换到解析器进行解析 + boolean gotoRead = EofJudge.INSTANCE.judge(session); + + // 当一个完整的查询检查结束后,切换至首包的检查 + session.currPkgProc = PkgFirstReader.INSTANCE; + + // 检查是否需要读取下一个包 + if (gotoRead) { + // 并且为直接返回 + return true; + } + } + } + + if (curBuffer.readIndex == curBuffer.writeIndex) { + isContinue = false; + } else { + isContinue = true; + } + break; + + case LongHalfPacket: + if (curMSQLPackgInf.crossBuffer) { + // 发生过透传的半包,往往包的长度超过了buffer 的长度. + logger.debug(" readed crossBuffer LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + } else if (!isfinishPackage(curMSQLPackgInf)) { + // 不需要整包解析的长半包透传. result set .这种半包直接透传 + curMSQLPackgInf.crossBuffer = true; + curBuffer.readIndex = curMSQLPackgInf.endPos; + curMSQLPackgInf.remainsBytes = curMSQLPackgInf.pkgLength + - (curMSQLPackgInf.endPos - curMSQLPackgInf.startPos); + logger.debug(" readed LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + logger.debug(" curBuffer {}", curBuffer); + } else { + // 读取到了EOF/OK/ERROR 类型长半包 是需要保证是整包的. + logger.debug(" readed finished LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + // TODO 保证整包的机制 + } + isContinue = false; + break; + case ShortHalfPacket: + logger.debug(" readed ShortHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + isContinue = false; + break; + } + } + + // 标识当前传输未结束 + + // 切换buffer 读写状态 + curBuffer.flip(); + MycatSession mycatSession = session.getMycatSession(); + // 直接透传报文 + mycatSession.takeOwner(SelectionKey.OP_WRITE); + + if (!isFinish) { + // 标识当前传输未结束 + mycatSession.getSessionAttrMap().put(SessionKeyEnum.SESSION_KEY_TRANSFER_OVER_FLAG.getKey(), true); + } + + mycatSession.writeToChannel(); + + /** + * 当前命令处理是否全部结束,全部结束时需要清理资源 + */ + return false; + } + + /** + * 进行当前完成包验证 + * + * @param curMSQLPackgInf + * @return + * @throws IOException + */ + private boolean isfinishPackage(MySQLPackageInf curMSQLPackgInf) throws IOException { + switch (curMSQLPackgInf.pkgType) { + case MySQLPacket.EOF_PACKET: + return true; + default: + return false; + } + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/console/SessionKeyEnum.java b/source/src/main/java/io/mycat/mycat2/console/SessionKeyEnum.java new file mode 100644 index 0000000..c1f7b31 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/console/SessionKeyEnum.java @@ -0,0 +1,54 @@ +package io.mycat.mycat2.console; + +/** + * sesssion会话中的参数值信息 + * + * @since 2017年8月24日 下午11:39:43 + * @version 0.0.1 + * @author liujun + */ +public enum SessionKeyEnum { + + /** + * session会话中的是否需要读取验证的标识 + */ + SESSION_PKG_READ_FLAG("session_pkg_read_flag"), + + /** + * 用来标识session会话中列结束标识 + */ + SESSION_KEY_COLUMN_OVER("session_key_colum_over"), + + /** + * 用来标识session会话中事务 + */ + SESSION_KEY_TRANSACTION_FLAG("session_key_transaction_flag"), + + + /** + * 标识当前连接的闲置状态标识 ,true,闲置,false,未闲置,即在使用中 + */ + SESSION_KEY_CONN_IDLE_FLAG("session_key_conn_idle_flag"), + + /** + * 标识当前后端数据透传是否结束的标识,存在此标识,标识未结束,否则即为结束 + */ + SESSION_KEY_TRANSFER_OVER_FLAG("session_key_transfer_over_flag") + + ; + + private String key; + + private SessionKeyEnum(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java index 4e4261b..73f137e 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java @@ -10,6 +10,14 @@ import io.mycat.mycat2.MyCommand; import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; + +import io.mycat.mycat2.SQLCommand; +import io.mycat.mycat2.beans.MySQLDataSource; +import io.mycat.mycat2.console.SessionKeyEnum; +import io.mycat.mycat2.tasks.BackendConCreateTask; +import io.mycat.mycat2.tasks.BackendSynchronzationTask; +import io.mycat.mysql.packet.ErrorPacket; + import io.mycat.proxy.NIOHandler; import io.mycat.proxy.ProxyBuffer; @@ -34,16 +42,18 @@ public void onSocketRead(final AbstractMySQLSession session) throws IOException private void onFrontRead(final MycatSession session) throws IOException { boolean readed = session.readFromChannel(); ProxyBuffer buffer = session.getProxyBuffer(); - if (readed == false || - // 没有读到完整报文 - MySQLSession.CurrPacketType.Full != session.resolveMySQLPackage(buffer, session.curMSQLPackgInf, - false)) { + // 在load data的情况下,SESSION_PKG_READ_FLAG会被打开,以不让进行包的完整性检查 + if (!session.getSessionAttrMap().containsKey(SessionKeyEnum.SESSION_PKG_READ_FLAG.getKey()) + && (readed == false || + // 没有读到完整报文 + MySQLSession.CurrPacketType.Full != session.resolveMySQLPackage(buffer, session.curMSQLPackgInf, + false))) { return; } if (session.curMSQLPackgInf.endPos < buffer.writeIndex) { logger.warn("front contains multi package "); } - + MyCommand myCommand = session.getMyCommand(); if(myCommand!=null){ diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java new file mode 100644 index 0000000..56e1d9d --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java @@ -0,0 +1,117 @@ +package io.mycat.mycat2.tasks; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.beans.MySQLDataSource; +import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.mysql.packet.QueryPacket; +import io.mycat.proxy.ProxyBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * Created by ynfeng on 2017/8/28. + *

+ * 读取msyql字符集映射 + *

+ *

+ * 字符集对应关系查询:
+ *
+ * SHOW COLLATION
+ * +--------------------------+----------+-----+---------+----------+---------+
+ * | Collation                | Charset  | Id  | Default | Compiled | Sortlen |
+ * +--------------------------+----------+-----+---------+----------+---------+
+ * | big5_chinese_ci          | big5     |   1 | Yes     | Yes      |       1 |
+ * | big5_bin                 | big5     |  84 |         | Yes      |       1 |
+ * | dec8_swedish_ci          | dec8     |   3 | Yes     | Yes      |       1 |
+ * | dec8_bin                 | dec8     |  69 |         | Yes      |       1 |
+ * 
+ *

+ * 简单使用示例 + *

+ *    BackendCharsetReadTask backendCharsetReadTask = new BackendCharsetReadTask(optSession);
+ *    optSession.setCurNIOHandler(backendCharsetReadTask);
+ *    backendCharsetReadTask.readCharset();
+ * 
+ */ +public class BackendCharsetReadTask extends BackendIOTaskWithResultSet { + private static Logger logger = LoggerFactory.getLogger(BackendCharsetReadTask.class); + private static final String SQL = "SHOW COLLATION;"; + private MySQLSession mySQLSession; + private int fieldCount; + private MySQLDataSource ds; + + public BackendCharsetReadTask(MySQLSession mySQLSession, MySQLDataSource ds) { + this.mySQLSession = mySQLSession; + this.ds = ds; + } + + public void readCharset() throws IOException { + ProxyBuffer proxyBuf = mySQLSession.proxyBuffer; + proxyBuf.reset(); + QueryPacket queryPacket = new QueryPacket(); + queryPacket.packetId = 0; + queryPacket.sql = SQL; + queryPacket.write(proxyBuf); + proxyBuf.flip(); + proxyBuf.readIndex = proxyBuf.writeIndex; + this.mySQLSession.writeToChannel(); + } + + @Override + void onRsColCount(MySQLSession session) { + ProxyBuffer proxyBuffer = session.proxyBuffer; + MySQLPackageInf curMQLPackgInf = session.curMSQLPackgInf; + //读取有多少列 + fieldCount = (int) proxyBuffer.getLenencInt(curMQLPackgInf.startPos + MySQLPacket.packetHeaderSize); + } + + @Override + void onRsColDef(MySQLSession session) { + //并不关心列定义 + } + + @Override + void onRsRow(MySQLSession session) { + ProxyBuffer proxyBuffer = session.proxyBuffer; + MySQLPackageInf curMQLPackgInf = session.curMSQLPackgInf; + int rowDataIndex = curMQLPackgInf.startPos + MySQLPacket.packetHeaderSize; + + String collation = null; + String charset = null; + int id = 0; + + //读取每行的各列数据 + for (int i = 0; i < fieldCount; i++) { + int lenc = (int) proxyBuffer.getLenencInt(rowDataIndex); + rowDataIndex += proxyBuffer.getLenencLength(lenc); + String text = proxyBuffer.getFixString(rowDataIndex, lenc); + rowDataIndex += lenc; + + if (i == 0) { + collation = text; + } else if (i == 1) { + charset = text; + } else if (i == 2) { + id = Integer.parseInt(text); + } else { + ds.INDEX_TO_CHARSET.put(id, charset); + Integer index = ds.CHARSET_TO_INDEX.get(charset); + if (index == null || index > id) { + ds.CHARSET_TO_INDEX.put(charset, id); + } + + break; + } + } + } + + @Override + void onRsFinish(MySQLSession session) { + //结果集完成 + logger.debug("session[{}] load charset finish",session); + } +} diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java new file mode 100644 index 0000000..df36b88 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java @@ -0,0 +1,79 @@ +package io.mycat.mycat2.tasks; + +import io.mycat.mycat2.AbstractMySQLSession; +import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mysql.packet.MySQLPacket; + +import java.io.IOException; + +/** + * task处理结果集的模板类 + *

+ * Created by ynfeng on 2017/8/28. + */ +public abstract class BackendIOTaskWithResultSet extends AbstractBackendIOTask { + protected ResultSetState curRSState = ResultSetState.RS_STATUS_COL_COUNT; + + @Override + public void onSocketRead(T session) throws IOException { + if (session.readFromChannel()) { + for (; ; ) { + AbstractMySQLSession.CurrPacketType currPacketType = session.resolveMySQLPackage(session.proxyBuffer, session.curMSQLPackgInf, true); + //因为是解析所以只处理整包 + if (currPacketType == AbstractMySQLSession.CurrPacketType.Full) { + MySQLPackageInf curMQLPackgInf = session.curMSQLPackgInf; + switch (curRSState) { + case RS_STATUS_COL_COUNT: + onRsColCount(session); + curRSState = ResultSetState.RS_STATUS_COL_DEF; + break; + case RS_STATUS_COL_DEF: + if (curMQLPackgInf.pkgType == MySQLPacket.EOF_PACKET) { + curRSState = ResultSetState.RS_STATUS_ROW; + } else { + onRsColDef(session); + } + break; + case RS_STATUS_ROW: + if (curMQLPackgInf.pkgType == MySQLPacket.EOF_PACKET) { + curRSState = ResultSetState.RS_STATUS_FINISH; + onRsFinish(session); + } else { + onRsRow(session); + } + break; + } + } else { + break; + } + } + } + } + + abstract void onRsColCount(T session); + + abstract void onRsColDef(T session); + + abstract void onRsRow(T session); + + abstract void onRsFinish(T session); + + public enum ResultSetState { + /** + * 结果集第一个包 + */ + RS_STATUS_COL_COUNT, + /** + * 结果集列定义 + */ + RS_STATUS_COL_DEF, + /** + * 结果集行数据 + */ + RS_STATUS_ROW, + /** + * 结果集完成 + */ + RS_STATUS_FINISH; + } +} diff --git a/source/src/main/java/io/mycat/mysql/packet/EOFPacket.java b/source/src/main/java/io/mycat/mysql/packet/EOFPacket.java index 91b003c..456e0d3 100644 --- a/source/src/main/java/io/mycat/mysql/packet/EOFPacket.java +++ b/source/src/main/java/io/mycat/mysql/packet/EOFPacket.java @@ -43,28 +43,34 @@ * @author mycat */ public class EOFPacket extends MySQLPacket { - public byte pkgType = MySQLPacket.EOF_PACKET; - public int warningCount; - public int status = 2; + public byte pkgType = MySQLPacket.EOF_PACKET; + public int warningCount; + public int status = 2; + public void write(ProxyBuffer buffer) { + buffer.writeFixInt(3, calcPacketSize()); + buffer.writeByte(packetId); + buffer.writeLenencInt(pkgType); + buffer.writeFixInt(2, warningCount); + buffer.writeFixInt(2, status); + } + public void read(ProxyBuffer buffer) { + packetLength = (int) buffer.readFixInt(3); + packetId = buffer.readByte(); + pkgType = (byte) buffer.readByte(); + warningCount = (int) buffer.readFixInt(2); + status = (int) buffer.readFixInt(2); + } - public void write(ProxyBuffer buffer) { - buffer.writeFixInt(3,calcPacketSize()); - buffer.writeByte(packetId); - buffer.writeLenencInt(pkgType); - buffer.writeFixInt(2, warningCount); - buffer.writeFixInt(2, status); - } + @Override + public int calcPacketSize() { + return 5;// 1+2+2; + } - @Override - public int calcPacketSize() { - return 5;// 1+2+2; - } - - @Override - protected String getPacketInfo() { - return "MySQL EOF Packet"; - } + @Override + protected String getPacketInfo() { + return "MySQL EOF Packet"; + } } diff --git a/source/src/main/java/io/mycat/mysql/packet/MySQLPacket.java b/source/src/main/java/io/mycat/mysql/packet/MySQLPacket.java index 591c3d9..afa3fec 100644 --- a/source/src/main/java/io/mycat/mysql/packet/MySQLPacket.java +++ b/source/src/main/java/io/mycat/mysql/packet/MySQLPacket.java @@ -31,211 +31,215 @@ * */ public abstract class MySQLPacket { - - public static int packetHeaderSize = 4; - - // 后端报文类型 - public static final byte REQUEST_FILE_FIELD_COUNT = (byte) 251; - public static final byte OK_PACKET = 0; - public static final byte ERROR_PACKET = (byte) 0xFF; - public static final byte EOF_PACKET = (byte) 0xFE; - public static final byte FIELD_EOF_PACKET = (byte) 0xFE; - public static final byte ROW_EOF_PACKET = (byte) 0xFE; - public static final byte AUTH_PACKET = 1; - public static final byte QUIT_PACKET = 2; - - // 前端报文类型 - /** - * none, this is an internal thread state - */ - public static final byte COM_SLEEP = 0; - - /** - * mysql_close - */ - public static final byte COM_QUIT = 1; - - public static final int COM_QUIT_PACKET_LENGTH = 1; - - /** - * mysql_select_db - */ - public static final byte COM_INIT_DB = 2; - - /** - * mysql_real_query - */ - public static final byte COM_QUERY = 3; - - /** - * mysql_list_fields - */ - public static final byte COM_FIELD_LIST = 4; - - /** - * mysql_create_db (deprecated) - */ - public static final byte COM_CREATE_DB = 5; - - /** - * mysql_drop_db (deprecated) - */ - public static final byte COM_DROP_DB = 6; - - /** - * mysql_refresh - */ - public static final byte COM_REFRESH = 7; - - /** - * mysql_shutdown - */ - public static final byte COM_SHUTDOWN = 8; - - /** - * mysql_stat - */ - public static final byte COM_STATISTICS = 9; - - /** - * mysql_list_processes - */ - public static final byte COM_PROCESS_INFO = 10; - - /** - * none, this is an internal thread state - */ - public static final byte COM_CONNECT = 11; - - /** - * mysql_kill - */ - public static final byte COM_PROCESS_KILL = 12; - - /** - * mysql_dump_debug_info - */ - public static final byte COM_DEBUG = 13; - - /** - * mysql_ping - */ - public static final byte COM_PING = 14; - - /** - * none, this is an internal thread state - */ - public static final byte COM_TIME = 15; - - /** - * none, this is an internal thread state - */ - public static final byte COM_DELAYED_INSERT = 16; - - /** - * mysql_change_user - */ - public static final byte COM_CHANGE_USER = 17; - - /** - * used by slave server mysqlbinlog - */ - public static final byte COM_BINLOG_DUMP = 18; - - /** - * used by slave server to get master table - */ - public static final byte COM_TABLE_DUMP = 19; - - /** - * used by slave to log connection to master - */ - public static final byte COM_CONNECT_OUT = 20; - - /** - * used by slave to register to master - */ - public static final byte COM_REGISTER_SLAVE = 21; - - /** - * mysql_stmt_prepare - */ - public static final byte COM_STMT_PREPARE = 22; - - /** - * mysql_stmt_execute - */ - public static final byte COM_STMT_EXECUTE = 23; - - /** - * mysql_stmt_send_long_data - */ - public static final byte COM_STMT_SEND_LONG_DATA = 24; - - /** - * mysql_stmt_close - */ - public static final byte COM_STMT_CLOSE = 25; - - /** - * mysql_stmt_reset - */ - public static final byte COM_STMT_RESET = 26; - - /** - * mysql_set_server_option - */ - public static final byte COM_SET_OPTION = 27; - - /** - * mysql_stmt_fetch - */ - public static final byte COM_STMT_FETCH = 28; - - /** - * mysql_stmt_fetch - */ - public static final byte COM_DAEMON = 29; - - /** - * mysql_stmt_fetch - */ - public static final byte COM_BINLOG_DUMP_GTID = 30; - - /** - * mysql_stmt_fetch - */ - public static final byte COM_RESET_CONNECTION = 31; - - /** - * Mycat heartbeat - */ - public static final byte COM_HEARTBEAT = 64; - - - public int packetLength; - public byte packetId; - - /** - * 计算数据包大小,不包含包头长度。 - */ - public abstract int calcPacketSize(); - - /** - * 取得数据包信息 - */ - protected abstract String getPacketInfo(); - - @Override - public String toString() { - return new StringBuilder().append(getPacketInfo()).append("{length=") - .append(packetLength).append(",id=").append(packetId) - .append('}').toString(); - } - - /** - * 写入到Buffer里(为了发送) - * @param buffer - */ - public abstract void write(ProxyBuffer buffer); + + public static int packetHeaderSize = 4; + + // 后端报文类型 + public static final byte REQUEST_FILE_FIELD_COUNT = (byte) 251; + public static final byte OK_PACKET = 0; + public static final byte ERROR_PACKET = (byte) 0xFF; + public static final byte EOF_PACKET = (byte) 0xFE; + public static final byte FIELD_EOF_PACKET = (byte) 0xFE; + public static final byte ROW_EOF_PACKET = (byte) 0xFE; + public static final byte AUTH_PACKET = 1; + public static final byte QUIT_PACKET = 2; + + /** + * 当前为load data的响应包 + */ + public static final byte LOAD_DATA_PACKET = (byte) 0xfb; + + // 前端报文类型 + /** + * none, this is an internal thread state + */ + public static final byte COM_SLEEP = 0; + + /** + * mysql_close + */ + public static final byte COM_QUIT = 1; + + public static final int COM_QUIT_PACKET_LENGTH = 1; + + /** + * mysql_select_db + */ + public static final byte COM_INIT_DB = 2; + + /** + * mysql_real_query + */ + public static final byte COM_QUERY = 3; + + /** + * mysql_list_fields + */ + public static final byte COM_FIELD_LIST = 4; + + /** + * mysql_create_db (deprecated) + */ + public static final byte COM_CREATE_DB = 5; + + /** + * mysql_drop_db (deprecated) + */ + public static final byte COM_DROP_DB = 6; + + /** + * mysql_refresh + */ + public static final byte COM_REFRESH = 7; + + /** + * mysql_shutdown + */ + public static final byte COM_SHUTDOWN = 8; + + /** + * mysql_stat + */ + public static final byte COM_STATISTICS = 9; + + /** + * mysql_list_processes + */ + public static final byte COM_PROCESS_INFO = 10; + + /** + * none, this is an internal thread state + */ + public static final byte COM_CONNECT = 11; + + /** + * mysql_kill + */ + public static final byte COM_PROCESS_KILL = 12; + + /** + * mysql_dump_debug_info + */ + public static final byte COM_DEBUG = 13; + + /** + * mysql_ping + */ + public static final byte COM_PING = 14; + + /** + * none, this is an internal thread state + */ + public static final byte COM_TIME = 15; + + /** + * none, this is an internal thread state + */ + public static final byte COM_DELAYED_INSERT = 16; + + /** + * mysql_change_user + */ + public static final byte COM_CHANGE_USER = 17; + + /** + * used by slave server mysqlbinlog + */ + public static final byte COM_BINLOG_DUMP = 18; + + /** + * used by slave server to get master table + */ + public static final byte COM_TABLE_DUMP = 19; + + /** + * used by slave to log connection to master + */ + public static final byte COM_CONNECT_OUT = 20; + + /** + * used by slave to register to master + */ + public static final byte COM_REGISTER_SLAVE = 21; + + /** + * mysql_stmt_prepare + */ + public static final byte COM_STMT_PREPARE = 22; + + /** + * mysql_stmt_execute + */ + public static final byte COM_STMT_EXECUTE = 23; + + /** + * mysql_stmt_send_long_data + */ + public static final byte COM_STMT_SEND_LONG_DATA = 24; + + /** + * mysql_stmt_close + */ + public static final byte COM_STMT_CLOSE = 25; + + /** + * mysql_stmt_reset + */ + public static final byte COM_STMT_RESET = 26; + + /** + * mysql_set_server_option + */ + public static final byte COM_SET_OPTION = 27; + + /** + * mysql_stmt_fetch + */ + public static final byte COM_STMT_FETCH = 28; + + /** + * mysql_stmt_fetch + */ + public static final byte COM_DAEMON = 29; + + /** + * mysql_stmt_fetch + */ + public static final byte COM_BINLOG_DUMP_GTID = 30; + + /** + * mysql_stmt_fetch + */ + public static final byte COM_RESET_CONNECTION = 31; + + /** + * Mycat heartbeat + */ + public static final byte COM_HEARTBEAT = 64; + + public int packetLength; + public byte packetId; + + /** + * 计算数据包大小,不包含包头长度。 + */ + public abstract int calcPacketSize(); + + /** + * 取得数据包信息 + */ + protected abstract String getPacketInfo(); + + @Override + public String toString() { + return new StringBuilder().append(getPacketInfo()).append("{length=").append(packetLength).append(",id=") + .append(packetId).append('}').toString(); + } + + /** + * 写入到Buffer里(为了发送) + * + * @param buffer + */ + public abstract void write(ProxyBuffer buffer); } diff --git a/source/src/main/java/io/mycat/mysql/packet/OKPacket.java b/source/src/main/java/io/mycat/mysql/packet/OKPacket.java new file mode 100644 index 0000000..6b27f74 --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/packet/OKPacket.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql.packet; + +import io.mycat.proxy.ProxyBuffer; + +/** + * From Server To Client, at the end of a series of Field Packets, and at the + * end of a series of Data Packets.With prepared statements, EOF Packet can also + * end parameter information, which we'll describe later. + * + *

+ * Bytes                 Name
+ * -----                 ----
+ * 1                     field_count, always = 0xfe
+ * 2                     warning_count
+ * 2                     Status Flags
+ * 
+ * @see http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol#EOF_Packet
+ * 
+ * + * @author mycat + */ +public class OKPacket extends MySQLPacket { + public byte pkgType = MySQLPacket.OK_PACKET; + + public static final byte FIELD_COUNT = 0x00; + public static final byte[] OK = new byte[] { 7, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0 }; + + public byte fieldCount = FIELD_COUNT; + public long affectedRows; + public long insertId; + public int serverStatus; + public int warningCount; + public byte[] message; + + public void write(ProxyBuffer buffer) { + buffer.writeFixInt(3, calcPacketSize()); + buffer.writeByte(packetId); + buffer.writeLenencInt(fieldCount); + buffer.writeLenencInt(affectedRows); + buffer.writeLenencInt(insertId); + buffer.writeFixInt(2, serverStatus); + buffer.writeFixInt(2, warningCount); + if (message != null) { + buffer.writeLenencString(new String(message)); + } + } + + public void read(ProxyBuffer buffer) { + packetLength = (int) buffer.readFixInt(3); + packetId = buffer.readByte(); + fieldCount = buffer.readByte(); + affectedRows = buffer.readLenencInt(); + insertId = buffer.readLenencInt(); + serverStatus = (int) buffer.readFixInt(2); + warningCount = (int) buffer.readFixInt(2); + if (buffer.readIndex < buffer.writeIndex) { + + int msgLength = buffer.writeIndex - buffer.readIndex; + this.message = buffer.getBytes(buffer.writeIndex, msgLength); + buffer.readIndex += msgLength; + } + } + + @Override + public int calcPacketSize() { + return 5;// 1+2+2; + } + + @Override + protected String getPacketInfo() { + return "MySQL EOF Packet"; + } + +} diff --git a/source/src/main/java/io/mycat/proxy/ProxyBuffer.java b/source/src/main/java/io/mycat/proxy/ProxyBuffer.java index 366716f..2f74ab1 100644 --- a/source/src/main/java/io/mycat/proxy/ProxyBuffer.java +++ b/source/src/main/java/io/mycat/proxy/ProxyBuffer.java @@ -8,7 +8,7 @@ /** * 可重用的Buffer,连续读或者写,当空间不够时Compact擦除之前用过的空间, 处于写状态或者读状态之一,不能同时读写, * 只有数据被操作完成(读完或者写完)后State才能被改变(flip方法或手工切换状态),同时可能要改变Owner,chanageOwn - * + * * 需要外部 关心的状态为 writeIndex 写入buffer 开始位置 readIndex 读取开始位置 inReading 当前buffer 读写状态 * frontUsing owner 不需要外部关心的状态为 readMark 向channel 中写入数据时的开始位置, 该状态由 * writeToChannel 自动维护,不需要外部显式指定 preUsing 上一个owner 仅在 write==0 @@ -19,10 +19,10 @@ * 不变. 5. 从 buffer 向 channel 写入数据时,写入 readMark--readIndex 之间的数据. 6. 写完成后 flip * 切换读写状态。同时 如果 readIndex > buffer.capacity() * 2 / 3 进行一次压缩 7. 从 channel * 向buffer 写入数据时,如果 writeIndex > buffer.capacity() * 1 / 3 进行一次压缩 - * + * * 二、没有读取数据,向buffer中写入数据后 直接 write 到 channel的场景 1. 在写入到 channel 时 ,需要显式 指定 * readIndex = writeIndex; 2. 其他步骤 同 (透传、只前端读写、只后端读写场景)场景 - * + * * @author yanjunli * */ @@ -82,20 +82,20 @@ public boolean isInWriting() { /** * 写数据到Socket中时,是否写完了预先指定的Buffer内容 - * + * * @return */ public boolean writeFinished() { - + //buffer.limit(proxyBuffer.readIndex); //buffer.position(proxyBuffer.readMark); return readIndex==readMark; } - + /** * 需要谨慎使用,调用者需要清除当前Buffer所处的状态!! - * + * * @return ByteBuffer */ public ByteBuffer getBuffer() { @@ -115,7 +115,7 @@ public void reset() { /** * 只能用在读状态下,跳过指定的N个字符 - * + * * @param step */ public void skip(int step) { @@ -157,6 +157,10 @@ public long readFixInt(int length) { return val; } + public long getFixInt(int index,int length){ + return getInt(index, length); + } + public long readLenencInt() { int index = readIndex; long len = getInt(index, 1) & 0xff; @@ -408,7 +412,7 @@ public byte[] getLenencBytes(int index) { * 值 * @return 长度 */ - private int getLenencLength(int lenenc) { + public int getLenencLength(int lenenc) { if (lenenc < 251) { return 1; } else if (lenenc >= 251 && lenenc < (1 << 16)) { diff --git a/source/src/main/java/io/mycat/proxy/ProxyReactorThread.java b/source/src/main/java/io/mycat/proxy/ProxyReactorThread.java index 2eca4bc..99c6a6a 100644 --- a/source/src/main/java/io/mycat/proxy/ProxyReactorThread.java +++ b/source/src/main/java/io/mycat/proxy/ProxyReactorThread.java @@ -27,6 +27,14 @@ public class ProxyReactorThread extends Thread { protected ConcurrentLinkedQueue pendingJobs = new ConcurrentLinkedQueue(); protected ArrayList allSessions = new ArrayList(); + public Selector getSelector() { + return selector; + } + + public BufferPool getBufPool() { + return bufPool; + } + @SuppressWarnings("unchecked") public ProxyReactorThread(BufferPool bufPool) throws IOException { this.bufPool = bufPool; diff --git a/source/src/main/java/io/mycat/util/ParseUtil.java b/source/src/main/java/io/mycat/util/ParseUtil.java index 5cb0d7b..0adacfe 100644 --- a/source/src/main/java/io/mycat/util/ParseUtil.java +++ b/source/src/main/java/io/mycat/util/ParseUtil.java @@ -57,331 +57,356 @@ public static final int getPacketLength(ByteBuffer buffer, int offset) throws IO return length + msyql_packetHeaderSize; } - - public static boolean isEOF(char c) { - return (c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == ';'); - } + /** + * 进行resultSet的包验证 + * + * @param offset + * 偏移量 + * @param position + * 当前的buffer的位置 + * @param bitLength + * 提取的位数 + * @return + */ + public static final boolean validateResultHeader(final long offset, final long position, final int bitLength) { + return offset + msyql_packetHeaderSize + mysql_packetTypeSize + bitLength <= position; + } - public static long getSQLId(String stmt) { - int offset = stmt.indexOf('='); - if (offset != -1 && stmt.length() > ++offset) { - String id = stmt.substring(offset).trim(); - try { - return Long.parseLong(id); - } catch (NumberFormatException e) { - } - } - return 0L; - } + public static boolean isEOF(char c) { + return (c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == ';'); + } - /** - * 'abc' - * - * @param offset stmt.charAt(offset) == first ' - */ - private static String parseString(String stmt, int offset) { - StringBuilder sb = new StringBuilder(); - loop: for (++offset; offset < stmt.length(); ++offset) { - char c = stmt.charAt(offset); - if (c == '\\') { - switch (c = stmt.charAt(++offset)) { - case '0': - sb.append('\0'); - break; - case 'b': - sb.append('\b'); - break; - case 'n': - sb.append('\n'); - break; - case 'r': - sb.append('\r'); - break; - case 't': - sb.append('\t'); - break; - case 'Z': - sb.append((char) 26); - break; - default: - sb.append(c); - } - } else if (c == '\'') { - if (offset + 1 < stmt.length() && stmt.charAt(offset + 1) == '\'') { - ++offset; - sb.append('\''); - } else { - break loop; - } - } else { - sb.append(c); - } - } - return sb.toString(); - } + public static long getSQLId(String stmt) { + int offset = stmt.indexOf('='); + if (offset != -1 && stmt.length() > ++offset) { + String id = stmt.substring(offset).trim(); + try { + return Long.parseLong(id); + } catch (NumberFormatException e) { + } + } + return 0L; + } - /** - * "abc" - * - * @param offset stmt.charAt(offset) == first " - */ - private static String parseString2(String stmt, int offset) { - StringBuilder sb = new StringBuilder(); - loop: for (++offset; offset < stmt.length(); ++offset) { - char c = stmt.charAt(offset); - if (c == '\\') { - switch (c = stmt.charAt(++offset)) { - case '0': - sb.append('\0'); - break; - case 'b': - sb.append('\b'); - break; - case 'n': - sb.append('\n'); - break; - case 'r': - sb.append('\r'); - break; - case 't': - sb.append('\t'); - break; - case 'Z': - sb.append((char) 26); - break; - default: - sb.append(c); - } - } else if (c == '"') { - if (offset + 1 < stmt.length() && stmt.charAt(offset + 1) == '"') { - ++offset; - sb.append('"'); - } else { - break loop; - } - } else { - sb.append(c); - } - } - return sb.toString(); - } + /** + * 'abc' + * + * @param offset + * stmt.charAt(offset) == first ' + */ + private static String parseString(String stmt, int offset) { + StringBuilder sb = new StringBuilder(); + loop: for (++offset; offset < stmt.length(); ++offset) { + char c = stmt.charAt(offset); + if (c == '\\') { + switch (c = stmt.charAt(++offset)) { + case '0': + sb.append('\0'); + break; + case 'b': + sb.append('\b'); + break; + case 'n': + sb.append('\n'); + break; + case 'r': + sb.append('\r'); + break; + case 't': + sb.append('\t'); + break; + case 'Z': + sb.append((char) 26); + break; + default: + sb.append(c); + } + } else if (c == '\'') { + if (offset + 1 < stmt.length() && stmt.charAt(offset + 1) == '\'') { + ++offset; + sb.append('\''); + } else { + break loop; + } + } else { + sb.append(c); + } + } + return sb.toString(); + } + + /** + * "abc" + * + * @param offset + * stmt.charAt(offset) == first " + */ + private static String parseString2(String stmt, int offset) { + StringBuilder sb = new StringBuilder(); + loop: for (++offset; offset < stmt.length(); ++offset) { + char c = stmt.charAt(offset); + if (c == '\\') { + switch (c = stmt.charAt(++offset)) { + case '0': + sb.append('\0'); + break; + case 'b': + sb.append('\b'); + break; + case 'n': + sb.append('\n'); + break; + case 'r': + sb.append('\r'); + break; + case 't': + sb.append('\t'); + break; + case 'Z': + sb.append((char) 26); + break; + default: + sb.append(c); + } + } else if (c == '"') { + if (offset + 1 < stmt.length() && stmt.charAt(offset + 1) == '"') { + ++offset; + sb.append('"'); + } else { + break loop; + } + } else { + sb.append(c); + } + } + return sb.toString(); + } + + /** + * AS `abc` + * + * @param offset + * stmt.charAt(offset) == first ` + */ + private static String parseIdentifierEscape(String stmt, int offset) { + StringBuilder sb = new StringBuilder(); + loop: for (++offset; offset < stmt.length(); ++offset) { + char c = stmt.charAt(offset); + if (c == '`') { + if (offset + 1 < stmt.length() && stmt.charAt(offset + 1) == '`') { + ++offset; + sb.append('`'); + } else { + break loop; + } + } else { + sb.append(c); + } + } + return sb.toString(); + } - /** - * AS `abc` - * - * @param offset stmt.charAt(offset) == first ` - */ - private static String parseIdentifierEscape(String stmt, int offset) { - StringBuilder sb = new StringBuilder(); - loop: for (++offset; offset < stmt.length(); ++offset) { - char c = stmt.charAt(offset); - if (c == '`') { - if (offset + 1 < stmt.length() && stmt.charAt(offset + 1) == '`') { - ++offset; - sb.append('`'); - } else { - break loop; - } - } else { - sb.append(c); - } - } - return sb.toString(); - } + /** + * @param aliasIndex + * for AS id, index of 'i' + */ + public static String parseAlias(String stmt, final int aliasIndex) { + if (aliasIndex < 0 || aliasIndex >= stmt.length()) { + return null; + } + switch (stmt.charAt(aliasIndex)) { + case '\'': + return parseString(stmt, aliasIndex); + case '"': + return parseString2(stmt, aliasIndex); + case '`': + return parseIdentifierEscape(stmt, aliasIndex); + default: + int offset = aliasIndex; + for (; offset < stmt.length() && CharTypes.isIdentifierChar(stmt.charAt(offset)); ++offset) + ; + return stmt.substring(aliasIndex, offset); + } + } - /** - * @param aliasIndex for AS id, index of 'i' - */ - public static String parseAlias(String stmt, final int aliasIndex) { - if (aliasIndex < 0 || aliasIndex >= stmt.length()) { - return null; - } - switch (stmt.charAt(aliasIndex)) { - case '\'': - return parseString(stmt, aliasIndex); - case '"': - return parseString2(stmt, aliasIndex); - case '`': - return parseIdentifierEscape(stmt, aliasIndex); - default: - int offset = aliasIndex; - for (; offset < stmt.length() && CharTypes.isIdentifierChar(stmt.charAt(offset)); ++offset); - return stmt.substring(aliasIndex, offset); - } - } + /** + * 注解保留,注释 + * + * @param stmt + * @param offset + * @return + */ + public static int comment(String stmt, int offset) { + int len = stmt.length(); + int n = offset; + switch (stmt.charAt(n)) { + case '/': + if (len > ++n && stmt.charAt(n++) == '*' && len > n + 1) { + // 对两种注解放过:/*!mycat: 和 /*#mycat: + if (stmt.charAt(n) == '!') { + break; + } else if (stmt.charAt(n) == '#') { + if (len > n + 5 && stmt.charAt(n + 1) == 'm' && stmt.charAt(n + 2) == 'y' + && stmt.charAt(n + 3) == 'c' && stmt.charAt(n + 4) == 'a' && stmt.charAt(n + 5) == 't') { + break; - /** - * 注解保留,注释 - * @param stmt - * @param offset - * @return - */ - public static int comment(String stmt, int offset) { - int len = stmt.length(); - int n = offset; - switch (stmt.charAt(n)) { - case '/': - if (len > ++n && stmt.charAt(n++) == '*' && len > n + 1) { - //对两种注解放过:/*!mycat: 和 /*#mycat: - if(stmt.charAt(n) == '!') { - break; - } else if (stmt.charAt(n) == '#') { - if(len > n + 5 && stmt.charAt(n + 1) == 'm' - && stmt.charAt(n + 2) == 'y' - && stmt.charAt(n + 3) == 'c' - && stmt.charAt(n + 4) == 'a' - && stmt.charAt(n + 5) == 't') { - break; - - } - } - for (int i = n; i < len; ++i) { - if (stmt.charAt(i) == '*') { - int m = i + 1; - if (len > m && stmt.charAt(m) == '/') return m; - } - } - } - break; - case '#': - for (int i = n + 1; i < len; ++i) { - if (stmt.charAt(i) == '\n') return i; - } - break; - } - return offset; - } + } + } + for (int i = n; i < len; ++i) { + if (stmt.charAt(i) == '*') { + int m = i + 1; + if (len > m && stmt.charAt(m) == '/') + return m; + } + } + } + break; + case '#': + for (int i = n + 1; i < len; ++i) { + if (stmt.charAt(i) == '\n') + return i; + } + break; + } + return offset; + } - public static boolean currentCharIsSep(String stmt, int offset) { - if (stmt.length() > offset) { - switch (stmt.charAt(offset)) { - case ' ': - case '\t': - case '\r': - case '\n': - return true; - default: - return false; - } - } - return true; - } + public static boolean currentCharIsSep(String stmt, int offset) { + if (stmt.length() > offset) { + switch (stmt.charAt(offset)) { + case ' ': + case '\t': + case '\r': + case '\n': + return true; + default: + return false; + } + } + return true; + } - /***** - * 检查下一个字符是否为分隔符,并把偏移量加1 - */ - public static boolean nextCharIsSep(String stmt, int offset) { - return currentCharIsSep(stmt, ++offset); - } + /***** + * 检查下一个字符是否为分隔符,并把偏移量加1 + */ + public static boolean nextCharIsSep(String stmt, int offset) { + return currentCharIsSep(stmt, ++offset); + } - /***** - * 检查下一个字符串是否为期望的字符串,并把偏移量移到从offset开始计算,expectValue之后的位置 - * - * @param stmt 被解析的sql - * @param offset 被解析的sql的当前位置 - * @param nextExpectedString 在stmt中准备查找的字符串 - * @param checkSepChar 当找到expectValue值时,是否检查其后面字符为分隔符号 - * @return 如果包含指定的字符串,则移动相应的偏移量,否则返回值=offset - */ - public static int nextStringIsExpectedWithIgnoreSepChar(String stmt, - int offset, - String nextExpectedString, - boolean checkSepChar) { - if (nextExpectedString == null || nextExpectedString.length() < 1) return offset; - int i = offset; - int index = 0; - char expectedChar; - char actualChar; - boolean isSep; - for (; i < stmt.length() && index < nextExpectedString.length(); ++i) { - if (index == 0) { - isSep = currentCharIsSep(stmt, i); - if (isSep) { - continue; - } - } - actualChar = stmt.charAt(i); - expectedChar = nextExpectedString.charAt(index++); - if (actualChar != expectedChar) { - return offset; - } - } - if (index == nextExpectedString.length()) { - boolean ok = true; - if (checkSepChar) { - ok = nextCharIsSep(stmt, i); - } - if (ok) return i; - } - return offset; - } + /***** + * 检查下一个字符串是否为期望的字符串,并把偏移量移到从offset开始计算,expectValue之后的位置 + * + * @param stmt + * 被解析的sql + * @param offset + * 被解析的sql的当前位置 + * @param nextExpectedString + * 在stmt中准备查找的字符串 + * @param checkSepChar + * 当找到expectValue值时,是否检查其后面字符为分隔符号 + * @return 如果包含指定的字符串,则移动相应的偏移量,否则返回值=offset + */ + public static int nextStringIsExpectedWithIgnoreSepChar(String stmt, int offset, String nextExpectedString, + boolean checkSepChar) { + if (nextExpectedString == null || nextExpectedString.length() < 1) + return offset; + int i = offset; + int index = 0; + char expectedChar; + char actualChar; + boolean isSep; + for (; i < stmt.length() && index < nextExpectedString.length(); ++i) { + if (index == 0) { + isSep = currentCharIsSep(stmt, i); + if (isSep) { + continue; + } + } + actualChar = stmt.charAt(i); + expectedChar = nextExpectedString.charAt(index++); + if (actualChar != expectedChar) { + return offset; + } + } + if (index == nextExpectedString.length()) { + boolean ok = true; + if (checkSepChar) { + ok = nextCharIsSep(stmt, i); + } + if (ok) + return i; + } + return offset; + } - private static final String JSON = "json"; - private static final String EQ = "="; + private static final String JSON = "json"; + private static final String EQ = "="; - //private static final String WHERE = "where"; - //private static final String SET = "set"; + // private static final String WHERE = "where"; + // private static final String SET = "set"; - /********** - * 检查下一个字符串是否json= * - * - * @param stmt 被解析的sql - * @param offset 被解析的sql的当前位置 - * @return 如果包含指定的字符串,则移动相应的偏移量,否则返回值=offset - */ - public static int nextStringIsJsonEq(String stmt, int offset) { - int i = offset; + /********** + * 检查下一个字符串是否json= * + * + * @param stmt + * 被解析的sql + * @param offset + * 被解析的sql的当前位置 + * @return 如果包含指定的字符串,则移动相应的偏移量,否则返回值=offset + */ + public static int nextStringIsJsonEq(String stmt, int offset) { + int i = offset; - // / drds 之后的符号 - if (!currentCharIsSep(stmt, ++i)) { - return offset; - } + // / drds 之后的符号 + if (!currentCharIsSep(stmt, ++i)) { + return offset; + } - // json 串 - int k = nextStringIsExpectedWithIgnoreSepChar(stmt, i, JSON, false); - if (k <= i) { - return offset; - } - i = k; + // json 串 + int k = nextStringIsExpectedWithIgnoreSepChar(stmt, i, JSON, false); + if (k <= i) { + return offset; + } + i = k; - // 等于符号 - k = nextStringIsExpectedWithIgnoreSepChar(stmt, i, EQ, false); - if (k <= i) { - return offset; - } - return i; - } + // 等于符号 + k = nextStringIsExpectedWithIgnoreSepChar(stmt, i, EQ, false); + if (k <= i) { + return offset; + } + return i; + } - public static int move(String stmt, int offset, int length) { - int i = offset; - for (; i < stmt.length(); ++i) { - switch (stmt.charAt(i)) { - case ' ': - case '\t': - case '\r': - case '\n': - continue; - case '/': - case '#': - i = comment(stmt, i); - continue; - default: - return i + length; - } - } - return i; - } + public static int move(String stmt, int offset, int length) { + int i = offset; + for (; i < stmt.length(); ++i) { + switch (stmt.charAt(i)) { + case ' ': + case '\t': + case '\r': + case '\n': + continue; + case '/': + case '#': + i = comment(stmt, i); + continue; + default: + return i + length; + } + } + return i; + } - public static boolean compare(String s, int offset, char[] keyword) { - if (s.length() >= offset + keyword.length) { - for (int i = 0; i < keyword.length; ++i, ++offset) { - if (Character.toUpperCase(s.charAt(offset)) != keyword[i]) { - return false; - } - } - return true; - } - return false; - } + public static boolean compare(String s, int offset, char[] keyword) { + if (s.length() >= offset + keyword.length) { + for (int i = 0; i < keyword.length; ++i, ++offset) { + if (Character.toUpperCase(s.charAt(offset)) != keyword[i]) { + return false; + } + } + return true; + } + return false; + } } \ No newline at end of file