allBackCons = session.getBackendCons();
@@ -198,24 +223,47 @@ public void passThroughSQL(MySQLFrontConnection frontCon, ConDataBuffer dataBuff
LOGGER.error("No schema selected");
return ;
}
+
final DNBean dnBean = frontCon.getMycatSchema().getDefaultDN();
final String replica = dnBean.getMysqlReplica();
final SQLEngineCtx ctx = SQLEngineCtx.INSTANCE();
LOGGER.debug("select a replica: {}", replica);
final MySQLReplicatSet repSet = ctx.getMySQLReplicatSet(replica);
final MySQLDataSource datas = repSet.getCurWriteDH();
+
+ /**
+ * 如果该sql对应后端db,没有连接池,则创建连接池部分
+ */
final MySQLBackendConnection newCon =
datas.getConnection(frontCon.getReactor(), dnBean.getDatabase(), true, null);
+
+ /**很关键的设置前端front 与 backend session*/
newCon.setAttachement(frontCon);
+
+ /**设置后端连接池结果集处理handler*/
newCon.setUserCallback(directTransCallback);
+
+ /**
+ * 执行sql语句
+ */
frontCon.addTodoTask(() -> {
- newCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos, pkgLen));
+ /**
+ * 将数据写到后端连接池中
+ */
+ newCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos,pkgLen));
newCon.enableWrite(false);
+ /**
+ * 新建立的连接放到连接池中
+ */
session.addBackCon(newCon);
});
} else {
+ /**
+ * 否则直接写到后端即可
+ */
existCon.getWriteDataBuffer().putBytes(dataBuffer.getBytes(pkgStartPos, pkgLen));
existCon.enableWrite(false);
+ existCon.setUserCallback(directTransCallback);
}
}
diff --git a/Mycat-Core/src/main/java/io/mycat/front/CheckUserLoginResponseCallback.java b/Mycat-Core/src/main/java/io/mycat/front/CheckUserLoginResponseCallback.java
index 1ec9c37..f0a0f27 100644
--- a/Mycat-Core/src/main/java/io/mycat/front/CheckUserLoginResponseCallback.java
+++ b/Mycat-Core/src/main/java/io/mycat/front/CheckUserLoginResponseCallback.java
@@ -89,6 +89,9 @@ private void success(MySQLFrontConnection con, AuthPacket auth) throws IOExcepti
}
LOGGER.debug("charset = {}, charsetIndex = {}", charset, charsetIndex);
con.setCharset(charsetIndex, charset);
+
+ //认证成功后,修改changeCmdHandler,由CheckUserLoginResponseCallback改用
+ // AbstractSchemaSQLCommandHandler处理
if (!con.setFrontSchema(auth.database)) {
final String errmsg = "No Mycat Schema defined: " + auth.database;
LOGGER.debug(errmsg);
diff --git a/Mycat-Core/src/main/java/io/mycat/front/MySQLFrontConnectionHandler.java b/Mycat-Core/src/main/java/io/mycat/front/MySQLFrontConnectionHandler.java
index 6085f79..8b4af36 100644
--- a/Mycat-Core/src/main/java/io/mycat/front/MySQLFrontConnectionHandler.java
+++ b/Mycat-Core/src/main/java/io/mycat/front/MySQLFrontConnectionHandler.java
@@ -88,8 +88,9 @@ public void handleReadEvent(final MySQLFrontConnection cnxn) throws IOException{
// 解析报文类型
final byte packetType = buffer.getByte(offset + MySQLConnection.msyql_packetHeaderSize);
final int pkgStartPos = offset;
- offset += length;
- buffer.setReadingPos(offset);
+
+
+
// trace-protocol-packet
// @author little-pan
// @since 2016-09-29
@@ -100,6 +101,11 @@ public void handleReadEvent(final MySQLFrontConnection cnxn) throws IOException{
cnxn.getId(), buffer.hashCode(), pkgStartPos, length, packetType, limit, hexs);
}
cnxn.getSession().getCurCmdHandler().processCmd(cnxn, buffer, packetType, pkgStartPos, length);
+
+ offset += length;
+ buffer.setReadingPos(offset);
+
+
}
}
diff --git a/Mycat-Core/src/main/java/io/mycat/memalloc/MyCatMemoryAllocator.java b/Mycat-Core/src/main/java/io/mycat/memalloc/MyCatMemoryAllocator.java
new file mode 100644
index 0000000..2f390f2
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/memalloc/MyCatMemoryAllocator.java
@@ -0,0 +1,197 @@
+package io.mycat.memalloc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 封装netty pooled Direct Memory 接口,为mycat提供内存池管理功能
+ *
+ * @author zagnix
+ * @create 2017-01-18 11:01
+ */
+
+public class MyCatMemoryAllocator implements ByteBufAllocator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MyCatMemoryAllocator.class);
+
+
+ private final static MyCatMemoryAllocator INSTANCE =
+ new MyCatMemoryAllocator(Runtime.getRuntime().availableProcessors()*2);
+
+ /** netty memory pool alloctor*/
+ private final PooledByteBufAllocator alloc;
+ /**arena 的数量,一般设置cpu cores*2 */
+ private final int numberOfArenas;
+
+ /** ChunkSize 大小 = pageSize << maxOrder */
+ private final int chunkSize;
+
+ /**
+ * numberOfArenas 设置为处理器cores*2
+ * @param numberOfArenas
+ */
+ public MyCatMemoryAllocator(int numberOfArenas){
+ this.numberOfArenas = numberOfArenas;
+ if (!PlatformDependent.hasUnsafe()) {
+ LOGGER.warn("Using direct memory, but sun.misc.Unsafe not available.");
+ }
+ boolean preferDirect = true;
+
+ int pageSize = 8192;
+ int maxOrder = 11;
+
+ this.chunkSize = pageSize << maxOrder;
+
+ int numDirectArenas = numberOfArenas;
+
+ int numHeapArenas = 0;
+
+ this.alloc = new PooledByteBufAllocator(
+ preferDirect,
+ numHeapArenas,
+ numDirectArenas,
+ pageSize,
+ maxOrder,
+ 512/**tinycache*/,
+ 256/**smallcache*/,
+ 64/**normalcache*/,
+ true/**使用Thread Local Cache*/);
+ }
+
+
+ public static MyCatMemoryAllocator getINSTANCE() {
+ return INSTANCE;
+ }
+
+ /**
+ *
+ * @return alloc
+ */
+ public PooledByteBufAllocator getAlloc() {
+ return alloc;
+ }
+
+ /**
+ * Returns the number of arenas.
+ *
+ * @return Number of arenas.
+ */
+ int getNumberOfArenas() {
+ return numberOfArenas;
+ }
+
+ /**
+ * Returns the chunk size.
+ *
+ * @return Chunk size.
+ */
+ int getChunkSize() {
+ return chunkSize;
+ }
+
+
+ @Override
+ public ByteBuf buffer() {
+ return alloc.buffer();
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity) {
+ return alloc.buffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+ return alloc.buffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer() {
+ return alloc.ioBuffer();
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity) {
+ return alloc.ioBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+ return alloc.ioBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf heapBuffer() {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity) {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public ByteBuf directBuffer() {
+ return alloc.directBuffer();
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity) {
+ return alloc.directBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+ return alloc.directBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer() {
+ return alloc.compositeBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+ return alloc.compositeBuffer(maxNumComponents);
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer() {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+ throw new UnsupportedOperationException("Heap buffer");
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer() {
+ return alloc.compositeDirectBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+ return alloc.compositeDirectBuffer(maxNumComponents);
+ }
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return alloc.isDirectBufferPooled();
+ }
+
+ @Override
+ public int calculateNewCapacity(int i, int i1) {
+ return 0;
+ }
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/mysql/BufferUtil.java b/Mycat-Core/src/main/java/io/mycat/mysql/BufferUtil.java
new file mode 100644
index 0000000..3f4ceb8
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/mysql/BufferUtil.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software;Designed and Developed mainly by many Chinese
+ * opensource volunteers. you can redistribute it and/or modify it under the
+ * terms of the GNU General Public License version 2 only, as published by the
+ * Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Any questions about this component can be directed to it's project Web address
+ * https://code.google.com/p/opencloudb/.
+ *
+ */
+package io.mycat.mysql;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author mycat
+ */
+public class BufferUtil {
+
+ public static final void writeUB2(ByteBuffer buffer, int i) {
+ buffer.put((byte) (i & 0xff));
+ buffer.put((byte) (i >>> 8));
+ }
+
+ public static final void writeUB3(ByteBuffer buffer, int i) {
+ buffer.put((byte) (i & 0xff));
+ buffer.put((byte) (i >>> 8));
+ buffer.put((byte) (i >>> 16));
+ }
+
+ public static final void writeInt(ByteBuffer buffer, int i) {
+ buffer.put((byte) (i & 0xff));
+ buffer.put((byte) (i >>> 8));
+ buffer.put((byte) (i >>> 16));
+ buffer.put((byte) (i >>> 24));
+ }
+
+ public static final void writeFloat(ByteBuffer buffer, float f) {
+ writeInt(buffer, Float.floatToIntBits(f));
+ }
+
+ public static final void writeUB4(ByteBuffer buffer, long l) {
+ buffer.put((byte) (l & 0xff));
+ buffer.put((byte) (l >>> 8));
+ buffer.put((byte) (l >>> 16));
+ buffer.put((byte) (l >>> 24));
+ }
+
+ public static final void writeLong(ByteBuffer buffer, long l) {
+ buffer.put((byte) (l & 0xff));
+ buffer.put((byte) (l >>> 8));
+ buffer.put((byte) (l >>> 16));
+ buffer.put((byte) (l >>> 24));
+ buffer.put((byte) (l >>> 32));
+ buffer.put((byte) (l >>> 40));
+ buffer.put((byte) (l >>> 48));
+ buffer.put((byte) (l >>> 56));
+ }
+
+ public static final void writeDouble(ByteBuffer buffer, double d) {
+ writeLong(buffer, Double.doubleToLongBits(d));
+ }
+
+ public static final void writeLength(ByteBuffer buffer, long l) {
+ if (l < 251) {
+ buffer.put((byte) l);
+ } else if (l < 0x10000L) {
+ buffer.put((byte) 252);
+ writeUB2(buffer, (int) l);
+ } else if (l < 0x1000000L) {
+ buffer.put((byte) 253);
+ writeUB3(buffer, (int) l);
+ } else {
+ buffer.put((byte) 254);
+ writeLong(buffer, l);
+ }
+ }
+
+ public static final void writeWithNull(ByteBuffer buffer, byte[] src) {
+ buffer.put(src);
+ buffer.put((byte) 0);
+ }
+
+ public static final void writeWithLength(ByteBuffer buffer, byte[] src) {
+ int length = src.length;
+ if (length < 251) {
+ buffer.put((byte) length);
+ } else if (length < 0x10000L) {
+ buffer.put((byte) 252);
+ writeUB2(buffer, length);
+ } else if (length < 0x1000000L) {
+ buffer.put((byte) 253);
+ writeUB3(buffer, length);
+ } else {
+ buffer.put((byte) 254);
+ writeLong(buffer, length);
+ }
+ buffer.put(src);
+ }
+
+ public static final void writeWithLength(ByteBuffer buffer, byte[] src, byte nullValue) {
+ if (src == null) {
+ buffer.put(nullValue);
+ } else {
+ writeWithLength(buffer, src);
+ }
+ }
+
+ public static final int getLength(long length) {
+ if (length < 251) {
+ return 1;
+ } else if (length < 0x10000L) {
+ return 3;
+ } else if (length < 0x1000000L) {
+ return 4;
+ } else {
+ return 9;
+ }
+ }
+
+ public static final int getLength(byte[] src) {
+ int length = src.length;
+ if (length < 251) {
+ return 1 + length;
+ } else if (length < 0x10000L) {
+ return 3 + length;
+ } else if (length < 0x1000000L) {
+ return 4 + length;
+ } else {
+ return 9 + length;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/Mycat-Core/src/main/java/io/mycat/mysql/packet/CommandPacket.java b/Mycat-Core/src/main/java/io/mycat/mysql/packet/CommandPacket.java
new file mode 100644
index 0000000..4536b64
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/mysql/packet/CommandPacket.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software;Designed and Developed mainly by many Chinese
+ * opensource volunteers. you can redistribute it and/or modify it under the
+ * terms of the GNU General Public License version 2 only, as published by the
+ * Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Any questions about this component can be directed to it's project Web address
+ * https://code.google.com/p/opencloudb/.
+ *
+ */
+package io.mycat.mysql.packet;
+
+
+import io.mycat.mysql.BufferUtil;
+import io.mycat.mysql.MySQLConnection;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * From client to server whenever the client wants the server to do something.
+ *
+ *
+ * Bytes Name
+ * ----- ----
+ * 1 command
+ * n arg
+ *
+ * command: The most common value is 03 COM_QUERY, because
+ * INSERT UPDATE DELETE SELECT etc. have this code.
+ * The possible values at time of writing (taken
+ * from /include/mysql_com.h for enum_server_command) are:
+ *
+ * # Name Associated client function
+ * - ---- --------------------------
+ * 0x00 COM_SLEEP (none, this is an internal thread state)
+ * 0x01 COM_QUIT mysql_close
+ * 0x02 COM_INIT_DB mysql_select_db
+ * 0x03 COM_QUERY mysql_real_query
+ * 0x04 COM_FIELD_LIST mysql_list_fields
+ * 0x05 COM_CREATE_DB mysql_create_db (deprecated)
+ * 0x06 COM_DROP_DB mysql_drop_db (deprecated)
+ * 0x07 COM_REFRESH mysql_refresh
+ * 0x08 COM_SHUTDOWN mysql_shutdown
+ * 0x09 COM_STATISTICS mysql_stat
+ * 0x0a COM_PROCESS_INFO mysql_list_processes
+ * 0x0b COM_CONNECT (none, this is an internal thread state)
+ * 0x0c COM_PROCESS_KILL mysql_kill
+ * 0x0d COM_DEBUG mysql_dump_debug_info
+ * 0x0e COM_PING mysql_ping
+ * 0x0f COM_TIME (none, this is an internal thread state)
+ * 0x10 COM_DELAYED_INSERT (none, this is an internal thread state)
+ * 0x11 COM_CHANGE_USER mysql_change_user
+ * 0x12 COM_BINLOG_DUMP sent by the slave IO thread to request a binlog
+ * 0x13 COM_TABLE_DUMP LOAD TABLE ... FROM MASTER (deprecated)
+ * 0x14 COM_CONNECT_OUT (none, this is an internal thread state)
+ * 0x15 COM_REGISTER_SLAVE sent by the slave to register with the master (optional)
+ * 0x16 COM_STMT_PREPARE mysql_stmt_prepare
+ * 0x17 COM_STMT_EXECUTE mysql_stmt_execute
+ * 0x18 COM_STMT_SEND_LONG_DATA mysql_stmt_send_long_data
+ * 0x19 COM_STMT_CLOSE mysql_stmt_close
+ * 0x1a COM_STMT_RESET mysql_stmt_reset
+ * 0x1b COM_SET_OPTION mysql_set_server_option
+ * 0x1c COM_STMT_FETCH mysql_stmt_fetch
+ *
+ * arg: The text of the command is just the way the user typed it, there is no processing
+ * by the client (except removal of the final ';').
+ * This field is not a null-terminated string; however,
+ * the size can be calculated from the packet size,
+ * and the MySQL client appends '\0' when receiving.
+ *
+ * @see http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol#Command_Packet_.28Overview.29
+ *
+ *
+ * @author mycat
+ */
+public class CommandPacket extends MySQLPacket {
+
+ public byte command;
+ public byte[] arg;
+
+ public void read(ByteBuffer data) {
+ MySQLMessage mm = new MySQLMessage(data);
+ packetLength = mm.readUB3();
+ packetId = mm.read();
+ command = mm.read();
+ arg = mm.readBytes();
+ }
+
+ public ByteBuffer write(MySQLConnection c) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(256);
+ BufferUtil.writeUB3(buffer, calcPacketSize());
+ buffer.put(packetId);
+ buffer.put(command);
+ buffer.put(arg);
+ buffer.flip();
+ return buffer;
+ // c.getChannel().write(buffer);
+ }
+
+ @Override
+ public int calcPacketSize() {
+ return 1 + arg.length;
+ }
+
+ @Override
+ protected String getPacketInfo() {
+ return "MySQL Command Packet";
+ }
+
+
+}
\ No newline at end of file
diff --git a/Mycat-Core/src/main/java/io/mycat/net2/ByteBufConDataBuffer.java b/Mycat-Core/src/main/java/io/mycat/net2/ByteBufConDataBuffer.java
new file mode 100644
index 0000000..3051e88
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/net2/ByteBufConDataBuffer.java
@@ -0,0 +1,143 @@
+package io.mycat.net2;
+
+import io.mycat.memalloc.MyCatMemoryAllocator;
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+/**
+ * @author zagnix
+ * @create 2017-01-19 11:41
+ */
+
+public class ByteBufConDataBuffer implements ConDataBuffer {
+
+ private ByteBuf buffer = null;
+ private final int initCapacity;
+ private final int maxCapacity;
+
+ public ByteBufConDataBuffer(final int initCapacity,final int maxCapacity){
+ this.initCapacity = initCapacity;
+ this.maxCapacity = maxCapacity;
+ this.buffer = MyCatMemoryAllocator.getINSTANCE().directBuffer(initCapacity);
+ }
+
+ @Override
+ public int transferFrom(SocketChannel socketChanel) throws IOException {
+ /* int size=-1,len = 0;
+ int pkglen =0;
+
+ while ((size=buffer.writeBytes(socketChanel,512))>0){
+ len +=size;
+ }
+ */
+ //System.out.println("transferFrom1 read index = " + buffer.readerIndex() + "write index = " + buffer.writerIndex());
+ System.out.println("writable bytes :" + buffer.writableBytes());
+
+ int len = buffer.writeBytes(socketChanel,buffer.writableBytes());
+ //System.out.println("transferFrom2 read index = " + buffer.readerIndex() + "write index = " + buffer.writerIndex());
+ return len;
+
+ }
+
+ @Override
+ public void putBytes(ByteBuffer buf) throws IOException {
+ buffer.writeBytes(buf);
+ }
+
+ @Override
+ public void putBytes(byte[] buf) throws IOException {
+ buffer.writeBytes(buf);
+ }
+
+ @Override
+ public ByteBuffer beginWrite(int length) throws IOException {
+ System.out.println("beginWrite1 read index = " + buffer.readerIndex() + "write index = " + buffer.writerIndex());
+ ByteBuffer byteBuffer =ByteBuffer.allocateDirect(length); //buffer.internalNioBuffer(buffer.writerIndex(),length);
+ System.out.println("beginWrite2 read index = " + buffer.readerIndex() + "write index = " + buffer.writerIndex());
+ return byteBuffer;
+ }
+
+ @Override
+ public void endWrite(ByteBuffer src) throws IOException {
+ buffer.writeBytes(src);
+ }
+
+ @Override
+ public byte getByte(int index) throws IOException {
+ return buffer.getByte(index);
+ }
+
+ @Override
+ public ByteBuffer getBytes(int index, int length) throws IOException {
+ buffer.readerIndex(index);
+ buffer.writerIndex(index+length);
+ ByteBuffer byteBuffer = buffer.nioBuffer(0,length);
+ buffer.readBytes(byteBuffer);
+ return byteBuffer;
+ }
+
+ @Override
+ public int transferTo(SocketChannel socketChanel) throws IOException {
+ // System.out.println("transferTo1 read index = " + buffer.readerIndex() + "write index = " + buffer.writerIndex());
+ int len = buffer.readBytes(socketChanel,buffer.readableBytes());
+ //System.out.println("transferTo2 read index = " + buffer.readerIndex() + "write index = " + buffer.writerIndex());
+ return len;
+
+ }
+
+ @Override
+ public int writingPos() throws IOException {
+ return buffer.writerIndex();
+ }
+
+ @Override
+ public int readPos() {
+ return buffer.readerIndex();
+ }
+
+ @Override
+ public int totalSize() {
+ return maxCapacity;
+ }
+
+ @Override
+ public void setWritingPos(int writingPos) throws IOException {
+ buffer.setIndex(buffer.readerIndex(),writingPos);
+ }
+
+ @Override
+ public void setReadingPos(int readingPos) {
+ buffer.setIndex(readingPos,buffer.writerIndex());
+ }
+
+ @Override
+ public boolean isFull() throws IOException {
+ return !buffer.isWritable();
+ }
+
+ @Override
+ public void recycle() {
+ //buffer.release();
+ }
+
+ public ByteBuf getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(ByteBuf buffer) {
+ this.buffer = buffer;
+ }
+
+ public int getInitCapacity() {
+ return initCapacity;
+ }
+
+
+ public int getMaxCapacity() {
+ return maxCapacity;
+ }
+
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/net2/Connection.java b/Mycat-Core/src/main/java/io/mycat/net2/Connection.java
index f610489..2a2f7d0 100644
--- a/Mycat-Core/src/main/java/io/mycat/net2/Connection.java
+++ b/Mycat-Core/src/main/java/io/mycat/net2/Connection.java
@@ -246,13 +246,21 @@ protected void cleanup() {
// 清理资源占用
if(readDataBuffer!=null)
{
- readDataBuffer.recycle();
- readDataBuffer=null;
+ try {
+ readDataBuffer.recycle();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ readDataBuffer=null;
}
if(this.writeDataBuffer!=null)
{
- writeDataBuffer.recycle();
- writeDataBuffer=null;
+ try {
+ writeDataBuffer.recycle();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ writeDataBuffer=null;
}
@@ -275,8 +283,19 @@ public void register(Selector selector, ReactorBufferPool myBufferPool) throws I
String maprFileName=id+".rtmp";
String mapwFileName=id+".wtmp";
LOGGER.info("connection bytebuffer mapped "+maprFileName);
- this.readDataBuffer =new MappedFileConDataBuffer(maprFileName); // 2 ,3
- writeDataBuffer=new MappedFileConDataBuffer3(mapwFileName);
+
+ //TODO
+ /**使用MyCatMemoryAllocator分配Direct Buffer,再进行SocketChannel通信时候,
+ * 网络读写都会减少一次数据的拷贝,而使用FileChanel与SocketChannel数据交换时
+ * 底层最终还是生成一个临时的Direct Buffer,用临时Direct Buffer写入或者读SocketChannel中
+ * 后面考虑会使用netty中ByteBuf中的DirectBuffer进行网络IO通信。效率更高
+ * */
+ this.readDataBuffer =new MappedFileConDataBuffer(maprFileName); // 2 ,3
+ this.writeDataBuffer=new MappedFileConDataBuffer3(mapwFileName);
+ //存在bug暂不启用,以后统一是ByteBuf作为buffer进行NIO网络通信。
+ // this.readDataBuffer = new ByteBufConDataBuffer(4096,16*1024*1024);
+ // this.writeDataBuffer = new ByteBufConDataBuffer(4096,16*1024*1024);
+ //新的client进来后,处理Server发送Client的handshake init packet
this.handler.onConnected(this);
}
diff --git a/Mycat-Core/src/main/java/io/mycat/net2/MappedFileConDataBuffer3.java b/Mycat-Core/src/main/java/io/mycat/net2/MappedFileConDataBuffer3.java
index 50e29b8..aa9410b 100644
--- a/Mycat-Core/src/main/java/io/mycat/net2/MappedFileConDataBuffer3.java
+++ b/Mycat-Core/src/main/java/io/mycat/net2/MappedFileConDataBuffer3.java
@@ -31,6 +31,7 @@
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
+import io.mycat.memalloc.MyCatMemoryAllocator;
import sun.misc.Unsafe;
import sun.nio.ch.FileChannelImpl;
/**
@@ -168,7 +169,9 @@ public ByteBuffer getBytes(int index,int length) throws IOException {
}
@Override
public ByteBuffer beginWrite(int length) {
- ByteBuffer copyBuf=ByteBuffer.allocate(length);
+ //TODO
+ //ByteBuffer copyBuf=ByteBuffer.allocate(length);
+ ByteBuffer copyBuf = MyCatMemoryAllocator.getINSTANCE().directBuffer(length).nioBuffer(0,length);
return copyBuf;
}
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/HintHandler.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintHandler.java
new file mode 100644
index 0000000..6f1b1f8
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintHandler.java
@@ -0,0 +1,12 @@
+package io.mycat.sqlcache;
+
+/**
+ * Hint处理Handler
+ *
+ * @author zagnix
+ * @create 2017-01-17 14:07
+ */
+
+public interface HintHandler {
+ public boolean handle(String sql);
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLDataLoader.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLDataLoader.java
new file mode 100644
index 0000000..9779be1
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLDataLoader.java
@@ -0,0 +1,60 @@
+package io.mycat.sqlcache;
+
+import com.google.common.util.concurrent.*;
+import io.mycat.bigmem.sqlcache.BigSQLResult;
+import io.mycat.bigmem.sqlcache.IDataLoader;
+import io.mycat.bigmem.sqlcache.Keyer;
+import io.mycat.bigmem.console.LocatePolicy;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import static com.google.common.hash.Hashing.murmur3_32;
+
+/**
+ * SQL 异步加载结果集类
+ *
+ * @author zagnix
+ * @create 2017-01-20 15:13
+ */
+
+public class HintSQLDataLoader implements IDataLoader {
+ /**
+ * 根据sql,异步从后台DB reload数据,替换旧值
+ * @param keyer
+ * @return
+ */
+ @Override
+ public V reload(Keyer keyer) {
+ ListeningExecutorService executor =
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+ final ListenableFuture listenableFuture = executor
+ .submit(new Callable() {
+ @Override
+ public BigSQLResult call() throws Exception {
+ //TODO
+ String sql = keyer.getSql();
+ String sqlkey = keyer.getLastAccessTime() +"_"+murmur3_32().hashUnencodedChars(sql);
+ BigSQLResult sqlResultCache
+ = new BigSQLResult(LocatePolicy.Normal,sqlkey,32*1024*1024);
+
+
+ return sqlResultCache;
+ }
+ });
+
+ Futures.addCallback(listenableFuture, new FutureCallback() {
+ @Override
+ public void onSuccess(BigSQLResult bigSQLResult) {
+ //TODO
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ //TODO
+ }
+ }
+ );
+ return null;
+ }
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLInfo.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLInfo.java
new file mode 100644
index 0000000..f88dbe5
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLInfo.java
@@ -0,0 +1,88 @@
+package io.mycat.sqlcache;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Hint SQL 携带的属性KV 和 真实执行的sql
+ *
+ * @author zagnix
+ * @version 1.0
+ * @create 2017-01-16 17:32
+ */
+
+public class HintSQLInfo {
+ private String functionName;
+ private boolean isCache = false;
+ private HintHandler hintHandler;
+ private String hintSQL;
+ private String execSQL;
+
+ /**
+ * paramsKv 中的Key必须是 cache-time,auto-refresh,access-count
+ * cache-time=xxx auto-refresh=true access-count=5000
+ */
+ private Map paramsKv = new HashMap<>();
+
+ public HintSQLInfo(String hintSQL){
+ this.hintSQL = hintSQL;
+ }
+
+ public String getExecSQL() {
+ return execSQL;
+ }
+
+ public void setExecSQL(String execSQL) {
+ this.execSQL = execSQL;
+ }
+
+ public Map getParamsKv() {
+ return paramsKv;
+ }
+
+ public void setParamsKv(Map paramsKv) {
+ this.paramsKv = paramsKv;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public void setFunctionName(String functionName) {
+ this.functionName = functionName;
+ }
+
+ public String getHintSQL() {
+ return hintSQL;
+ }
+
+ public void setHintSQL(String hintSQL) {
+ this.hintSQL = hintSQL;
+ }
+
+ public HintHandler getHintHandler() {
+ return hintHandler;
+ }
+
+ public void setHintHandler(HintHandler hintHandler) {
+ this.hintHandler = hintHandler;
+ }
+
+ public boolean isCache() {
+ return isCache;
+ }
+
+ public void setCache(boolean cache) {
+ isCache = cache;
+ }
+
+ @Override
+ public String toString() {
+ return "HintSQLInfo{" +
+ "functionName='" + functionName + '\'' +
+ ", hintSQL='" + hintSQL + '\'' +
+ ", execSQL='" + execSQL + '\'' +
+ ", paramsKv=" + paramsKv +
+ '}';
+ }
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLParser.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLParser.java
new file mode 100644
index 0000000..4a31481
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLParser.java
@@ -0,0 +1,79 @@
+package io.mycat.sqlcache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+/**
+ * 主要用来解析 Hint SQL
+ * 兼容mycat 1.6版本 新的注解方式
+ *
+ * @author zagnix
+ * @create 2017-01-16 17:23
+ */
+
+public class HintSQLParser {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HintSQLParser.class);
+
+ //注解格式:/*!mycat:type=value */ sql. sql为真实的执行的sql语句
+ private final static String MYCAT_HINT_START = "/*!mycat:";
+ private final static String HINT_PRO_SPLIT = " ";
+ private final static String HINT_KV_SPLIT = "=";
+
+
+ public static HintSQLInfo parserHintSQL(String sql){
+ if (sql == null)
+ return null;
+
+ HintSQLInfo hintSQLInfo = new HintSQLInfo(sql);
+ Map hintKv = new HashMap();
+ int endIndex = sql.indexOf("*/");
+
+ if (endIndex !=-1 ){
+ String hintPart = sql.substring(0,endIndex);
+ hintSQLInfo.setExecSQL(sql.substring(endIndex+2));
+ String kvsPart=hintPart.replace(MYCAT_HINT_START,"").replace("*/","");
+ StringTokenizer token = new StringTokenizer(kvsPart,HINT_PRO_SPLIT);
+
+ /**hint功能*/
+ if (token.hasMoreElements()) {
+ StringTokenizer function = new StringTokenizer(token.nextToken(), HINT_KV_SPLIT);
+ if (function.countTokens()==2){
+ String kName = function.nextToken();
+ String vValue = function.nextToken();
+ if (kName.equalsIgnoreCase("cacheable") && vValue.equals("true")){
+ hintSQLInfo.setFunctionName(kName);
+ hintSQLInfo.setCache(true);
+ }
+
+ }else {
+ LOGGER.error(sql + "注解中的KV属性不对");
+ return null;
+ }
+
+ /**KV**/
+ while (token.hasMoreElements()) {
+ StringTokenizer t = new StringTokenizer(token.nextToken(), HINT_KV_SPLIT);
+
+ if (t.countTokens() == 2) {
+ /**TODO 限制hintSQL中kv中k具体命名必须在{cacheable, cache-time,auto-refresh, access-count}*/
+ hintKv.put(t.nextToken(),t.nextToken());
+ }else { /**KV*/
+ LOGGER.error(sql + "注解中的KV属性不对");
+ return null;
+ }
+ }
+ hintSQLInfo.setParamsKv(hintKv);
+ }else { /**没有function name*/
+ LOGGER.error(sql + "注解中的功能没有名字");
+ return null;
+ }
+ }else {
+ return null;
+ }
+ return hintSQLInfo;
+ }
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLRemoveKeyListener.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLRemoveKeyListener.java
new file mode 100644
index 0000000..d0375a7
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/HintSQLRemoveKeyListener.java
@@ -0,0 +1,26 @@
+package io.mycat.sqlcache;
+
+import io.mycat.bigmem.sqlcache.BigSQLResult;
+import io.mycat.bigmem.sqlcache.IRemoveKeyListener;
+
+/**
+ * HintSQL 结果集被移除时回调类
+ *
+ * @author zagnix
+ * @create 2017-01-20 15:14
+ */
+
+public class HintSQLRemoveKeyListener implements IRemoveKeyListener {
+ /**
+ * key 失效,做清理工作
+ *
+ * @param key
+ * @param value
+ */
+ @Override
+ public void removeNotify(K key, V value) {
+ if (value !=null){
+ value.removeAll();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/LocatePolicy.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/LocatePolicy.java
new file mode 100644
index 0000000..d22584b
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/LocatePolicy.java
@@ -0,0 +1,46 @@
+package io.mycat.sqlcache;
+
+/**
+ * 策略信息
+* 源文件名:LocatePolicy.java
+* 文件版本:1.0.0
+* 创建作者:Think
+* 创建日期:2016年12月27日
+* 修改作者:Think
+* 修改日期:2016年12月27日
+* 文件描述:TODO
+* 版权所有:Copyright 2016 zjhz, Inc. All Rights Reserved.
+*/
+public enum LocatePolicy {
+
+ /**
+ * 使用本地的内存进行缓存的策略
+ * @字段说明 Core
+ */
+ Core(1),
+
+ /**
+ * 使用文件进行映射的缓存的策略信息
+ * @字段说明 Normal
+ */
+ Normal(2);
+
+ /**
+ * 策略信息
+ * @字段说明 policy
+ */
+ private int policy;
+
+ public int getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(int policy) {
+ this.policy = policy;
+ }
+
+ private LocatePolicy(int policy) {
+ this.policy = policy;
+ }
+
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/sqlcache/SQLResultsCacheService.java b/Mycat-Core/src/main/java/io/mycat/sqlcache/SQLResultsCacheService.java
new file mode 100644
index 0000000..09c3816
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/sqlcache/SQLResultsCacheService.java
@@ -0,0 +1,287 @@
+package io.mycat.sqlcache;
+
+import io.mycat.SQLEngineCtx;
+import io.mycat.backend.MySQLBackendConnection;
+import io.mycat.backend.MySQLDataSource;
+import io.mycat.backend.MySQLReplicatSet;
+import io.mycat.backend.callback.SQLResCacheHintHandler;
+import io.mycat.beans.DNBean;
+import io.mycat.bigmem.sqlcache.*;
+import io.mycat.bigmem.console.LocatePolicy;
+import io.mycat.engine.UserSession;
+import io.mycat.front.MySQLFrontConnection;
+import io.mycat.mysql.MySQLConnection;
+import io.mycat.mysql.packet.*;
+import io.mycat.net2.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import static com.google.common.hash.Hashing.murmur3_32;
+import static io.mycat.mysql.MySQLConnection.RESULT_FETCH_STATUS;
+import static io.mycat.mysql.MySQLConnection.RESULT_HEADER_STATUS;
+
+/**
+ * SQL 结果集缓存服务
+ *
+ * @author zagnix
+ * @version 1.0
+ * @create 2017-01-17 17:23
+ */
+
+public class SQLResultsCacheService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SQLResultsCacheService.class);
+ private final CacheImp sqlResultCacheImp;
+ static class InitSQLResultsCacheService{
+ public final static SQLResultsCacheService sqlResultsCacheService = new SQLResultsCacheService();
+ }
+
+ private SQLResultsCacheService(){
+ sqlResultCacheImp = new CacheImp();
+ }
+
+ /**
+ * 将sql结果集缓存起来
+ *
+ * @param hintSQLInfo Hint SQL相关信息
+ * @param bigSQLResult sql结果集缓存
+ * @param loader 结果集load or reload 接口
+ * @param listener key被移除时,调用的接口
+ */
+ public void cacheSQLResult(HintSQLInfo hintSQLInfo,BigSQLResult bigSQLResult,IDataLoader loader,
+ IRemoveKeyListener listener){
+ /**
+ * cache-time=xxx auto-refresh=true access-count=5000
+ */
+ String key = "" + murmur3_32().hashUnencodedChars(hintSQLInfo.getExecSQL());
+
+ Keyer keyer = new Keyer();
+ keyer.setSql(hintSQLInfo.getExecSQL());
+ keyer.setKey(key);
+ keyer.setValue(bigSQLResult);
+ keyer.setCacheTTL(Integer.valueOf(hintSQLInfo.getParamsKv().get("cache-time")));
+ keyer.setAccessCount(Integer.valueOf(hintSQLInfo.getParamsKv().get("access-count")));
+ // keyer.setAutoRefresh(Boolean.valueOf(hintSQLInfo.getParamsKv().get("auto-refresh")));
+ keyer.setRemoveKeyListener(listener);
+ keyer.setiDataLoader(loader);
+ sqlResultCacheImp.put(key,bigSQLResult,keyer);
+ }
+ /**
+ * 获取sql语句,已经缓存的结果集
+ * @param sql sql 语句
+ * @return
+ */
+ public BigSQLResult getSQLResult(String sql){
+ String key = "" + murmur3_32().hashUnencodedChars(sql);
+ return sqlResultCacheImp.get(key);
+ }
+
+
+ /**
+ * 获取sql语句,已经缓存的结果集
+ *
+ * @param sql sql 语句
+ */
+ public void remove(String sql){
+
+ String key = "" + murmur3_32().hashUnencodedChars(sql);
+ sqlResultCacheImp.remove(key);
+ }
+
+
+
+
+ /**
+ * 处理Select Hint SQL 结果集函数
+ * @param frontCon
+ * @param hintSQLInfo
+ * @param mySQLMessage
+ * @return
+ * @throws IOException
+ */
+ public boolean processHintSQL(MySQLFrontConnection frontCon,HintSQLInfo hintSQLInfo, MySQLMessage mySQLMessage) throws IOException {
+
+ /**
+ * SQL Cache 结果集缓存框架实现:
+ * 1.根据解析处理的sql,查询SQLCache是否已经缓存了。缓存了,则直接从Cache中获取结果集发给client端即可
+ * 2.没有缓存的情况则将sql执行发送的后端Mysql数据库,执行并发查询,返回的结果集,分二部走
+ * 1>. 直接透传给Client
+ * 2>. 缓存到MyCat本地
+ * 3.根据Hint注解的属性,决定异步拉去后端的结果集,替换掉旧的数据集
+ */
+ BigSQLResult sqlResultCache = getSQLResult(hintSQLInfo.getExecSQL());
+
+ if (sqlResultCache != null){
+ LOGGER.error(hintSQLInfo.getExecSQL() + ":====>>>> Use Local Cache SQL Resuls");
+ sqlResultCacheDirectClient(frontCon,sqlResultCache);
+ return true;
+ }else {
+ /**从后端拉取数据进行缓存*/
+ sqlResultCache =
+ new BigSQLResult(LocatePolicy.Normal,hintSQLInfo.getExecSQL(),32*1024*1024/**TODO*/);
+ }
+
+ /**
+ * 1. 改写sql语句,去掉前面的注释
+ */
+ CommandPacket command = new CommandPacket();
+ command.packetId = 0;
+ command.command = MySQLPacket.COM_QUERY;
+
+ /**
+ * 2. 获取后端DB连接池,执行rewrite sql
+ */
+ MySQLBackendConnection existCon = null;
+ UserSession session=frontCon.getSession();
+ ArrayList allBackCons = session.getBackendCons();
+ if (!allBackCons.isEmpty()) {
+ existCon = allBackCons.get(0);
+ }
+ if (existCon == null || existCon.isClosed()) {
+ if (existCon != null) {
+ session.removeBackCon(existCon);
+ }
+ if (frontCon.getMycatSchema() == null){
+ frontCon.writeErrMessage(1450, "No schema selected");
+ return false;
+ }
+
+ final DNBean dnBean = frontCon.getMycatSchema().getDefaultDN();
+ final String replica = dnBean.getMysqlReplica();
+ final SQLEngineCtx ctx = SQLEngineCtx.INSTANCE();
+ final MySQLReplicatSet repSet = ctx.getMySQLReplicatSet(replica);
+ final MySQLDataSource datas = repSet.getCurWriteDH();
+ /**
+ * 如果该sql对应后端db,没有连接池,则创建连接池部分
+ */
+ final MySQLBackendConnection newCon =
+ datas.getConnection(frontCon.getReactor(), dnBean.getDatabase(), true, null);
+
+ /**很关键的设置前端front 与 backend session*/
+ newCon.setAttachement(frontCon);
+
+ /**设置后端连接池结果集处理handler,sqlResultCache缓存结果集类*/
+ newCon.setUserCallback(new SQLResCacheHintHandler(hintSQLInfo,sqlResultCache));
+
+ /**
+ * 执行sql语句
+ */
+ frontCon.addTodoTask(() -> {
+ /**
+ * 将数据写到后端连接池中
+ */
+ command.arg = hintSQLInfo.getExecSQL().getBytes(newCon.getCharset());
+ newCon.getWriteDataBuffer().putBytes(command.write(newCon));
+ newCon.enableWrite(false);
+ /**
+ * 新建立的连接放到连接池中
+ */
+ session.addBackCon(newCon);
+
+ });
+ } else {
+ /**
+ * 否则直接写到后端即可
+ */
+ command.arg = hintSQLInfo.getExecSQL().getBytes(existCon.getCharset());
+ existCon.getWriteDataBuffer().putBytes(command.write(existCon));
+ existCon.enableWrite(false);
+ /**设置后端连接池结果集处理handler,sqlResultCache缓存结果集类*/
+ existCon.setUserCallback(new SQLResCacheHintHandler(hintSQLInfo,sqlResultCache));
+ }
+
+ return true;
+ }
+
+ /**
+ * 处理Local Cache结果集,直接返回给Front Connection端
+ * @param frontConn
+ * @param sqlResultCache
+ */
+ private void sqlResultCacheDirectClient(MySQLFrontConnection frontConn,BigSQLResult sqlResultCache) throws IOException {
+ sqlResultCache.reset();
+ int filedCount = 0;
+ int status = Connection.STATE_IDLE;
+ while (sqlResultCache.hasNext()){
+ byte [] datas = sqlResultCache.next();
+ byte packetType = datas[MySQLConnection.msyql_packetHeaderSize];
+ ByteBuffer packetBuffer = ByteBuffer.wrap(datas);
+ frontConn.getWriteDataBuffer().putBytes(packetBuffer);
+ frontConn.enableWrite(false);
+
+ if (false) {
+ switch (status) {
+ case Connection.STATE_IDLE:
+ if (packetType == MySQLPacket.COM_QUERY) {
+ status = Connection.STATE_IDLE;
+ } else if (packetType == MySQLPacket.COM_QUIT) {
+ status = (Connection.STATE_IDLE);
+ } else if (sqlResultCache != null) {
+ /**step 1: Result Set Header Packet 列的数目*/
+ status = RESULT_HEADER_STATUS;
+ ResultSetHeaderPacket resultSetHeaderPacket = new ResultSetHeaderPacket();
+ resultSetHeaderPacket.read(packetBuffer);
+ filedCount = resultSetHeaderPacket.fieldCount;
+ LOGGER.error("step 1 =====> DB status: Result Filed Count " + filedCount);
+
+ }
+ break;
+ case RESULT_HEADER_STATUS:
+ if (packetType == MySQLPacket.EOF_PACKET) {
+ LOGGER.error("step 3 : EOF Packet,marker—end of field packets");
+ status = RESULT_FETCH_STATUS;
+ } else if (packetType == MySQLPacket.ERROR_PACKET) {
+ status = (Connection.STATE_IDLE);
+ } else if (sqlResultCache != null) {
+ /**Step 2: Field Packets,列的描述信息*/
+
+ FieldPacket fieldPacket = new FieldPacket();
+ fieldPacket.read(packetBuffer);
+ //frontConn.getChannel().write(packetBuffer);
+
+ LOGGER.error("Step 2: Field Packets");
+
+ }
+ break;
+ case RESULT_FETCH_STATUS:
+ if (packetType == MySQLPacket.EOF_PACKET) {
+ /**Step 5:EOF Packet: marker---end of row data packets*/
+ LOGGER.error("Step 5:EOF Packet: marker---end of row data packets");
+ status = Connection.STATE_IDLE;
+ break;
+ } else if (packetType == MySQLPacket.ERROR_PACKET) {
+ status = (Connection.STATE_IDLE);
+ } else if (sqlResultCache != null) {
+ /**Step 4: Row Data Packet{多个}: row contents 一行对应一个row packet*/
+
+ LOGGER.error("Step 4: Row Data Packet");
+ RowDataPacket rowDataPacket = new RowDataPacket(filedCount);
+ rowDataPacket.read(packetBuffer);
+ for (int i = 0; i < filedCount; i++) {
+ byte[] src = rowDataPacket.fieldValues.get(i);
+ if (src != null)
+ LOGGER.error("filed value: " + new String(src));
+ }
+ }
+ break;
+ default:
+ LOGGER.warn("Error connected status.", status);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @return
+ */
+ public static SQLResultsCacheService getInstance()
+ {
+ return InitSQLResultsCacheService.sqlResultsCacheService;
+ }
+
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/util/UnsafeMemory.java b/Mycat-Core/src/main/java/io/mycat/util/UnsafeMemory.java
new file mode 100644
index 0000000..d597df1
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/util/UnsafeMemory.java
@@ -0,0 +1,52 @@
+package io.mycat.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+
+
+/**
+ * Unsafe 工具类
+ *
+ * @author zagnix
+ * @create 2016-11-18 14:17
+ */
+public final class UnsafeMemory {
+ private final static Logger logger = LoggerFactory.getLogger(UnsafeMemory.class);
+ private static final Unsafe _UNSAFE;
+ public static final int BYTE_ARRAY_OFFSET;
+ static {
+ Unsafe unsafe;
+ try {
+ Field theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe");
+ theUnsafeField.setAccessible(true);
+ unsafe = (Unsafe) theUnsafeField.get(null);
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ unsafe = null;
+ }
+ _UNSAFE = unsafe;
+ if (_UNSAFE != null) {
+ BYTE_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(byte[].class);
+ } else {
+ BYTE_ARRAY_OFFSET = 0;
+ }
+ }
+ public static Unsafe getUnsafe() {
+ return _UNSAFE;
+ }
+
+ /**
+ * 将size规整化为pagesize的倍数
+ * @param size
+ * @return
+ */
+ public static long roundToOsPageSzie(long size) {
+ long pagesize = _UNSAFE.pageSize();
+ return (size + (pagesize-1)) & ~(pagesize-1);
+
+ }
+
+}
diff --git a/Mycat-Core/src/main/java/io/mycat/util/Utils.java b/Mycat-Core/src/main/java/io/mycat/util/Utils.java
new file mode 100644
index 0000000..70d4eab
--- /dev/null
+++ b/Mycat-Core/src/main/java/io/mycat/util/Utils.java
@@ -0,0 +1,74 @@
+package io.mycat.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.regex.Pattern;
+
+public class Utils {
+ private static final String illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' + '\uD800' + "-" + '\uF8FF' + '\uFFF0'
+ + "-" + '\uFFFF';
+ private static final Pattern p = Pattern.compile("(^\\.{1,2}$)|[" + illegalChars + "]");
+
+ public static void validateFolder(String name) {
+ if (name == null || name.length() == 0) {
+ throw new IllegalArgumentException("folder name is emtpy");
+ }
+ if(name.length() > 255) {
+ throw new IllegalArgumentException("folder name is too long");
+ }
+ if (p.matcher(name).find()) {
+ throw new IllegalArgumentException("folder name [" + name + "] is illegal");
+ }
+ }
+
+ public static boolean isFilenameValid(String file) {
+ File f = new File(file);
+ try {
+ f.getCanonicalPath();
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ public static void deleteDirectory(File dir) {
+ if (!dir.exists()) return;
+ File[] subs = dir.listFiles();
+ if (subs != null) {
+ for (File f : dir.listFiles()) {
+ if (f.isFile()) {
+ if(!f.delete()) {
+ throw new IllegalStateException("delete file failed: "+f);
+ }
+ } else {
+ deleteDirectory(f);
+ }
+ }
+ }
+ if(!dir.delete()) {
+ throw new IllegalStateException("delete directory failed: "+dir);
+ }
+ }
+
+ static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ static Random rnd = new Random();
+
+ public static String randomString(int len )
+ {
+ StringBuilder sb = new StringBuilder( len );
+ for( int i = 0; i < len; i++ )
+ sb.append( AB.charAt( rnd.nextInt(AB.length()) ) );
+ return sb.toString();
+ }
+
+
+ public static void deleteFile(File file) {
+ if (!file.exists() || !file.isFile()) {
+ return;
+ }
+ if (!file.delete()) {
+ throw new IllegalStateException("delete file failed: "+file);
+ }
+ }
+}
diff --git a/Mycat-Core/src/test/java/io/mycat/memalloc/TestMycatMemoryAlloctor.java b/Mycat-Core/src/test/java/io/mycat/memalloc/TestMycatMemoryAlloctor.java
new file mode 100644
index 0000000..acc7270
--- /dev/null
+++ b/Mycat-Core/src/test/java/io/mycat/memalloc/TestMycatMemoryAlloctor.java
@@ -0,0 +1,53 @@
+package io.mycat.memalloc;
+
+import io.netty.buffer.ByteBuf;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+/**
+ * @author zagnix
+ * @create 2017-01-18 11:19
+ */
+
+public class TestMycatMemoryAlloctor {
+ @Test
+ public void testMemAlloc(){
+ final MyCatMemoryAllocator memoryAllocator =
+ new MyCatMemoryAllocator(Runtime.getRuntime().availableProcessors()*2);
+
+ for (int i = 0; i <100 ; i++) {
+ ByteBuf byteBuf = memoryAllocator.directBuffer(128);
+ byteBuf.writeBytes("helll world".getBytes());
+ ByteBuffer byteBuffer = byteBuf.nioBuffer(0,128);
+ String srt = getString(byteBuffer);
+ byteBuf.release();
+ System.out.println("refCnt:" + byteBuf.refCnt());
+ // byteBuf.writeBytes(byteBuffer);
+ // System.out.println("refCnt:" + byteBuf.refCnt());
+ }
+ }
+
+ public static String getString(ByteBuffer buffer) {
+ Charset charset = null;
+ CharsetDecoder decoder = null;
+ CharBuffer charBuffer = null;
+ try {
+ charset = Charset.forName("UTF-8");
+ decoder = charset.newDecoder();
+ charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
+ return charBuffer.toString();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return "error";
+ }
+ }
+
+ public static ByteBuffer getByteBuffer(String str)
+ {
+ return ByteBuffer.wrap(str.getBytes());
+ }
+}
diff --git a/Mycat-Core/src/test/java/io/mycat/sqlcache/MyCatBigSqlResultsCache.java b/Mycat-Core/src/test/java/io/mycat/sqlcache/MyCatBigSqlResultsCache.java
new file mode 100644
index 0000000..439ba2e
--- /dev/null
+++ b/Mycat-Core/src/test/java/io/mycat/sqlcache/MyCatBigSqlResultsCache.java
@@ -0,0 +1,232 @@
+package io.mycat.sqlcache;
+
+import com.google.common.util.concurrent.*;
+import io.mycat.bigmem.sqlcache.*;
+import io.mycat.util.Utils;
+import io.mycat.bigmem.console.LocatePolicy;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import static com.google.common.hash.Hashing.murmur3_32;
+
+/**
+ * Cache SQL 大结果集 对外接口
+ *
+ * @author zagnix
+ * @version 1.0
+ * @create 2016-12-30 10:51
+ */
+
+public class MyCatBigSqlResultsCache {
+
+ private final CacheImp sqlResultCacheImp;
+
+ private final static MyCatBigSqlResultsCache INSTANCE = new MyCatBigSqlResultsCache();
+
+ private MyCatBigSqlResultsCache(){
+ sqlResultCacheImp = new CacheImp();
+ }
+
+ /**
+ * 将sql结果集缓存起来
+ *
+ * @param sql sql语句
+ * @param bigSQLResult sql结果集缓存
+ * @param cache 缓存时间
+ * @param accesCount 在缓存时间内,被访问的次数
+ * @param loader 结果集load or reload 接口
+ * @param listener key被移除时,调用的接口
+ */
+ public void cacheSQLResult(String sql, BigSQLResult bigSQLResult, long cache, long accesCount,
+ IDataLoader loader, IRemoveKeyListener listener){
+ /**
+ * TODO
+ */
+ String key = "" + murmur3_32().hashUnencodedChars(sql);
+
+ Keyer keyer = new Keyer();
+ keyer.setSql(sql);
+ keyer.setKey(key);
+ keyer.setValue(bigSQLResult);
+ keyer.setCacheTTL(cache);
+ keyer.setAccessCount(accesCount);
+ keyer.setRemoveKeyListener(listener);
+ keyer.setiDataLoader(loader);
+ sqlResultCacheImp.put(key,bigSQLResult,keyer);
+ }
+
+
+ /**
+ * 获取sql语句,已经缓存的结果集
+ *
+ * @param sql sql 语句
+ * @return
+ */
+ public BigSQLResult getSQLResult(String sql){
+ /**
+ * TODO
+ */
+ String key = "" + murmur3_32().hashUnencodedChars(sql);
+ return sqlResultCacheImp.get(key);
+ }
+
+
+
+ /**
+ * 获取sql语句,已经缓存的结果集
+ *
+ * @param sql sql 语句
+ */
+ public void remove(String sql){
+ /**
+ * TODO
+ */
+ String key = "" + murmur3_32().hashUnencodedChars(sql);
+ sqlResultCacheImp.remove(key);
+ }
+
+
+
+ /**
+ * 对外对象实例
+ * @return
+ */
+ public static MyCatBigSqlResultsCache getInstance() {
+ return INSTANCE;
+ }
+
+
+ public static void main(String [] args){
+
+ String sql = "select * from table";
+
+ BigSQLResult sqlResultCache =
+ new BigSQLResult(LocatePolicy.Normal,sql,32*1024*1024);
+
+ long ROWS = 10000;
+
+
+ /** 模拟从后端DB拉取数据 */
+ for (int i = 0; i < ROWS ; i++) {
+ byte[] rows = Utils.randomString(1024).getBytes();
+
+ /**
+ * 使用内存映射Cache,存放SQL结果集
+ */
+ sqlResultCache.put(rows);
+ }
+
+
+ MyCatBigSqlResultsCache.getInstance().cacheSQLResult(sql,sqlResultCache,300,5000,
+ new IDataLoader() {
+
+ /**
+ * 根据sql,异步从后台DB reload数据,替换旧值
+ * @param keyer
+ * @return
+ */
+ @Override
+ public BigSQLResult reload(Keyer keyer) {
+ ListeningExecutorService executor =
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+ final ListenableFuture listenableFuture = executor
+ .submit(new Callable() {
+ @Override
+ public BigSQLResult call() throws Exception {
+ String sql = keyer.getSql();
+ String sqlkey = keyer.getLastAccessTime() +"_"+murmur3_32().hashUnencodedChars(sql);
+ BigSQLResult sqlResultCache
+ = new BigSQLResult(LocatePolicy.Normal,sqlkey,32*1024*1024);
+
+ /**模拟从后端DB拉取数据*/
+ for (int i = 0; i < ROWS ; i++) {
+ byte[] rows = Utils.randomString(1024).getBytes();
+
+ /**
+ * 使用内存映射Cache,存放SQL结果集
+ */
+ sqlResultCache.put(rows);
+ }
+
+ return sqlResultCache;
+ }
+ });
+
+ Futures.addCallback(listenableFuture, new FutureCallback() {
+ @Override
+ public void onSuccess(BigSQLResult bigSQLResult) {
+ if(bigSQLResult!=null && bigSQLResult.hasNext()){
+
+ BigSQLResult oldSqlResult=
+ MyCatBigSqlResultsCache.getInstance().getSQLResult(keyer.getSql());
+
+ if (oldSqlResult != null){
+ oldSqlResult.removeAll();
+ }
+
+ /**替换旧的值*/
+ MyCatBigSqlResultsCache.getInstance().
+ sqlResultCacheImp.put(keyer.getKey(),bigSQLResult,keyer);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ //TODO
+ }
+ }
+ );
+
+ return null;
+ }
+
+
+ }, new IRemoveKeyListener() {
+
+ /**
+ * key 失效,做清理工作
+ *
+ * @param key
+ * @param value
+ */
+ @Override
+ public void removeNotify(String key, BigSQLResult value) {
+ if (value !=null){
+ value.removeAll();
+ }
+
+ System.out.println("key :" + key);
+ }
+ });
+
+ /**
+ * 访问Cache
+ */
+ BigSQLResult bigSQLResult =
+ MyCatBigSqlResultsCache.getInstance().getSQLResult(sql);
+
+
+ bigSQLResult.reset();
+
+ while (bigSQLResult.hasNext()){
+
+ byte[] data = bigSQLResult.next();
+ //TODO
+ System.out.println("String :" + new String(data));
+
+ }
+
+
+ MyCatBigSqlResultsCache.getInstance().remove(sql);
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/Mycat-Core/src/test/java/io/mycat/sqlcache/TestHintSqlParser.java b/Mycat-Core/src/test/java/io/mycat/sqlcache/TestHintSqlParser.java
new file mode 100644
index 0000000..a08195c
--- /dev/null
+++ b/Mycat-Core/src/test/java/io/mycat/sqlcache/TestHintSqlParser.java
@@ -0,0 +1,17 @@
+package io.mycat.sqlcache;
+
+import org.junit.Test;
+
+/**
+ * @author zagnix
+ * @create 2017-01-16 18:03
+ */
+
+public class TestHintSqlParser {
+ //TODO 测试case待完善
+ @Test
+ public void testHintSqlParser(){
+ String hintSql = "/*!mycat:cacheable=true cache-time=5000 auto-refresh=true access-count=5000*/select * from table";
+ System.out.println(HintSQLParser.parserHintSQL(hintSql).toString());
+ }
+}
diff --git a/Mycat-Core/src/test/java/io/mycat/sqlcache/testSQLResultCache.java b/Mycat-Core/src/test/java/io/mycat/sqlcache/testSQLResultCache.java
new file mode 100644
index 0000000..d13d6cb
--- /dev/null
+++ b/Mycat-Core/src/test/java/io/mycat/sqlcache/testSQLResultCache.java
@@ -0,0 +1,130 @@
+package io.mycat.sqlcache;
+
+
+import io.mycat.bigmem.console.LocatePolicy;
+import io.mycat.bigmem.sqlcache.BigSQLResult;
+import io.mycat.util.Utils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.common.hash.Hashing.murmur3_32;
+import static org.junit.Assert.*;
+
+//TODO 待完善测试用例
+public class testSQLResultCache {
+ private BigSQLResult sqlResultCache;
+
+ @Test
+ public void simpleTest() throws IOException {
+ for(int i = 1; i <= 2; i++) {
+
+ sqlResultCache = new BigSQLResult(LocatePolicy.Normal, "select * from t",16*1024*1024);
+ assertNotNull(sqlResultCache);
+
+ for(int j = 1; j <= 3; j++) {
+ assertTrue(sqlResultCache.size() == 0L);
+ assertTrue(sqlResultCache.isEmpty());
+
+ assertNull(sqlResultCache.next());
+
+ sqlResultCache.put("hello".getBytes());
+ assertTrue(sqlResultCache.size() == 1L);
+ assertTrue(sqlResultCache.hasNext());
+ assertEquals("hello", new String(sqlResultCache.next()));
+ assertNull(sqlResultCache.next());
+
+ sqlResultCache.put("world".getBytes());
+ sqlResultCache.flush();
+ assertTrue(sqlResultCache.size() == 1L);
+ assertTrue(sqlResultCache.hasNext());
+ assertEquals("world", new String(sqlResultCache.next()));
+ assertNull(sqlResultCache.next());
+ }
+ sqlResultCache.recycle();
+ }
+ }
+
+
+ @Test
+ public void testSQLResultCache(){
+ long ROWS = 10000;
+ //long ROWS = 10000*10000;
+ //long ROWS = 100000*100000;
+ Map sqlResultCacheMap = new HashMap();
+
+ String sql = "select * from table1";
+ String sqlkey = ""+murmur3_32().hashUnencodedChars(sql);
+
+ /**
+ * sql results back list
+ */
+
+ ArrayList backList = new ArrayList();
+
+
+ /**
+ * 使用内存映射Cache,存放SQL结果集
+ */
+
+ BigSQLResult sqlResultCache
+ = new BigSQLResult(LocatePolicy.Normal,sqlkey,16*1024*1024);
+
+ for (int i = 0; i < ROWS ; i++) {
+ byte[] rows = Utils.randomString(1024).getBytes();
+ backList.add(rows);
+
+ /**
+ * 使用内存映射Cache,存放SQL结果集
+ */
+ sqlResultCache.put(rows);
+ }
+ sqlResultCacheMap.put(sqlkey,sqlResultCache);
+
+
+
+ /**
+ * 验证内存映射Cache,存放SQL结果集
+ */
+ BigSQLResult sqlResCache = sqlResultCacheMap.get(sqlkey);
+
+ Assert.assertEquals(backList.size(),sqlResCache.size());
+ for (int i = 0; i