Skip to content

Commit

Permalink
Merge pull request MyCATApache#19 from MyCATApache/master
Browse files Browse the repository at this point in the history
合拼
  • Loading branch information
junwen12221 authored Oct 10, 2017
2 parents be1982a + 19701b6 commit e5b04d3
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 107 deletions.
28 changes: 18 additions & 10 deletions source/src/main/java/io/mycat/mycat2/beans/MySQLRepBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,19 @@ private boolean checkIndex(int newIndex){
}

public int getNextIndex(){
MySQLMetaBean metaBean = metaBeans.stream().skip(writeIndex + 1).findFirst().orElse(null);
MySQLMetaBean metaBean = metaBeans.stream()
.skip(writeIndex + 1)
.filter(f->f.getHeartbeat().getStatus()==DBHeartbeat.OK_STATUS)
.findFirst()
.orElse(null);
if (metaBean!=null){
return metaBeans.indexOf(metaBean);
} else {
metaBean = metaBeans.stream().limit(writeIndex).findFirst().orElse(null);
metaBean = metaBeans.stream()
.limit(writeIndex)
.filter(f->f.getHeartbeat().getStatus()==DBHeartbeat.OK_STATUS)
.findFirst()
.orElse(null);
if(metaBean!=null){
return metaBeans.indexOf(metaBean);
}
Expand All @@ -128,20 +136,20 @@ public int getNextIndex(){
public CheckResult switchDataSourcecheck(int newIndex){
String errmsg = null;
CheckResult result = new CheckResult(true);

if(RepTypeEnum.SINGLE_NODE.equals(getReplicaBean().getRepType())){
errmsg = " repl type is "+RepTypeEnum.SINGLE_NODE.name() + ", switchDatasource is not supported";
logger.warn(errmsg);
if (replicaBean.getSwitchType() == ReplicaBean.RepSwitchTypeEnum.NOT_SWITCH) {
errmsg = "not switch datasource ,for switchType is "+ReplicaBean.RepSwitchTypeEnum.NOT_SWITCH+",repl name is "+ replicaBean.getName();
result.setSuccess(false);
result.setMsg(errmsg);
}else if(RepTypeEnum.SINGLE_NODE.equals(getReplicaBean().getRepType())){
errmsg = " repl ["+replicaBean.getName()+"] type is "+RepTypeEnum.SINGLE_NODE.name() + ", switchDatasource is not supported";
result.setSuccess(false);
result.setMsg(errmsg);
}else if(!checkIndex(newIndex)){
errmsg = "not switch datasource ,writeIndex out of range. writeIndex is " + newIndex;
logger.warn(errmsg);
errmsg = "not switch datasource ,there is no datasource available in the "+replicaBean.getName()+" Replication group. ";
result.setSuccess(false);
result.setMsg(errmsg);
}else if(newIndex==writeIndex){
errmsg = "not switch datasource ,writeIndex == newIndex .newIndex is " + newIndex;
logger.warn(errmsg);
errmsg = "not switch datasource ,writeIndex == newIndex .newIndex is " + newIndex +" in the "+replicaBean.getName()+" Replication group. ";
result.setSuccess(false);
result.setMsg(errmsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public class HeartbeatBean {
* 默认空闲超时时间
*/
private long idleTimeout = 30 * 60 * 1000L;
private long processorCheckPeriod = 1 * 1000L;;
private long minSwitchtimeInterval = 30 * 60 * 1000L;;
private long processorCheckPeriod = 1 * 1000L;
private long minSwitchtimeInterval = 30 * 60 * 1000L;

public int getTimerExecutor() {
return timerExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,6 @@ private void setError(MySQLDetector detector, String msg) {
} else {
MycatConfig conf = ProxyRuntime.INSTANCE.getConfig();
if (source.isSlaveNode()) {
logger.error(msg);

// 集群模式下通知从节点更新状态
ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER);
if (clusterConfig.getCluster().isEnable()) {
Expand All @@ -245,7 +243,7 @@ private void setError(MySQLDetector detector, String msg) {
}
} else {
// 写节点 尝试多次次失败后, 需要通知集群
logger.warn("heartbeat to backend session error, notify the cluster if needed");
logger.warn("heartbeat to backend session error, notify the cluster");

HeartbeatConfig heartbeatConfig = conf.getConfig(ConfigEnum.HEARTBEAT);
long curTime = System.currentTimeMillis();
Expand Down Expand Up @@ -280,65 +278,4 @@ private void setTimeout(MySQLDetector detector) {
this.isChecking.set(false);
status = DBHeartbeat.TIMEOUT_STATUS;
}

// /**
// * switch data source
// */
// private void switchSourceIfNeed(String reason) {
// int switchType = source.getHostConfig().getSwitchType();
// if (switchType == DataHostConfig.NOT_SWITCH_DS) {
// if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("not switch datasource ,for switchType is "
// + DataHostConfig.NOT_SWITCH_DS);
// return;
// }
// return;
// }
// PhysicalDBPool pool = this.source.getDbPool();
// int curDatasourceHB = pool.getSource().getHeartbeat().getStatus();
// // read node can't switch ,only write node can switch
// if (pool.getWriteType() == PhysicalDBPool.WRITE_ONLYONE_NODE
// && !source.isReadNode()
// && curDatasourceHB != DBHeartbeat.OK_STATUS
// && pool.getSources().length > 1) {
// synchronized (pool) {
// // try to see if need switch datasource
// curDatasourceHB = pool.getSource().getHeartbeat().getStatus();
// if (curDatasourceHB != DBHeartbeat.INIT_STATUS && curDatasourceHB != DBHeartbeat.OK_STATUS) {
// int curIndex = pool.getActivedIndex();
// int nextId = pool.next(curIndex);
// PhysicalDatasource[] allWriteNodes = pool.getSources();
// while (true) {
// if (nextId == curIndex) {
// break;
// }
// PhysicalDatasource theSource = allWriteNodes[nextId];
// DBHeartbeat theSourceHB = theSource.getHeartbeat();
// int theSourceHBStatus = theSourceHB.getStatus();
// if (theSourceHBStatus == DBHeartbeat.OK_STATUS) {
// if (switchType == DataHostConfig.SYN_STATUS_SWITCH_DS) {
// if (Integer.valueOf(0).equals( theSourceHB.getSlaveBehindMaster())) {
// LOGGER.info("try to switch datasource ,slave is synchronized to master " + theSource.getConfig());
// pool.switchSource(nextId, true, reason);
// break;
// } else {
// LOGGER.warn("ignored datasource ,slave is not synchronized to master , slave behind master :"
// + theSourceHB.getSlaveBehindMaster()
// + " " + theSource.getConfig());
// }
// } else {
// // normal switch
// LOGGER.info("try to switch datasource ,not checked slave synchronize status " + theSource.getConfig());
// pool.switchSource(nextId, true, reason);
// break;
// }
//
// }
// nextId = pool.next(nextId);
// }
//
// }
// }
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public void setActionName(String actionName){
abstract public MySQLCommand getMySQLCommand();

/**
* 默认的重复检查, 命令链会根据该方法,进行去重复操作。
* 如果 需要有多个实例,可以返回不同的值。
* 默认的重复检查, 命令链会根据该方法,进行去重复操作。
* 如果 需要有多个实例,可以返回不同的值。
* @return
*/
public long currentKey() {
// 结果集缓存. 在责任链中 只允许出现一次,这里返回相同的值
// 结果集缓存. 在责任链中 只允许出现一次,这里返回相同的值
return this.getClass().getSimpleName().hashCode();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public void readCharset() throws IOException {
queryPacket.write(proxyBuf);
proxyBuf.flip();
proxyBuf.readIndex = proxyBuf.writeIndex;
this.mySQLSession.writeToChannel();
try {
this.mySQLSession.writeToChannel();
} catch (IOException e) {
onRsFinish(this.mySQLSession,false,e.getMessage());
}
}

@Override
Expand Down Expand Up @@ -111,12 +115,19 @@ void onRsRow(MySQLSession session) {
}

@Override
void onRsFinish(MySQLSession session,boolean success) throws IOException {
if(callBack!=null){
callBack.finished(session, null, success, null);
void onRsFinish(MySQLSession session,boolean success,String msg) throws IOException {
if(success){
if(callBack!=null){
callBack.finished(session, null, success, null);
}
//结果集完成
logger.debug("session[{}] load charset finish",session);
}else{
if(ResultSetState.RS_STATUS_READ_ERROR == curRSState||
ResultSetState.RS_STATUS_WRITE_ERROR == curRSState){
session.close(false, msg);
}
}
//结果集完成
logger.debug("session[{}] load charset finish",session);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onSocketRead(MySQLSession session) throws IOException {
} else {
// 认证结果报文收到
if (session.curMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) {
logger.info("backend authed suceess ");
logger.debug("backend authed suceess ");
this.finished(true);
} else if (session.curMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET) {
errPkg = new ErrorPacket();
Expand All @@ -115,7 +115,7 @@ public void onSocketRead(MySQLSession session) throws IOException {
public void onConnect(SelectionKey theKey, MySQLSession userSession, boolean success, String msg)
throws IOException {
String logInfo = success ? " backend connect success " : "backend connect failed " + msg;
logger.info(logInfo + " channel " + userSession.channel);
logger.debug("{} sessionId = {}, {}:{}",logInfo, userSession.getSessionId(),userSession.getMySQLMetaBean().getDsMetaBean().getIp(), userSession.getMySQLMetaBean().getDsMetaBean().getPort());
if (success) {
InetSocketAddress serverRemoteAddr = (InetSocketAddress) userSession.channel.getRemoteAddress();
InetSocketAddress serverLocalAddr = (InetSocketAddress) userSession.channel.getLocalAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import java.util.List;
import java.util.Map;

import io.mycat.mycat2.beans.conf.ReplicaBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.mycat2.MySQLSession;
import io.mycat.mycat2.beans.MySQLMetaBean;
import io.mycat.mycat2.beans.MySQLPackageInf;
import io.mycat.mycat2.beans.MySQLRepBean;
import io.mycat.mycat2.beans.conf.ReplicaBean;
import io.mycat.mycat2.beans.heartbeat.DBHeartbeat;
import io.mycat.mycat2.beans.heartbeat.MySQLDetector;
import io.mycat.mycat2.beans.heartbeat.MySQLHeartbeat;
Expand Down Expand Up @@ -126,7 +126,7 @@ void onRsRow(MySQLSession session) {
}

@Override
void onRsFinish(MySQLSession session,boolean success) {
void onRsFinish(MySQLSession session,boolean success,String msg) {
if(success){
//归还连接
MycatReactorThread reactor = (MycatReactorThread)Thread.currentThread();
Expand All @@ -148,11 +148,13 @@ void onRsFinish(MySQLSession session,boolean success) {
break;
}
detector.setLasstReveivedQryTime(System.currentTimeMillis());
// detector.getHeartbeat().getRecorder().set((detector.getLasstReveivedQryTime() - detector.getLastSendQryTime()));
}else{
logger.error("found MySQL master/slave Replication err !!! {}:{}" , metaBean.getDsMetaBean().getIp(),metaBean.getDsMetaBean().getPort());
detector.getHeartbeat().setDbSynStatus(DBHeartbeat.DB_SYN_ERROR);
detector.getHeartbeat().setResult(DBHeartbeat.ERROR_STATUS, detector, null);
if(ResultSetState.RS_STATUS_READ_ERROR == curRSState||
ResultSetState.RS_STATUS_WRITE_ERROR == curRSState){
detector.getHeartbeat().setDbSynStatus(DBHeartbeat.DB_SYN_ERROR);
detector.getHeartbeat().setResult(DBHeartbeat.ERROR_STATUS, detector, null);
}
session.close(false, msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@ public void onSocketRead(T session) throws IOException {
if (!session.readFromChannel()){
return;
}
}catch(ClosedChannelException e){
e.printStackTrace();
session.close(false, e.getMessage());
return;
}catch (IOException e) {
e.printStackTrace();
onRsFinish(session,false);
}catch(IOException e){
curRSState = ResultSetState.RS_STATUS_READ_ERROR;
onRsFinish(session,false,e.getMessage());
return;
}

Expand All @@ -58,7 +54,7 @@ public void onSocketRead(T session) throws IOException {
case RS_STATUS_ROW:
if (curMQLPackgInf.pkgType == MySQLPacket.EOF_PACKET) {
curRSState = ResultSetState.RS_STATUS_FINISH;
onRsFinish(session,true);
onRsFinish(session,true,null);
} else {
onRsRow(session);
}
Expand All @@ -76,7 +72,7 @@ public void onSocketRead(T session) throws IOException {

abstract void onRsRow(T session);

abstract void onRsFinish(T session,boolean success) throws IOException;
abstract void onRsFinish(T session,boolean success,String msg) throws IOException;

public enum ResultSetState {
/**
Expand All @@ -94,6 +90,16 @@ public enum ResultSetState {
/**
* 结果集完成
*/
RS_STATUS_FINISH;
RS_STATUS_FINISH,

/**
* 结果集网络读取错误
*/
RS_STATUS_READ_ERROR,

/**
* 结果集网络写入错误
*/
RS_STATUS_WRITE_ERROR;
}
}
5 changes: 2 additions & 3 deletions source/src/main/java/io/mycat/proxy/AbstractSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public SocketChannel channel() {
}

public String sessionInfo() {
return " [" + this.addr + ']';
return " [ sessionId = "+ sessionId+" ," + this.addr + ']';
}

public boolean isChannelOpen() {
Expand All @@ -282,7 +282,6 @@ public boolean isClosed() {
public void close(boolean normal, String hint) {
if (!this.isClosed()) {
this.closed = true;
logger.info("close session " + this.sessionInfo() + " for reason " + hint);
closeSocket(channel, normal, hint);
if (!referedBuffer) {
this.bufPool.recycleBuf(proxyBuffer.getBuffer());
Expand Down Expand Up @@ -313,7 +312,7 @@ protected void closeSocket(SocketChannel channel, boolean normal, String msg) {
if (channel == null) {
return;
}
String logInf = (normal) ? " normal close " : "abnormal close " + channel;
String logInf = (normal) ? " normal close " : "abnormal close " ;
logger.info(logInf + sessionInfo() + " reason:" + msg);
try {
channel.close();
Expand Down
25 changes: 24 additions & 1 deletion 需求
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
格式,需求编号,需求内容,需求目的,提出人的QQ


Mycat 2.0 HBT技术解决 多表关联案例
1.Mycat 2.0 HBT技术解决 多表关联案例

select a.id,a.price,b.username where a.id in (111,222,333,444) and b.id=a.id

Expand All @@ -28,3 +28,26 @@ routeList.add(engine.createRouteCondition(Engine.AND,Engine.IN,"aa,bb,cc"));
routeList.add(engine.createRouteCondition(Engine.OR,Engine.GREATE,"aa,bb,cc"));
String[] partions=engine.calcSQLRoute("tableA",routeList);
上述了路由信息可能需要人工指定,放入 SqlMeta, new(sql,"a",routeList)





2.Mycat Exchanger

负责把实现JDBC方式连接Oracle,SQL Server,以及Redis。MongoDB等后端存储的,模拟成标准的MySQL Server,对接Mycat。这样 Mycat就统一支持各种数据库,并且核心还稳定简单。






3.管理命令

增加管理命令,mycat fetch config,返回三列?的表格,文件名称,修改时间,内容(文本),为了Web管理界面调用,展现配置文件内容

以及对应的 mycat update config xxx.conf <文件内容> ,即上传和更新生效某个配置文件,返回一行的Table,optcode ?错误码,以及说明。。。

包括返回集群状态的表格,哪几个节点,谁是Leader。以及是否有Proxy,Proxy的状态,节点上下线,这些都以后要命令行实现,同时给Web使用

所有管理命令都可以被Web调用

0 comments on commit e5b04d3

Please sign in to comment.