diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b2223f9d642ceac3f1e0ec0c3fab6a4a7908d6c8..85415fbe34f01c6bfce726a005eed7f760b7ebe2 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -102,7 +102,7 @@ typedef struct { STqExecHandle execHandle; // exec SRpcMsg* msg; int32_t noDataPollCnt; - int8_t sendRsp; + int8_t exec; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 128f83bf53d545bedaf5eb01cfd7d1a7e8bc80a4..32819a5924f5a6eba66903d6c9c82ba24be2a9e6 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -163,7 +163,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } static bool isHandleExecuting(STqHandle* pHandle){ - return 0 == atomic_load_8(&pHandle->sendRsp); + return 1 == atomic_load_8(&pHandle->exec); } static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, @@ -185,7 +185,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInfo("sub is executing, pHandle:%p", pHandle); taosMsleep(5); } - atomic_store_8(&pHandle->sendRsp, 0); + atomic_store_8(&pHandle->exec, 1); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -203,7 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); return code; } else{ @@ -224,7 +224,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); return code; } @@ -248,7 +248,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqInfo("sub is executing, pHandle:%p", pHandle); taosMsleep(5); } - atomic_store_8(&pHandle->sendRsp, 0); + atomic_store_8(&pHandle->exec, 1); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { @@ -345,7 +345,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } end: - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead);