diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 85415fbe34f01c6bfce726a005eed7f760b7ebe2..dece28de6b3fe135bf6a570fdfc1d927c81843f4 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -185,6 +185,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); +bool tqIsHandleExecuting(STqHandle* pHandle); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 311d637be8c0e6d2936ab167f10571adfc448643..f3da17553b7e2c5a23d72bc822575acdcb7894be 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -379,15 +379,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); int32_t code = 0; -// taosWLockLatch(&pTq->lock); -// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); -// if (code != 0) { -// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); -// } -// taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { @@ -395,6 +390,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pHandle->pRef) { walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); } + + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); + taosMsleep(5); + } + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 32819a5924f5a6eba66903d6c9c82ba24be2a9e6..c5bf4268a77b5ad5ab68b25798b0889ec4b78e92 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -static bool isHandleExecuting(STqHandle* pHandle){ - return 1 == atomic_load_8(&pHandle->exec); -} +bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); } static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { @@ -174,15 +172,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - qTaskInfo_t task = pHandle->execHandle.task; - if(qTaskIsExecuting(task)){ - code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - tDeleteSMqDataRsp(&dataRsp); - return code; - } - while(isHandleExecuting(pHandle)){ - tqInfo("sub is executing, pHandle:%p", pHandle); + while(tqIsHandleExecuting(pHandle)){ + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } atomic_store_8(&pHandle->exec, 1); @@ -211,9 +203,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } } - - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); // NOTE: this pHandle->consumerId may have been changed already. + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); end: { @@ -221,7 +212,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tFormatOffset(buf, 80, &dataRsp.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); -// taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } atomic_store_8(&pHandle->exec, 0); @@ -237,17 +227,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); - qTaskInfo_t task = pHandle->execHandle.task; - if(qTaskIsExecuting(task)){ - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - while(isHandleExecuting(pHandle)){ - tqInfo("sub is executing, pHandle:%p", pHandle); + while(tqIsHandleExecuting(pHandle)){ + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } + atomic_store_8(&pHandle->exec, 1); if (offset->type != TMQ_OFFSET__LOG) { @@ -274,7 +259,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } } - if (offset->type == TMQ_OFFSET__LOG) { verifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1;