提交 cd105381 编写于 作者: wmmhello's avatar wmmhello

fix:[TD-24111]avoid exec pHandle task in multi query thread

上级 a098b53d
...@@ -102,7 +102,7 @@ typedef struct { ...@@ -102,7 +102,7 @@ typedef struct {
STqExecHandle execHandle; // exec STqExecHandle execHandle; // exec
SRpcMsg* msg; SRpcMsg* msg;
int32_t noDataPollCnt; int32_t noDataPollCnt;
int8_t sendRsp; int8_t exec;
} STqHandle; } STqHandle;
typedef struct { typedef struct {
......
...@@ -163,7 +163,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -163,7 +163,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
} }
static bool isHandleExecuting(STqHandle* pHandle){ static bool isHandleExecuting(STqHandle* pHandle){
return 0 == atomic_load_8(&pHandle->sendRsp); return 1 == atomic_load_8(&pHandle->exec);
} }
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
...@@ -185,7 +185,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -185,7 +185,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tqInfo("sub is executing, pHandle:%p", pHandle); tqInfo("sub is executing, pHandle:%p", pHandle);
taosMsleep(5); taosMsleep(5);
} }
atomic_store_8(&pHandle->sendRsp, 0); atomic_store_8(&pHandle->exec, 1);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset); code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
...@@ -203,7 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -203,7 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqRegisterPushHandle(pTq, pHandle, pMsg); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
atomic_store_8(&pHandle->sendRsp, 1); atomic_store_8(&pHandle->exec, 0);
return code; return code;
} }
else{ else{
...@@ -224,7 +224,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -224,7 +224,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// taosWUnLockLatch(&pTq->lock); // taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
} }
atomic_store_8(&pHandle->sendRsp, 1); atomic_store_8(&pHandle->exec, 0);
return code; return code;
} }
...@@ -248,7 +248,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -248,7 +248,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqInfo("sub is executing, pHandle:%p", pHandle); tqInfo("sub is executing, pHandle:%p", pHandle);
taosMsleep(5); taosMsleep(5);
} }
atomic_store_8(&pHandle->sendRsp, 0); atomic_store_8(&pHandle->exec, 1);
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
...@@ -345,7 +345,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -345,7 +345,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
end: end:
atomic_store_8(&pHandle->sendRsp, 1); atomic_store_8(&pHandle->exec, 0);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册