提交 ed612088 编写于 作者: wmmhello's avatar wmmhello

fix:lost data because tsdbreader or task is killed

上级 d8387b95
...@@ -90,7 +90,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 ...@@ -90,7 +90,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
*/ */
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); //void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
......
...@@ -370,7 +370,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -370,7 +370,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
bool exec = tqIsHandleExec(pHandle); bool exec = tqIsHandleExec(pHandle);
if(!exec) { if(!exec) {
tqSetHandleExec(pHandle); tqSetHandleExec(pHandle);
qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
break; break;
...@@ -572,16 +572,16 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -572,16 +572,16 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// atomic_add_fetch_32(&pHandle->epoch, 1); // atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task // kill executing task
if(tqIsHandleExec(pHandle)) { // if(tqIsHandleExec(pHandle)) {
qTaskInfo_t pTaskInfo = pHandle->execHandle.task; // qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) { // if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
} // }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo); // qStreamCloseTsdbReader(pTaskInfo);
// } // }
} // }
// remove if it has been register in the push manager, and return one empty block to consumer // remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
......
...@@ -180,10 +180,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { ...@@ -180,10 +180,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot); doSetTaskId(pTaskInfo->pRoot);
} }
void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { //void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
SExecTaskInfo* pTaskInfo = tinfo; // SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->code = code; // pTaskInfo->code = code;
} //}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) { if (tinfo == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册