diff --git a/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java b/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java index 1cfcfb6..64d8ca2 100644 --- a/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java @@ -48,8 +48,8 @@ public enum CurrPacketType { /** * 事务提交方式 */ - public AutoCommit autoCommit = AutoCommit.OFF; - + public AutoCommit autoCommit = AutoCommit.ON; + /** * 认证中的seed报文数据 */ @@ -83,6 +83,7 @@ public void setCurBufOwner(boolean curBufOwner) { */ public void responseOKOrError(MySQLPacket pkg) throws IOException { // proxyBuffer.changeOwner(true); + this.proxyBuffer.reset(); pkg.write(this.proxyBuffer); proxyBuffer.flip(); proxyBuffer.readIndex = proxyBuffer.writeIndex; diff --git a/source/src/main/java/io/mycat/mycat2/ConfigLoader.java b/source/src/main/java/io/mycat/mycat2/ConfigLoader.java index e756edb..bc40ef0 100644 --- a/source/src/main/java/io/mycat/mycat2/ConfigLoader.java +++ b/source/src/main/java/io/mycat/mycat2/ConfigLoader.java @@ -62,7 +62,8 @@ public static List loadSheamBeans(String schemaBeanuri){ NamedNodeMap map=curRuleNode.getAttributes(); String name=getAttribute(map,"name",null); - String schemaType=getAttribute(map,"nopartion","true"); + String schemaType=getAttribute(map,"type","0"); + String balanceSelectIntrans = getAttribute(map, "balanceSelectIntrans", "false"); String defaultDB=getAttribute(map,"default-db",null); String[] dnItems=defaultDB.split(":"); DNBean dnBean=new DNBean(dnItems[0].trim(),dnItems[1].trim()); @@ -76,7 +77,9 @@ public static List loadSheamBeans(String schemaBeanuri){ String tRule=getAttribute(attrs,"sharding-rule",null); TableDefBean tbBean=new TableDefBean(tName,tType,tKey,tRule); tableLst.add(tbBean);}); - SchemaBean sBean=new SchemaBean(name,dnBean,("true".equalsIgnoreCase(schemaType)),tableLst); + SchemaBean sBean=new SchemaBean(name,dnBean,schemaType, + Boolean.valueOf(balanceSelectIntrans) + ,tableLst); LOGGER.debug("schema-bean: {}", sBean); list.add(sBean); } diff --git a/source/src/main/java/io/mycat/mycat2/MyCommand.java b/source/src/main/java/io/mycat/mycat2/MyCommand.java new file mode 100644 index 0000000..224ee09 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/MyCommand.java @@ -0,0 +1,44 @@ +package io.mycat.mycat2; + +import java.io.IOException; + +/** + * 处理 mysql 命令 + * @author Administrator + * + */ +public interface MyCommand { + + /** + * 收到后端应答 + * + * @param session + * 后端MySQLSession + * @return + * @throws IOException + */ + public boolean onBackendResponse(MySQLSession session) throws IOException; + + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException; + + public boolean onFrontWriteFinished(MycatSession session) throws IOException; + + public boolean onBackendWriteFinished(MySQLSession session) throws IOException; + + /** + * 直接应答请求报文,如果是直接应答的,则此方法调用一次就完成了,如果是靠后端响应后才应答,则至少会调用两次, + * + * @param session + * @return 是否完成了应答 + */ + public boolean procssSQL(MycatSession session) throws IOException; + + /** + * 清理资源,只清理自己产生的资源(如创建了Buffer,以及Session中放入了某些对象) + * + * @param socketClosed + * 是否因为Session关闭而清理资源,此时应该彻底清理 + */ + public void clearResouces(boolean sessionCLosed); + +} diff --git a/source/src/main/java/io/mycat/mycat2/MySQLCommand.java b/source/src/main/java/io/mycat/mycat2/MySQLCommand.java new file mode 100644 index 0000000..b845a1c --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/MySQLCommand.java @@ -0,0 +1,12 @@ +package io.mycat.mycat2; + +/** + * 负责处理SQL命令 + * + * @author wuzhihui + * + * @param + */ +public interface MySQLCommand extends MyCommand{ + +} diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index e027e2b..93d45e7 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -17,6 +17,11 @@ */ public class MySQLSession extends AbstractMySQLSession { private String database; + + /** + * 当前缓存的 mysqlSession 所属的mysql-replica 的名称。用于快速判断当前连接是否可以被复用 + */ + private String currBackendCachedName; /** * 当前所从属的mycat sesssion */ @@ -60,8 +65,15 @@ public void setMycatSession(MycatSession mycatSession) { @Override protected void doTakeReadOwner() { - this.getMycatSession().takeOwner(SelectionKey.OP_READ); + this.getMycatSession().takeOwner(SelectionKey.OP_READ); + } + + public String getCurrBackendCachedName() { + return currBackendCachedName; + } + public void setCurrBackendCachedName(String currBackendCachedName) { + this.currBackendCachedName = currBackendCachedName; } } diff --git a/source/src/main/java/io/mycat/mycat2/MycatCore.java b/source/src/main/java/io/mycat/mycat2/MycatCore.java index ef72800..ddb4c26 100644 --- a/source/src/main/java/io/mycat/mycat2/MycatCore.java +++ b/source/src/main/java/io/mycat/mycat2/MycatCore.java @@ -100,7 +100,6 @@ public static void main(String[] args) throws IOException { ClusterNode.parseNodesInf(conf.getAllNodeInfs())); runtime.setMyCLuster(cluster); cluster.initCluster(); - } URL datasourceURL = ConfigLoader.class.getResource("/datasource.xml"); diff --git a/source/src/main/java/io/mycat/mycat2/MycatSession.java b/source/src/main/java/io/mycat/mycat2/MycatSession.java index 935337b..661380e 100644 --- a/source/src/main/java/io/mycat/mycat2/MycatSession.java +++ b/source/src/main/java/io/mycat/mycat2/MycatSession.java @@ -4,11 +4,31 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.mycat.mycat2.beans.DNBean; import io.mycat.mycat2.beans.MySQLDataSource; import io.mycat.mycat2.beans.SchemaBean; +import io.mycat.mycat2.cmds.strategy.AnnotateRouteCmdStrategy; +import io.mycat.mycat2.cmds.strategy.DBINMultiServerCmdStrategy; +import io.mycat.mycat2.cmds.strategy.DBInOneServerCmdStrategy; +import io.mycat.mycat2.net.DefaultMycatSessionHandler; +import io.mycat.mycat2.sqlparser.NewSQLContext; +import io.mycat.mycat2.tasks.AsynTaskCallBack; +import io.mycat.mycat2.tasks.BackendConCreateTask; +import io.mycat.mycat2.tasks.BackendSynchemaTask; +import io.mycat.mycat2.tasks.BackendSynchronzationTask; +import io.mycat.mysql.AutoCommit; import io.mycat.mysql.Capabilities; +import io.mycat.mysql.packet.ErrorPacket; import io.mycat.mysql.packet.HandshakePacket; import io.mycat.proxy.BufferPool; import io.mycat.proxy.ProxyRuntime; @@ -21,14 +41,59 @@ * */ public class MycatSession extends AbstractMySQLSession { + + private static Logger logger = LoggerFactory.getLogger(MycatSession.class); + + private MySQLSession curBackend; - private MySQLSession backend; + public MyCommand curSQLCommand; + + public NewSQLContext sqlContext = new NewSQLContext(); - public SQLCommand curSQLCommand; /** * Mycat Schema */ public SchemaBean schema; + + private Map> backendMap = new HashMap<>(); + + private static List masterSqlList = new ArrayList<>(); + + static{ + masterSqlList.add(NewSQLContext.INSERT_SQL); + masterSqlList.add(NewSQLContext.UPDATE_SQL); + masterSqlList.add(NewSQLContext.DELETE_SQL); + masterSqlList.add(NewSQLContext.REPLACE_SQL); +// masterSqlList.add(NewSQLContext.SELECT_INTO_SQL); +// masterSqlList.add(NewSQLContext.SELECT_FOR_UPDATE_SQL); + masterSqlList.add(NewSQLContext.LOAD_SQL); + masterSqlList.add(NewSQLContext.CALL_SQL); + masterSqlList.add(NewSQLContext.TRUNCATE_SQL); + +// masterSqlList.add(NewSQLContext.BEGIN_SQL); +// masterSqlList.add(NewSQLContext.START_SQL); +// masterSqlList.add(NewSQLContext.SET_AUTOCOMMIT_SQL); + } + + /** + * 获取sql 类型 + * @param session + * @return + */ + public MyCommand getMyCommand(){ + switch(schema.type){ + case DBInOneServer: + return DBInOneServerCmdStrategy.INSTANCE.getMyCommand(this); + case DBINMultiServer: + return DBINMultiServerCmdStrategy.INSTANCE.getMyCommand(this); + case AnnotateRoute: + return AnnotateRouteCmdStrategy.INSTANCE.getMyCommand(this); + case SQLParseRoute: + return AnnotateRouteCmdStrategy.INSTANCE.getMyCommand(this); + default: + throw new InvalidParameterException("schema type is invalid "); + } + } public MycatSession(BufferPool bufPool, Selector nioSelector, SocketChannel frontChannel) throws IOException { super(bufPool, nioSelector, frontChannel); @@ -94,22 +159,14 @@ public void sendAuthPackge() throws IOException { this.writeToChannel(); } - /** - * 当前操作的后端会话连接 - * - * @return - */ - public MySQLSession getBackend() { - return backend; - } - /** * 绑定后端MySQL会话 * * @param backend */ public void bindBackend(MySQLSession backend) { - this.backend = backend; + this.curBackend = backend; + putbackendMap(backend); backend.setMycatSession(this); backend.useSharedBuffer(this.proxyBuffer); backend.setCurNIOHandler(this.getCurNIOHandler()); @@ -128,16 +185,16 @@ public void takeOwner(int intestOpts) { } else { this.change2WriteOpts(); } - if (this.backend != null) { - backend.setCurBufOwner(false); - backend.clearReadWriteOpts(); + if (this.curBackend != null) { + curBackend.setCurBufOwner(false); + curBackend.clearReadWriteOpts(); } } public void chnageBothReadOpts() { this.change2ReadOpts(); - this.backend.change2ReadOpts(); + this.curBackend.change2ReadOpts(); } /** * 放弃控制权,同时设置对端MySQLSession感兴趣的事件,如SocketRead,Write,只能其一 @@ -147,11 +204,11 @@ public void chnageBothReadOpts() public void giveupOwner(int intestOpts) { this.curBufOwner = false; this.clearReadWriteOpts(); - backend.setCurBufOwner(true); + curBackend.setCurBufOwner(true); if (intestOpts == SelectionKey.OP_READ) { - backend.change2ReadOpts(); + curBackend.change2ReadOpts(); } else { - backend.change2WriteOpts(); + curBackend.change2WriteOpts(); } } @@ -170,6 +227,7 @@ public void answerFront(byte[] rawPkg) throws IOException { public void close(boolean normal, String hint) { super.close(normal, hint); + //TODO 清理前后端资源 this.curSQLCommand.clearResouces(true); } @@ -191,5 +249,185 @@ protected void doTakeReadOwner() { this.takeOwner(SelectionKey.OP_READ); } + + + private String getbackendName(){ + String backendName = null; + switch(schema.type){ + case DBInOneServer: + backendName = schema.getDefaultDN().getMysqlReplica(); + break; + case AnnotateRoute: + break; + case DBINMultiServer: + break; + case SQLParseRoute: + break; + default: + break; + } + if(backendName==null){ + throw new InvalidParameterException("the backendName must not be null"); + } + return backendName; + } + + /** + * 将后端连接放入到后端连接缓存中 + * @param mysqlSession + */ + private void putbackendMap(MySQLSession mysqlSession){ + String backendName = getbackendName(); + mysqlSession.setCurrBackendCachedName(backendName); + List list = backendMap.get(backendName); + if(list==null){ + list = new ArrayList<>(); + backendMap.putIfAbsent(backendName, list); + } + list.add(mysqlSession); + } + + /** + * 当前操作的后端会话连接 + * + * @return + */ + public void getBackend(AsynTaskCallBack callback) throws IOException { + final boolean runOnSlave; + + if((NewSQLContext.ANNOTATION_BALANCE==sqlContext.getAnnotationType() + ||(NewSQLContext.ANNOTATION_DB_TYPE==sqlContext.getAnnotationType() + &&1==sqlContext.getAnnotationValue(NewSQLContext.ANNOTATION_DB_TYPE))) + ||(AutoCommit.ON==autoCommit //非事务场景下,走从节点 + )){ // 事务场景下, 如果配置了事务内的查询也走读写分离 + + if(masterSqlList.contains(sqlContext.getSQLType())){ + runOnSlave = false; + }else{ + //走从节点 + runOnSlave = true; + } + }else{ + runOnSlave = false; + } + + String backendName = getbackendName(); + + // 如果当前backend 连接可用,直接使用。 + if(curBackend!=null&& + backendName.equals(curBackend.getCurrBackendCachedName()) + &&curBackend.isDefaultChannelRead()==runOnSlave){ + if(logger.isDebugEnabled()){ + logger.debug("Using cached backend connections for " + (runOnSlave?"read":"write")); + } + callback.finished(curBackend,null,true,null); + return; + } + + List backendList = backendMap.get(backendName); + + Optional mysqlSession=null; + if(backendList!=null){ + mysqlSession = backendList.stream().filter(f->((f.isDefaultChannelRead()==runOnSlave))).findFirst(); + } + + if(mysqlSession==null||!mysqlSession.isPresent()){ + if(logger.isDebugEnabled()){ + logger.debug("create new connection for "+(runOnSlave?"read":"write")); + } + createBackendConn(this,runOnSlave,callback); + }else{ + curBackend = mysqlSession.get(); + if(logger.isDebugEnabled()){ + logger.debug("Using cached map backend connections for "+ (runOnSlave?"read":"write")); + } + callback.finished(curBackend,null,true,null); + } + } + + + + /** + * 创建后端连接 + * @param session + * @throws IOException + */ + public void createBackendConn(MycatSession session,boolean runOnSlave,AsynTaskCallBack callback) throws IOException { + final MySQLDataSource ds = session.getDatasource(); + //TODO 从连接池获取 + BackendConCreateTask authProcessor = new BackendConCreateTask(session.bufPool, session.nioSelector, ds, + session.schema.name); + authProcessor.setCallback((optSession, Sender, exeSucces, retVal) -> { + //设置当前连接 读写分离属性 + optSession.setDefaultChannelRead(runOnSlave); + //恢复默认的Handler + session.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + optSession.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + if (exeSucces) { + session.bindBackend(optSession); + syncSessionStateToBackend(optSession,callback); + } else { + ErrorPacket errPkg = (ErrorPacket) retVal; + optSession.responseOKOrError(errPkg); + } + }); + session.setCurNIOHandler(authProcessor); + } + + /** + * 同步后端连接状态 + * @param mycatSession + * @param mysqlSession + * @param callback + * @throws IOException + */ + public void syncSessionStateToBackend(MySQLSession mysqlSession,AsynTaskCallBack callback) throws IOException { + MycatSession mycatSession = mysqlSession.getMycatSession(); + BackendSynchronzationTask backendSynchronzationTask = new BackendSynchronzationTask(mysqlSession); + backendSynchronzationTask.setCallback((optSession, sender, exeSucces, rv) -> { + //恢复默认的Handler + mycatSession.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + optSession.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + if (exeSucces) { + syncSchemaToBackend(optSession,callback); + } else { + ErrorPacket errPkg = (ErrorPacket) rv; + mycatSession.close(true, errPkg.message); + } + }); + mycatSession.setCurNIOHandler(backendSynchronzationTask); + } + + /** + * 同步 schema 到后端 + * @param mysqlSession + * @param callback + * @throws IOException + */ + public void syncSchemaToBackend(MySQLSession mysqlSession,AsynTaskCallBack callback) throws IOException{ + if(mysqlSession.getMycatSession().schema!=null + &&!mysqlSession.getMycatSession().schema.getDefaultDN().getDatabase().equals(mysqlSession.getDatabase())){ + MycatSession mycatSession = mysqlSession.getMycatSession(); + BackendSynchemaTask backendSynchemaTask = new BackendSynchemaTask(mysqlSession); + backendSynchemaTask.setCallback((optSession, sender, exeSucces, rv) -> { + //恢复默认的Handler + mycatSession.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + optSession.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + if (exeSucces) { + if(callback!=null){ + callback.finished(optSession, sender, exeSucces, rv); + } + } else { + ErrorPacket errPkg = (ErrorPacket) rv; + mycatSession.close(true, errPkg.message); + } + }); + mycatSession.setCurNIOHandler(backendSynchemaTask); + }else{ + if(callback!=null){ + callback.finished(mysqlSession, null, true, null); + } + } + } } diff --git a/source/src/main/java/io/mycat/mycat2/beans/SchemaBean.java b/source/src/main/java/io/mycat/mycat2/beans/SchemaBean.java index 1c6b262..04bb228 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/SchemaBean.java +++ b/source/src/main/java/io/mycat/mycat2/beans/SchemaBean.java @@ -46,24 +46,25 @@ public enum SchemaType { public String name; public SchemaType type; private DNBean defaultDN; + + /* + * 事务内的查询是否也做读写分离 + */ + private boolean balanceSelectIntrans; /** * 是否非分片的Schema,意味著沒有任何分片表的Schema */ - private final boolean normalSchema; private List tableDefBeans; - public SchemaBean(String name, DNBean defaultDN, boolean normalSchema, List tableDefBeans) { + public SchemaBean(String name, DNBean defaultDN, String type,boolean balanceSelectIntrans, List tableDefBeans) { super(); this.name = name; this.defaultDN = defaultDN; - this.normalSchema = normalSchema; + this.type = SchemaType.values()[Integer.parseInt(type)]; + this.balanceSelectIntrans = balanceSelectIntrans; this.tableDefBeans = tableDefBeans; } - public boolean isNormalSchema() { - return normalSchema; - } - public String getName() { return name; } @@ -90,7 +91,7 @@ public List getTableDefBeans() { @Override public String toString() { - return "SchemaBean [name=" + name + ", defaultDN=" + defaultDN + ", normalSchema=" + normalSchema + return "SchemaBean [name=" + name + ", defaultDN=" + defaultDN + ", tableDefBeans=" + tableDefBeans + "]"; } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/AbstractMutiDNExeCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/AbstractMutiDNExeCmd.java index 662d0fe..61ca5cb 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/AbstractMutiDNExeCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/AbstractMutiDNExeCmd.java @@ -2,10 +2,10 @@ import java.io.IOException; + import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; -import io.mycat.mycat2.SQLCommand; - +import io.mycat.mycat2.MySQLCommand; /** * 多节点执行SQL的抽象MySQLCommand类。 * 需要修改SQLCommand接口, @@ -14,7 +14,7 @@ * @author wuzhihui * */ -public class AbstractMutiDNExeCmd implements SQLCommand{ +public class AbstractMutiDNExeCmd implements MySQLCommand{ @Override public boolean procssSQL(MycatSession session) throws IOException { @@ -52,6 +52,4 @@ public void clearResouces(boolean sessionCLosed) { } - - } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/CmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/CmdStrategy.java new file mode 100644 index 0000000..c9fdfd3 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/CmdStrategy.java @@ -0,0 +1,15 @@ +package io.mycat.mycat2.cmds; + +import io.mycat.mycat2.MyCommand; +import io.mycat.mycat2.MycatSession; + +/** + * + * @author yanjunli + * + */ +public interface CmdStrategy { + + MyCommand getMyCommand(MycatSession session); + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/ComChangeUserCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/ComChangeUserCmd.java new file mode 100644 index 0000000..fa4d680 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/ComChangeUserCmd.java @@ -0,0 +1,53 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLCommand; +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; + +public class ComChangeUserCmd implements MySQLCommand{ + + private static final Logger logger = LoggerFactory.getLogger(ComChangeUserCmd.class); + + public static final ComChangeUserCmd INSTANCE = new ComChangeUserCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendResponse(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendWriteFinished(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void clearResouces(boolean sessionCLosed) { + // TODO Auto-generated method stub + + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/ComInitDB.java b/source/src/main/java/io/mycat/mycat2/cmds/ComInitDB.java new file mode 100644 index 0000000..18c6eeb --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/ComInitDB.java @@ -0,0 +1,37 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatConfig; +import io.mycat.mycat2.MycatSession; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.proxy.ProxyRuntime; + +public class ComInitDB extends DirectPassthrouhCmd{ + + + private static final Logger logger = LoggerFactory.getLogger(ComInitDB.class); + + public static final ComInitDB INSTANCE = new ComInitDB(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { +// byte[] curdatabases = session.proxyBuffer.getBytes(session.curMSQLPackgInf.startPos+MySQLPacket.packetHeaderSize+1, +// session.curMSQLPackgInf.pkgLength -MySQLPacket.packetHeaderSize+1); +// if(curdatabases==null){ +// +// } +// +// if(ProxyRuntime.INSTANCE.getProxyConfig() instanceof MycatConfig){ +// MycatConfig config = (MycatConfig)ProxyRuntime.INSTANCE.getProxyConfig(); +// String databasesName = new String(curdatabases); +// session.schema = config.getMycatSchema(databasesName); +// logger.debug(" current database change to {}",databasesName); +// } + + return super.procssSQL(session); + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/ComQuitCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/ComQuitCmd.java new file mode 100644 index 0000000..3a7035e --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/ComQuitCmd.java @@ -0,0 +1,54 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLCommand; +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; + +public class ComQuitCmd implements MySQLCommand{ + + private static final Logger logger = LoggerFactory.getLogger(ComQuitCmd.class); + + public static final ComQuitCmd INSTANCE = new ComQuitCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + session.close(true, "client closed"); + return true; + } + + @Override + public boolean onBackendResponse(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendWriteFinished(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void clearResouces(boolean sessionCLosed) { + // TODO Auto-generated method stub + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java index 052e364..944647e 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java @@ -8,9 +8,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; -import io.mycat.mycat2.SQLCommand; +import io.mycat.mycat2.MySQLCommand; import io.mycat.mycat2.beans.MySQLPackageInf; import io.mycat.mycat2.cmds.judge.DirectTransJudge; import io.mycat.mycat2.cmds.judge.ErrorJudge; @@ -25,7 +26,7 @@ * @author wuzhihui * */ -public class DirectPassthrouhCmd implements SQLCommand { +public class DirectPassthrouhCmd implements MySQLCommand { private static final Logger logger = LoggerFactory.getLogger(DirectPassthrouhCmd.class); @@ -45,14 +46,19 @@ public class DirectPassthrouhCmd implements SQLCommand { @Override public boolean procssSQL(MycatSession session) throws IOException { - ProxyBuffer curBuffer = session.proxyBuffer; - // 切换 buffer 读写状态 - curBuffer.flip(); - // 没有读取,直接透传时,需要指定 透传的数据 截止位置 - curBuffer.readIndex = curBuffer.writeIndex; - // 改变 owner,对端Session获取,并且感兴趣写事件 - session.giveupOwner(SelectionKey.OP_WRITE); - session.getBackend().writeToChannel(); + curfinishPackage.putAll(finishPackage); + session.getBackend((mysqlsession, sender, success,result)->{ + if(success){ + ProxyBuffer curBuffer = session.proxyBuffer; + // 切换 buffer 读写状态 + curBuffer.flip(); + // 没有读取,直接透传时,需要指定 透传的数据 截止位置 + curBuffer.readIndex = curBuffer.writeIndex; + // 改变 owner,对端Session获取,并且感兴趣写事件 + session.giveupOwner(SelectionKey.OP_WRITE); + mysqlsession.writeToChannel(); + } + }); return false; } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java new file mode 100644 index 0000000..9b9ab7b --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/NotSupportCmd.java @@ -0,0 +1,53 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLCommand; +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; + +public class NotSupportCmd implements MySQLCommand{ + + private static final Logger logger = LoggerFactory.getLogger(NotSupportCmd.class); + + public static final NotSupportCmd INSTANCE = new NotSupportCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendResponse(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendWriteFinished(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void clearResouces(boolean sessionCLosed) { + // TODO Auto-generated method stub + + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComBeginCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComBeginCmd.java new file mode 100644 index 0000000..0e6451a --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComBeginCmd.java @@ -0,0 +1,28 @@ +package io.mycat.mycat2.cmds.sqlCmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mysql.AutoCommit; + +public class SqlComBeginCmd extends DirectPassthrouhCmd{ + + private static final Logger logger = LoggerFactory.getLogger(SqlComBeginCmd.class); + + public static final SqlComBeginCmd INSTANCE = new SqlComBeginCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + /* + * 开启事务, + * TODO 事务兼容性完善. + */ + session.autoCommit=AutoCommit.OFF; + super.procssSQL(session); + return false; + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComCommitCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComCommitCmd.java new file mode 100644 index 0000000..61afbde --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComCommitCmd.java @@ -0,0 +1,28 @@ +package io.mycat.mycat2.cmds.sqlCmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mysql.AutoCommit; + +public class SqlComCommitCmd extends DirectPassthrouhCmd{ + + private static final Logger logger = LoggerFactory.getLogger(SqlComCommitCmd.class); + + public static final SqlComCommitCmd INSTANCE = new SqlComCommitCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + super.procssSQL(session); + /* + * 提交事务 + * TODO 事务兼容性完善. + */ + session.autoCommit=AutoCommit.ON; + return false; + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComKillCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComKillCmd.java new file mode 100644 index 0000000..6dc1388 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComKillCmd.java @@ -0,0 +1,54 @@ +package io.mycat.mycat2.cmds.sqlCmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLCommand; +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MycatSession; + +public class SqlComKillCmd implements MySQLCommand{ + + private static final Logger logger = LoggerFactory.getLogger(SqlComKillCmd.class); + + public static final SqlComKillCmd INSTANCE = new SqlComKillCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendResponse(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendClosed(MySQLSession session, boolean normal) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onFrontWriteFinished(MycatSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean onBackendWriteFinished(MySQLSession session) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void clearResouces(boolean sessionCLosed) { + // TODO Auto-generated method stub + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComRollBackCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComRollBackCmd.java new file mode 100644 index 0000000..9cf121c --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComRollBackCmd.java @@ -0,0 +1,28 @@ +package io.mycat.mycat2.cmds.sqlCmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mysql.AutoCommit; + +public class SqlComRollBackCmd extends DirectPassthrouhCmd{ + + private static final Logger logger = LoggerFactory.getLogger(SqlComRollBackCmd.class); + + public static final SqlComRollBackCmd INSTANCE = new SqlComRollBackCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + super.procssSQL(session); + /* + * 提交事务 + * TODO 事务兼容性完善. + */ + session.autoCommit=AutoCommit.ON; + return false; + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComStartCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComStartCmd.java new file mode 100644 index 0000000..29d0083 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/sqlCmds/SqlComStartCmd.java @@ -0,0 +1,36 @@ +package io.mycat.mycat2.cmds.sqlCmds; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mycat2.sqlparser.NewSQLContext; +import io.mycat.mysql.AutoCommit; + +public class SqlComStartCmd extends DirectPassthrouhCmd{ + + private static final Logger logger = LoggerFactory.getLogger(SqlComStartCmd.class); + + public static final SqlComStartCmd INSTANCE = new SqlComStartCmd(); + + @Override + public boolean procssSQL(MycatSession session) throws IOException { + + //TODO start transaction; start slave; 暂时还无法区分 + if(NewSQLContext.TRANSACTION_SQL==session.sqlContext.getSQLType()){ + + } + + /* + * 开启事务, + * TODO 事务兼容性完善. + */ + session.autoCommit=AutoCommit.OFF; + super.procssSQL(session); + return false; + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java new file mode 100644 index 0000000..6195812 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/AbstractCmdStrategy.java @@ -0,0 +1,67 @@ +package io.mycat.mycat2.cmds.strategy; + +import java.util.HashMap; +import java.util.Map; + +import io.mycat.mycat2.MyCommand; +import io.mycat.mycat2.MySQLCommand; +import io.mycat.mycat2.MycatSession; +import io.mycat.mycat2.cmds.CmdStrategy; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mycat2.sqlparser.NewSQLContext; +import io.mycat.mycat2.sqlparser.NewSQLParser; +import io.mycat.mysql.packet.MySQLPacket; + +public abstract class AbstractCmdStrategy implements CmdStrategy { + + /** + * 进行MySQL命令的处理的容器 + */ + protected Map MYCOMMANDMAP = new HashMap<>(); + + /** + * 进行SQL命令的处理的容器 + */ + protected Map MYSQLCOMMANDMAP = new HashMap<>(); + + public AbstractCmdStrategy(){ + initMyCmdHandler(); + initMySqlCmdHandler(); + } + + protected abstract void initMyCmdHandler(); + + protected abstract void initMySqlCmdHandler(); + + @Override + public MyCommand getMyCommand(MycatSession session) { + MyCommand command = null; + if(MySQLPacket.COM_QUERY==session.curMSQLPackgInf.pkgType){ + command = doGetMySQLCommand(session); + }else{ + command = doGetMyCommand(session); + } + return command!=null?command:DirectPassthrouhCmd.INSTANCE; + } + + /** + * 模板方法,默认的获取 my 命令处理器的方法,子类可以覆盖 + * @param session + * @return + */ + protected MyCommand doGetMyCommand(MycatSession session){ + return MYCOMMANDMAP.get(session.curMSQLPackgInf.pkgType); + } + + /** + * 模板方法,默认的获取 sql 命令处理器的方法,子类可以覆盖 + * @param session + * @return + */ + protected MyCommand doGetMySQLCommand(MycatSession session){ + NewSQLParser parser = new NewSQLParser(); + parser.parse(session.proxyBuffer.getBytes(session.curMSQLPackgInf.startPos+MySQLPacket.packetHeaderSize+1, + session.curMSQLPackgInf.pkgLength - MySQLPacket.packetHeaderSize - 1), session.sqlContext); + return MYSQLCOMMANDMAP.get(session.sqlContext.getSQLType()); + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/AnnotateRouteCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/AnnotateRouteCmdStrategy.java new file mode 100644 index 0000000..f77c751 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/AnnotateRouteCmdStrategy.java @@ -0,0 +1,19 @@ +package io.mycat.mycat2.cmds.strategy; + +public class AnnotateRouteCmdStrategy extends AbstractCmdStrategy { + + public static final AnnotateRouteCmdStrategy INSTANCE = new AnnotateRouteCmdStrategy(); + + @Override + protected void initMyCmdHandler() { + // TODO Auto-generated method stub + + } + + @Override + protected void initMySqlCmdHandler() { + // TODO Auto-generated method stub + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBINMultiServerCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBINMultiServerCmdStrategy.java new file mode 100644 index 0000000..31a1e0d --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBINMultiServerCmdStrategy.java @@ -0,0 +1,19 @@ +package io.mycat.mycat2.cmds.strategy; + +public class DBINMultiServerCmdStrategy extends AbstractCmdStrategy { + + public static final DBINMultiServerCmdStrategy INSTANCE = new DBINMultiServerCmdStrategy(); + + @Override + protected void initMyCmdHandler() { + // TODO Auto-generated method stub + + } + + @Override + protected void initMySqlCmdHandler() { + // TODO Auto-generated method stub + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java new file mode 100644 index 0000000..514067e --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/DBInOneServerCmdStrategy.java @@ -0,0 +1,66 @@ +package io.mycat.mycat2.cmds.strategy; + +import io.mycat.mycat2.cmds.ComChangeUserCmd; +import io.mycat.mycat2.cmds.ComInitDB; +import io.mycat.mycat2.cmds.ComQuitCmd; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.mycat2.cmds.NotSupportCmd; +import io.mycat.mycat2.cmds.sqlCmds.SqlComBeginCmd; +import io.mycat.mycat2.cmds.sqlCmds.SqlComCommitCmd; +import io.mycat.mycat2.cmds.sqlCmds.SqlComRollBackCmd; +import io.mycat.mycat2.cmds.sqlCmds.SqlComStartCmd; +import io.mycat.mycat2.sqlparser.NewSQLContext; +import io.mycat.mysql.packet.MySQLPacket; + +public class DBInOneServerCmdStrategy extends AbstractCmdStrategy{ + + public static final DBInOneServerCmdStrategy INSTANCE = new DBInOneServerCmdStrategy(); + + @Override + protected void initMyCmdHandler() { + MYCOMMANDMAP.put(MySQLPacket.COM_QUIT, ComQuitCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_INIT_DB, ComInitDB.INSTANCE); +// MYCOMMANDMAP.put(MySQLPacket.COM_QUERY, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_FIELD_LIST, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_CREATE_DB, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_DROP_DB, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_REFRESH, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_SHUTDOWN, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STATISTICS, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_PROCESS_INFO, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_CONNECT, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_PROCESS_KILL, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_DEBUG, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_PING, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_TIME, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_DELAYED_INSERT, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_CHANGE_USER, ComChangeUserCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_BINLOG_DUMP, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_TABLE_DUMP, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_CONNECT_OUT, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_REGISTER_SLAVE, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STMT_PREPARE, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STMT_EXECUTE, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STMT_SEND_LONG_DATA, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STMT_CLOSE, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STMT_RESET, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_SET_OPTION, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_STMT_FETCH, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_DAEMON, NotSupportCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_BINLOG_DUMP_GTID, DirectPassthrouhCmd.INSTANCE); + MYCOMMANDMAP.put(MySQLPacket.COM_RESET_CONNECTION, DirectPassthrouhCmd.INSTANCE); + } + + @Override + protected void initMySqlCmdHandler() { + MYSQLCOMMANDMAP.put(NewSQLContext.INSERT_SQL, DirectPassthrouhCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.UPDATE_SQL, DirectPassthrouhCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.COMMIT_SQL, SqlComCommitCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.ROLLBACK_SQL, SqlComRollBackCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.SELECT_SQL, DirectPassthrouhCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.BEGIN_SQL, SqlComBeginCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.START_SQL, SqlComStartCmd.INSTANCE); + MYSQLCOMMANDMAP.put(NewSQLContext.USE_SQL, SqlComStartCmd.INSTANCE); + + } +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/strategy/SQLParseRouteCmdStrategy.java b/source/src/main/java/io/mycat/mycat2/cmds/strategy/SQLParseRouteCmdStrategy.java new file mode 100644 index 0000000..e103a41 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/strategy/SQLParseRouteCmdStrategy.java @@ -0,0 +1,19 @@ +package io.mycat.mycat2.cmds.strategy; + +public class SQLParseRouteCmdStrategy extends AbstractCmdStrategy { + + public static final SQLParseRouteCmdStrategy INSTANCE = new SQLParseRouteCmdStrategy(); + + @Override + protected void initMyCmdHandler() { + // TODO Auto-generated method stub + + } + + @Override + protected void initMySqlCmdHandler() { + // TODO Auto-generated method stub + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java index fd0f275..73f137e 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java @@ -7,14 +7,17 @@ import org.slf4j.LoggerFactory; import io.mycat.mycat2.AbstractMySQLSession; +import io.mycat.mycat2.MyCommand; import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.MycatSession; + import io.mycat.mycat2.SQLCommand; import io.mycat.mycat2.beans.MySQLDataSource; import io.mycat.mycat2.console.SessionKeyEnum; import io.mycat.mycat2.tasks.BackendConCreateTask; import io.mycat.mycat2.tasks.BackendSynchronzationTask; import io.mycat.mysql.packet.ErrorPacket; + import io.mycat.proxy.NIOHandler; import io.mycat.proxy.ProxyBuffer; @@ -50,64 +53,24 @@ private void onFrontRead(final MycatSession session) throws IOException { if (session.curMSQLPackgInf.endPos < buffer.writeIndex) { logger.warn("front contains multi package "); } - if (session.getBackend() == null) { - // todo ,从连接池中获取连接,获取不到后创建新连接, - final MySQLDataSource ds = session.getDatasource(); - logger.info("hang cur sql for backend connection ready "); - BackendConCreateTask authProcessor = new BackendConCreateTask(session.bufPool, session.nioSelector, ds, - session.schema.name); - authProcessor.setCallback((optSession, Sender, exeSucces, retVal) -> { - // 恢复默认的Handler - session.setCurNIOHandler(INSTANCE); - if (exeSucces) { - session.bindBackend(optSession); - if (session.curSQLCommand.procssSQL(session)) { - session.curSQLCommand.clearResouces(false); - } - } else { - ErrorPacket errPkg = (ErrorPacket) retVal; - optSession.responseOKOrError(errPkg); - - } - }); - session.setCurNIOHandler(authProcessor); - return; - - } else { - - // if not synchorndiz d - // syncSessionStateToBackend() - // 如果是 SQL 则调用 sql parser 进行处理 - // SQLComandProcessInf sqlCmd = - // SQLCOMMANDMAP.get(session.curFrontMSQLPackgInf.pkgType); - + + MyCommand myCommand = session.getMyCommand(); + + if(myCommand!=null){ + session.curSQLCommand = myCommand; // 如果当前包需要处理,则交给对应方法处理,否则直接透传 if (session.curSQLCommand.procssSQL(session)) { session.curSQLCommand.clearResouces(false); } + }else{ + logger.error(" current packageTyps is not support,please fix it!!! the packageType is {} ",session.curMSQLPackgInf); } } - private void syncSessionStateToBackend(MycatSession mycatSession, MySQLSession mysqlSession) throws IOException { - BackendSynchronzationTask backendSynchronzationTask = new BackendSynchronzationTask(mysqlSession); - backendSynchronzationTask.setCallback((session, sender, exeSucces, rv) -> { - if (exeSucces) { - // 交给SQLComand去处理 - if (mycatSession.curSQLCommand.procssSQL(mycatSession)) { - mycatSession.curSQLCommand.clearResouces(false); - } - } else { - ErrorPacket errPkg = (ErrorPacket) rv; - session.responseOKOrError(errPkg); - } - }); - mycatSession.setCurNIOHandler(backendSynchronzationTask); - } - private void onBackendRead(MySQLSession session) throws IOException { // 交给SQLComand去处理 - SQLCommand curCmd = session.getMycatSession().curSQLCommand; + MyCommand curCmd = session.getMycatSession().curSQLCommand; if (curCmd.onBackendResponse(session)) { curCmd.clearResouces(false); } diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/MySqlCommand.java b/source/src/main/java/io/mycat/mycat2/sqlparser/MySqlCommand.java new file mode 100644 index 0000000..aed0abe --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/MySqlCommand.java @@ -0,0 +1,162 @@ +package io.mycat.mycat2.sqlparser; + +/* +@enum enum_sql_command +@brief SQL Commands + + SQL Command is resolved during SQL parsing and assigned to the Lex + object= 0; accessible from the THD. + + When a command is added here= 0; be sure it's also added in mysqld.cc + in "struct show_var_st status_vars[]= {" ... + + If the command returns a result set or is not allowed in stored + functions or triggers= 0; please also make sure that + sp_get_flags_for_command (sp_head.cc) returns proper flags for the + added public static int SQLCOM_. +*/ +public interface MySqlCommand { + + public static int SQLCOM_SELECT = 0; + public static int SQLCOM_CREATE_TABLE= 1; + public static int SQLCOM_CREATE_INDEX= 2; + public static int SQLCOM_ALTER_TABLE= 3; + public static int SQLCOM_UPDATE= 4; + public static int SQLCOM_INSERT= 5; + public static int SQLCOM_INSERT_SELECT= 6; + public static int SQLCOM_DELETE= 7; + public static int SQLCOM_TRUNCATE= 8; + public static int SQLCOM_DROP_TABLE= 9; + public static int SQLCOM_DROP_INDEX= 10; + public static int SQLCOM_SHOW_DATABASES= 11; + public static int SQLCOM_SHOW_TABLES= 12; + public static int SQLCOM_SHOW_FIELDS= 13; + public static int SQLCOM_SHOW_KEYS= 14; + public static int SQLCOM_SHOW_VARIABLES= 15; + public static int SQLCOM_SHOW_STATUS= 16; + public static int SQLCOM_SHOW_ENGINE_LOGS= 17; + public static int SQLCOM_SHOW_ENGINE_STATUS= 18; + public static int SQLCOM_SHOW_ENGINE_MUTEX= 19; + public static int SQLCOM_SHOW_PROCESSLIST= 20; + public static int SQLCOM_SHOW_MASTER_STAT= 21; + public static int SQLCOM_SHOW_SLAVE_STAT= 22; + public static int SQLCOM_SHOW_GRANTS= 23; + public static int SQLCOM_SHOW_CREATE= 24; + public static int SQLCOM_SHOW_CHARSETS= 25; + public static int SQLCOM_SHOW_COLLATIONS= 26; + public static int SQLCOM_SHOW_CREATE_DB= 27; + public static int SQLCOM_SHOW_TABLE_STATUS= 28; + public static int SQLCOM_SHOW_TRIGGERS= 29; + public static int SQLCOM_LOAD= 30; + public static int SQLCOM_SET_OPTION= 31; + public static int SQLCOM_LOCK_TABLES= 32; + public static int SQLCOM_UNLOCK_TABLES= 33; + public static int SQLCOM_GRANT= 34; + public static int SQLCOM_CHANGE_DB= 35; + public static int SQLCOM_CREATE_DB= 36; + public static int SQLCOM_DROP_DB= 37; + public static int SQLCOM_ALTER_DB= 38; + public static int SQLCOM_REPAIR= 39; + public static int SQLCOM_REPLACE= 40; + public static int SQLCOM_REPLACE_SELECT= 41; + public static int SQLCOM_CREATE_FUNCTION= 42; + public static int SQLCOM_DROP_FUNCTION= 43; + public static int SQLCOM_REVOKE= 44; + public static int SQLCOM_OPTIMIZE= 45; + public static int SQLCOM_CHECK= 46; + public static int SQLCOM_ASSIGN_TO_KEYCACHE= 47; + public static int SQLCOM_PRELOAD_KEYS= 48; + public static int SQLCOM_FLUSH= 49; + public static int SQLCOM_KILL= 50; + public static int SQLCOM_ANALYZE= 51; + public static int SQLCOM_ROLLBACK= 52; + public static int SQLCOM_ROLLBACK_TO_SAVEPOINT= 53; + public static int SQLCOM_COMMIT= 54; + public static int SQLCOM_SAVEPOINT= 55; + public static int SQLCOM_RELEASE_SAVEPOINT= 56; + public static int SQLCOM_SLAVE_START= 57; + public static int SQLCOM_SLAVE_STOP= 58; + public static int SQLCOM_START_GROUP_REPLICATION= 59; + public static int SQLCOM_STOP_GROUP_REPLICATION= 60; + public static int SQLCOM_BEGIN= 61; + public static int SQLCOM_CHANGE_MASTER= 62; + public static int SQLCOM_CHANGE_REPLICATION_FILTER= 63; + public static int SQLCOM_RENAME_TABLE= 64; + public static int SQLCOM_RESET= 65; + public static int SQLCOM_PURGE= 66; + public static int SQLCOM_PURGE_BEFORE= 67; + public static int SQLCOM_SHOW_BINLOGS= 68; + public static int SQLCOM_SHOW_OPEN_TABLES= 69; + public static int SQLCOM_HA_OPEN= 70; + public static int SQLCOM_HA_CLOSE= 71; + public static int SQLCOM_HA_READ= 72; + public static int SQLCOM_SHOW_SLAVE_HOSTS= 73; + public static int SQLCOM_DELETE_MULTI= 74; + public static int SQLCOM_UPDATE_MULTI= 75; + public static int SQLCOM_SHOW_BINLOG_EVENTS= 76; + public static int SQLCOM_DO= 77; + public static int SQLCOM_SHOW_WARNS= 78; + public static int SQLCOM_EMPTY_QUERY= 79; + public static int SQLCOM_SHOW_ERRORS= 80; + public static int SQLCOM_SHOW_STORAGE_ENGINES= 81; + public static int SQLCOM_SHOW_PRIVILEGES= 82; + public static int SQLCOM_HELP= 83; + public static int SQLCOM_CREATE_USER= 84; + public static int SQLCOM_DROP_USER= 85; + public static int SQLCOM_RENAME_USER= 86; + public static int SQLCOM_REVOKE_ALL= 87; + public static int SQLCOM_CHECKSUM= 88; + public static int SQLCOM_CREATE_PROCEDURE= 89; + public static int SQLCOM_CREATE_SPFUNCTION= 90; + public static int SQLCOM_CALL= 91; + public static int SQLCOM_DROP_PROCEDURE= 92; + public static int SQLCOM_ALTER_PROCEDURE= 93; + public static int SQLCOM_ALTER_FUNCTION= 94; + public static int SQLCOM_SHOW_CREATE_PROC= 95; + public static int SQLCOM_SHOW_CREATE_FUNC= 96; + public static int SQLCOM_SHOW_STATUS_PROC= 97; + public static int SQLCOM_SHOW_STATUS_FUNC= 98; + public static int SQLCOM_PREPARE= 99; + public static int SQLCOM_EXECUTE= 100; + public static int SQLCOM_DEALLOCATE_PREPARE= 101; + public static int SQLCOM_CREATE_VIEW= 102; + public static int SQLCOM_DROP_VIEW= 103; + public static int SQLCOM_CREATE_TRIGGER= 104; + public static int SQLCOM_DROP_TRIGGER= 105; + public static int SQLCOM_XA_START= 106; + public static int SQLCOM_XA_END= 107; + public static int SQLCOM_XA_PREPARE= 108; + public static int SQLCOM_XA_COMMIT= 109; + public static int SQLCOM_XA_ROLLBACK= 110; + public static int SQLCOM_XA_RECOVER= 111; + public static int SQLCOM_SHOW_PROC_CODE= 112; + public static int SQLCOM_SHOW_FUNC_CODE= 113; + public static int SQLCOM_ALTER_TABLESPACE= 114; + public static int SQLCOM_INSTALL_PLUGIN= 115; + public static int SQLCOM_UNINSTALL_PLUGIN= 116; + public static int SQLCOM_BINLOG_BASE64_EVENT= 117; + public static int SQLCOM_SHOW_PLUGINS= 118; + public static int SQLCOM_CREATE_SERVER= 119; + public static int SQLCOM_DROP_SERVER= 120; + public static int SQLCOM_ALTER_SERVER= 121; + public static int SQLCOM_CREATE_EVENT= 122; + public static int SQLCOM_ALTER_EVENT= 123; + public static int SQLCOM_DROP_EVENT= 124; + public static int SQLCOM_SHOW_CREATE_EVENT= 125; + public static int SQLCOM_SHOW_EVENTS= 126; + public static int SQLCOM_SHOW_CREATE_TRIGGER= 127; + public static int SQLCOM_ALTER_DB_UPGRADE= 128; + public static int SQLCOM_SHOW_PROFILE= 129; + public static int SQLCOM_SHOW_PROFILES= 130; + public static int SQLCOM_SIGNAL= 131; + public static int SQLCOM_RESIGNAL= 132; + public static int SQLCOM_SHOW_RELAYLOG_EVENTS= 133; + public static int SQLCOM_GET_DIAGNOSTICS= 134; + public static int SQLCOM_ALTER_USER= 135; + public static int SQLCOM_EXPLAIN_OTHER= 136; + public static int SQLCOM_SHOW_CREATE_USER= 137; + public static int SQLCOM_SHUTDOWN= 138; + public static int SQLCOM_ALTER_INSTANCE= 139; + /* This should be the last !!! */ + public static int SQLCOM_END = 140; +} diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLContext.java b/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLContext.java index b3ef6e9..db76df6 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLContext.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLContext.java @@ -1,9 +1,10 @@ package io.mycat.mycat2.sqlparser; -import io.mycat.mycat2.sqlparser.SQLParseUtils.HashArray; import java.util.Arrays; +import io.mycat.mycat2.sqlparser.SQLParseUtils.HashArray; + /** * Created by Fanfan on 2017/3/21. */ @@ -53,6 +54,8 @@ public class NewSQLContext { public static final byte SET_AUTOCOMMIT_SQL = 34; public static final byte COMMIT_SQL = 35; // public static final byte COMMIT_SQL = 17; + public static final byte SELECT_INTO_SQL = 36; + public static final byte SELECT_FOR_UPDATE_SQL = 37; //ANNOTATION TYPE public static final byte ANNOTATION_BALANCE = 1; @@ -192,7 +195,7 @@ public void setSQLFinished(int curHashPos) { int idx = curSQLIdx<<2; curSQLIdx++; sqlInfoArray[idx++] = (short)preHashArrayPos; - sqlInfoArray[idx++] = (short)((hashArrayRealSQLOffset<<5) | sqlType); + sqlInfoArray[idx++] = (short)((hashArrayRealSQLOffset<<6) | sqlType); sqlInfoArray[idx++] = (short)sqlSize; sqlInfoArray[idx] = (short)((preTableResultPos<<8) | curSQLTblCount); curSQLTblCount = 0; @@ -221,7 +224,7 @@ public int getSQLTblCount(int sqlIdx) { public long getSqlHash() { return this.sqlHash; } public void setSQLType(byte sqlType) { - if (this.sqlType == 0) + if (this.sqlType == 0 || this.sqlType == SELECT_SQL) this.sqlType = sqlType; } @@ -229,14 +232,18 @@ public void setSQLIdx(int sqlIdx) { curSQLIdx = sqlIdx; } - public byte getSQLType() { return (byte)(this.sqlInfoArray[1] & 0x1F); } - public byte getSQLType(int sqlIdx) { return (byte)(this.sqlInfoArray[(sqlIdx<<2)+1] & 0x1F); } + public byte getSQLType() { + byte type = (byte)(this.sqlInfoArray[1] & 0x3F); + return type==0?this.sqlType:type; + } + public byte getSQLType(int sqlIdx) { return (byte)(this.sqlInfoArray[(sqlIdx<<2)+1] & 0x3F); } + public byte getCurSQLType() { return this.sqlType; } public void setRealSQLOffset(int hashArrayPos) { hashArrayRealSQLOffset = hashArrayPos - preHashArrayPos; } public int getRealSQLOffset(int sqlIdx) { - int hashArrayOffset = sqlInfoArray[(sqlIdx<<2)+1] >>> 5; + int hashArrayOffset = sqlInfoArray[(sqlIdx<<2)+1] >>> 6; return hashArray.getPos(hashArrayOffset); } public int getRealSQLSize(int sqlIdx) { diff --git a/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLParser.java b/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLParser.java index 66089d2..7066dd4 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLParser.java +++ b/source/src/main/java/io/mycat/mycat2/sqlparser/NewSQLParser.java @@ -1,13 +1,11 @@ package io.mycat.mycat2.sqlparser; -import io.mycat.mycat2.sqlparser.SQLParseUtils.HashArray; -import io.mycat.mycat2.sqlparser.SQLParseUtils.Tokenizer; -import io.mycat.mycat2.sqlparser.IntTokenHash; - -import java.nio.charset.StandardCharsets; import java.util.stream.IntStream; +import io.mycat.mycat2.sqlparser.SQLParseUtils.HashArray; +import io.mycat.mycat2.sqlparser.SQLParseUtils.Tokenizer; + /** * Created by Kaiz on 2017/2/6. * @@ -96,6 +94,11 @@ boolean isAlias(int pos, int type) { //需要优化成数组判断 return false; else return true; + case IntTokenHash.FROM: + if (hashArray.getHash(pos) == TokenHash.FROM) + return false; + else + return true; default: return true; } @@ -370,6 +373,10 @@ public void firstParse(NewSQLContext context) { } break; case IntTokenHash.INTO: + byte type = context.getSQLType(); + if (context.getCurSQLType() == NewSQLContext.SELECT_SQL) { + context.setSQLType(NewSQLContext.SELECT_INTO_SQL); + } if (hashArray.getHash(pos) == TokenHash.INTO) { pos = pickTableNames(++pos, arrayCount, context); } @@ -537,6 +544,13 @@ public void firstParse(NewSQLContext context) { case IntTokenHash.SQL_DELIMETER: context.setSQLFinished(++pos); break; + case IntTokenHash.FOR: + int next = pos+1; + if (context.getCurSQLType() == NewSQLContext.SELECT_SQL) { + if (hashArray.getIntHash(next) == IntTokenHash.UPDATE && hashArray.getHash(next) == TokenHash.UPDATE) { + context.setSQLType(NewSQLContext.SELECT_FOR_UPDATE_SQL); + } + } default: pos++; break; @@ -583,8 +597,8 @@ public static void main(String[] args) { // byte[] src = "SELECT * FROM table LIMIT 95,-1".getBytes(StandardCharsets.UTF_8); // byte[] src = "/*balance*/select * from tbl_A where id=1;".getBytes(StandardCharsets.UTF_8); // byte[] src = "/*!MyCAT:DB_Type=Master*/select * from tbl_A where id=1;".getBytes(StandardCharsets.UTF_8); - byte[] src = "insert tbl_A(id, val) values(1, 2);\ninsert tbl_B(id, val) values(2, 2);\nSELECT id, val FROM tbl_S where id=19;\n".getBytes(StandardCharsets.UTF_8); - +// byte[] src = "insert tbl_A(id, val) values(1, 2);\ninsert tbl_B(id, val) values(2, 2);\nSELECT id, val FROM tbl_S where id=19;\n".getBytes(StandardCharsets.UTF_8); + byte[] src = "select * into tbl_B from tbl_A;".getBytes(); // long min = 0; // for (int i = 0; i < 50; i++) { // System.out.print("Loop " + i + " : "); diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchemaTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchemaTask.java new file mode 100644 index 0000000..8bc9a79 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchemaTask.java @@ -0,0 +1,53 @@ +package io.mycat.mycat2.tasks; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mysql.packet.CommandPacket; +import io.mycat.mysql.packet.ErrorPacket; +import io.mycat.mysql.packet.MySQLPacket; + +public class BackendSynchemaTask extends AbstractBackendIOTask { + + private static Logger logger = LoggerFactory.getLogger(BackendSynchemaTask.class); + + public BackendSynchemaTask(MySQLSession session) throws IOException{ + super(session,true); + session.proxyBuffer.reset(); + CommandPacket packet = new CommandPacket(); + packet.packetId = 0; + packet.command = MySQLPacket.COM_INIT_DB; + packet.arg = session.getMycatSession().schema.getDefaultDN().getDatabase().getBytes(); + packet.write(session.proxyBuffer); + session.proxyBuffer.flip(); + session.proxyBuffer.readIndex = session.proxyBuffer.writeIndex; + session.writeToChannel(); + } + + @Override + public void onSocketRead(MySQLSession session) throws IOException { + session.proxyBuffer.reset(); + if (!session.readFromChannel()) {// 没有读到数据或者报文不完整 + return; + } + + switch (session.resolveMySQLPackage(session.proxyBuffer, session.curMSQLPackgInf, true)) { + case Full: + if(session.curMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET){ + this.finished(true); + }else if(session.curMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET){ + errPkg = new ErrorPacket(); + errPkg.read(session.proxyBuffer); + logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message); + this.finished(false); + } + break; + default: + return; + } + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java index dc7a316..b80cb32 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java @@ -5,8 +5,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.mycat.mycat2.AbstractMySQLSession.CurrPacketType; import io.mycat.mycat2.MySQLSession; +import io.mycat.mysql.packet.CommandPacket; import io.mycat.mysql.packet.ErrorPacket; import io.mycat.mysql.packet.MySQLPacket; import io.mycat.mysql.packet.QueryPacket; @@ -19,6 +19,8 @@ */ public class BackendSynchronzationTask extends AbstractBackendIOTask { private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); + + private int syncCmdNum = 0; public BackendSynchronzationTask(MySQLSession session) throws IOException { super(session,true); @@ -33,6 +35,7 @@ private void syncState(MySQLSession session) throws IOException { QueryPacket queryPacket = new QueryPacket(); queryPacket.packetId = 0; queryPacket.sql = session.isolation.getCmd() + session.autoCommit.getCmd() + session.isolation.getCmd(); + syncCmdNum = 3; queryPacket.write(proxyBuf); proxyBuf.flip(); proxyBuf.readIndex = proxyBuf.writeIndex; @@ -42,11 +45,25 @@ private void syncState(MySQLSession session) throws IOException { @Override public void onSocketRead(MySQLSession session) throws IOException { session.proxyBuffer.reset(); - if (!session.readFromChannel() - || CurrPacketType.Full != session.resolveMySQLPackage(session.proxyBuffer, session.curMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 + if (!session.readFromChannel()) {// 没有读到数据或者报文不完整 return; } - if (session.curMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { + boolean isAllOK = true; + while (syncCmdNum >0) { + switch (session.resolveMySQLPackage(session.proxyBuffer, session.curMSQLPackgInf, true)) { + case Full: + if(session.curMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET){ + isAllOK = false; + syncCmdNum = 0; + } + break; + default: + return; + } + syncCmdNum --; + } + + if (isAllOK) { this.finished(true); } else { errPkg = new ErrorPacket(); @@ -55,7 +72,4 @@ public void onSocketRead(MySQLSession session) throws IOException { this.finished(false); } } - - - } diff --git a/source/src/main/java/io/mycat/mysql/packet/CommandPacket.java b/source/src/main/java/io/mycat/mysql/packet/CommandPacket.java new file mode 100644 index 0000000..0191c0f --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/packet/CommandPacket.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql.packet; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import io.mycat.proxy.ProxyBuffer; + +/** + * From client to server whenever the client wants the server to do something. + * + *
+ * Bytes         Name
+ * -----         ----
+ * 1             command
+ * n             arg
+ * 
+ * command:      The most common value is 03 COM_QUERY, because
+ *               INSERT UPDATE DELETE SELECT etc. have this code.
+ *               The possible values at time of writing (taken
+ *               from /include/mysql_com.h for enum_server_command) are:
+ * 
+ *               #      Name                Associated client function
+ *               -      ----                --------------------------
+ *               0x00   COM_SLEEP           (none, this is an internal thread state)
+ *               0x01   COM_QUIT            mysql_close
+ *               0x02   COM_INIT_DB         mysql_select_db 
+ *               0x03   COM_QUERY           mysql_real_query
+ *               0x04   COM_FIELD_LIST      mysql_list_fields
+ *               0x05   COM_CREATE_DB       mysql_create_db (deprecated)
+ *               0x06   COM_DROP_DB         mysql_drop_db (deprecated)
+ *               0x07   COM_REFRESH         mysql_refresh
+ *               0x08   COM_SHUTDOWN        mysql_shutdown
+ *               0x09   COM_STATISTICS      mysql_stat
+ *               0x0a   COM_PROCESS_INFO    mysql_list_processes
+ *               0x0b   COM_CONNECT         (none, this is an internal thread state)
+ *               0x0c   COM_PROCESS_KILL    mysql_kill
+ *               0x0d   COM_DEBUG           mysql_dump_debug_info
+ *               0x0e   COM_PING            mysql_ping
+ *               0x0f   COM_TIME            (none, this is an internal thread state)
+ *               0x10   COM_DELAYED_INSERT  (none, this is an internal thread state)
+ *               0x11   COM_CHANGE_USER     mysql_change_user
+ *               0x12   COM_BINLOG_DUMP     sent by the slave IO thread to request a binlog
+ *               0x13   COM_TABLE_DUMP      LOAD TABLE ... FROM MASTER (deprecated)
+ *               0x14   COM_CONNECT_OUT     (none, this is an internal thread state)
+ *               0x15   COM_REGISTER_SLAVE  sent by the slave to register with the master (optional)
+ *               0x16   COM_STMT_PREPARE    mysql_stmt_prepare
+ *               0x17   COM_STMT_EXECUTE    mysql_stmt_execute
+ *               0x18   COM_STMT_SEND_LONG_DATA mysql_stmt_send_long_data
+ *               0x19   COM_STMT_CLOSE      mysql_stmt_close
+ *               0x1a   COM_STMT_RESET      mysql_stmt_reset
+ *               0x1b   COM_SET_OPTION      mysql_set_server_option
+ *               0x1c   COM_STMT_FETCH      mysql_stmt_fetch
+ * 
+ * arg:          The text of the command is just the way the user typed it, there is no processing
+ *               by the client (except removal of the final ';').
+ *               This field is not a null-terminated string; however,
+ *               the size can be calculated from the packet size,
+ *               and the MySQL client appends '\0' when receiving.
+ *               
+ * @see http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol#Command_Packet_.28Overview.29
+ * 
+ * + * @author mycat + */ +public class CommandPacket extends MySQLPacket { + + public byte command; + public byte[] arg; + + @Override + public void write(ProxyBuffer buffer) { + this.write(buffer,calcPacketSize()); + } + + private void write(ProxyBuffer buffer,int pkgSize) { + buffer.writeFixInt(3,calcPacketSize()); + buffer.writeByte(packetId); + buffer.writeByte(command); + buffer.writeBytes(arg); + } + + @Override + public int calcPacketSize() { + return 1 + arg.length; + } + + @Override + protected String getPacketInfo() { + return "MySQL Command Packet"; + } +} \ No newline at end of file diff --git a/source/src/main/java/io/mycat/proxy/AbstractSession.java b/source/src/main/java/io/mycat/proxy/AbstractSession.java index c74e09b..3546b08 100644 --- a/source/src/main/java/io/mycat/proxy/AbstractSession.java +++ b/source/src/main/java/io/mycat/proxy/AbstractSession.java @@ -335,4 +335,12 @@ public void writeFinished() throws IOException { this.getCurNIOHandler().onWriteFinished(this); } + + public boolean isDefaultChannelRead() { + return defaultChannelRead; + } + + public void setDefaultChannelRead(boolean defaultChannelRead) { + this.defaultChannelRead = defaultChannelRead; + } } diff --git a/source/src/main/resources/schema.xml b/source/src/main/resources/schema.xml index e162ca2..0d5f3cb 100644 --- a/source/src/main/resources/schema.xml +++ b/source/src/main/resources/schema.xml @@ -1,9 +1,9 @@ - + - +