提交 8ee170be 编写于 作者: D dapan1121

enh: add retry for vnode closed case

上级 cc36796f
......@@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @param tinfo qhandle
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
/**
* destroy query info structure
......
......@@ -323,6 +323,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526)
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527)
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -796,7 +796,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
......
......@@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
......@@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo);
setTaskKilled(pTaskInfo, rspCode);
qStopTaskOperators(pTaskInfo);
......
......@@ -611,21 +611,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
}
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
if (pTaskInfo->owner != 0 &&
((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
/*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
assert(pTaskInfo->cost.start != 0);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
// return true;
}
return false;
return (0 != pTaskInfo->code) ? true : false;
}
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
/////////////////////////////////////////////////////////////////////////////////////////////
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
......
......@@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
......
......@@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
}
}
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) {
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
int32_t code = 0;
// Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDebug("start to kill task");
code = qAsyncKillTask(taskHandle);
code = qAsyncKillTask(taskHandle, rspCode);
atomic_store_ptr(&ctx->taskHandle, taskHandle);
}
......
......@@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
......@@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_FETCH: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
......@@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_CQUERY: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (ctx->rspCode) {
......@@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
break;
......@@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
......@@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(ctx->rspCode);
}
if (ctx->rspCode) {
......@@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx));
QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
......@@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (!dropped) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......@@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx);
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......
......@@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
return 0;
}
int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; }
int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; }
void qwtDestroyTask(qTaskInfo_t qHandle) {}
......
......@@ -315,6 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册