From e41a0c1e87d8b1485b84536321e2dc85e7f372b6 Mon Sep 17 00:00:00 2001 From: fengyannan Date: Sun, 13 Aug 2017 20:09:33 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=89=8D=E5=90=8E=E7=AB=AF=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=9F=BA=E7=A1=80=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/mycat/mycat2/MySQLSession.java | 6 + .../mycat/mycat2/net/DefaultSQLHandler.java | 27 ++++- .../mycat2/tasks/BackendConCreateTask.java | 2 +- .../tasks/BackendSynchronzationTask.java | 111 ++++++++++++++++++ .../mysql/{Isolations.java => Isolation.java} | 90 ++++++++------ .../io/mycat/mysql/packet/QueryPacket.java | 29 +++++ 6 files changed, 220 insertions(+), 45 deletions(-) create mode 100644 source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java rename source/src/main/java/io/mycat/mysql/{Isolations.java => Isolation.java} (66%) create mode 100644 source/src/main/java/io/mycat/mysql/packet/QueryPacket.java diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index aa1aaeb..6e0ec1a 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -9,6 +9,7 @@ import io.mycat.mycat2.beans.MySQLPackageInf; import io.mycat.mycat2.beans.SchemaBean; import io.mycat.mysql.Capabilities; +import io.mycat.mysql.Isolation; import io.mycat.mysql.packet.HandshakePacket; import io.mycat.mysql.packet.MySQLPacket; import io.mycat.proxy.BufferOptState; @@ -46,6 +47,11 @@ public class MySQLSession extends UserProxySession { */ public SchemaBean schema; + /** + * 事务隔离级别 + */ + public Isolation isolation = Isolation.REPEATED_READ; + /** * 认证中的seed报文数据 */ diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java index 160a52e..4af21ce 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java @@ -5,6 +5,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import io.mycat.mycat2.tasks.BackendSynchronzationTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,7 @@ public void onFrontRead(final MySQLSession session) throws IOException { // final MySQLDataSource datas = repSet.getCurWriteDH(); logger.info("hang cur sql for backend connection ready "); - String serverIP = "localhost"; + String serverIP = "10.211.55.5"; int serverPort = 3306; InetSocketAddress serverAddress = new InetSocketAddress(serverIP, serverPort); session.backendChannel = SocketChannel.open(); @@ -67,11 +68,8 @@ public void onFrontRead(final MySQLSession session) throws IOException { BackendConCreateTask authProcessor = new BackendConCreateTask(session, null); authProcessor.setCallback((optSession, Sender, exeSucces, retVal) -> { if (exeSucces) { - optSession.setCurProxyHandler(DefaultSQLHandler.INSTANCE); - // 交给SQLComand去处理 - if (session.curSQLCommand.procssSQL(session, false)) { - session.curSQLCommand.clearResouces(false); - } + //认证成功后开始同步会话状态至后端 + syncSessionStateToBackend(session); } else { ErrorPacket errPkg = (ErrorPacket) retVal; optSession.responseOKOrError(errPkg, true); @@ -90,6 +88,23 @@ public void onFrontRead(final MySQLSession session) throws IOException { } + private void syncSessionStateToBackend(MySQLSession mySQLSession) throws IOException{ + BackendSynchronzationTask backendSynchronzationTask = new BackendSynchronzationTask(mySQLSession); + backendSynchronzationTask.setCallback((session, sender, exeSucces, rv) -> { + if (exeSucces) { + session.setCurProxyHandler(DefaultSQLHandler.INSTANCE); + // 交给SQLComand去处理 + if (session.curSQLCommand.procssSQL(session, false)) { + session.curSQLCommand.clearResouces(false); + } + } else { + ErrorPacket errPkg = (ErrorPacket) rv; + session.responseOKOrError(errPkg, true); + } + }); + mySQLSession.setCurProxyHandler(backendSynchronzationTask); + } + public void onBackendRead(MySQLSession session) throws IOException { boolean readed = session.readSocket(false); if (readed == false) { diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java index 3fabdd3..2784694 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java @@ -103,7 +103,7 @@ public void onBackendRead(MySQLSession session) throws IOException { welcomePkgReceived = true; } else { // 认证结果报文收到 - if (session.curBackendMSQLPackgInf.pkgType == 0x00) { + if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { logger.info("backend authed suceess "); this.finished(true); } else if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET) { diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java new file mode 100644 index 0000000..91ca299 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java @@ -0,0 +1,111 @@ +package io.mycat.mycat2.tasks; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mysql.packet.ErrorPacket; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.mysql.packet.QueryPacket; +import io.mycat.proxy.ProxyBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Created by ynfeng on 2017/8/13. + *

+ * 同步状态至后端数据库,包括:字符集,事务,隔离级别等 + */ +public class BackendSynchronzationTask implements BackendIOTask { + private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); + private AsynTaskCallBack callBack; + private MySQLSession session; + private static QueryPacket[] CMDS = new QueryPacket[3]; + private int processCmd = 0; + private ErrorPacket errPkg; + + static { + QueryPacket isolationSynCmd = new QueryPacket(); + isolationSynCmd.packetId = 0; + + QueryPacket charsetSynCmd = new QueryPacket(); + charsetSynCmd.packetId = 0; + + QueryPacket transactionSynCmd = new QueryPacket(); + transactionSynCmd.packetId = 0; + + CMDS[0] = isolationSynCmd; + CMDS[1] = charsetSynCmd; + CMDS[2] = transactionSynCmd; + } + + public BackendSynchronzationTask(MySQLSession session) throws IOException { + this.processCmd = 0; + this.session = session; + syncState(session); + } + + private void syncState(MySQLSession session) throws IOException { + logger.info("synchronzation state to bakcend.session=" + session.toString()); + ProxyBuffer frontBuffer = session.frontBuffer; + frontBuffer.reset(); + //TODO 字符集映射和前端事务设置还未完成,这里只用隔离级别模拟实现(其实都是SET xxx效果一样),回头补充 + switch (processCmd) { + case 1: + case 2: + case 0: + CMDS[processCmd].sql = session.isolation.getCmd(); + CMDS[processCmd].write(frontBuffer); + + frontBuffer.flip(); + session.writeToChannel(frontBuffer, session.backendChannel); + processCmd++; + break; + default: + this.finished(true); + break; + } + + } + + @Override + public void onBackendConnect(MySQLSession userSession, boolean success, String msg) throws IOException { + + } + + @Override + public void onBackendRead(MySQLSession session) throws IOException { + session.frontBuffer.reset(); + if (!session.readSocket(false) + || !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 + return; + } + if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { + syncState(session); + } else { + //TODO 同步失败如何处理??是否应该关闭此连接?? + errPkg = new ErrorPacket(); + errPkg.read(session.frontBuffer); + logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message); + this.finished(false); + } + } + + private void finished(boolean success) throws IOException { + callBack.finished(session, this, success, this.errPkg); + } + + @Override + public void onBackendWrite(MySQLSession session) throws IOException { + + } + + @Override + public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { + + } + + @Override + public void setCallback(AsynTaskCallBack callBack) { + this.callBack = callBack; + } +} diff --git a/source/src/main/java/io/mycat/mysql/Isolations.java b/source/src/main/java/io/mycat/mysql/Isolation.java similarity index 66% rename from source/src/main/java/io/mycat/mysql/Isolations.java rename to source/src/main/java/io/mycat/mysql/Isolation.java index 1226445..fd0b357 100644 --- a/source/src/main/java/io/mycat/mysql/Isolations.java +++ b/source/src/main/java/io/mycat/mysql/Isolation.java @@ -1,38 +1,52 @@ -/* - * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://code.google.com/p/opencloudb/. - * - */ -package io.mycat.mysql; - -/** - * 事务隔离级别定义 - * - * @author mycat - */ -public interface Isolations { - - public static final int READ_UNCOMMITTED = 1; - public static final int READ_COMMITTED = 2; - public static final int REPEATED_READ = 3; - public static final int SERIALIZABLE = 4; - -} \ No newline at end of file +/* + * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the + * terms of the GNU General Public License version 2 only, as published by the + * Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Any questions about this component can be directed to it's project Web address + * https://code.google.com/p/opencloudb/. + * + */ +package io.mycat.mysql; + + +/** + * 事务隔离级别定义 + * + * @author mycat, ynfeng + */ +public enum Isolation { + READ_UNCOMMITTED("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;"), + READ_COMMITTED("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;"), + REPEATED_READ("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;"), + SERIALIZABLE("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;"); + + private String cmd; + + Isolation(String cmd) { + this.cmd = cmd; + } + + public String getCmd() { + return cmd; + } + + public void setCmd(String cmd) { + this.cmd = cmd; + } +} + diff --git a/source/src/main/java/io/mycat/mysql/packet/QueryPacket.java b/source/src/main/java/io/mycat/mysql/packet/QueryPacket.java new file mode 100644 index 0000000..4919e73 --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/packet/QueryPacket.java @@ -0,0 +1,29 @@ +package io.mycat.mysql.packet; + +import io.mycat.proxy.ProxyBuffer; + +/** + * Created by ynfeng on 2017/8/13. + */ +public class QueryPacket extends MySQLPacket { + public String sql; + private byte pkgType = MySQLPacket.COM_QUERY; + + @Override + public int calcPacketSize() { + return sql.length() + 1; + } + + @Override + protected String getPacketInfo() { + return "A COM_QUERY packet:" + sql; + } + + @Override + public void write(ProxyBuffer buffer) { + buffer.writeFixInt(3, calcPacketSize()); + buffer.writeByte(packetId); + buffer.writeByte(pkgType); + buffer.writeFixString(sql); + } +} From f2d48036a237936785fb8420dc854f3167eed5e4 Mon Sep 17 00:00:00 2001 From: fengyannan Date: Sun, 13 Aug 2017 20:13:02 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E5=90=8E=E7=AB=AF=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?UTF-8?q?ip=E5=9C=B0=E5=9D=80=E4=B8=BAlocalhost?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java index 4af21ce..1678bdd 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java @@ -54,7 +54,7 @@ public void onFrontRead(final MySQLSession session) throws IOException { // final MySQLDataSource datas = repSet.getCurWriteDH(); logger.info("hang cur sql for backend connection ready "); - String serverIP = "10.211.55.5"; + String serverIP = "localhost"; int serverPort = 3306; InetSocketAddress serverAddress = new InetSocketAddress(serverIP, serverPort); session.backendChannel = SocketChannel.open();