提交 30556c67 编写于 作者: D dapan1121

feature/qnode

上级 896df2bd
......@@ -113,6 +113,12 @@ typedef struct SSchJob {
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
......
......@@ -100,7 +100,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
pTask->status = JOB_TASK_STATUS_NOT_START;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
return TSDB_CODE_SUCCESS;
......@@ -236,7 +236,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("use execNode from plan as candidate addr");
SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.numOfEps);
return TSDB_CODE_SUCCESS;
}
......@@ -273,13 +273,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId);
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
SCH_TASK_ELOG("task already in exec list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to exec list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks),
pJob->queryId);
SCH_TASK_DLOG("task added to exec list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
return TSDB_CODE_SUCCESS;
}
......@@ -387,7 +393,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
pTask->status = JOB_TASK_STATUS_SUCCEED;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
......@@ -460,10 +466,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
if (!needRetry) {
SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
if (!moved) {
SCH_TASK_ELOG("task may already moved, status:%d", pTask->status);
}
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
if (!moved) {
SCH_TASK_ELOG("task may already moved, status:%d", pTask->status);
}
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
......@@ -476,11 +486,10 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
return TSDB_CODE_SUCCESS;
}
}
pJob->status = JOB_TASK_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));
return TSDB_CODE_SUCCESS;
return errCode;
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
......@@ -706,7 +715,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
}
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
......@@ -714,22 +723,22 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
if (NULL == pTask->msg || pTask->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = task->msgLen;
msg = task->msg;
msgSize = pTask->msgLen;
msg = pTask->msg;
break;
}
case TDMT_VND_QUERY: {
if (NULL == task->msg) {
if (NULL == pTask->msg) {
qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = sizeof(SSubQueryMsg) + task->msgLen;
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
......@@ -738,12 +747,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(task->msgLen);
memcpy(pMsg->msg, task->msg, task->msgLen);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->contentLen = htonl(pTask->msgLen);
memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
break;
}
case TDMT_VND_RES_READY: {
......@@ -756,14 +765,14 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SResReadyMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
break;
}
case TDMT_VND_FETCH: {
if (NULL == task) {
if (NULL == pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
msgSize = sizeof(SResFetchMsg);
......@@ -775,10 +784,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SResFetchMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
break;
}
case TDMT_VND_DROP_TASK:{
......@@ -791,10 +800,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
STaskDropMsg *pMsg = msg;
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
break;
}
default:
......@@ -804,11 +813,11 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
SEpSet epSet;
SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
schConvertAddrToEpSet(addr, &epSet);
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize));
SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
return TSDB_CODE_SUCCESS;
......@@ -818,25 +827,51 @@ _return:
SCH_RET(code);
}
static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (pStatus) {
*pStatus = status;
}
return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED
|| status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
}
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SSubplan *plan = pTask->plan;
int8_t status = 0;
int32_t code = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status);
SCH_ERR_RET(pJob->errCode);
}
SCH_ERR_RET(qSubPlanToString(plan, &pTask->msg, &pTask->msgLen));
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
SSubplan *plan = pTask->plan;
if (NULL == pTask->candidateAddrs || taosArrayGetSize(pTask->candidateAddrs) <= 0) {
SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, pTask->taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
if (NULL == pTask->msg) {
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) {
SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code);
}
}
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
// NOTE: race condition: the task should be put into the hash table before send msg to server
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, plan->msgType));
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
pTask->status = JOB_TASK_STATUS_EXECUTING;
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType));
return TSDB_CODE_SUCCESS;
_return:
code = schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
}
int32_t schLaunchJob(SSchJob *pJob) {
......@@ -936,11 +971,11 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void*
*(SSchJob **)job = pJob;
if (syncSchedule) {
SCH_JOB_DLOG("will wait for rsp now");
SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob));
tsem_wait(&pJob->rspSem);
}
SCH_JOB_DLOG("job exec done");
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册