Skip to content

Commit

Permalink
Mycat 2.0集群和Leader选举初步版本完成,还有BUg和未完善功能需要进行,期待帅才
Browse files Browse the repository at this point in the history
  • Loading branch information
Leader us committed Aug 13, 2017
1 parent 5fc08b7 commit 701347e
Show file tree
Hide file tree
Showing 26 changed files with 680 additions and 80 deletions.
19 changes: 17 additions & 2 deletions source/src/main/java/io/mycat/mycat2/MySQLStudySessionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.mycat2.net.MySQLProcalDebugHandler;
import io.mycat.proxy.BufferPool;
import io.mycat.proxy.Session;
import io.mycat.proxy.SessionManager;

/**
Expand All @@ -21,10 +24,11 @@
*/
public class MySQLStudySessionManager implements SessionManager<MySQLSession> {
protected static Logger logger = LoggerFactory.getLogger(MySQLStudySessionManager.class);
private ArrayList<MySQLSession> allSessions = new ArrayList<MySQLSession>();

@Override
public MySQLSession createSession(BufferPool bufPool, Selector nioSelector, SocketChannel frontChannel,boolean isAcceptCon)
throws IOException {
public MySQLSession createSession(BufferPool bufPool, Selector nioSelector, SocketChannel frontChannel,
boolean isAcceptCon) throws IOException {

logger.info("MySQL client connected ." + frontChannel);

Expand All @@ -41,8 +45,19 @@ public MySQLSession createSession(BufferPool bufPool, Selector nioSelector, Sock
session.backendChannel.connect(serverAddress);
SelectionKey selectKey = session.backendChannel.register(session.nioSelector, SelectionKey.OP_CONNECT, session);
session.backendKey = selectKey;
session.setSessionManager(this);
allSessions.add(session);
logger.info("Connecting to server " + serverIP + ":" + serverPort);
return session;
}

@Override
public Collection<MySQLSession> getAllSessions() {
return this.allSessions;
}

public void removeSession(Session session) {
this.allSessions.remove(session);

}
}
5 changes: 4 additions & 1 deletion source/src/main/java/io/mycat/mycat2/MycatCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package io.mycat.mycat2;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.List;

Expand Down Expand Up @@ -59,7 +60,9 @@ public static void main(String[] args) throws IOException {
NameableExecutor businessExecutor = ExecutorUtil.create("BusinessExecutor", 10);
// 定时器Executor,用来执行定时任务
NamebleScheduledExecutor timerExecutor = ExecutorUtil.createSheduledExecute("Timer", 5);
MycatConfig conf = MycatConfig.loadFromProperties(ConfigLoader.class.getResourceAsStream("/mycat.conf"));
InputStream instream=ClassLoader.getSystemResourceAsStream("mycat.conf");
instream=(instream==null)?ConfigLoader.class.getResourceAsStream("/mycat.conf"):instream;
MycatConfig conf = MycatConfig.loadFromProperties(instream);
ProxyRuntime runtime = ProxyRuntime.INSTANCE;
runtime.setProxyConfig(conf);
// runtime.setNioProxyHandler(new DefaultMySQLProxyHandler());
Expand Down
17 changes: 17 additions & 0 deletions source/src/main/java/io/mycat/mycat2/MycatSessionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.mycat2.net.DefaultSQLHandler;
import io.mycat.mycat2.net.MySQLClientAuthHandler;
import io.mycat.proxy.BufferPool;
import io.mycat.proxy.Session;
import io.mycat.proxy.SessionManager;

/**
Expand All @@ -20,6 +23,7 @@
*/
public class MycatSessionManager implements SessionManager<MySQLSession> {
protected static Logger logger = LoggerFactory.getLogger(MycatSessionManager.class);
private ArrayList<MySQLSession> allSessions = new ArrayList<MySQLSession>();

@Override
public MySQLSession createSession(BufferPool bufPool, Selector nioSelector, SocketChannel frontChannel,
Expand All @@ -34,7 +38,20 @@ public MySQLSession createSession(BufferPool bufPool, Selector nioSelector, Sock
session.setCurProxyHandler(MySQLClientAuthHandler.INSTANCE);
// 向MySQL Client发送认证报文
session.sendAuthPackge();
session.setSessionManager(this);
allSessions.add(session);
return session;
}

@Override
public Collection<MySQLSession> getAllSessions() {
return this.allSessions;
}


public void removeSession(Session session) {
this.allSessions.remove(session);

}

}
18 changes: 15 additions & 3 deletions source/src/main/java/io/mycat/proxy/AbstractSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
*
*/
public class AbstractSession implements Session {

protected static Logger logger = LoggerFactory.getLogger(AbstractSession.class);
private int sessionId;
private SessionManager<? extends Session> sessionManager;
public BufferPool bufPool;
public Selector nioSelector;
// 前端连接
Expand Down Expand Up @@ -76,15 +78,13 @@ public void close(String message) {
logger.info("close session " + this.sessionInfo() + " for reason " + message);
closeSocket(frontChannel);
bufPool.recycleBuf(frontBuffer.getBuffer());

this.sessionManager.removeSession(this);
} else {
logger.warn("session already closed " + this.sessionInfo());
}

}



/**
* 从SocketChannel中读取数据并写入到内部Buffer中,writeState里记录了写入的位置指针
* 第一次调用之前需要确保Buffer状态为Write状态,并指定要写入的位置,
Expand Down Expand Up @@ -201,4 +201,16 @@ public int getSessionId() {
return sessionId;
}



public void setSessionManager(SessionManager<? extends Session> sessionManager) {
this.sessionManager = sessionManager;
}

@SuppressWarnings("unchecked")
@Override
public <T extends Session> SessionManager<T> getMySessionManager() {
return (SessionManager<T>) this.sessionManager;
}

}
19 changes: 19 additions & 0 deletions source/src/main/java/io/mycat/proxy/ConnectIOHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.mycat.proxy;

import java.io.IOException;
/**
* 处理 程序中主动发起连接(Connect)的IO连接事件
* @author wuzhihui
*
* @param <T>
*/
public interface ConnectIOHandler<T extends Session> {
/**
* 处理Connect事件
* @param userSession
* @param success 是否连接成功
* @param msg
* @throws IOException
*/
void onConnect(T userSession, boolean success, String msg) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,6 +18,7 @@
*/
public class DefaultTCPProxySessionManager implements SessionManager<UserProxySession>{
protected static Logger logger = LoggerFactory.getLogger(DefaultTCPProxySessionManager.class);
private ArrayList<UserProxySession> allSessions=new ArrayList<UserProxySession>();
@Override
public UserProxySession createSession(BufferPool bufPool, Selector nioSelector, SocketChannel frontChannel,boolean isAcceptedCon) throws IOException {

Expand All @@ -33,9 +36,17 @@ public UserProxySession createSession(BufferPool bufPool, Selector nioSelector,
SelectionKey selectKey = session.backendChannel.register(session.nioSelector, SelectionKey.OP_CONNECT, session);
session.backendKey = selectKey;
logger.info("Connecting to backend server " + serverIP + ":" + serverPort);
session.setSessionManager(this);
allSessions.add(session);
return session;
}


@Override
public Collection<UserProxySession> getAllSessions() {
return this.allSessions;
}
public void removeSession(Session session) {
this.allSessions.remove(session);

}
}
34 changes: 26 additions & 8 deletions source/src/main/java/io/mycat/proxy/NIOAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*
*/
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
Expand All @@ -30,7 +31,7 @@ public NIOAcceptor(BufferPool bufferPool) throws IOException {
selector = Selector.open();
}

@SuppressWarnings("rawtypes")
@SuppressWarnings({ "rawtypes", "unchecked" })
public void run() {
int nioIndex = 0;

Expand All @@ -50,9 +51,11 @@ public void run() {
}

while (true) {
Set<SelectionKey> keys = null;
SelectionKey curKey=null;
try {
selector.select(1000);
final Set<SelectionKey> keys = selector.selectedKeys();
keys = selector.selectedKeys();
if (keys.isEmpty()) {
continue;
}
Expand All @@ -61,6 +64,7 @@ public void run() {
if (!key.isValid()) {
continue;
}
curKey=key;
int readdyOps = key.readyOps();
if ((readdyOps & SelectionKey.OP_ACCEPT) != 0) {

Expand All @@ -70,7 +74,7 @@ public void run() {
logger.info("new Client connected: " + socketChannel);
boolean clusterServer = (boolean) key.attachment();
if (clusterServer) {
Session session = env.getAdminSessionManager().createSession(this.bufferPool, selector,
AdminSession session = env.getAdminSessionManager().createSession(this.bufferPool, selector,
socketChannel, true);
} else {
// 找到一个可用的NIO Reactor Thread,交付托管
Expand All @@ -83,10 +87,20 @@ public void run() {
}
} else if ((readdyOps & SelectionKey.OP_CONNECT) != 0) {
// only from cluster server socket
SocketChannel socketChannel = (SocketChannel) key.channel();
if (socketChannel.finishConnect()) {
Session session = env.getAdminSessionManager().createSession(this.bufferPool, selector,
socketChannel, false);
SocketChannel curChannel = (SocketChannel) key.channel();
String clusterNodeId = (String) key.attachment();
AdminSession session = env.getAdminSessionManager().createSession(this.bufferPool, selector,
curChannel, false);
session.setNodeId(clusterNodeId);
ConnectIOHandler<AdminSession> connectIOHandler = (ConnectIOHandler<AdminSession>) env
.getAdminSessionIOHandler();
try {
if (curChannel.finishConnect()) {
connectIOHandler.onConnect(session, true, null);
}

} catch (ConnectException ex) {
connectIOHandler.onConnect(session, false, ex.getMessage());
}

} else if ((readdyOps & SelectionKey.OP_READ) != 0) {
Expand All @@ -97,9 +111,13 @@ public void run() {
env.getAdminSessionIOHandler().onFrontWrite((AdminSession) key.attachment());
}
}
keys.clear();
} catch (IOException e) {
curKey.cancel();
logger.warn("caugh error ", e);
} finally {
if (keys != null) {
keys.clear();
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions source/src/main/java/io/mycat/proxy/ProxyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class ProxyConfig {
// 逗号分隔的所有集群节点的ID:IP:Port信息,如
// leader-1:127.0.0.1:9066,leader-2:127.0.0.1:9068,leader-3:127.0.0.1:9069
private String allNodeInfs;
// 当前节点所用的配置文件的版本(节点自身的基础配置信息,如绑定的IP地址,端口以及其他只针对自身节点的配置变动不影响配置文件版本,设计的时候,分开两种配置文件)
private String myConfigFileVersion="1.0";

public static MycatConfig loadFromProperties(InputStream in) throws IOException {
MycatConfig conf = new MycatConfig();
Expand Down Expand Up @@ -97,4 +99,12 @@ public void setClusterPort(int clusterPort) {
this.clusterPort = clusterPort;
}

public String getMyConfigFileVersion() {
return myConfigFileVersion;
}

public void setMyConfigFileVersion(String myConfigFileVersion) {
this.myConfigFileVersion = myConfigFileVersion;
}

}
8 changes: 3 additions & 5 deletions source/src/main/java/io/mycat/proxy/ProxyRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

import io.mycat.proxy.man.AdminCommandResovler;
import io.mycat.proxy.man.AdminSession;
import io.mycat.proxy.man.DefaultAdminSessionHandler;
import io.mycat.proxy.man.DefaultAdminSessionManager;
import io.mycat.proxy.man.MyCluster;

@SuppressWarnings("rawtypes")
Expand All @@ -28,7 +26,7 @@ public class ProxyRuntime {
private ProxyReactorThread[] reactorThreads;
private SessionManager sessionManager;
// 用于管理端口的Session会话管理
private SessionManager adminSessionManager;
private SessionManager<AdminSession> adminSessionManager;
// 用于管理端口的Session IO Handler
private FrontIOHandler<AdminSession> adminSessionIOHandler;
private AdminCommandResovler adminCmdResolver;
Expand Down Expand Up @@ -165,11 +163,11 @@ public void setTraceProtocol(boolean traceProtocol) {
this.traceProtocol = traceProtocol;
}

public SessionManager getAdminSessionManager() {
public SessionManager<AdminSession> getAdminSessionManager() {
return adminSessionManager;
}

public void setAdminSessionManager(SessionManager adminSessionManager) {
public void setAdminSessionManager(SessionManager<AdminSession> adminSessionManager) {
this.adminSessionManager = adminSessionManager;
}

Expand Down
1 change: 1 addition & 0 deletions source/src/main/java/io/mycat/proxy/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
*/
public interface Session {
boolean isClosed();
public <T extends Session> SessionManager<T> getMySessionManager();
}
5 changes: 5 additions & 0 deletions source/src/main/java/io/mycat/proxy/SessionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Collection;

/**
* 用来处理新的连接请求并创建Session
Expand All @@ -23,5 +24,9 @@ public interface SessionManager<T extends Session> {
*/
public T createSession(BufferPool bufPool, Selector nioSelector, SocketChannel channel,boolean isAcceptedCon) throws IOException;

public Collection<T> getAllSessions();

public void removeSession(Session session);


}
2 changes: 1 addition & 1 deletion source/src/main/java/io/mycat/proxy/man/AdminCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
*/
public interface AdminCommand {

void handlerPkg(AdminSession session) throws IOException;
void handlerPkg(AdminSession session,byte cmdType) throws IOException;
}
Loading

0 comments on commit 701347e

Please sign in to comment.