提交 aaa588a6 编写于 作者: D dapan1121

fix: fix stmt parser crash issue

上级 2e953bc7
......@@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.lockEnable = true};
SCtgDebug gCTGDebug = {0};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
......
......@@ -1537,6 +1537,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
&context.pTableBlockHashObj);
if (NULL == context.pVgroupsHashObj) {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
if (NULL == context.pTableBlockHashObj) {
context.pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
context.pTableBlockHashObj =
......
......@@ -61,6 +61,8 @@ typedef enum {
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_ASYNC_LAUNCH_TASK 0
typedef struct SSchDebug {
bool lockEnable;
bool apiEnable;
......@@ -281,6 +283,11 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
typedef struct SSchTaskCtx {
SSchJob *pJob;
SSchTask *pTask;
} SSchTaskCtx;
extern SSchedulerMgmt schMgmt;
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
......@@ -428,7 +435,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchFetchTask(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
......
......@@ -54,7 +54,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
sum += pTask->plan->execNodeStat.tableNum;
}
if (sum < schMgmt.cfg.maxNodeTableNum) {
if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) {
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
return TSDB_CODE_SUCCESS;
}
......@@ -230,7 +230,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) {
......
......@@ -819,7 +819,10 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = pCtx->pJob;
SSchTask *pTask = pCtx->pTask;
int8_t status = 0;
int32_t code = 0;
......@@ -834,12 +837,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
......@@ -850,19 +853,51 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_RET(code);
SCH_ERR_JRET(code);
} else {
SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
}
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
_return:
taosMemoryFree(param);
#if SCH_ASYNC_LAUNCH_TASK
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
if (code) {
code = schHandleJobFailure(pJob, code);
}
#endif
SCH_RET(code);
}
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
if (NULL == param) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
param->pJob = pJob;
param->pTask = pTask;
#if SCH_ASYNC_LAUNCH_TASK
taosAsyncExec(schLaunchTaskImpl, param, NULL);
#else
SCH_ERR_RET(schLaunchTaskImpl(param));
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -878,10 +913,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
if (enough) {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
}
} else {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
}
return TSDB_CODE_SUCCESS;
......
......@@ -47,7 +47,7 @@ STaosError errors[] = {
// rpc
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Redirect")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Database not ready, need retry")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册