提交 cb0d52d1 编写于 作者: D dapan1121

feature/scheduler

上级 440b0b86
......@@ -165,6 +165,7 @@ const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
char *jobTaskStatusStr(int32_t status);
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name);
......@@ -181,9 +182,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define REQUEST_MAX_RETRY_NUM 3
#define REQUEST_MAX_TRY_TIMES 5
#define qFatal(...) \
do { \
......
......@@ -319,7 +319,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t retryNum = 0;
int32_t code = 0;
while (retryNum++ < REQUEST_MAX_RETRY_NUM) {
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = execQueryImpl(pTscObj, sql, sqlLen);
if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
......
......@@ -162,6 +162,32 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
return TSDB_CODE_SUCCESS;
}
char *jobTaskStatusStr(int32_t status) {
switch (status) {
case JOB_TASK_STATUS_NULL:
return "NULL";
case JOB_TASK_STATUS_NOT_START:
return "NOT_START";
case JOB_TASK_STATUS_EXECUTING:
return "EXECUTING";
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
return "PARTIAL_SUCCEED";
case JOB_TASK_STATUS_SUCCEED:
return "SUCCEED";
case JOB_TASK_STATUS_FAILED:
return "FAILED";
case JOB_TASK_STATUS_CANCELLING:
return "CANCELLING";
case JOB_TASK_STATUS_CANCELLED:
return "CANCELLED";
case JOB_TASK_STATUS_DROPPING:
return "DROPPING";
default:
break;
}
return "UNKNOWN";
}
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) {
SSchema s = {0};
......
......@@ -145,7 +145,6 @@ typedef struct SQWorkerMgmt {
void *timer;
tmr_h hbTimer;
SRWLatch schLock;
//SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj;
......
......@@ -11,6 +11,27 @@
SQWDebug gQWDebug = {0};
char *qwPhaseStr(int32_t phase) {
switch (phase) {
case QW_PHASE_PRE_QUERY:
return "PRE_QUERY";
case QW_PHASE_POST_QUERY:
return "POST_QUERY";
case QW_PHASE_PRE_FETCH:
return "PRE_FETCH";
case QW_PHASE_POST_FETCH:
return "POST_FETCH";
case QW_PHASE_PRE_CQUERY:
return "PRE_CQUERY";
case QW_PHASE_POST_CQUERY:
return "POST_CQUERY";
default:
break;
}
return "UNKNOWN";
}
int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
......@@ -72,7 +93,7 @@ int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
break;
default:
QW_TASK_ELOG("invalid task status:%d", oriStatus);
QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -80,7 +101,7 @@ int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
_return:
QW_TASK_ELOG("invalid task status update from %d to %d", oriStatus, newStatus);
QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
QW_RET(code);
}
......@@ -97,7 +118,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
continue;
}
QW_TASK_DLOG("task status updated from %d to %d", origStatus, status);
QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
break;
}
......@@ -206,16 +227,18 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i
if (rwType && task) {
QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
} else {
QW_TASK_ELOG("task status already exist, id:%s", id);
QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
}
} else {
QW_TASK_ELOG("taosHashPut to tasksHash failed, code:%x", code);
QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));
if (rwType && task) {
QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
}
......@@ -252,10 +275,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
//QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
//QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
}
......@@ -276,32 +297,28 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCtx **ctx) {
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx nctx = {0};
//QW_LOCK(QW_WRITE, &mgmt->ctxLock);
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
if (0 != code) {
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (HASH_NODE_EXIST(code)) {
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
} else if (ctx) {
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
} else {
QW_TASK_ELOG("task ctx already exist, id:%s", id);
QW_TASK_ELOG_E("task ctx already exist");
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
}
} else {
QW_TASK_ELOG("taosHashPut to ctxHash failed, code:%x", code);
QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
......@@ -313,17 +330,15 @@ int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCt
}
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) {
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL));
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL));
}
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx);
return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx);
}
int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx);
return qwAddTaskCtxImpl(QW_FPARAMS(), false, ctx);
}
......@@ -638,7 +653,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
}
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED);
QW_TASK_DLOG_E("task all data fetched, done");
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
}
......@@ -653,9 +668,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
output->needStop = false;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
if (QW_PHASE_PRE_QUERY == phase) {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
......@@ -671,7 +684,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
atomic_store_8(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase);
QW_TASK_ELOG("task already cancelled/dropped at wrong phase %s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
......@@ -705,7 +718,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at wrong phase, code:%x, phase:%d", ctx->rspCode, phase);
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
......@@ -718,46 +731,46 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
case QW_PHASE_PRE_FETCH: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
QW_TASK_ELOG("drop event at wrong phase %s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase);
QW_TASK_ELOG("cancel event at wrong phase %s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch not finished, phase:%d", phase);
QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION;
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
QW_TASK_ELOG("query rsp are not ready, phase:%d", phase);
QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
......@@ -768,14 +781,14 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
atomic_store_8(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
......@@ -810,7 +823,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
......@@ -842,15 +855,15 @@ _return:
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, output->rspCode);
QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode));
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode);
QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode));
}
QW_SCH_TASK_DLOG("end to handle event at phase %d", phase);
QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase));
QW_RET(code);
}
......@@ -865,7 +878,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
output->needStop = false;
......@@ -875,14 +888,14 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
locked = true;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
......@@ -931,7 +944,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
}
if (ctx->rspCode) {
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
......@@ -968,20 +981,20 @@ _return:
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, output->rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode));
}
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, output->rspCode);
QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode));
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode);
QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode));
}
QW_SCH_TASK_DLOG("end to handle event at phase %d", phase);
QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase));
QW_RET(code);
}
......
......@@ -300,7 +300,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql);
tfree(sql);
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
......
......@@ -28,8 +28,7 @@ extern "C" {
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
......@@ -113,6 +112,7 @@ typedef struct SSchTask {
int32_t msgLen; // msg length
int8_t status; // task status
int32_t lastMsgType; // last sent msg type
int32_t tryTimes; // task already tried times
SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
......@@ -176,9 +176,12 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
......@@ -193,6 +196,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
......
......@@ -105,7 +105,7 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
......@@ -118,14 +118,14 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType));
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
default:
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask));
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -193,7 +193,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break;
default:
SCH_JOB_ELOG("invalid job status:%d", oriStatus);
SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -201,7 +201,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
continue;
}
SCH_JOB_DLOG("job status updated from %d to %d", oriStatus, newStatus);
SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
break;
}
......@@ -210,7 +210,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
_return:
SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus);
SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
SCH_ERR_RET(code);
}
......@@ -503,7 +503,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
} else {
SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
}
......@@ -513,7 +513,7 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -532,7 +532,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*moved = false;
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
}
int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
......@@ -540,7 +540,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -558,7 +558,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
}
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
......@@ -566,7 +566,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -583,26 +583,48 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
*needRetry = false;
int8_t status = 0;
++pTask->tryTimes;
if (schJobNeedToStop(pJob, &status)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status));
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes);
return TSDB_CODE_SUCCESS;
}
if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS;
}
//TODO CHECK epList/condidateList
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (SCH_IS_DATA_SRC_TASK(pTask)) {
if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes, SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS;
}
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if ((pTask->candidateIdx + 1) >= candidateNum) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS;
}
++pTask->candidateIdx;
}
*needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS;
}
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
......@@ -613,7 +635,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (SCH_IS_DATA_SRC_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else {
++pTask->candidateIdx;
......@@ -762,7 +784,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status);
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
......@@ -781,7 +803,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
} else {
SCH_TASK_ELOG("task not in executing list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -816,7 +838,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
......@@ -925,7 +947,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status);
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
SCH_RET(atomic_load_32(&pJob->errCode));
}
......@@ -1034,7 +1056,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
default:
SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1414,7 +1436,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status);
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
......@@ -1496,14 +1518,14 @@ int32_t schLaunchJob(SSchJob *pJob) {
void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
if (NULL == pTask->execAddrs) {
SCH_TASK_DLOG("no exec address, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs);
if (size <= 0) {
SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
......@@ -1652,11 +1674,11 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD
*job = pJob->refId;
if (syncSchedule) {
SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob));
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
}
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
schReleaseJob(pJob->refId);
......@@ -1874,13 +1896,13 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status));
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -1892,10 +1914,10 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%d", status);
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
SCH_JOB_DLOG("job already succeed, status:%d", status);
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
......@@ -1906,7 +1928,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
status = SCH_GET_JOB_STATUS(pJob);
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
SCH_JOB_ELOG("job failed or dropping, status:%d", status);
SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册