未验证 提交 6cbfa0c2 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1819 from taosdata/enhance/tqueue

optimize the tqueue code
......@@ -1083,6 +1083,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else {
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort,
......
......@@ -94,8 +94,6 @@ void *taosAllocateQitem(int size) {
void taosFreeQitem(void *param) {
if (param == NULL) return;
uTrace("item:%p is freed", param);
char *temp = (char *)param;
temp -= sizeof(STaosQnode);
free(temp);
......@@ -144,7 +142,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1;
//uTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
uTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
}
pthread_mutex_unlock(&queue->mutex);
......@@ -309,13 +307,12 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
pthread_mutex_lock(&qset->mutex);
for(int i=0; i<qset->numOfQueues; ++i) {
//pthread_mutex_lock(&qset->mutex);
if (qset->current == NULL)
qset->current = qset->head;
STaosQueue *queue = qset->current;
if (queue) qset->current = queue->next;
//pthread_mutex_unlock(&qset->mutex);
if (queue == NULL) break;
if (queue->head == NULL) continue;
pthread_mutex_lock(&queue->mutex);
......@@ -351,13 +348,12 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
pthread_mutex_lock(&qset->mutex);
for(int i=0; i<qset->numOfQueues; ++i) {
// pthread_mutex_lock(&qset->mutex);
if (qset->current == NULL)
qset->current = qset->head;
queue = qset->current;
if (queue) qset->current = queue->next;
// pthread_mutex_unlock(&qset->mutex);
if (queue == NULL) break;
if (queue->head == NULL) continue;
pthread_mutex_lock(&queue->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册