提交 c158087d 编写于 作者: H Haojun Liao

fix(tmq): kill the ongoing tsdb scans while transferring the ownership of vnode to other consumers.

上级 5c128e22
......@@ -149,7 +149,6 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle
* @return
*/
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
......@@ -162,6 +161,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
bool qTaskIsExecuting(qTaskInfo_t qinfo);
......@@ -171,14 +171,6 @@ bool qTaskIsExecuting(qTaskInfo_t qinfo);
*/
void qDestroyTask(qTaskInfo_t tinfo);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
*
* @param iter the table iterator to traverse all tables belongs to a super table, or an invert index
* @return
*/
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
......
......@@ -888,7 +888,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) {
// qAsyncKillTask(pTaskInfo);
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
}
taosWLockLatch(&pTq->lock);
......@@ -904,11 +904,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
qStreamCloseTsdbReader(pTaskInfo);
}
// reset the error code.
if (pHandle->execHandle.task != NULL) {
}
taosWUnLockLatch(&pTq->lock);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg);
......
......@@ -749,6 +749,23 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
return TSDB_CODE_SUCCESS;
}
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, rspCode);
while(qTaskIsExecuting(pTaskInfo)) {
taosMsleep(10);
}
pTaskInfo->code = rspCode;
return TSDB_CODE_SUCCESS;
}
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (NULL == pTaskInfo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册