未验证 提交 2fb3c491 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #14909 from taosdata/enh/schPolicy

enh: add schedule policy
...@@ -25,11 +25,6 @@ extern "C" { ...@@ -25,11 +25,6 @@ extern "C" {
extern tsem_t schdRspSem; extern tsem_t schdRspSem;
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
int32_t maxNodeTableNum;
} SSchedulerCfg;
typedef struct SQueryProfileSummary { typedef struct SQueryProfileSummary {
int64_t startTs; // Object created and added into the message queue int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed int64_t endTs; // the timestamp when the task is completed
...@@ -84,7 +79,7 @@ typedef struct SSchedulerReq { ...@@ -84,7 +79,7 @@ typedef struct SSchedulerReq {
} SSchedulerReq; } SSchedulerReq;
int32_t schedulerInit(SSchedulerCfg *cfg); int32_t schedulerInit(void);
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob); int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob);
...@@ -96,6 +91,8 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); ...@@ -96,6 +91,8 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
void schedulerStopQueryHb(void *pTrans); void schedulerStopQueryHb(void *pTrans);
int32_t schedulerUpdatePolicy(int32_t policy);
int32_t schedulerEnableReSchedule(bool enableResche);
/** /**
* Cancel query job * Cancel query job
......
...@@ -363,8 +363,7 @@ void taos_init_imp(void) { ...@@ -363,8 +363,7 @@ void taos_init_imp(void) {
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
catalogInit(&cfg); catalogInit(&cfg);
SSchedulerCfg scfg = {.maxJobNum = 100}; schedulerInit();
schedulerInit(&scfg);
tscDebug("starting to initialize TAOS driver"); tscDebug("starting to initialize TAOS driver");
taosSetCoreDump(true); taosSetCoreDump(true);
......
...@@ -834,6 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -834,6 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
schedulerFreeJob(&pRequest->body.queryJob, 0);
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
return; return;
} }
......
...@@ -131,6 +131,7 @@ void taos_close(TAOS *taos) { ...@@ -131,6 +131,7 @@ void taos_close(TAOS *taos) {
STscObj *pObj = acquireTscObj(*(int64_t *)taos); STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) { if (NULL == pObj) {
taosMemoryFree(taos);
return; return;
} }
......
...@@ -8,9 +8,9 @@ target_include_directories( ...@@ -8,9 +8,9 @@ target_include_directories(
target_link_libraries( target_link_libraries(
command command
PRIVATE os util nodes catalog function transport qcom PRIVATE os util nodes catalog function transport qcom scheduler
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
\ No newline at end of file
...@@ -77,6 +77,10 @@ extern "C" { ...@@ -77,6 +77,10 @@ extern "C" {
#define EXPLAIN_MODE_FORMAT "mode=%s" #define EXPLAIN_MODE_FORMAT "mode=%s"
#define EXPLAIN_STRING_TYPE_FORMAT "%s" #define EXPLAIN_STRING_TYPE_FORMAT "%s"
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
#define COMMAND_ENABLE_RESCHEDULE "enableReSchedule"
typedef struct SExplainGroup { typedef struct SExplainGroup {
int32_t nodeNum; int32_t nodeNum;
int32_t physiPlanExecNum; int32_t physiPlanExecNum;
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#include "catalog.h" #include "catalog.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "commandInt.h"
#include "scheduler.h"
extern SConfig* tsCfg; extern SConfig* tsCfg;
...@@ -479,7 +481,42 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR ...@@ -479,7 +481,42 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableR
return execShowCreateTable(pStmt, pRsp); return execShowCreateTable(pStmt, pRsp);
} }
static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
int32_t code = 0;
if (0 == strcasecmp(cmd, COMMAND_RESET_LOG)) {
taosResetLog();
cfgDumpCfg(tsCfg, 0, false);
} else if (0 == strcasecmp(cmd, COMMAND_SCHEDULE_POLICY)) {
code = schedulerUpdatePolicy(atoi(value));
} else if (0 == strcasecmp(cmd, COMMAND_ENABLE_RESCHEDULE)) {
code = schedulerEnableReSchedule(atoi(value));
} else {
goto _return;
}
*processed = true;
_return:
if (code) {
terrno = code;
}
return code;
}
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
bool processed = false;
if (execAlterCmd(pStmt->config, pStmt->value, &processed)) {
return terrno;
}
if (processed) {
goto _return;
}
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) { if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
return terrno; return terrno;
} }
...@@ -488,6 +525,8 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { ...@@ -488,6 +525,8 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
return terrno; return terrno;
} }
_return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -388,6 +388,11 @@ static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode* ...@@ -388,6 +388,11 @@ static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode*
static void destroyExprNode(SExprNode* pExpr) { taosArrayDestroy(pExpr->pAssociation); } static void destroyExprNode(SExprNode* pExpr) { taosArrayDestroy(pExpr->pAssociation); }
static void nodesDestroyNodePointer(void* node) {
SNode* pNode = *(SNode**)node;
nodesDestroyNode(pNode);
}
void nodesDestroyNode(SNode* pNode) { void nodesDestroyNode(SNode* pNode) {
if (NULL == pNode) { if (NULL == pNode) {
return; return;
...@@ -718,6 +723,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -718,6 +723,7 @@ void nodesDestroyNode(SNode* pNode) {
} }
taosArrayDestroy(pQuery->pDbList); taosArrayDestroy(pQuery->pDbList);
taosArrayDestroy(pQuery->pTableList); taosArrayDestroy(pQuery->pTableList);
taosArrayDestroyEx(pQuery->pPlaceholderValues, nodesDestroyNodePointer);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_SCAN: { case QUERY_NODE_LOGIC_PLAN_SCAN: {
......
...@@ -378,6 +378,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt); ...@@ -378,6 +378,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
int32_t qwAddTaskCtx(QW_FPARAMS_DEF); int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF);
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true}; SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
...@@ -147,9 +147,9 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int ...@@ -147,9 +147,9 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
if (gQWDebug.tmp) { if (gQWDebug.tmp) {
if (TDMT_SCH_QUERY == qwMsg->msgType) { if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
SEpSet epSet = {0}; SEpSet epSet = {0};
epSet.inUse = 1; epSet.inUse = 1;
epSet.numOfEps = 3; epSet.numOfEps = 3;
...@@ -159,16 +159,15 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { ...@@ -159,16 +159,15 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
epSet.eps[1].port = 7200; epSet.eps[1].port = 7200;
strcpy(epSet.eps[2].fqdn, "localhost"); strcpy(epSet.eps[2].fqdn, "localhost");
epSet.eps[2].port = 7300; epSet.eps[2].port = 7300;
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
ctx->phase = QW_PHASE_POST_QUERY; ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
......
...@@ -315,10 +315,10 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -315,10 +315,10 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = msg->refId; int64_t rId = msg->refId;
int32_t eId = msg->execId; int32_t eId = msg->execId;
SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; SQWMsg qwMsg = {.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg)); QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle); QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -469,7 +469,7 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { ...@@ -469,7 +469,7 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
} }
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
...@@ -488,6 +488,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -488,6 +488,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
qwDbgResponseRedirect(qwMsg, ctx);
_return: _return:
if (ctx) { if (ctx) {
......
...@@ -28,15 +28,6 @@ extern "C" { ...@@ -28,15 +28,6 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
#include "command.h" #include "command.h"
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum { enum {
SCH_READ = 1, SCH_READ = 1,
SCH_WRITE, SCH_WRITE,
...@@ -54,6 +45,24 @@ typedef enum { ...@@ -54,6 +45,24 @@ typedef enum {
SCH_OP_GET_STATUS, SCH_OP_GET_STATUS,
} SCH_OP_TYPE; } SCH_OP_TYPE;
typedef enum {
SCH_LOAD_SEQ = 1,
SCH_RANDOM,
SCH_ALL,
} SCH_POLICY;
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
typedef struct SSchDebug { typedef struct SSchDebug {
bool lockEnable; bool lockEnable;
bool apiEnable; bool apiEnable;
...@@ -126,6 +135,13 @@ typedef struct SSchStatusFps { ...@@ -126,6 +135,13 @@ typedef struct SSchStatusFps {
schStatusEventFp eventFp; schStatusEventFp eventFp;
} SSchStatusFps; } SSchStatusFps;
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
int32_t maxNodeTableNum;
SCH_POLICY schPolicy;
bool enableReSchedule;
} SSchedulerCfg;
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId uint64_t sId; // schedulerId
...@@ -184,34 +200,36 @@ typedef struct SSchLevel { ...@@ -184,34 +200,36 @@ typedef struct SSchLevel {
typedef struct SSchTaskProfile { typedef struct SSchTaskProfile {
int64_t startTs; int64_t startTs;
int64_t* execTime; SArray* execTime;
int64_t waitTime; int64_t waitTime;
int64_t endTs; int64_t endTs;
} SSchTaskProfile; } SSchTaskProfile;
typedef struct SSchTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SRWLatch lock; // task reentrant lock SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task may exec times int32_t maxExecTimes; // task max exec times
int32_t execId; // task current execute try index int32_t maxRetryTimes; // task max retry times
SSchLevel *level; // level int32_t retryTimes; // task retry times
SRWLatch planLock; // task update plan lock int32_t execId; // task current execute index
SSubplan *plan; // subplan SSchLevel *level; // level
char *msg; // operator tree SRWLatch planLock; // task update plan lock
int32_t msgLen; // msg length SSubplan *plan; // subplan
int8_t status; // task status char *msg; // operator tree
int32_t lastMsgType; // last sent msg type int32_t msgLen; // msg length
int64_t timeoutUsec; // taks timeout useconds before reschedule int8_t status; // task status
SQueryNodeAddr succeedAddr; // task executed success node address int32_t lastMsgType; // last sent msg type
int8_t candidateIdx; // current try condidation index int64_t timeoutUsec; // task timeout useconds before reschedule
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SQueryNodeAddr succeedAddr; // task executed success node address
SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo int8_t candidateIdx; // current try condidation index
SSchTaskProfile profile; // task execution profile SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
int32_t childReady; // child task ready number SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SSchTaskProfile profile; // task execution profile
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* int32_t childReady; // child task ready number
void* handle; // task send handle SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
bool registerdHb; // registered in hb SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle
bool registerdHb; // registered in hb
} SSchTask; } SSchTask;
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
...@@ -265,7 +283,7 @@ typedef struct SSchJob { ...@@ -265,7 +283,7 @@ typedef struct SSchJob {
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execTime[(_task)->execId % (_task)->maxExecTimes]) > (_task)->timeoutUsec) #define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
...@@ -299,7 +317,6 @@ extern SSchedulerMgmt schMgmt; ...@@ -299,7 +317,6 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_TASK_MAX_EXEC_TIMES(_levelIdx, _levelNum) (SCH_MAX_CANDIDATE_EP_NUM * ((_levelNum) - (_levelIdx)))
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...@@ -321,8 +338,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -321,8 +338,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \ #define SCH_LOG_TASK_START_TS(_task) \
do { \ do { \
int64_t us = taosGetTimestampUs(); \ int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ taosArrayPush((_task)->profile.execTime, &us); \
(_task)->profile.execTime[idx] = us; \
if (0 == (_task)->execId) { \ if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \ (_task)->profile.startTs = us; \
} \ } \
...@@ -331,8 +347,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -331,8 +347,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_WAIT_TS(_task) \ #define SCH_LOG_TASK_WAIT_TS(_task) \
do { \ do { \
int64_t us = taosGetTimestampUs(); \ int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ (_task)->profile.waitTime += us - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
(_task)->profile.waitTime += us - (_task)->profile.execTime[idx]; \
} while (0) } while (0)
...@@ -340,7 +355,8 @@ extern SSchedulerMgmt schMgmt; ...@@ -340,7 +355,8 @@ extern SSchedulerMgmt schMgmt;
do { \ do { \
int64_t us = taosGetTimestampUs(); \ int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us - (_task)->profile.execTime[idx]; \ int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
*startts = us - *startts; \
(_task)->profile.endTs = us; \ (_task)->profile.endTs = us; \
} while (0) } while (0)
...@@ -471,9 +487,11 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask); ...@@ -471,9 +487,11 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync); bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync);
extern SSchDebug gSCHDebug; extern SSchDebug gSCHDebug;
......
...@@ -343,7 +343,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { ...@@ -343,7 +343,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum)); SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask)); SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
...@@ -476,7 +476,7 @@ _return: ...@@ -476,7 +476,7 @@ _return:
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
} }
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
schUpdateJobErrCode(pJob, errCode); schUpdateJobErrCode(pJob, errCode);
int32_t code = atomic_load_32(&pJob->errCode); int32_t code = atomic_load_32(&pJob->errCode);
...@@ -489,21 +489,29 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod ...@@ -489,21 +489,29 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
} }
// Note: no more task error processing, handled in function internal int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode); schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROP, errCode)); SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
return TSDB_CODE_SCH_IGNORE_ERROR;
} }
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
schPostJobRes(pJob, SCH_OP_EXEC); schPostJobRes(pJob, SCH_OP_EXEC);
...@@ -828,7 +836,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int ...@@ -828,7 +836,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
} }
if (errCode) { if (errCode) {
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode); schHandleJobFailure(pJob, errCode);
} }
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
...@@ -907,7 +915,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { ...@@ -907,7 +915,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
} }
if (errCode) { if (errCode) {
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode); schHandleJobFailure(pJob, errCode);
} }
if (pJob) { if (pJob) {
......
...@@ -42,32 +42,47 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -42,32 +42,47 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
taosHashCleanup(pTask->execNodes); taosHashCleanup(pTask->execNodes);
} }
taosMemoryFree(pTask->profile.execTime); taosArrayDestroy(pTask->profile.execTime);
} }
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
pTask->maxRetryTimes = SCH_MAX_CANDIDATE_EP_NUM;
} else {
int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
pTask->maxRetryTimes = TMAX(nodeNum, SCH_MAX_CANDIDATE_EP_NUM);
}
pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
}
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
int32_t code = 0; int32_t code = 0;
pTask->plan = pPlan; pTask->plan = pPlan;
pTask->level = pLevel; pTask->level = pLevel;
pTask->execId = -1; pTask->execId = -1;
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum);
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pTask->taskId = schGenTaskId(); pTask->taskId = schGenTaskId();
pTask->execNodes = pTask->execNodes =
taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t));
schInitTaskRetryTimes(pJob, pTask, pLevel);
pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(pTask->profile.execTime); taosArrayDestroy(pTask->profile.execTime);
taosHashCleanup(pTask->execNodes); taosHashCleanup(pTask->execNodes);
SCH_RET(code); SCH_RET(code);
...@@ -105,7 +120,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ ...@@ -105,7 +120,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
} }
if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) { if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
SCH_TASK_ELOG("fail to remove execId %d from execNodeList", execId); SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
} else { } else {
SCH_TASK_DLOG("execId %d removed from execNodeList", execId); SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
} }
...@@ -235,7 +250,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -235,7 +250,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
if (pTask->level->taskFailed > 0) { if (pTask->level->taskFailed > 0) {
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, NULL)); SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
} else { } else {
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
} }
...@@ -285,6 +300,10 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -285,6 +300,10 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (!schMgmt.cfg.enableReSchedule) {
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -304,13 +323,17 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -304,13 +323,17 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
if ((pTask->execId + 1) >= pTask->maxExecTimes) { SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void *)&rspCode); if (NULL == pData) {
return TSDB_CODE_SUCCESS; pTask->retryTimes = 0;
} }
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId);
schHandleJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS;
}
schDropTaskOnExecNode(pJob, pTask); schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes); taosHashClear(pTask->execNodes);
...@@ -493,9 +516,15 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -493,9 +516,15 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
} }
} }
if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, pTask->maxRetryTimes);
return TSDB_CODE_SUCCESS;
}
if ((pTask->execId + 1) >= pTask->maxExecTimes) { if ((pTask->execId + 1) >= pTask->maxExecTimes) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); SCH_TASK_DLOG("task no more retry since reach max exec times, execId:%d/%d", pTask->execId, pTask->maxExecTimes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -649,10 +678,31 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe ...@@ -649,10 +678,31 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if (++pTask->candidateIdx >= candidateNum) { if (candidateNum <= 1) {
pTask->candidateIdx = 0; goto _return;
}
switch (schMgmt.cfg.schPolicy) {
case SCH_LOAD_SEQ:
case SCH_ALL:
default:
if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0;
}
break;
case SCH_RANDOM: {
int32_t lastIdx = pTask->candidateIdx;
while (lastIdx == pTask->candidateIdx) {
pTask->candidateIdx = taosRand() % candidateNum;
}
break;
}
} }
SCH_TASK_DLOG("switch task candiateIdx to %d", pTask->candidateIdx);
_return:
SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -739,8 +789,9 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { ...@@ -739,8 +789,9 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++; pTask->execId++;
pTask->retryTimes++;
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execId); SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
SCH_LOG_TASK_START_TS(pTask); SCH_LOG_TASK_START_TS(pTask);
......
...@@ -22,26 +22,19 @@ SSchedulerMgmt schMgmt = { ...@@ -22,26 +22,19 @@ SSchedulerMgmt schMgmt = {
.jobRef = -1, .jobRef = -1,
}; };
int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t schedulerInit() {
if (schMgmt.jobRef >= 0) { if (schMgmt.jobRef >= 0) {
qError("scheduler already initialized"); qError("scheduler already initialized");
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
if (cfg) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
schMgmt.cfg = *cfg; schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
schMgmt.cfg.schPolicy = SCHEDULE_DEFAULT_POLICY;
if (schMgmt.cfg.maxJobNum == 0) { schMgmt.cfg.enableReSchedule = true;
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
}
if (schMgmt.cfg.maxNodeTableNum <= 0) {
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
}
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
}
qDebug("schedule policy init to %d", schMgmt.cfg.schPolicy);
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
if (schMgmt.jobRef < 0) { if (schMgmt.jobRef < 0) {
qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum); qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
...@@ -130,6 +123,26 @@ void schedulerStopQueryHb(void *pTrans) { ...@@ -130,6 +123,26 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb(pTrans); schCleanClusterHb(pTrans);
} }
int32_t schedulerUpdatePolicy(int32_t policy) {
switch (policy) {
case SCH_LOAD_SEQ:
case SCH_RANDOM:
case SCH_ALL:
schMgmt.cfg.schPolicy = policy;
qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy);
break;
default:
return TSDB_CODE_TSC_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
int32_t schedulerEnableReSchedule(bool enableResche) {
schMgmt.cfg.enableReSchedule = enableResche;
return TSDB_CODE_SUCCESS;
}
void schedulerFreeJob(int64_t* jobId, int32_t errCode) { void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
if (0 == *jobId) { if (0 == *jobId) {
return; return;
...@@ -141,7 +154,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) { ...@@ -141,7 +154,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
return; return;
} }
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode); schHandleJobDrop(pJob, errCode);
schReleaseJob(*jobId); schReleaseJob(*jobId);
*jobId = 0; *jobId = 0;
......
...@@ -477,7 +477,7 @@ void* schtRunJobThread(void *aa) { ...@@ -477,7 +477,7 @@ void* schtRunJobThread(void *aa) {
schtInitLogFile(); schtInitLogFile();
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit();
assert(code == 0); assert(code == 0);
...@@ -649,7 +649,7 @@ TEST(queryTest, normalCase) { ...@@ -649,7 +649,7 @@ TEST(queryTest, normalCase) {
qnodeAddr.port = 6031; qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr); taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit();
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag); schtBuildQueryDag(&dag);
...@@ -756,7 +756,7 @@ TEST(queryTest, readyFirstCase) { ...@@ -756,7 +756,7 @@ TEST(queryTest, readyFirstCase) {
qnodeAddr.port = 6031; qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr); taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit();
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag); schtBuildQueryDag(&dag);
...@@ -866,7 +866,7 @@ TEST(queryTest, flowCtrlCase) { ...@@ -866,7 +866,7 @@ TEST(queryTest, flowCtrlCase) {
qnodeAddr.port = 6031; qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr); taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit();
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
schtBuildQueryFlowCtrlDag(&dag); schtBuildQueryFlowCtrlDag(&dag);
...@@ -975,7 +975,7 @@ TEST(insertTest, normalCase) { ...@@ -975,7 +975,7 @@ TEST(insertTest, normalCase) {
qnodeAddr.port = 6031; qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr); taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit();
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
schtBuildInsertDag(&dag); schtBuildInsertDag(&dag);
......
...@@ -135,7 +135,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error" ...@@ -135,7 +135,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error"
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table")
// mnode-common // mnode-common
......
...@@ -2685,6 +2685,8 @@ int main(int argc, char *argv[]) ...@@ -2685,6 +2685,8 @@ int main(int argc, char *argv[])
runAll(taos); runAll(taos);
taos_close(taos);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册