From b4d546fb71744560dc7fa2b8650e62bd971bbab4 Mon Sep 17 00:00:00 2001 From: gaulzhw Date: Sat, 30 Sep 2017 18:00:10 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=9C=80=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- "\351\234\200\346\261\202" | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git "a/\351\234\200\346\261\202" "b/\351\234\200\346\261\202" index 7fe2318..942b5ea 100644 --- "a/\351\234\200\346\261\202" +++ "b/\351\234\200\346\261\202" @@ -2,7 +2,7 @@ 格式,需求编号,需求内容,需求目的,提出人的QQ -Mycat 2.0 HBT技术解决 多表关联案例 +1.Mycat 2.0 HBT技术解决 多表关联案例 select a.id,a.price,b.username where a.id in (111,222,333,444) and b.id=a.id @@ -28,3 +28,26 @@ routeList.add(engine.createRouteCondition(Engine.AND,Engine.IN,"aa,bb,cc")); routeList.add(engine.createRouteCondition(Engine.OR,Engine.GREATE,"aa,bb,cc")); String[] partions=engine.calcSQLRoute("tableA",routeList); 上述了路由信息可能需要人工指定,放入 SqlMeta, new(sql,"a",routeList) + + + + + +2.Mycat Exchanger + +负责把实现JDBC方式连接Oracle,SQL Server,以及Redis。MongoDB等后端存储的,模拟成标准的MySQL Server,对接Mycat。这样 Mycat就统一支持各种数据库,并且核心还稳定简单。 + + + + + + +3.管理命令 + +增加管理命令,mycat fetch config,返回三列?的表格,文件名称,修改时间,内容(文本),为了Web管理界面调用,展现配置文件内容 + +以及对应的 mycat update config xxx.conf <文件内容> ,即上传和更新生效某个配置文件,返回一行的Table,optcode ?错误码,以及说明。。。 + +包括返回集群状态的表格,哪几个节点,谁是Leader。以及是否有Proxy,Proxy的状态,节点上下线,这些都以后要命令行实现,同时给Web使用 + +所有管理命令都可以被Web调用 \ No newline at end of file From 6ae89d00571d191121bb0c6112f5f7ca13df4c57 Mon Sep 17 00:00:00 2001 From: gaulzhw Date: Sun, 1 Oct 2017 11:15:15 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E7=9A=84;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/io/mycat/mycat2/beans/conf/HeartbeatBean.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/src/main/java/io/mycat/mycat2/beans/conf/HeartbeatBean.java b/source/src/main/java/io/mycat/mycat2/beans/conf/HeartbeatBean.java index df075b5..36f6d7b 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/conf/HeartbeatBean.java +++ b/source/src/main/java/io/mycat/mycat2/beans/conf/HeartbeatBean.java @@ -20,8 +20,8 @@ public class HeartbeatBean { * 默认空闲超时时间 */ private long idleTimeout = 30 * 60 * 1000L; - private long processorCheckPeriod = 1 * 1000L;; - private long minSwitchtimeInterval = 30 * 60 * 1000L;; + private long processorCheckPeriod = 1 * 1000L; + private long minSwitchtimeInterval = 30 * 60 * 1000L; public int getTimerExecutor() { return timerExecutor; From 5d7cf79ffddfb64ab8c6bba6c5ed0f4b6e09afea Mon Sep 17 00:00:00 2001 From: yanjunli Date: Sun, 1 Oct 2017 21:53:15 +0800 Subject: [PATCH 3/3] =?UTF-8?q?1.=20=E4=BF=AE=E5=A4=8D=20mysql=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=99=A8=E6=8C=82=E6=8E=89=E7=9A=84=E6=83=85=E5=86=B5?= =?UTF-8?q?=E4=B8=8B,=20mycat=20=E7=8B=82=E5=88=B7=20=20=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E5=85=B3=E9=97=AD=E8=BF=9E=E6=8E=A5=E7=9A=84=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E9=97=AE=E9=A2=98=202.=20=E8=BF=9B=E8=A1=8C=E4=B8=BB=E4=BB=8E?= =?UTF-8?q?=E5=88=87=E6=8D=A2=E6=97=B6=EF=BC=8C=20=E8=B7=B3=E8=BF=87=20?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E4=B8=8D=E6=AD=A3=E5=B8=B8=E7=9A=84=E4=BB=8E?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E3=80=82=203.=20=E4=B8=BB=E4=BB=8E=E5=88=87?= =?UTF-8?q?=E6=8D=A2=E6=A3=80=E6=9F=A5=E6=97=B6=EF=BC=8C=20=E5=A6=82?= =?UTF-8?q?=E6=9E=9C=20=E5=A4=8D=E5=88=B6=E7=BB=84=20=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=B8=BA=20SINGLE=5FNODE=20=E7=B1=BB=E5=9E=8B=EF=BC=8C?= =?UTF-8?q?=E4=B9=9F=E4=B8=8D=E8=BF=9B=E8=A1=8C=E5=88=87=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/mycat/mycat2/beans/MySQLRepBean.java | 28 +++++--- .../beans/heartbeat/MySQLHeartbeat.java | 65 +------------------ .../mycat2/sqlannotations/SQLAnnotation.java | 6 +- .../mycat2/tasks/BackendCharsetReadTask.java | 23 +++++-- .../mycat2/tasks/BackendConCreateTask.java | 4 +- .../mycat2/tasks/BackendHeartbeatTask.java | 14 ++-- .../tasks/BackendIOTaskWithResultSet.java | 26 +++++--- .../java/io/mycat/proxy/AbstractSession.java | 5 +- 8 files changed, 67 insertions(+), 104 deletions(-) diff --git a/source/src/main/java/io/mycat/mycat2/beans/MySQLRepBean.java b/source/src/main/java/io/mycat/mycat2/beans/MySQLRepBean.java index 3ebe613..6639f74 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/MySQLRepBean.java +++ b/source/src/main/java/io/mycat/mycat2/beans/MySQLRepBean.java @@ -108,11 +108,19 @@ private boolean checkIndex(int newIndex){ } public int getNextIndex(){ - MySQLMetaBean metaBean = metaBeans.stream().skip(writeIndex + 1).findFirst().orElse(null); + MySQLMetaBean metaBean = metaBeans.stream() + .skip(writeIndex + 1) + .filter(f->f.getHeartbeat().getStatus()==DBHeartbeat.OK_STATUS) + .findFirst() + .orElse(null); if (metaBean!=null){ return metaBeans.indexOf(metaBean); } else { - metaBean = metaBeans.stream().limit(writeIndex).findFirst().orElse(null); + metaBean = metaBeans.stream() + .limit(writeIndex) + .filter(f->f.getHeartbeat().getStatus()==DBHeartbeat.OK_STATUS) + .findFirst() + .orElse(null); if(metaBean!=null){ return metaBeans.indexOf(metaBean); } @@ -128,20 +136,20 @@ public int getNextIndex(){ public CheckResult switchDataSourcecheck(int newIndex){ String errmsg = null; CheckResult result = new CheckResult(true); - - if(RepTypeEnum.SINGLE_NODE.equals(getReplicaBean().getRepType())){ - errmsg = " repl type is "+RepTypeEnum.SINGLE_NODE.name() + ", switchDatasource is not supported"; - logger.warn(errmsg); + if (replicaBean.getSwitchType() == ReplicaBean.RepSwitchTypeEnum.NOT_SWITCH) { + errmsg = "not switch datasource ,for switchType is "+ReplicaBean.RepSwitchTypeEnum.NOT_SWITCH+",repl name is "+ replicaBean.getName(); + result.setSuccess(false); + result.setMsg(errmsg); + }else if(RepTypeEnum.SINGLE_NODE.equals(getReplicaBean().getRepType())){ + errmsg = " repl ["+replicaBean.getName()+"] type is "+RepTypeEnum.SINGLE_NODE.name() + ", switchDatasource is not supported"; result.setSuccess(false); result.setMsg(errmsg); }else if(!checkIndex(newIndex)){ - errmsg = "not switch datasource ,writeIndex out of range. writeIndex is " + newIndex; - logger.warn(errmsg); + errmsg = "not switch datasource ,there is no datasource available in the "+replicaBean.getName()+" Replication group. "; result.setSuccess(false); result.setMsg(errmsg); }else if(newIndex==writeIndex){ - errmsg = "not switch datasource ,writeIndex == newIndex .newIndex is " + newIndex; - logger.warn(errmsg); + errmsg = "not switch datasource ,writeIndex == newIndex .newIndex is " + newIndex +" in the "+replicaBean.getName()+" Replication group. "; result.setSuccess(false); result.setMsg(errmsg); } diff --git a/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLHeartbeat.java b/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLHeartbeat.java index e803af8..c839b03 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLHeartbeat.java +++ b/source/src/main/java/io/mycat/mycat2/beans/heartbeat/MySQLHeartbeat.java @@ -233,8 +233,6 @@ private void setError(MySQLDetector detector, String msg) { } else { MycatConfig conf = ProxyRuntime.INSTANCE.getConfig(); if (source.isSlaveNode()) { - logger.error(msg); - // 集群模式下通知从节点更新状态 ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER); if (clusterConfig.getCluster().isEnable()) { @@ -245,7 +243,7 @@ private void setError(MySQLDetector detector, String msg) { } } else { // 写节点 尝试多次次失败后, 需要通知集群 - logger.warn("heartbeat to backend session error, notify the cluster if needed"); + logger.warn("heartbeat to backend session error, notify the cluster"); HeartbeatConfig heartbeatConfig = conf.getConfig(ConfigEnum.HEARTBEAT); long curTime = System.currentTimeMillis(); @@ -280,65 +278,4 @@ private void setTimeout(MySQLDetector detector) { this.isChecking.set(false); status = DBHeartbeat.TIMEOUT_STATUS; } - -// /** -// * switch data source -// */ -// private void switchSourceIfNeed(String reason) { -// int switchType = source.getHostConfig().getSwitchType(); -// if (switchType == DataHostConfig.NOT_SWITCH_DS) { -// if (LOGGER.isDebugEnabled()) { -// LOGGER.debug("not switch datasource ,for switchType is " -// + DataHostConfig.NOT_SWITCH_DS); -// return; -// } -// return; -// } -// PhysicalDBPool pool = this.source.getDbPool(); -// int curDatasourceHB = pool.getSource().getHeartbeat().getStatus(); -// // read node can't switch ,only write node can switch -// if (pool.getWriteType() == PhysicalDBPool.WRITE_ONLYONE_NODE -// && !source.isReadNode() -// && curDatasourceHB != DBHeartbeat.OK_STATUS -// && pool.getSources().length > 1) { -// synchronized (pool) { -// // try to see if need switch datasource -// curDatasourceHB = pool.getSource().getHeartbeat().getStatus(); -// if (curDatasourceHB != DBHeartbeat.INIT_STATUS && curDatasourceHB != DBHeartbeat.OK_STATUS) { -// int curIndex = pool.getActivedIndex(); -// int nextId = pool.next(curIndex); -// PhysicalDatasource[] allWriteNodes = pool.getSources(); -// while (true) { -// if (nextId == curIndex) { -// break; -// } -// PhysicalDatasource theSource = allWriteNodes[nextId]; -// DBHeartbeat theSourceHB = theSource.getHeartbeat(); -// int theSourceHBStatus = theSourceHB.getStatus(); -// if (theSourceHBStatus == DBHeartbeat.OK_STATUS) { -// if (switchType == DataHostConfig.SYN_STATUS_SWITCH_DS) { -// if (Integer.valueOf(0).equals( theSourceHB.getSlaveBehindMaster())) { -// LOGGER.info("try to switch datasource ,slave is synchronized to master " + theSource.getConfig()); -// pool.switchSource(nextId, true, reason); -// break; -// } else { -// LOGGER.warn("ignored datasource ,slave is not synchronized to master , slave behind master :" -// + theSourceHB.getSlaveBehindMaster() -// + " " + theSource.getConfig()); -// } -// } else { -// // normal switch -// LOGGER.info("try to switch datasource ,not checked slave synchronize status " + theSource.getConfig()); -// pool.switchSource(nextId, true, reason); -// break; -// } -// -// } -// nextId = pool.next(nextId); -// } -// -// } -// } -// } -// } } diff --git a/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java b/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java index 8dec705..bfaaa52 100644 --- a/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java +++ b/source/src/main/java/io/mycat/mycat2/sqlannotations/SQLAnnotation.java @@ -25,12 +25,12 @@ public void setActionName(String actionName){ abstract public MySQLCommand getMySQLCommand(); /** - * 默认的重复检查, 命令链会根据该方法,进行去重复操作。 - * 如果 需要有多个实例,可以返回不同的值。 + * Ĭϵظ, ݸ÷ȥظ + * Ҫжʵ,Էزֵͬ * @return */ public long currentKey() { - // 结果集缓存. 在责任链中 只允许出现一次,这里返回相同的值 + // . ֻһΣﷵֵͬ return this.getClass().getSimpleName().hashCode(); } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java index b2b4eaa..1e67db5 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendCharsetReadTask.java @@ -59,7 +59,11 @@ public void readCharset() throws IOException { queryPacket.write(proxyBuf); proxyBuf.flip(); proxyBuf.readIndex = proxyBuf.writeIndex; - this.mySQLSession.writeToChannel(); + try { + this.mySQLSession.writeToChannel(); + } catch (IOException e) { + onRsFinish(this.mySQLSession,false,e.getMessage()); + } } @Override @@ -111,12 +115,19 @@ void onRsRow(MySQLSession session) { } @Override - void onRsFinish(MySQLSession session,boolean success) throws IOException { - if(callBack!=null){ - callBack.finished(session, null, success, null); + void onRsFinish(MySQLSession session,boolean success,String msg) throws IOException { + if(success){ + if(callBack!=null){ + callBack.finished(session, null, success, null); + } + //结果集完成 + logger.debug("session[{}] load charset finish",session); + }else{ + if(ResultSetState.RS_STATUS_READ_ERROR == curRSState|| + ResultSetState.RS_STATUS_WRITE_ERROR == curRSState){ + session.close(false, msg); + } } - //结果集完成 - logger.debug("session[{}] load charset finish",session); } } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java index 324396d..9be8403 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java @@ -98,7 +98,7 @@ public void onSocketRead(MySQLSession session) throws IOException { } else { // 认证结果报文收到 if (session.curMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { - logger.info("backend authed suceess "); + logger.debug("backend authed suceess "); this.finished(true); } else if (session.curMSQLPackgInf.pkgType == MySQLPacket.ERROR_PACKET) { errPkg = new ErrorPacket(); @@ -115,7 +115,7 @@ public void onSocketRead(MySQLSession session) throws IOException { public void onConnect(SelectionKey theKey, MySQLSession userSession, boolean success, String msg) throws IOException { String logInfo = success ? " backend connect success " : "backend connect failed " + msg; - logger.info(logInfo + " channel " + userSession.channel); + logger.debug("{} sessionId = {}, {}:{}",logInfo, userSession.getSessionId(),userSession.getMySQLMetaBean().getDsMetaBean().getIp(), userSession.getMySQLMetaBean().getDsMetaBean().getPort()); if (success) { InetSocketAddress serverRemoteAddr = (InetSocketAddress) userSession.channel.getRemoteAddress(); InetSocketAddress serverLocalAddr = (InetSocketAddress) userSession.channel.getLocalAddress(); diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java index 715f1e4..1591ccf 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendHeartbeatTask.java @@ -7,7 +7,6 @@ import java.util.List; import java.util.Map; -import io.mycat.mycat2.beans.conf.ReplicaBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +14,7 @@ import io.mycat.mycat2.beans.MySQLMetaBean; import io.mycat.mycat2.beans.MySQLPackageInf; import io.mycat.mycat2.beans.MySQLRepBean; +import io.mycat.mycat2.beans.conf.ReplicaBean; import io.mycat.mycat2.beans.heartbeat.DBHeartbeat; import io.mycat.mycat2.beans.heartbeat.MySQLDetector; import io.mycat.mycat2.beans.heartbeat.MySQLHeartbeat; @@ -126,7 +126,7 @@ void onRsRow(MySQLSession session) { } @Override - void onRsFinish(MySQLSession session,boolean success) { + void onRsFinish(MySQLSession session,boolean success,String msg) { if(success){ //归还连接 MycatReactorThread reactor = (MycatReactorThread)Thread.currentThread(); @@ -148,11 +148,13 @@ void onRsFinish(MySQLSession session,boolean success) { break; } detector.setLasstReveivedQryTime(System.currentTimeMillis()); -// detector.getHeartbeat().getRecorder().set((detector.getLasstReveivedQryTime() - detector.getLastSendQryTime())); }else{ - logger.error("found MySQL master/slave Replication err !!! {}:{}" , metaBean.getDsMetaBean().getIp(),metaBean.getDsMetaBean().getPort()); - detector.getHeartbeat().setDbSynStatus(DBHeartbeat.DB_SYN_ERROR); - detector.getHeartbeat().setResult(DBHeartbeat.ERROR_STATUS, detector, null); + if(ResultSetState.RS_STATUS_READ_ERROR == curRSState|| + ResultSetState.RS_STATUS_WRITE_ERROR == curRSState){ + detector.getHeartbeat().setDbSynStatus(DBHeartbeat.DB_SYN_ERROR); + detector.getHeartbeat().setResult(DBHeartbeat.ERROR_STATUS, detector, null); + } + session.close(false, msg); } } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java index 757865b..169d26d 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTaskWithResultSet.java @@ -28,13 +28,9 @@ public void onSocketRead(T session) throws IOException { if (!session.readFromChannel()){ return; } - }catch(ClosedChannelException e){ - e.printStackTrace(); - session.close(false, e.getMessage()); - return; - }catch (IOException e) { - e.printStackTrace(); - onRsFinish(session,false); + }catch(IOException e){ + curRSState = ResultSetState.RS_STATUS_READ_ERROR; + onRsFinish(session,false,e.getMessage()); return; } @@ -58,7 +54,7 @@ public void onSocketRead(T session) throws IOException { case RS_STATUS_ROW: if (curMQLPackgInf.pkgType == MySQLPacket.EOF_PACKET) { curRSState = ResultSetState.RS_STATUS_FINISH; - onRsFinish(session,true); + onRsFinish(session,true,null); } else { onRsRow(session); } @@ -76,7 +72,7 @@ public void onSocketRead(T session) throws IOException { abstract void onRsRow(T session); - abstract void onRsFinish(T session,boolean success) throws IOException; + abstract void onRsFinish(T session,boolean success,String msg) throws IOException; public enum ResultSetState { /** @@ -94,6 +90,16 @@ public enum ResultSetState { /** * 结果集完成 */ - RS_STATUS_FINISH; + RS_STATUS_FINISH, + + /** + * 结果集网络读取错误 + */ + RS_STATUS_READ_ERROR, + + /** + * 结果集网络写入错误 + */ + RS_STATUS_WRITE_ERROR; } } diff --git a/source/src/main/java/io/mycat/proxy/AbstractSession.java b/source/src/main/java/io/mycat/proxy/AbstractSession.java index e312cc4..458bf05 100644 --- a/source/src/main/java/io/mycat/proxy/AbstractSession.java +++ b/source/src/main/java/io/mycat/proxy/AbstractSession.java @@ -263,7 +263,7 @@ public SocketChannel channel() { } public String sessionInfo() { - return " [" + this.addr + ']'; + return " [ sessionId = "+ sessionId+" ," + this.addr + ']'; } public boolean isChannelOpen() { @@ -282,7 +282,6 @@ public boolean isClosed() { public void close(boolean normal, String hint) { if (!this.isClosed()) { this.closed = true; - logger.info("close session " + this.sessionInfo() + " for reason " + hint); closeSocket(channel, normal, hint); if (!referedBuffer) { this.bufPool.recycleBuf(proxyBuffer.getBuffer()); @@ -313,7 +312,7 @@ protected void closeSocket(SocketChannel channel, boolean normal, String msg) { if (channel == null) { return; } - String logInf = (normal) ? " normal close " : "abnormal close " + channel; + String logInf = (normal) ? " normal close " : "abnormal close " ; logger.info(logInf + sessionInfo() + " reason:" + msg); try { channel.close();