未验证 提交 bf840551 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21289 from taosdata/fix/liaohj_main

fix(tmq): check handle status before close sub.
...@@ -185,6 +185,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq); ...@@ -185,6 +185,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
bool tqIsHandleExecuting(STqHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -379,15 +379,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -379,15 +379,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
int32_t code = 0; int32_t code = 0;
// taosWLockLatch(&pTq->lock);
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
// if (code != 0) {
// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
// }
// taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) { if (pHandle) {
...@@ -395,6 +390,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -395,6 +390,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->pRef) { if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
} }
while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
......
...@@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -162,9 +162,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static bool isHandleExecuting(STqHandle* pHandle){ bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
return 1 == atomic_load_8(&pHandle->exec);
}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) { SRpcMsg* pMsg, STqOffsetVal* pOffset) {
...@@ -174,15 +172,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -174,15 +172,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
while(isHandleExecuting(pHandle)){ while(tqIsHandleExecuting(pHandle)){
tqInfo("sub is executing, pHandle:%p", pHandle); tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5); taosMsleep(5);
} }
atomic_store_8(&pHandle->exec, 1); atomic_store_8(&pHandle->exec, 1);
...@@ -211,9 +203,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -211,9 +203,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
} }
} }
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already. // NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
end: end:
{ {
...@@ -221,7 +212,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -221,7 +212,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tFormatOffset(buf, 80, &dataRsp.rspOffset); tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
// taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp); tDeleteSMqDataRsp(&dataRsp);
} }
atomic_store_8(&pHandle->exec, 0); atomic_store_8(&pHandle->exec, 0);
...@@ -237,17 +227,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -237,17 +227,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest); tqInitTaosxRsp(&taosxRsp, pRequest);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
while(isHandleExecuting(pHandle)){ while(tqIsHandleExecuting(pHandle)){
tqInfo("sub is executing, pHandle:%p", pHandle); tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5); taosMsleep(5);
} }
atomic_store_8(&pHandle->exec, 1); atomic_store_8(&pHandle->exec, 1);
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
...@@ -274,7 +259,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -274,7 +259,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
} }
if (offset->type == TMQ_OFFSET__LOG) { if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset); verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1; int64_t fetchVer = offset->version + 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册