diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aca2729731c87a8aeae69db7a49d284d5a528f27..bccfe2b9e1bc9ae8f80683fba219d80cdc6dbc50 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -325,25 +325,25 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // 2. check re-balance status - taosRLockLatch(&pTq->lock); +// taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - taosRUnLockLatch(&pTq->lock); +// taosRUnLockLatch(&pTq->lock); return -1; } - taosRUnLockLatch(&pTq->lock); +// taosRUnLockLatch(&pTq->lock); // 3. update the epoch value - taosWLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); int32_t savedEpoch = pHandle->epoch; if (savedEpoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); pHandle->epoch = reqEpoch; } - taosWUnLockLatch(&pTq->lock); +// taosWUnLockLatch(&pTq->lock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); @@ -358,12 +358,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); - taosWLockLatch(&pTq->lock); - int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); - } - taosWUnLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); +// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); +// if (code != 0) { +// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); +// } +// taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 81dd8abc3eef2f0105d0e1d04050aa9399c599b1..ab1be15271cdaf87af15131e7d54931ec8afe02b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -254,7 +254,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); // lock - taosWLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -263,12 +263,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - taosWUnLockLatch(&pTq->lock); - return code; - } +// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && +// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { +// //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); +// taosWUnLockLatch(&pTq->lock); +// return code; +// } code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); @@ -281,7 +281,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tFormatOffset(buf, 80, &dataRsp.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - taosWUnLockLatch(&pTq->lock); +// taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } return code;