From aaa588a62f1fd031c00807192c2bc282a200ae3f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 9 Aug 2022 14:10:14 +0800 Subject: [PATCH] fix: fix stmt parser crash issue --- source/libs/catalog/src/ctgDbg.c | 2 +- source/libs/parser/src/parInsert.c | 7 ++++ source/libs/scheduler/inc/schInt.h | 9 ++++- source/libs/scheduler/src/schFlowCtrl.c | 4 +- source/libs/scheduler/src/schTask.c | 53 ++++++++++++++++++++----- source/util/src/terror.c | 2 +- 6 files changed, 63 insertions(+), 14 deletions(-) diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 8333cb28c0..bd3402dc39 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "catalogInt.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {.lockEnable = true}; +SCtgDebug gCTGDebug = {0}; void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { ASSERT(*(int32_t*)param == 1); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 85f73f0663..fa86cfb5b5 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1537,6 +1537,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache if (pContext->pStmtCb && *pQuery) { (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj); + if (NULL == context.pVgroupsHashObj) { + context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + } + if (NULL == context.pTableBlockHashObj) { + context.pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } } else { context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); context.pTableBlockHashObj = diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 02e878f4f8..1b3d75f33b 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -61,6 +61,8 @@ typedef enum { #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 #define SCH_DEFAULT_MAX_RETRY_NUM 6 +#define SCH_ASYNC_LAUNCH_TASK 0 + typedef struct SSchDebug { bool lockEnable; bool apiEnable; @@ -281,6 +283,11 @@ typedef struct SSchJob { SQueryProfileSummary summary; } SSchJob; +typedef struct SSchTaskCtx { + SSchJob *pJob; + SSchTask *pTask; +} SSchTaskCtx; + extern SSchedulerMgmt schMgmt; #define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec) @@ -428,7 +435,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); -int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); +int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchFetchTask(SSchJob *pJob); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction); diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 6b34a394b6..c5c2bfb2bb 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -54,7 +54,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { sum += pTask->plan->execNodeStat.tableNum; } - if (sum < schMgmt.cfg.maxNodeTableNum) { + if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) { SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum); return TSDB_CODE_SUCCESS; } @@ -230,7 +230,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); - SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask)); remainNum -= pTask->plan->execNodeStat.tableNum; if (remainNum <= 0) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index cabca0dc0c..729dbf7c1f 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -819,7 +819,10 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { return TSDB_CODE_SUCCESS; } -int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { +int32_t schLaunchTaskImpl(void *param) { + SSchTaskCtx *pCtx = (SSchTaskCtx *)param; + SSchJob *pJob = pCtx->pJob; + SSchTask *pTask = pCtx->pTask; int8_t status = 0; int32_t code = 0; @@ -834,12 +837,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status)); - SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } // NOTE: race condition: the task should be put into the hash table before send msg to server if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { - SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); } @@ -850,19 +853,51 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { if (TSDB_CODE_SUCCESS != code) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); - SCH_ERR_RET(code); + SCH_ERR_JRET(code); } else { SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } } - SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); + SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); + SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask)); } - SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + +_return: + + taosMemoryFree(param); + +#if SCH_ASYNC_LAUNCH_TASK + if (code) { + code = schProcessOnTaskFailure(pJob, pTask, code); + } + if (code) { + code = schHandleJobFailure(pJob, code); + } +#endif + + SCH_RET(code); +} + +int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { + + SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx)); + if (NULL == param) { + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + param->pJob = pJob; + param->pTask = pTask; + +#if SCH_ASYNC_LAUNCH_TASK + taosAsyncExec(schLaunchTaskImpl, param, NULL); +#else + SCH_ERR_RET(schLaunchTaskImpl(param)); +#endif return TSDB_CODE_SUCCESS; } @@ -878,10 +913,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); if (enough) { - SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask)); } } else { - SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask)); + SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask)); } return TSDB_CODE_SUCCESS; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3c31c893d1..2be1c9f744 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -47,7 +47,7 @@ STaosError errors[] = { // rpc TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Redirect") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Database not ready, need retry") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") -- GitLab