Skip to content

Commit

Permalink
Merge pull request MyCATApache#27 from yanjunli/master
Browse files Browse the repository at this point in the history
AdminSession buffer 由 ProtocolBuffer 改为 ProxyBuffer
  • Loading branch information
apachemycat authored Aug 23, 2017
2 parents df2b0b8 + 4d1bc9c commit d960b1d
Show file tree
Hide file tree
Showing 17 changed files with 60 additions and 58 deletions.
Binary file added doc/images/proxybuffer_init.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/read_to_buffer1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/read_to_channel1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/read_to_channel2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/readbuffer1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/readbuffer2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions source/src/main/java/io/mycat/proxy/NIOAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ protected void processConnectKey(ReactorEnv reactorEnv, SelectionKey curKey) thr
if (curChannel.finishConnect()) {
AdminSession session = adminSessionMan.createSession(curKey.attachment(), this.bufPool, selector,
curChannel, false);
ConnectIOHandler<AdminSession> connectIOHandler = (ConnectIOHandler<AdminSession>) session
NIOHandler<AdminSession> connectIOHandler = (NIOHandler<AdminSession>) session
.getCurNIOHandler();
connectIOHandler.onConnect(curKey, session, true, null);
}

} catch (ConnectException ex) {
logger.warn("connect failed " + curChannel + " reason:" + ex);
if (adminSessionMan.getDefaultSessionHandler() instanceof ConnectIOHandler) {
ConnectIOHandler<AdminSession> connectIOHandler = (ConnectIOHandler<AdminSession>) adminSessionMan
if (adminSessionMan.getDefaultSessionHandler() instanceof NIOHandler) {
NIOHandler<AdminSession> connectIOHandler = (NIOHandler<AdminSession>) adminSessionMan
.getDefaultSessionHandler();
connectIOHandler.onConnect(curKey, null, false, null);

Expand Down
42 changes: 20 additions & 22 deletions source/src/main/java/io/mycat/proxy/man/AdminSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.mycat.proxy.AbstractSession;
import io.mycat.proxy.BufferPool;
import io.mycat.proxy.NIOHandler;
import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.ProxyReactorThread;
import io.mycat.proxy.ProxyRuntime;
import io.mycat.proxy.Session;
Expand Down Expand Up @@ -43,8 +44,8 @@ public class AdminSession implements Session {
private String nodeId;
public AdminCommand curAdminCommand;
// 全双工模式,读写用两个不同的Buffer,不会相互切换
public ProtocolBuffer readingBuffer;
public ProtocolBuffer writingBuffer;
public ProxyBuffer readingBuffer;
public ProxyBuffer writingBuffer;
public PackageInf curAdminPkgInf = new PackageInf();

public AdminSession(BufferPool bufferPool, Selector selector, SocketChannel channel) throws IOException {
Expand All @@ -56,8 +57,8 @@ public AdminSession(BufferPool bufferPool, Selector selector, SocketChannel chan
SelectionKey socketKey = channel.register(nioSelector, SelectionKey.OP_READ, this);
this.channelKey = socketKey;
this.sessionId = ProxyRuntime.INSTANCE.genSessionId();
this.readingBuffer = new ProtocolBuffer(bufferPool.allocByteBuffer());
this.writingBuffer = new ProtocolBuffer(bufferPool.allocByteBuffer());
this.readingBuffer = new ProxyBuffer(bufferPool.allocByteBuffer());
this.writingBuffer = new ProxyBuffer(bufferPool.allocByteBuffer());

}

Expand All @@ -80,7 +81,7 @@ public void answerClientNow(ManagePacket packet) throws IOException {
public void modifySelectKey() throws ClosedChannelException {
if (channelKey != null && channelKey.isValid()) {
int clientOps = SelectionKey.OP_READ;
if (writingBuffer.optLimit == writingBuffer.optMark) {
if (writingBuffer.readMark == writingBuffer.readIndex) {
this.writingBuffer.reset();
clientOps &= ~SelectionKey.OP_WRITE;
} else {
Expand All @@ -93,18 +94,15 @@ public void modifySelectKey() throws ClosedChannelException {
public void writeChannel() throws IOException {
// 尝试压缩,移除之前写过的内容
ByteBuffer buffer = writingBuffer.getBuffer();
if (writingBuffer.optMark > buffer.capacity() * 2 / 3) {
buffer.limit(writingBuffer.optLimit);
buffer.position(writingBuffer.optMark);
buffer.compact();
writingBuffer.optMark = 0;
writingBuffer.optLimit = buffer.position();
if (writingBuffer.readIndex > buffer.capacity() * 2 / 3) {
writingBuffer.compact();
}else{
buffer.limit(writingBuffer.readIndex);
buffer.position(writingBuffer.readMark);
}
buffer.limit(writingBuffer.optLimit);
buffer.position(writingBuffer.optMark);
int writed = this.channel.write(buffer);
if (writed > 0) {
writingBuffer.optMark = buffer.position();
writingBuffer.readMark += writed;
}
modifySelectKey();
}
Expand All @@ -117,8 +115,8 @@ public void writeChannel() throws IOException {
*/
public byte receivedPacket() throws IOException {
ByteBuffer buffer = this.readingBuffer.getBuffer();
int offset = readingBuffer.optMark;
int limit = readingBuffer.optLimit;
int offset = readingBuffer.readIndex;
int limit = readingBuffer.writeIndex;
if (limit == offset) {
return -1;
}
Expand Down Expand Up @@ -157,13 +155,13 @@ public boolean readSocket() throws IOException {

// 尝试压缩,移除之前读过的内容
ByteBuffer buffer = readingBuffer.getBuffer();
if (readingBuffer.optMark > buffer.capacity() * 1 / 3) {
buffer.limit(readingBuffer.optLimit);
buffer.position(readingBuffer.optMark);
if (readingBuffer.readIndex > buffer.capacity() * 1 / 3) {
buffer.limit(readingBuffer.writeIndex);
buffer.position(readingBuffer.readIndex);
buffer.compact();
readingBuffer.optMark = 0;
readingBuffer.readIndex = 0;
} else {
buffer.position(readingBuffer.optLimit);
buffer.position(readingBuffer.writeIndex);
}
int readed = channel.read(buffer);
logger.debug(" readed {} total bytes ", readed);
Expand All @@ -174,7 +172,7 @@ public boolean readSocket() throws IOException {

logger.warn("readed zero bytes ,Maybe a bug ,please fix it !!!!");
}
readingBuffer.optLimit = buffer.position();
readingBuffer.writeIndex = buffer.position();
return readed > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import java.nio.ByteBuffer;

import io.mycat.proxy.ProxyBuffer;

/**
* 持续写入的同时也不断发送数据给Socket的Buffer,需要两个状态
* @author wuzhihui
*
*/
public class ConinueWritingBuffer extends ProtocolBuffer{
public class ConinueWritingBuffer extends ProxyBuffer{
public int readOpt;
public int readLimit;
public ConinueWritingBuffer(ByteBuffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ public void onSocketRead(final AdminSession session) throws IOException {
if (readed == false) {
return;
}
int bufferLimit = session.readingBuffer.optLimit;
// int bufferLimit = session.readingBuffer.writeIndex;
byte pkgType = -1;
while ((pkgType = session.receivedPacket()) != -1) {
session.readingBuffer.optLimit = session.curAdminPkgInf.startPos;
// session.readingBuffer.writeIndex = session.curAdminPkgInf.startPos;
if (pkgType == ManagePacket.PKG_FAILED || pkgType == ManagePacket.PKG_SUCCESS) {
session.curAdminCommand.handlerPkg(session, pkgType);
} else {
session.curAdminCommand = ProxyRuntime.INSTANCE.getAdminCmdResolver().resolveCommand(pkgType);
session.curAdminCommand.handlerPkg(session, pkgType);
}
// 下一个报文解析
session.readingBuffer.optMark = session.readingBuffer.optLimit;
// session.readingBuffer.readIndex = session.curAdminPkgInf.startPos+session.curAdminPkgInf.length;
}

session.readingBuffer.optLimit = bufferLimit;
// session.readingBuffer.writeIndex = bufferLimit;
}

/**
Expand Down
22 changes: 12 additions & 10 deletions source/src/main/java/io/mycat/proxy/man/ManagePacket.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import io.mycat.proxy.ProxyBuffer;

/**
* 管理报文,3个字节的包头 ,其中前两个字节为包长度,第3个字节为包类型,最后为报文内容。
*
Expand Down Expand Up @@ -66,35 +68,35 @@ public void setPkgLength(int pkgLength) {
this.pkgLength = pkgLength;
}

public void resolve(ProtocolBuffer buffer) {
public void resolve(ProxyBuffer buffer) {
buffer.skip(3);
this.resolveBody(buffer);
}

public abstract void resolveBody(ProtocolBuffer buffer);
public abstract void resolveBody(ProxyBuffer buffer);

/**
* 报文内容写入到Buffer中(等待发送)
*
* @param buffer
*/
public void writeTo(ProtocolBuffer buffer) {
int beginPos = buffer.optLimit;
buffer.optLimit=2;
public void writeTo(ProxyBuffer buffer) {
int beginPos = buffer.writeIndex;
buffer.writeIndex=2;
buffer.writeByte(this.pkgType);
this.writeBody(buffer);
// total length
int lastPos = buffer.optLimit;
buffer.optLimit = beginPos;
int lastPos = buffer.writeIndex;
buffer.writeIndex = beginPos;
buffer.writeFixInt(2, lastPos - packetHeaderSize);
buffer.optLimit = lastPos;

buffer.writeIndex = lastPos;
buffer.readIndex = buffer.writeIndex;
}

/**
* 需要保证内容不超过buffer的容量
*
* @param buffer
*/
public abstract void writeBody(ProtocolBuffer buffer);
public abstract void writeBody(ProxyBuffer buffer);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.mycat.proxy.man.packet;

import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.man.ManagePacket;
import io.mycat.proxy.man.ProtocolBuffer;

/**
* 执行失败的Packet报文
Expand Down Expand Up @@ -42,14 +42,14 @@ public void setErrMsg(String errMsg) {
}

@Override
public void resolveBody(ProtocolBuffer buffer) {
public void resolveBody(ProxyBuffer buffer) {
this.errorCode = (int) buffer.readFixInt(4);
this.errMsg = buffer.readNULString();

}

@Override
public void writeBody(ProtocolBuffer buffer) {
public void writeBody(ProxyBuffer buffer) {
buffer.writeFixInt(4, errorCode);
buffer.writeNULString(errMsg);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.mycat.proxy.man.packet;

import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.man.ManagePacket;
import io.mycat.proxy.man.ProtocolBuffer;

/**
* Leader同意加入集群后,节点发送确认报文,完成加入过程
Expand All @@ -28,13 +28,13 @@ public String[] getMyJoinedNodeIds() {
}

@Override
public void resolveBody(ProtocolBuffer buffer) {
public void resolveBody(ProxyBuffer buffer) {
this.myConnectedNodes = buffer.readNULString();

}

@Override
public void writeBody(ProtocolBuffer buffer) {
public void writeBody(ProxyBuffer buffer) {
buffer.writeNULString(myConnectedNodes);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.mycat.proxy.man.packet;

import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.man.ManagePacket;
import io.mycat.proxy.man.ProtocolBuffer;

/**
* 应答Node加入集群的申请
Expand Down Expand Up @@ -36,15 +36,15 @@ public String[] getMyJoinedNodeIds() {
}

@Override
public void resolveBody(ProtocolBuffer buffer) {
public void resolveBody(ProxyBuffer buffer) {
this.myConnectedNodes = buffer.readNULString();
this.configFileVersion = buffer.readNULString();
this.joinState = buffer.readByte();

}

@Override
public void writeBody(ProtocolBuffer buffer) {
public void writeBody(ProxyBuffer buffer) {
buffer.writeNULString(myConnectedNodes);
buffer.writeNULString(this.configFileVersion);
buffer.writeByte(joinState);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.mycat.proxy.man.packet;

import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.man.ManagePacket;
import io.mycat.proxy.man.ProtocolBuffer;

/**
* 向Leader申请加入集群的报文
Expand All @@ -28,13 +28,13 @@ public String[] getMyJoinedNodeIds() {
}

@Override
public void resolveBody(ProtocolBuffer buffer) {
public void resolveBody(ProxyBuffer buffer) {
this.myConnectedNodes = buffer.readNULString();

}

@Override
public void writeBody(ProtocolBuffer buffer) {
public void writeBody(ProxyBuffer buffer) {
buffer.writeNULString(myConnectedNodes);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.mycat.proxy.man.packet;

import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.man.ManagePacket;
import io.mycat.proxy.man.MyCluster;
import io.mycat.proxy.man.MyCluster.ClusterState;
import io.mycat.proxy.man.ProtocolBuffer;

/**
* 节点信息的报文,用于向对方表明自己的身份信息以及自己所处的集群状态
Expand Down Expand Up @@ -34,7 +34,7 @@ public NodeRegInfoPacket() {
}

@Override
public void resolveBody(ProtocolBuffer buffer) {
public void resolveBody(ProxyBuffer buffer) {
nodeId = buffer.readNULString();
this.clusterState=ClusterState.getState(buffer.readByte());
this.lastClusterStateTime=buffer.readFixInt(8);
Expand All @@ -46,7 +46,7 @@ public void resolveBody(ProtocolBuffer buffer) {
}

@Override
public void writeBody(ProtocolBuffer buffer) {
public void writeBody(ProxyBuffer buffer) {
buffer.writeNULString(nodeId);
buffer.writeByte(clusterState.getSateCode());
buffer.writeFixInt(8, this.lastClusterStateTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.mycat.proxy.man.packet;

import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.man.ManagePacket;
import io.mycat.proxy.man.ProtocolBuffer;

/**
* 执行成功的Packet报文
Expand All @@ -24,13 +24,13 @@ public SuccessPacket() {
private String tips;

@Override
public void resolveBody(ProtocolBuffer buffer) {
public void resolveBody(ProxyBuffer buffer) {
this.tips = buffer.readNULString();

}

@Override
public void writeBody(ProtocolBuffer buffer) {
public void writeBody(ProxyBuffer buffer) {
buffer.writeNULString(tips);

}
Expand Down

0 comments on commit d960b1d

Please sign in to comment.