Skip to content

Commit

Permalink
Merge pull request MyCATApache#6 from MyCATApache/master
Browse files Browse the repository at this point in the history
从主干合并
  • Loading branch information
yanjunli authored Aug 29, 2017
2 parents 21be699 + 323812c commit 2b8a707
Show file tree
Hide file tree
Showing 24 changed files with 1,904 additions and 667 deletions.
57 changes: 52 additions & 5 deletions source/src/main/java/io/mycat/mycat2/AbstractMySQLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public enum CurrPacketType {

public AbstractMySQLSession(BufferPool bufferPool, Selector selector, SocketChannel channel) throws IOException {
this(bufferPool, selector, channel, SelectionKey.OP_READ);


}

Expand Down Expand Up @@ -93,7 +92,8 @@ public void responseOKOrError(MySQLPacket pkg) throws IOException {

/**
* 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE
* 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded=true
* 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded
* =true
*
* @param proxyBuf
* @return
Expand Down Expand Up @@ -143,7 +143,56 @@ public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf
// 解包获取包的数据长度
int pkgLength = ParseUtil.getPacketLength(buffer, offset);
// 解析报文类型
final byte packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize);
// final byte packetType = buffer.get(offset +
// ParseUtil.msyql_packetHeaderSize);

// 解析报文类型
int packetType = -1;

// 在包长度小于7时,作为resultSet的首包
if (pkgLength <= 7) {
int index = offset + ParseUtil.msyql_packetHeaderSize;

long len = proxyBuf.getInt(index, 1) & 0xff;
// 如果长度小于251,则取默认的长度
if (len < 251) {
packetType = (int) len;
} else if (len == 0xfc) {
// 进行验证是否位数足够,作为短包处理
if (!ParseUtil.validateResultHeader(offset, limit, 2)) {
// 收到短半包
logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset,
limit);
return CurrPacketType.ShortHalfPacket;
}
packetType = (int) proxyBuf.getInt(index + 1, 2);
} else if (len == 0xfd) {

// 进行验证是否位数足够,作为短包处理
if (!ParseUtil.validateResultHeader(offset, limit, 3)) {
// 收到短半包
logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset,
limit);
return CurrPacketType.ShortHalfPacket;
}

packetType = (int) proxyBuf.getInt(index + 1, 3);
} else {
// 进行验证是否位数足够,作为短包处理
if (!ParseUtil.validateResultHeader(offset, limit, 8)) {
// 收到短半包
logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset,
limit);
return CurrPacketType.ShortHalfPacket;
}

packetType = (int) proxyBuf.getInt(index + 1, 8);
}
} else {
// 解析报文类型
packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize);
}

// 包的类型
curPackInf.pkgType = packetType;
// 设置包的长度
Expand Down Expand Up @@ -179,6 +228,4 @@ public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf
}
}



}
12 changes: 8 additions & 4 deletions source/src/main/java/io/mycat/mycat2/MySQLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

import io.mycat.mycat2.cmds.pkgread.PkgFirstReader;
import io.mycat.mycat2.cmds.pkgread.PkgProcess;
import io.mycat.proxy.BufferPool;

/**
Expand All @@ -25,6 +27,11 @@ public class MySQLSession extends AbstractMySQLSession {
*/
private MycatSession mycatSession;

/**
* 当前结束检查处理的状态,默认为首包检查读取
*/
public PkgProcess currPkgProc = PkgFirstReader.INSTANCE;

public MySQLSession(BufferPool bufferPool, Selector selector, SocketChannel channel) throws IOException {
super(bufferPool, selector, channel, SelectionKey.OP_CONNECT);
}
Expand Down Expand Up @@ -58,8 +65,7 @@ public void setMycatSession(MycatSession mycatSession) {

@Override
protected void doTakeReadOwner() {
this.getMycatSession().takeOwner(SelectionKey.OP_READ);

this.getMycatSession().takeOwner(SelectionKey.OP_READ);
}

public String getCurrBackendCachedName() {
Expand All @@ -70,6 +76,4 @@ public void setCurrBackendCachedName(String currBackendCachedName) {
this.currBackendCachedName = currBackendCachedName;
}



}
95 changes: 64 additions & 31 deletions source/src/main/java/io/mycat/mycat2/beans/MySQLDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,21 @@
package io.mycat.mycat2.beans;

import java.io.IOException;

import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

import io.mycat.mycat2.MySQLSession;
import io.mycat.mycat2.tasks.BackendCharsetReadTask;
import io.mycat.mycat2.tasks.BackendConCreateTask;
import io.mycat.proxy.BufferPool;
import io.mycat.proxy.ProxyReactorThread;
import io.mycat.proxy.ProxyRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,28 +50,54 @@
public class MySQLDataSource {
public static final Logger LOGGER = LoggerFactory.getLogger(MySQLDataSource.class);
private final String name;
private final int size;
private final AtomicInteger activeSize;
private final MySQLBean mysqlBean;
private final ConMap conMap = new ConMap();
private long heartbeatRecoveryTime;
private boolean slaveNode;

/** collationIndex 和 charsetName 的映射 */
public final Map<Integer, String> INDEX_TO_CHARSET = new HashMap<>();
/** charsetName 到 默认collationIndex 的映射 */
public final Map<String, Integer> CHARSET_TO_INDEX = new HashMap<>();

private TransferQueue<MySQLSession> sessionQueue = new LinkedTransferQueue<>();

public MySQLSession getSession() {
MySQLSession session = sessionQueue.poll();
if (session != null) {
return session;
}

//todo 新建连接
return null;
}

public MySQLDataSource(MySQLBean config, boolean islaveNode) {
this.size = config.getMaxCon();
this.activeSize = new AtomicInteger(0);
this.mysqlBean = config;
this.name = config.getHostName();
this.slaveNode = islaveNode;

}

// public MySQLBackendConnection createNewConnection(String reactor, String
// schema, MySQLFrontConnection mySQLFrontConnection, BackConnectionCallback
// userCallback) throws IOException {
// MySQLBackendConnection con = factory.make(this, reactor, schema,
// mySQLFrontConnection, userCallback);
// this.conMap.getSchemaConQueue(schema).getAutoCommitCons().add(con);
// return con;
// }
public void createMySQLSession(BufferPool bufferPool, Selector selector) {
try {
BackendConCreateTask authProcessor = new BackendConCreateTask(bufferPool, selector, this, null);
authProcessor.setCallback((optSession, sender, exeSucces, retVal) -> {
if (exeSucces) {
int curSize = activeSize.incrementAndGet();
if (curSize == 1) {
BackendCharsetReadTask backendCharsetReadTask = new BackendCharsetReadTask(optSession, this);
optSession.setCurNIOHandler(backendCharsetReadTask);
backendCharsetReadTask.readCharset();
}
sessionQueue.add(optSession);
}
});
} catch (IOException e) {
LOGGER.warn("error to create mysqlSession for datasource: {}", this.name);
}
}

public boolean isSlaveNode() {
return slaveNode;
Expand All @@ -67,8 +107,8 @@ public void setSlaveNode(boolean slaveNode) {
this.slaveNode = slaveNode;
}

public int getSize() {
return size;
public AtomicInteger getActiveSize() {
return activeSize;
}

public String getName() {
Expand All @@ -78,22 +118,15 @@ public String getName() {
public boolean initSource() {
int initSize = this.mysqlBean.getMinCon();
LOGGER.info("init backend myqsl source ,create connections total " + initSize + " for " + mysqlBean);
// Set<String> reactos =
// SQLEngineCtx.INSTANCE().getReactorMap().keySet();
// Iterator<String> itor = reactos.iterator();
// for (int i = 0; i < initSize; i++) {
// try {
// String actorName = null;
// if (!itor.hasNext()) {
// itor = reactos.iterator();
// }
// actorName = itor.next();
// this.createNewConnectionOnReactor(actorName,
// this.mysqlBean.getDefaultSchema(), null, null);
// } catch (Exception e) {
// LOGGER.warn(" init connection error.", e);
// }
// }

ProxyRuntime runtime = ProxyRuntime.INSTANCE;
ProxyReactorThread[] reactorThreads = runtime.getReactorThreads();
int reactorSize = runtime.getNioReactorThreads();
for (int i = 0; i < initSize; i++) {
ProxyReactorThread reactorThread = reactorThreads[i % reactorSize];
reactorThread.addNIOJob(() -> createMySQLSession(reactorThread.getBufPool(), reactorThread.getSelector()));
}

LOGGER.info("init source finished");
return true;
}
Expand Down Expand Up @@ -156,7 +189,7 @@ public MySQLBean getConfig() {
@Override
public String toString() {
final StringBuilder sbuf = new StringBuilder("MySQLDataSource[").append("name=").append(name).append(',')
.append("size=").append(size).append(',').append("heartbeatRecoveryTime=").append(heartbeatRecoveryTime)
.append("activeSize=").append(activeSize).append(',').append("heartbeatRecoveryTime=").append(heartbeatRecoveryTime)
.append(',').append("slaveNode=").append(slaveNode).append(',').append("mysqlBean=").append(mysqlBean)
.append(']');
return (sbuf.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
*/
public class MySQLPackageInf {
public byte pkgType;
public int pkgType;
public boolean crossBuffer;
public int startPos;
public int endPos;
Expand Down
Loading

0 comments on commit 2b8a707

Please sign in to comment.