diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c10b7fc51fd2190bdbfb913c785365aac8da895a..4048ebe3f9ef844e4905ee06f4551c316dd49220 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -17,20 +17,16 @@ #include "vnd.h" int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - - taosWLockLatch(&pTq->lock); - if (taosHashGetSize(pTq->pPushMgr) > 0) { - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH}; - msg.pCont = rpcMallocCont(sizeof(SMsgHead)); - msg.contLen = sizeof(SMsgHead); - SMsgHead *pHead = msg.pCont; - pHead->vgId = vgId; - pHead->contLen = msg.contLen; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + if (taosHashGetSize(pTq->pPushMgr) <= 0) { + return 0; } - // unlock - taosWUnLockLatch(&pTq->lock); + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH}; + msg.pCont = rpcMallocCont(sizeof(SMsgHead)); + msg.contLen = sizeof(SMsgHead); + SMsgHead *pHead = msg.pCont; + pHead->vgId = TD_VID(pTq->pVnode); + pHead->contLen = msg.contLen; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); return 0; }