Skip to content

Commit

Permalink
Merge pull request MyCATApache#6 from ynfeng/state_sync
Browse files Browse the repository at this point in the history
前端状态同步到后端基础完成
  • Loading branch information
apachemycat authored Aug 14, 2017
2 parents 701347e + f2d4803 commit cd809e4
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 44 deletions.
6 changes: 6 additions & 0 deletions source/src/main/java/io/mycat/mycat2/MySQLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.mycat.mycat2.beans.MySQLPackageInf;
import io.mycat.mycat2.beans.SchemaBean;
import io.mycat.mysql.Capabilities;
import io.mycat.mysql.Isolation;
import io.mycat.mysql.packet.HandshakePacket;
import io.mycat.mysql.packet.MySQLPacket;
import io.mycat.proxy.BufferOptState;
Expand Down Expand Up @@ -46,6 +47,11 @@ public class MySQLSession extends UserProxySession {
*/
public SchemaBean schema;

/**
* 事务隔离级别
*/
public Isolation isolation = Isolation.REPEATED_READ;

/**
* 认证中的seed报文数据
*/
Expand Down
25 changes: 20 additions & 5 deletions source/src/main/java/io/mycat/mycat2/net/DefaultSQLHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

import io.mycat.mycat2.tasks.BackendSynchronzationTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,11 +68,8 @@ public void onFrontRead(final MySQLSession session) throws IOException {
BackendConCreateTask authProcessor = new BackendConCreateTask(session, null);
authProcessor.setCallback((optSession, Sender, exeSucces, retVal) -> {
if (exeSucces) {
optSession.setCurProxyHandler(DefaultSQLHandler.INSTANCE);
// 交给SQLComand去处理
if (session.curSQLCommand.procssSQL(session, false)) {
session.curSQLCommand.clearResouces(false);
}
//认证成功后开始同步会话状态至后端
syncSessionStateToBackend(session);
} else {
ErrorPacket errPkg = (ErrorPacket) retVal;
optSession.responseOKOrError(errPkg, true);
Expand All @@ -90,6 +88,23 @@ public void onFrontRead(final MySQLSession session) throws IOException {

}

private void syncSessionStateToBackend(MySQLSession mySQLSession) throws IOException{
BackendSynchronzationTask backendSynchronzationTask = new BackendSynchronzationTask(mySQLSession);
backendSynchronzationTask.setCallback((session, sender, exeSucces, rv) -> {
if (exeSucces) {
session.setCurProxyHandler(DefaultSQLHandler.INSTANCE);
// 交给SQLComand去处理
if (session.curSQLCommand.procssSQL(session, false)) {
session.curSQLCommand.clearResouces(false);
}
} else {
ErrorPacket errPkg = (ErrorPacket) rv;
session.responseOKOrError(errPkg, true);
}
});
mySQLSession.setCurProxyHandler(backendSynchronzationTask);
}

public void onBackendRead(MySQLSession session) throws IOException {
boolean readed = session.readSocket(false);
if (readed == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void onBackendRead(MySQLSession session) throws IOException {
welcomePkgReceived = true;
} else {
// 认证结果报文收到
if (session.curBackendMSQLPackgInf.pkgType == 0x00) {
if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) {
logger.info("backend authed suceess ");
this.finished(true);
} else if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.mycat.mycat2.tasks;

import io.mycat.mycat2.MySQLSession;
import io.mycat.mysql.packet.ErrorPacket;
import io.mycat.mysql.packet.MySQLPacket;
import io.mycat.mysql.packet.QueryPacket;
import io.mycat.proxy.ProxyBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Created by ynfeng on 2017/8/13.
* <p>
* 同步状态至后端数据库,包括:字符集,事务,隔离级别等
*/
public class BackendSynchronzationTask implements BackendIOTask<MySQLSession> {
private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class);
private AsynTaskCallBack callBack;
private MySQLSession session;
private static QueryPacket[] CMDS = new QueryPacket[3];
private int processCmd = 0;
private ErrorPacket errPkg;

static {
QueryPacket isolationSynCmd = new QueryPacket();
isolationSynCmd.packetId = 0;

QueryPacket charsetSynCmd = new QueryPacket();
charsetSynCmd.packetId = 0;

QueryPacket transactionSynCmd = new QueryPacket();
transactionSynCmd.packetId = 0;

CMDS[0] = isolationSynCmd;
CMDS[1] = charsetSynCmd;
CMDS[2] = transactionSynCmd;
}

public BackendSynchronzationTask(MySQLSession session) throws IOException {
this.processCmd = 0;
this.session = session;
syncState(session);
}

private void syncState(MySQLSession session) throws IOException {
logger.info("synchronzation state to bakcend.session=" + session.toString());
ProxyBuffer frontBuffer = session.frontBuffer;
frontBuffer.reset();
//TODO 字符集映射和前端事务设置还未完成,这里只用隔离级别模拟实现(其实都是SET xxx效果一样),回头补充
switch (processCmd) {
case 1:
case 2:
case 0:
CMDS[processCmd].sql = session.isolation.getCmd();
CMDS[processCmd].write(frontBuffer);

frontBuffer.flip();
session.writeToChannel(frontBuffer, session.backendChannel);
processCmd++;
break;
default:
this.finished(true);
break;
}

}

@Override
public void onBackendConnect(MySQLSession userSession, boolean success, String msg) throws IOException {

}

@Override
public void onBackendRead(MySQLSession session) throws IOException {
session.frontBuffer.reset();
if (!session.readSocket(false)
|| !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整
return;
}
if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) {
syncState(session);
} else {
//TODO 同步失败如何处理??是否应该关闭此连接??
errPkg = new ErrorPacket();
errPkg.read(session.frontBuffer);
logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message);
this.finished(false);
}
}

private void finished(boolean success) throws IOException {
callBack.finished(session, this, success, this.errPkg);
}

@Override
public void onBackendWrite(MySQLSession session) throws IOException {

}

@Override
public void onBackendSocketClosed(MySQLSession userSession, boolean normal) {

}

@Override
public void setCallback(AsynTaskCallBack callBack) {
this.callBack = callBack;
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,52 @@
/*
* Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software;Designed and Developed mainly by many Chinese
* opensource volunteers. you can redistribute it and/or modify it under the
* terms of the GNU General Public License version 2 only, as published by the
* Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Any questions about this component can be directed to it's project Web address
* https://code.google.com/p/opencloudb/.
*
*/
package io.mycat.mysql;

/**
* 事务隔离级别定义
*
* @author mycat
*/
public interface Isolations {

public static final int READ_UNCOMMITTED = 1;
public static final int READ_COMMITTED = 2;
public static final int REPEATED_READ = 3;
public static final int SERIALIZABLE = 4;

}
/*
* Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software;Designed and Developed mainly by many Chinese
* opensource volunteers. you can redistribute it and/or modify it under the
* terms of the GNU General Public License version 2 only, as published by the
* Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Any questions about this component can be directed to it's project Web address
* https://code.google.com/p/opencloudb/.
*
*/
package io.mycat.mysql;


/**
* 事务隔离级别定义
*
* @author mycat, ynfeng
*/
public enum Isolation {
READ_UNCOMMITTED("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;"),
READ_COMMITTED("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;"),
REPEATED_READ("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;"),
SERIALIZABLE("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;");

private String cmd;

Isolation(String cmd) {
this.cmd = cmd;
}

public String getCmd() {
return cmd;
}

public void setCmd(String cmd) {
this.cmd = cmd;
}
}

29 changes: 29 additions & 0 deletions source/src/main/java/io/mycat/mysql/packet/QueryPacket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.mycat.mysql.packet;

import io.mycat.proxy.ProxyBuffer;

/**
* Created by ynfeng on 2017/8/13.
*/
public class QueryPacket extends MySQLPacket {
public String sql;
private byte pkgType = MySQLPacket.COM_QUERY;

@Override
public int calcPacketSize() {
return sql.length() + 1;
}

@Override
protected String getPacketInfo() {
return "A COM_QUERY packet:" + sql;
}

@Override
public void write(ProxyBuffer buffer) {
buffer.writeFixInt(3, calcPacketSize());
buffer.writeByte(packetId);
buffer.writeByte(pkgType);
buffer.writeFixString(sql);
}
}

0 comments on commit cd809e4

Please sign in to comment.