未验证 提交 14a1f4e3 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18074 from taosdata/fix/TD-20300

fix: fix query thread quit issue
......@@ -94,6 +94,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
void qWorkerStopAllTasks(void *qWorkerMgmt);
void qWorkerDestroy(void **qWorkerMgmt);
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
......
......@@ -385,6 +385,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x072D)
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x072E)
#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F)
#define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
......
......@@ -77,6 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryPreClose(SVnode *pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
......
......@@ -242,7 +242,10 @@ _err:
return NULL;
}
void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); }
void vnodePreClose(SVnode *pVnode) {
vnodeQueryPreClose(pVnode);
vnodeSyncPreClose(pVnode);
}
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
......
......@@ -28,6 +28,8 @@ int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
}
void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); }
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
......
......@@ -153,6 +153,16 @@ typedef struct {
SSchemaWrapper* qsw;
} SSchemaInfo;
typedef struct {
int32_t operatorType;
int64_t refId;
} SExchangeOpStopInfo;
typedef struct {
SRWLatch lock;
SArray* pStopInfo;
} STaskStopInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
......@@ -171,6 +181,7 @@ typedef struct SExecTaskInfo {
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
STaskStopInfo stopInfo;
} SExecTaskInfo;
enum {
......@@ -1050,6 +1061,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo);
#ifdef __cplusplus
}
......
......@@ -65,6 +65,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
while (1) {
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
......@@ -286,6 +289,9 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
qAppendTaskStopInfo(pTaskInfo, &stopInfo);
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
......@@ -543,6 +549,10 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS;
}
......@@ -562,6 +572,9 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
......
......@@ -659,6 +659,54 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
return pTaskInfo->code;
}
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
taosWLockLatch(&pTaskInfo->stopInfo.lock);
taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
return TSDB_CODE_SUCCESS;
}
int32_t stopInfoComp(void const* lp, void const* rp) {
SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
if (key->refId < pInfo->refId) {
return -1;
} else if (key->refId > pInfo->refId) {
return 1;
}
return 0;
}
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
taosWLockLatch(&pTaskInfo->stopInfo.lock);
int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
if (idx >= 0) {
taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
}
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
return;
}
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWLockLatch(&pTaskInfo->stopInfo.lock);
int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
for (int32_t i = 0; i < num; ++i) {
SExchangeOpStopInfo *pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
if (pExchangeInfo) {
tsem_post(&pExchangeInfo->ready);
taosReleaseRef(exchangeObjRefPool, pStop->refId);
}
}
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
......@@ -667,7 +715,11 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
}
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo);
qStopTaskOperators(pTaskInfo);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2597,6 +2597,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate();
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
......@@ -3210,6 +3211,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
}
taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);
......
......@@ -246,7 +246,7 @@ typedef struct SQWorkerMgmt {
#define QW_ERR_RET(c) \
do { \
int32_t _code = c; \
int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
......@@ -254,7 +254,7 @@ typedef struct SQWorkerMgmt {
} while (0)
#define QW_RET(c) \
do { \
int32_t _code = c; \
int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
......@@ -262,7 +262,7 @@ typedef struct SQWorkerMgmt {
} while (0)
#define QW_ERR_JRET(c) \
do { \
code = c; \
code = (c); \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
......
......@@ -91,11 +91,53 @@ _return:
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
QW_LOCK(QW_READ, &sch->tasksLock);
QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs,
taosHashGetSize(sch->tasksHash));
int32_t taskNum = taosHashGetSize(sch->tasksHash);
QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);
uint64_t qId, tId;
int32_t eId;
SQWTaskStatus *pTask = NULL;
void *pIter = taosHashIterate(sch->tasksHash, NULL);
while (pIter) {
pTask = (SQWTaskStatus *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status);
pIter = taosHashIterate(sch->tasksHash, pIter);
}
QW_UNLOCK(QW_READ, &sch->tasksLock);
}
void qwDbgDumpTasksInfo(SQWorker *mgmt) {
QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));
int32_t i = 0;
SQWTaskCtx *ctx = NULL;
uint64_t qId, tId;
int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, "
"execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
}
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
if (!gQWDebug.dumpEnable) {
return;
......@@ -120,7 +162,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_UNLOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));
qwDbgDumpTasksInfo(mgmt);
}
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
......
......@@ -281,9 +281,11 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) {
int32_t code = 0;
// Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDebug("start to kill task");
code = qAsyncKillTask(taskHandle);
atomic_store_ptr(&ctx->taskHandle, taskHandle);
}
......
......@@ -683,6 +683,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
bool queryStop = false;
do {
ctx = NULL;
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
......@@ -1162,6 +1164,41 @@ _return:
QW_RET(code);
}
void qWorkerStopAllTasks(void *qWorkerMgmt) {
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
uint64_t qId, tId;
int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx);
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
}
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
return;
......
......@@ -377,6 +377,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in i
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册