diff --git a/cpp/servant/libservant/ServantHandle.cpp b/cpp/servant/libservant/ServantHandle.cpp index 2129b0747..fc0d3ea34 100644 --- a/cpp/servant/libservant/ServantHandle.cpp +++ b/cpp/servant/libservant/ServantHandle.cpp @@ -108,31 +108,24 @@ void ServantHandle::run() void ServantHandle::handleRequest() { bool bYield = false; - - __sync_fetch_and_add(&(_handleGroup->handleCount), 1); - __sync_fetch_and_add(&(_handleGroup->runningCount), 1); - - struct timespec ts; - while (!getEpollServer()->isTerminate()) { bool bServerReqEmpty = false; - uint64_t inow = TNOWMS + 3000; - ts.tv_sec = inow / 1000; - ts.tv_nsec = inow % 1000 * 1000 * 1000; - if (_handleGroup->recvCount <= 0) { - __sync_fetch_and_sub(&(_handleGroup->runningCount), 1); - if(_coroSched->getResponseCoroSize() > 0) - { - bServerReqEmpty = true; - } - else + TC_ThreadLock::Lock lock(_handleGroup->monitor); + + if (allAdapterIsEmpty() && allFilterIsEmpty()) { - sem_timedwait(&(_handleGroup->sem), &ts); + if(_coroSched->getResponseCoroSize() > 0) + { + bServerReqEmpty = true; + } + else + { + _handleGroup->monitor.timedWait(3000); + } } - __sync_fetch_and_add(&(_handleGroup->runningCount), 1); } //上报心跳 diff --git a/cpp/util/include/util/tc_epoll_server.h b/cpp/util/include/util/tc_epoll_server.h index d1d142c40..8cd973509 100644 --- a/cpp/util/include/util/tc_epoll_server.h +++ b/cpp/util/include/util/tc_epoll_server.h @@ -24,7 +24,6 @@ #include #include #include -#include #include "util/tc_epoller.h" #include "util/tc_thread.h" #include "util/tc_clientsocket.h" @@ -37,7 +36,6 @@ #include "util/tc_fifo.h" #include "util/tc_buffer.h" #include "util/tc_buffer_pool.h" -#include "util/tc_lockfree_queue.h" using namespace std; @@ -153,16 +151,8 @@ class TC_EpollServer : public TC_ThreadLock, public TC_HandleBase */ struct HandleGroup : public TC_HandleBase { - HandleGroup():runningCount(0), handleCount(0), recvCount(0) - { - sem_init(&sem,0,0); - } - volatile int runningCount; - volatile int handleCount; - volatile int recvCount; - sem_t sem; - string name; + TC_ThreadLock monitor; vector handles; map adapters; }; @@ -789,7 +779,7 @@ class TC_EpollServer : public TC_ThreadLock, public TC_HandleBase /** * 接收的数据队列 */ - LockFreeQueue _rBufQueue; + recv_queue _rbuffer; /** * 队列最大容量 @@ -1557,7 +1547,7 @@ class TC_EpollServer : public TC_ThreadLock, public TC_HandleBase /** * 发送队列 */ - LockFreeQueue _sBufQueue; + send_queue _sbuffer; /** * BindAdapter是否有udp监听 diff --git a/cpp/util/include/util/tc_lockfree_queue.h b/cpp/util/include/util/tc_lockfree_queue.h deleted file mode 100644 index 90099edcf..000000000 --- a/cpp/util/include/util/tc_lockfree_queue.h +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Tencent is pleased to support the open source community by making Tars available. - * - * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -#ifndef __LOCKFREEQUEUE_H__ -#define __LOCKFREEQUEUE_H__ - -#include - -template -class LockFreeQueue -{ -public: - enum - { - RT_OK = 0, //ɹ - RT_FULL = 1, // - RT_EMPTY = 2, //û - RT_ERR = -1, // - }; - - enum - { - FLAG_NULL = 0, - FLAG_SETTOOK = 1, - FLAG_SETED = 2, - FLAG_GETTOOK = 3, - }; - - LockFreeQueue():_bInit(false), _dataCount(0) - { - } - - /** - * @brief ʼ - * - * @param sizeԪظ - * - */ - int init(const uint32_t size) - { - if (_bInit) - return RT_OK; - - uint32_t totalSize = sizeof(Head) + (size + 1) * sizeof(Entry); - - _head = (Head*)new char[totalSize]; - memset(_head, 0 , totalSize); - - _head->head = _head->tail = 0; - - _head->size = size; - - _bInit = true; - - return RT_OK; - } - - /** - * @brief - * - * @param valvalue - * @param bBlockǷ - * - */ - int enqueue(const item& val, bool bBlock) - { - if (!_bInit) - return RT_ERR; - - bool bsuccess = false; - - uint64_t old_head, old_tail; - do - { - old_head = _head->head; - old_tail = _head->tail; - __sync_synchronize(); - - if(old_head >= old_tail) - { - if (_head->size > (old_head - old_tail)) - { - bsuccess = __sync_bool_compare_and_swap(&_head->head, - old_head, old_head + 1); - } - else - { - if (bBlock) - continue; - else - return RT_FULL; - } - } - } while (false == bsuccess); - - uint64_t pos = old_head % _head->size; - - do - { - bsuccess = __sync_bool_compare_and_swap(&(((Entry*)&(_head[1]))[pos].flag), FLAG_NULL, FLAG_SETTOOK); - }while(false == bsuccess); - - ((Entry*)&(_head[1]))[pos].data = val; - __sync_synchronize(); - - ((Entry*)&(_head[1]))[pos].flag = FLAG_SETED; - - return RT_OK; - } - - /** - * @brief ȡ - * - * @param valvalue - * @param bBlockǷ - * - */ - int dequeue(item& val, bool bBlock) - { - if (!_bInit) - return RT_ERR; - - bool bsuccess = false; - - uint64_t old_head, old_tail; - do - { - old_head = _head->head; - old_tail = _head->tail; - __sync_synchronize(); - - if (old_tail < old_head) - { - bsuccess = __sync_bool_compare_and_swap(&_head->tail, - old_tail, old_tail + 1); - } - else - { - if (bBlock) - continue; - else - return RT_EMPTY; - } - } while (false == bsuccess); - - uint64_t pos = old_tail % _head->size; - - do - { - bsuccess = __sync_bool_compare_and_swap(&(((Entry*)&(_head[1]))[pos].flag), FLAG_SETED, FLAG_GETTOOK); - }while(false == bsuccess); - - val = ((Entry*)&(_head[1]))[pos].data; - __sync_synchronize(); - - ((Entry*)&(_head[1]))[pos].flag = FLAG_NULL; - - return RT_OK; - } - - unsigned int size() const - { - uint64_t old_head, old_tail; - old_head = _head->head; - old_tail = _head->tail; - if(old_head > old_tail) - return old_head - old_tail; - else - return 0; - } - -private: - - struct Entry - { - volatile uint8_t flag; - item data; - }; - - struct Head - { - volatile uint64_t head; - - volatile uint64_t tail; - - uint32_t size; - } *_head; - - //Ƿѳʼ - bool _bInit; - - volatile unsigned int _dataCount; - -}; - - - -#endif diff --git a/cpp/util/src/tc_epoll_server.cpp b/cpp/util/src/tc_epoll_server.cpp index 3c55865aa..40dbd759c 100644 --- a/cpp/util/src/tc_epoll_server.cpp +++ b/cpp/util/src/tc_epoll_server.cpp @@ -134,10 +134,10 @@ TC_EpollServer::HandleGroupPtr& TC_EpollServer::Handle::getHandleGroup() void TC_EpollServer::Handle::notifyFilter() { - if (_handleGroup->runningCount <= _handleGroup->handleCount) - { - sem_post(&_handleGroup->sem); - } + TC_ThreadLock::Lock lock(_handleGroup->monitor); + + //如何做到不唤醒所有handle呢? + _handleGroup->monitor.notifyAll(); } void TC_EpollServer::Handle::setWaitTime(uint32_t iWaitTime) @@ -151,21 +151,15 @@ void TC_EpollServer::Handle::handleImp() { startHandle(); - __sync_fetch_and_add(&(_handleGroup->handleCount), 1); - __sync_fetch_and_add(&(_handleGroup->runningCount), 1); - - struct timespec ts; - while (!getEpollServer()->isTerminate()) { - uint64_t inow = TNOWMS + _iWaitTime; - ts.tv_sec = inow / 1000; - ts.tv_nsec = inow % 1000 * 1000 * 1000; - if (_handleGroup->recvCount <= 0) { - __sync_fetch_and_sub(&(_handleGroup->runningCount), 1); - sem_timedwait(&(_handleGroup->sem), &ts); - __sync_fetch_and_add(&(_handleGroup->runningCount), 1); + TC_ThreadLock::Lock lock(_handleGroup->monitor); + + if (allAdapterIsEmpty() && allFilterIsEmpty()) + { + _handleGroup->monitor.timedWait(_iWaitTime); + } } //上报心跳 @@ -228,7 +222,7 @@ void TC_EpollServer::Handle::handleImp() } } catch (exception &ex) - { + { if(recv) { close(recv->uid, recv->fd); @@ -277,7 +271,6 @@ TC_EpollServer::BindAdapter::BindAdapter(TC_EpollServer *pEpollServer) , _protocolName("tars") , _iBackPacketBuffLimit(0) { - _rBufQueue.init(100000); } TC_EpollServer::BindAdapter::~BindAdapter() @@ -334,59 +327,39 @@ bool TC_EpollServer::BindAdapter::isIpAllow(const string& ip) const void TC_EpollServer::BindAdapter::insertRecvQueue(const recv_queue::queue_type &vtRecvData, bool bPushBack) { - recv_queue::queue_type::const_iterator it = vtRecvData.begin(); - recv_queue::queue_type::const_iterator itEnd = vtRecvData.end(); - for(;it != itEnd;it++) { - if (*it == NULL) - { - _pEpollServer->error("[TC_EpollServer::insertRecvQueue] NULL item!"); - continue; - } - - int ret = _rBufQueue.enqueue(*it, false); - if (ret == LockFreeQueue::RT_OK) + if (bPushBack) { - __sync_fetch_and_add(&(_handleGroup->recvCount), 1); + _rbuffer.push_back(vtRecvData); } else { - _pEpollServer->error("[TC_EpollServer::BindAdapter::insertRecvQueue] queue full!"); - delete *it; + _rbuffer.push_front(vtRecvData); } } - if(_handleGroup->runningCount < _handleGroup->handleCount) - { - sem_post(&(_handleGroup->sem)); - } + TC_ThreadLock::Lock lock(_handleGroup->monitor); + + _handleGroup->monitor.notify(); } bool TC_EpollServer::BindAdapter::waitForRecvQueue(tagRecvData* &recv, uint32_t iWaitTime) { bool bRet = false; - int ret = _rBufQueue.dequeue(recv, false); - if (ret == LockFreeQueue::RT_OK) - { - bRet = true; - __sync_fetch_and_sub(&(_handleGroup->recvCount), 1); - } - else - bRet = false; + bRet = _rbuffer.pop_front(recv, iWaitTime); - if (bRet) + if(!bRet) { - if (!recv) - _pEpollServer->error("[TC_EpollServer::waitForRecvQueue] null recv!"); + return bRet; } - return recv != NULL; + return bRet; } size_t TC_EpollServer::BindAdapter::getRecvBufferSize() const { - return _rBufQueue.size(); + return _rbuffer.size(); } TC_EpollServer* TC_EpollServer::BindAdapter::getEpollServer() @@ -454,7 +427,7 @@ void TC_EpollServer::BindAdapter::setQueueCapacity(int n) int TC_EpollServer::BindAdapter::isOverloadorDiscard() { - int iRecvBufferSize = _rBufQueue.size(); + int iRecvBufferSize = _rbuffer.size(); if(iRecvBufferSize <= (_iQueueCapacity / 2))//未过载 { @@ -1501,8 +1474,6 @@ TC_EpollServer::NetThread::NetThread(TC_EpollServer *epollServer) _shutdown.createSocket(); _notify.createSocket(); - - _sBufQueue.init(100000); } TC_EpollServer::NetThread::~NetThread() @@ -1703,6 +1674,9 @@ void TC_EpollServer::NetThread::terminate() { _bTerminate = true; + //通知队列醒过来 + _sbuffer.notifyT(); + //通知epoll响应, 关闭连接 _epoller.mod(_shutdown.getfd(), H64(ET_CLOSE), EPOLLOUT); } @@ -1920,11 +1894,10 @@ void TC_EpollServer::NetThread::close(uint32_t uid) send->cmd = 'c'; - if(_sBufQueue.enqueue(send, false) == LockFreeQueue::RT_OK) - //通知epoll响应, 关闭连接 - _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); - else - delete send; + _sbuffer.push_back(send); + + //通知epoll响应, 关闭连接 + _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); } void TC_EpollServer::NetThread::send(uint32_t uid, const string &s, const string &ip, uint16_t port) @@ -1946,23 +1919,29 @@ void TC_EpollServer::NetThread::send(uint32_t uid, const string &s, const string send->port = port; - if(_sBufQueue.enqueue(send, false) == LockFreeQueue::RT_OK) - //通知epoll响应, 关闭连接 - _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); - else - delete send; + _sbuffer.push_back(send); + + //通知epoll响应, 有数据要发送 + _epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); } void TC_EpollServer::NetThread::processPipe() { - tagSendData* sendp = NULL; - while(_sBufQueue.dequeue(sendp, false) == LockFreeQueue::RT_OK) + send_queue::queue_type deSendData; + + _sbuffer.swap(deSendData); + + send_queue::queue_type::iterator it = deSendData.begin(); + + send_queue::queue_type::iterator itEnd = deSendData.end(); + + while(it != itEnd) { - switch(sendp->cmd) + switch((*it)->cmd) { case 'c': { - Connection *cPtr = getConnectionPtr(sendp->uid); + Connection *cPtr = getConnectionPtr((*it)->uid); if(cPtr) { @@ -1975,21 +1954,21 @@ void TC_EpollServer::NetThread::processPipe() } case 's': { - Connection *cPtr = getConnectionPtr(sendp->uid); + Connection *cPtr = getConnectionPtr((*it)->uid); if(cPtr) { #if TARS_SSL if (cPtr->getBindAdapter()->getEndpoint().isSSL() && cPtr->_openssl->IsHandshaked()) { - std::string out = cPtr->_openssl->Write(sendp->buffer.data(), sendp->buffer.size()); + std::string out = cPtr->_openssl->Write((*it)->buffer.data(), (*it)->buffer.size()); if (cPtr->_openssl->HasError()) break; // should not happen - sendp->buffer = out; + (*it)->buffer = out; } #endif - int ret = sendBuffer(cPtr, sendp->buffer, sendp->ip, sendp->port); + int ret = sendBuffer(cPtr, (*it)->buffer, (*it)->ip, (*it)->port); if(ret < 0) { @@ -2001,7 +1980,8 @@ void TC_EpollServer::NetThread::processPipe() default: assert(false); } - delete sendp; + delete (*it); + ++it; } } @@ -2118,7 +2098,7 @@ void TC_EpollServer::NetThread::run() } size_t TC_EpollServer::NetThread::getSendRspSize() { - return _sBufQueue.size(); + return _sbuffer.size(); } ////////////////////////////////////////////////////////////// TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum) @@ -2273,8 +2253,9 @@ void TC_EpollServer::stopThread() for (auto& kv : _handleGroups) { { - for (int i = 0; i < kv.second->handleCount; ++i) - sem_post(&kv.second->sem); + TC_ThreadLock::Lock lock(kv.second->monitor); + + kv.second->monitor.notifyAll(); } auto& hds = kv.second->handles; for (auto& handle : hds)