未验证 提交 2ba941c8 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1814 from taosdata/enhance/tqueue

use semphore instead of polling for tqeueu
...@@ -74,8 +74,10 @@ void dnodeCleanupRead() { ...@@ -74,8 +74,10 @@ void dnodeCleanupRead() {
for (int i=0; i < readPool.max; ++i) { for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
}
} }
taosCloseQset(readQset); taosCloseQset(readQset);
...@@ -114,12 +116,12 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -114,12 +116,12 @@ void dnodeRead(SRpcMsg *pMsg) {
pRead->pCont = pCont; pRead->pCont = pCont;
pRead->contLen = pHead->contLen; pRead->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// next vnode // next vnode
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
queuedMsgNum++; queuedMsgNum++;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
......
...@@ -71,7 +71,10 @@ void dnodeCleanupWrite() { ...@@ -71,7 +71,10 @@ void dnodeCleanupWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset);
} }
} }
......
...@@ -1364,7 +1364,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1364,7 +1364,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
} }
} }
} else { } else {
tTrace("%s %p, auth spi not matched, msg discarded", pRpc->label, pConn); tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi);
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} }
......
...@@ -41,6 +41,7 @@ typedef struct _taos_qset { ...@@ -41,6 +41,7 @@ typedef struct _taos_qset {
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfQueues; int32_t numOfQueues;
int32_t numOfItems; int32_t numOfItems;
tsem_t sem;
} STaosQset; } STaosQset;
typedef struct _taos_qall { typedef struct _taos_qall {
...@@ -59,6 +60,7 @@ taos_queue taosOpenQueue() { ...@@ -59,6 +60,7 @@ taos_queue taosOpenQueue() {
} }
pthread_mutex_init(&queue->mutex, NULL); pthread_mutex_init(&queue->mutex, NULL);
return queue; return queue;
} }
...@@ -79,7 +81,7 @@ void taosCloseQueue(taos_queue param) { ...@@ -79,7 +81,7 @@ void taosCloseQueue(taos_queue param) {
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
pthread_mutex_destroy(&queue->mutex);
free(queue); free(queue);
} }
...@@ -116,11 +118,12 @@ int taosWriteQitem(taos_queue param, int type, void *item) { ...@@ -116,11 +118,12 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (queue->qset) tsem_post(&queue->qset->sem);
return 0; return 0;
} }
...@@ -217,12 +220,15 @@ taos_qset taosOpenQset() { ...@@ -217,12 +220,15 @@ taos_qset taosOpenQset() {
} }
pthread_mutex_init(&qset->mutex, NULL); pthread_mutex_init(&qset->mutex, NULL);
tsem_init(&qset->sem, 0, 0);
return qset; return qset;
} }
void taosCloseQset(taos_qset param) { void taosCloseQset(taos_qset param) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
pthread_mutex_destroy(&qset->mutex);
tsem_destroy(&qset->sem);
free(qset); free(qset);
} }
...@@ -298,6 +304,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand ...@@ -298,6 +304,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
for(int i=0; i<qset->numOfQueues; ++i) { for(int i=0; i<qset->numOfQueues; ++i) {
...@@ -339,6 +347,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { ...@@ -339,6 +347,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
STaosQall *qall = (STaosQall *)p2; STaosQall *qall = (STaosQall *)p2;
int code = 0; int code = 0;
tsem_wait(&qset->sem);
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
for(int i=0; i<qset->numOfQueues; ++i) { for(int i=0; i<qset->numOfQueues; ++i) {
...@@ -364,6 +373,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { ...@@ -364,6 +373,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int j=1; j<qall->numOfItems; ++j) tsem_wait(&qset->sem);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册