提交 b2fd1ff8 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req

...@@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); ...@@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @param tinfo qhandle * @param tinfo qhandle
* @return * @return
*/ */
int32_t qAsyncKillTask(qTaskInfo_t tinfo); int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
/** /**
* destroy query info structure * destroy query info structure
......
...@@ -260,7 +260,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -260,7 +260,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
......
...@@ -343,6 +343,7 @@ int32_t* taosGetErrno(); ...@@ -343,6 +343,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526) #define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526)
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527) #define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527)
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
...@@ -241,7 +241,8 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe ...@@ -241,7 +241,8 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
static bool rpcRfp(int32_t code, tmsg_t msgType) { static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_VND_STOPPED) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) { msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
......
...@@ -781,7 +781,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, ...@@ -781,7 +781,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
......
...@@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
...@@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { ...@@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
tsem_post(&pExchangeInfo->ready); tsem_post(&pExchangeInfo->ready);
...@@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
......
...@@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { ...@@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWUnLockLatch(&pTaskInfo->stopInfo.lock); taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
} }
int32_t qAsyncKillTask(qTaskInfo_t qinfo) { int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
...@@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { ...@@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo); setTaskKilled(pTaskInfo, rspCode);
qStopTaskOperators(pTaskInfo); qStopTaskOperators(pTaskInfo);
......
...@@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB ...@@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
} }
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived return (0 != pTaskInfo->code) ? true : false;
// abort current query execution.
if (pTaskInfo->owner != 0 &&
((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
/*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
assert(pTaskInfo->cost.start != 0);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
// return true;
}
return false;
} }
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
......
...@@ -629,7 +629,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -629,7 +629,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
// process this data block based on the probabilities // process this data block based on the probabilities
...@@ -2032,7 +2032,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -2032,7 +2032,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
int32_t rows = 0; int32_t rows = 0;
...@@ -2529,7 +2529,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2529,7 +2529,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
// process this data block based on the probabilities // process this data block based on the probabilities
......
...@@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); ...@@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF); int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
......
...@@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { ...@@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
} }
} }
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDebug("start to kill task"); qDebug("start to kill task");
code = qAsyncKillTask(taskHandle); code = qAsyncKillTask(taskHandle, rspCode);
atomic_store_ptr(&ctx->taskHandle, taskHandle); atomic_store_ptr(&ctx->taskHandle, taskHandle);
} }
......
...@@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
...@@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
...@@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_CQUERY: { case QW_PHASE_PRE_CQUERY: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (ctx->rspCode) { if (ctx->rspCode) {
...@@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
break; break;
...@@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
...@@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (ctx->rspCode) { if (ctx->rspCode) {
...@@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx)); QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else { } else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
...@@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
if (!dropped) { if (!dropped) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} }
...@@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { ...@@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx); qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} }
......
...@@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { ...@@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
return 0; return 0;
} }
int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; } int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; }
void qwtDestroyTask(qTaskInfo_t qHandle) {} void qwtDestroyTask(qTaskInfo_t qHandle) {}
......
...@@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) ...@@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
int64_t leftTime = tsMaxRetryWaitTime - lastTime; int64_t leftTime = tsMaxRetryWaitTime - lastTime;
pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs; pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
pCtx->roundTimes = 0;
goto _return; goto _return;
} }
...@@ -407,7 +409,7 @@ _return: ...@@ -407,7 +409,7 @@ _return:
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
if (NULL == pData) { if (NULL == pData) {
pTask->retryTimes = 0; pTask->retryTimes = 0;
...@@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData && pData->pEpSet) { if (pData && pData->pEpSet) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) { } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr); SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
} else { } else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; SCH_SWITCH_EPSET(addr);
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
addr->epSet.numOfEps, pEp->fqdn, pEp->port);
} }
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
......
...@@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) { code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_VND_STOPPED) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
......
...@@ -321,6 +321,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already ...@@ -321,6 +321,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册