Skip to content

Commit

Permalink
Merge pull request #1 from MyCATApache/master
Browse files Browse the repository at this point in the history
merge from mycat2 at 2017.4.18
  • Loading branch information
Gao-Zhiwen authored Apr 18, 2017
2 parents 2f6e64e + f15ff78 commit 8e569fe
Show file tree
Hide file tree
Showing 28 changed files with 2,043 additions and 20 deletions.
19 changes: 19 additions & 0 deletions Mycat-Core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@
<version>2.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-buffer -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.7.Final</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>

<dependency>
<groupId>io.mycat.bigmem</groupId>
<artifactId>mycat-bigmemory</artifactId>
<version>0.0.1-RELEASE</version>
</dependency>
</dependencies>


Expand Down
2 changes: 1 addition & 1 deletion Mycat-Core/src/main/java/io/mycat/MycatCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static void main(String[] args) throws IOException {
connector.start();
NetSystem.getInstance().setConnector(connector);
final SystemConfig sysconfig = new SystemConfig();
sysconfig.setTraceProtocol(true);
sysconfig.setTraceProtocol(false);
NetSystem.getInstance().setNetConfig(sysconfig);
MySQLBackendConnectionFactory bakcMySQLFactory=new MySQLBackendConnectionFactory();
SQLEngineCtx.INSTANCE().setBackendMySQLConFactory(bakcMySQLFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public void handleReadEvent(MySQLBackendConnection con) throws IOException{
{
return;
}

length = MySQLConnection.getPacketLength(dataBuffer, offset);

if(length+offset>limit)
{
LOGGER.info("Not whole package :length "+length+" cur total length "+limit);
Expand All @@ -70,11 +72,13 @@ public void handleReadEvent(MySQLBackendConnection con) throws IOException{
byte packetType = dataBuffer.getByte(offset+MySQLConnection.msyql_packetHeaderSize);

int pkgStartPos=offset;
offset += length;
dataBuffer.setReadingPos(offset);



LOGGER.info("received pkg ,length "+length+" type "+packetType+" cur total length "+limit);
con.getUserCallback().handleResponse(con, dataBuffer, packetType, pkgStartPos, length);
offset += length;
dataBuffer.setReadingPos(offset);

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void handleResponse(MySQLBackendConnection source, ConDataBuffer dataBuff
packet = source.getHandshake();
if (packet == null) {
byteBuff=dataBuffer.getBytes(pkgStartPos, pkgLen);
processHandShakePacket(source, byteBuff);
processHandShakePacket(source, byteBuff);
// 发送认证数据包
source.authenticate();
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package io.mycat.backend.callback;

import io.mycat.backend.BackConnectionCallback;
import io.mycat.backend.MySQLBackendConnection;
import io.mycat.bigmem.sqlcache.BigSQLResult;
import io.mycat.bigmem.sqlcache.IDataLoader;
import io.mycat.bigmem.sqlcache.IRemoveKeyListener;
import io.mycat.front.MySQLFrontConnection;
import io.mycat.mysql.packet.MySQLPacket;
import io.mycat.net2.ConDataBuffer;
import io.mycat.net2.Connection;
import io.mycat.net2.ConnectionException;
import io.mycat.sqlcache.*;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;

import static io.mycat.mysql.MySQLConnection.RESULT_FETCH_STATUS;
import static io.mycat.mysql.MySQLConnection.RESULT_HEADER_STATUS;

/**
* SQL 结果集缓存 HintHandler
*
* @author zagnix
* @create 2017-01-17 14:11
*/

public class SQLResCacheHintHandler implements BackConnectionCallback,HintHandler {
private final BigSQLResult sqlResultCache;
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SQLResCacheHintHandler.class);
private int filedCount = 0;
private IDataLoader<String,BigSQLResult> loader;
private IRemoveKeyListener<String,BigSQLResult> listener;
private HintSQLInfo hintSQLInfo;
public SQLResCacheHintHandler(HintSQLInfo hintSQLInfo, BigSQLResult sqlResultCache){
this.hintSQLInfo = hintSQLInfo;
this.sqlResultCache = sqlResultCache;
this.listener = new HintSQLRemoveKeyListener<>();
this.loader = new HintSQLDataLoader<>();
this.sqlResultCache.reset();
}

@Override
public boolean handle(String sql) {
return false;
}

@Override
public void connectionError(ConnectionException e, MySQLBackendConnection conn) {
LOGGER.error("Hint SQL connectionError");

}

@Override
public void connectionAcquired(MySQLBackendConnection conn) {
LOGGER.error("Hint SQL connectionAcquired");
((MySQLFrontConnection)conn.getAttachement()).executePendingTask();
}

@Override
public void handleResponse(MySQLBackendConnection conn, ConDataBuffer dataBuffer, byte packetType, int pkgStartPos, int pkgLen) throws IOException {
//LOGGER.error("Hint SQL handleResponse");
MySQLFrontConnection frontCon= (MySQLFrontConnection) conn.getAttachement();
/**
* 直接透传给前端client即可,
* 缓存框架需要在这里将收到的数据写到缓存中
*/
ByteBuffer packetBuffer = dataBuffer.getBytes(pkgStartPos,pkgLen);
frontCon.getWriteDataBuffer().putBytes(packetBuffer);
frontCon.enableWrite(false);
int status = conn.getState();
switch (status) {
case Connection.STATE_IDLE:
if (packetType == MySQLPacket.COM_QUERY) {
conn.setState(Connection.STATE_IDLE);
} else if (packetType == MySQLPacket.COM_QUIT) {
conn.setState(Connection.STATE_IDLE);
}else if (sqlResultCache != null){
/**step 1: Result Set Header Packet 列的数目*/
LOGGER.error("step 1 =====> DB status: Result Filed Count ");
byte [] headers = new byte[packetBuffer.remaining()];
packetBuffer.get(headers);
sqlResultCache.put(headers);
conn.setState(RESULT_HEADER_STATUS);
}
break;
case RESULT_HEADER_STATUS:
if (packetType == MySQLPacket.EOF_PACKET) {

LOGGER.error("step 3 : EOF Packet,marker—end of field packets");
byte [] fieldDescsEof = new byte[packetBuffer.remaining()];
packetBuffer.get(fieldDescsEof);
sqlResultCache.put(fieldDescsEof);

conn.setState(RESULT_FETCH_STATUS);
} else if (packetType == MySQLPacket.ERROR_PACKET) {
conn.setState(Connection.STATE_IDLE);
} else if (sqlResultCache != null) {
/**Step 2: Field Packets,列的描述信息*/
LOGGER.error("Step 2: Field Packets");
byte [] fieldDescs = new byte[packetBuffer.remaining()];
packetBuffer.get(fieldDescs);
sqlResultCache.put(fieldDescs);
}
break;
case RESULT_FETCH_STATUS:
if (packetType == MySQLPacket.EOF_PACKET) {
/**Step 5:EOF Packet: marker---end of row data packets*/
LOGGER.error("Step 5:EOF Packet: marker---end of row data packets");
byte [] rowDataEof = new byte[packetBuffer.remaining()];
packetBuffer.get(rowDataEof);
sqlResultCache.put(rowDataEof);
/**将sqlResultCache插入CacheService中*/
SQLResultsCacheService.getInstance().
cacheSQLResult(hintSQLInfo,sqlResultCache,loader,listener);

conn.setState(Connection.STATE_IDLE);
} else if (packetType == MySQLPacket.ERROR_PACKET) {
conn.setState(Connection.STATE_IDLE);
} else if (sqlResultCache != null){
/**Step 4: Row Data Packet{多个}: row contents 一行对应一个row packet*/
LOGGER.error("Step 4: Row Data Packet");
byte [] rowDatas = new byte[packetBuffer.remaining()];
packetBuffer.get(rowDatas);
sqlResultCache.put(rowDatas);
}
break;
default:
LOGGER.warn("Error connected status.", status);
break;
}

if (conn.getState()== Connection.STATE_IDLE)
{
LOGGER.debug("realese bakend connedtion "+conn);
conn.release();
}
}

@Override
public void connectionClose(MySQLBackendConnection conn, String reason) {
LOGGER.error("Hint SQL connectionClose");

}

@Override
public void handlerError(Exception e, MySQLBackendConnection conn) {
LOGGER.error("Hint SQL handlerError");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;

import io.mycat.sqlcache.HintSQLInfo;
import io.mycat.sqlcache.HintSQLParser;
import io.mycat.sqlcache.SQLResultsCacheService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,13 +100,33 @@ private void initDb(final MySQLFrontConnection frontCon, final ByteBuffer byteBu
private void doSQLCommand(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, byte packageType,
int pkgStartPos, int pkgLen) throws IOException {
{
// 取得语句
//取得语句
ByteBuffer byteBuff = dataBuffer.getBytes(pkgStartPos, pkgLen);
MySQLMessage mm = new MySQLMessage(byteBuff);
mm.position(5);
String sql = null;
sql = mm.readString(frontCon.getCharset());
LOGGER.debug("received sql "+sql);
String sql = mm.readString(frontCon.getCharset());
/**
* parser hit sql
* 需要改写sql语句,去掉前面的注释即可
* 首先判断是否
*/
HintSQLInfo hintSQL = HintSQLParser.parserHintSQL(sql);
if (hintSQL != null && hintSQL.isCache()){
/**
* 0.判断是select语句
* 1.改写sql语句,去掉前面的注释
* 2.发送sql语句到后端执行。做好backend connection的Handler设置工作
* 3.解析sql的结果集,决定是否异步缓存,一部分直接发送给您前端。
* 4.如果当前sql的结果集已经缓存了。直接从本地缓存中拉去结果集即可
*/

if (!SQLResultsCacheService.getInstance().processHintSQL(frontCon,hintSQL,mm)){
frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "cache no implementation");
}

return;
}

// 执行查询
SQLInfo sqlInf=new SQLInfo();
int rs = ServerParse.parse(sql,sqlInf);
Expand All @@ -127,7 +150,9 @@ private void doSQLCommand(MySQLFrontConnection frontCon, ConDataBuffer dataBuffe
frontCon.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No database selected");
return;
}

executeSelectSQL(frontCon, dataBuffer, packageType,pkgStartPos, pkgLen,sql);

// SelectHandler.handle(sql, con, rs >>> 8);
break;
case ServerParse.START:
Expand Down Expand Up @@ -182,7 +207,7 @@ private void doSQLCommand(MySQLFrontConnection frontCon, ConDataBuffer dataBuffe
}
public void passThroughSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuffer, int pkgStartPos, int pkgLen)
throws IOException {
// 直接透传(默认)
// 直接透传(默认)获取连接池
MySQLBackendConnection existCon = null;
UserSession session=frontCon.getSession();
ArrayList<MySQLBackendConnection> allBackCons = session.getBackendCons();
Expand All @@ -198,24 +223,47 @@ public void passThroughSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuff
LOGGER.error("No schema selected");
return ;
}

final DNBean dnBean = frontCon.getMycatSchema().getDefaultDN();
final String replica = dnBean.getMysqlReplica();
final SQLEngineCtx ctx = SQLEngineCtx.INSTANCE();
LOGGER.debug("select a replica: {}", replica);
final MySQLReplicatSet repSet = ctx.getMySQLReplicatSet(replica);
final MySQLDataSource datas = repSet.getCurWriteDH();

/**
* 如果该sql对应后端db,没有连接池,则创建连接池部分
*/
final MySQLBackendConnection newCon =
datas.getConnection(frontCon.getReactor(), dnBean.getDatabase(), true, null);

/**很关键的设置前端front 与 backend session*/
newCon.setAttachement(frontCon);

/**设置后端连接池结果集处理handler*/
newCon.setUserCallback(directTransCallback);

/**
* 执行sql语句
*/
frontCon.addTodoTask(() -> {
newCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos, pkgLen));
/**
* 将数据写到后端连接池中
*/
newCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos,pkgLen));
newCon.enableWrite(false);
/**
* 新建立的连接放到连接池中
*/
session.addBackCon(newCon);
});
} else {
/**
* 否则直接写到后端即可
*/
existCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos, pkgLen));
existCon.enableWrite(false);
existCon.setUserCallback(directTransCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ private void success(MySQLFrontConnection con, AuthPacket auth) throws IOExcepti
}
LOGGER.debug("charset = {}, charsetIndex = {}", charset, charsetIndex);
con.setCharset(charsetIndex, charset);

//认证成功后,修改changeCmdHandler,由CheckUserLoginResponseCallback改用
// AbstractSchemaSQLCommandHandler处理
if (!con.setFrontSchema(auth.database)) {
final String errmsg = "No Mycat Schema defined: " + auth.database;
LOGGER.debug(errmsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ public void handleReadEvent(final MySQLFrontConnection cnxn) throws IOException{
// 解析报文类型
final byte packetType = buffer.getByte(offset + MySQLConnection.msyql_packetHeaderSize);
final int pkgStartPos = offset;
offset += length;
buffer.setReadingPos(offset);



// trace-protocol-packet
// @author little-pan
// @since 2016-09-29
Expand All @@ -100,6 +101,11 @@ public void handleReadEvent(final MySQLFrontConnection cnxn) throws IOException{
cnxn.getId(), buffer.hashCode(), pkgStartPos, length, packetType, limit, hexs);
}
cnxn.getSession().getCurCmdHandler().processCmd(cnxn, buffer, packetType, pkgStartPos, length);

offset += length;
buffer.setReadingPos(offset);


}
}

Expand Down
Loading

0 comments on commit 8e569fe

Please sign in to comment.