提交 e5834b8e 编写于 作者: D dapan

feature/qnode

上级 633b3de9
......@@ -20,6 +20,73 @@
static SSchedulerMgmt schMgmt = {0};
int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
if (oriStatus == newStatus) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_NOT_START) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_CANCELLED) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_CANCELLED:
case JOB_TASK_STATUS_DROPPING:
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
break;
default:
qError("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
_return:
SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus);
SCH_ERR_RET(code);
}
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
......@@ -365,14 +432,21 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
job->status = JOB_TASK_STATUS_FAILED;
job->errCode = errCode;
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
if (schValidateStatus(pJob, status, JOB_TASK_STATUS_FAILED)) {
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
}
SCH_SET_JOB_STATUS(pJob, JOB_TASK_STATUS_FAILED);
atomic_store_32(&pJob->errCode, errCode);
if (job->userFetch || ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule)) {
tsem_post(&job->rspSem);
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
if (pJob->userFetch || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
tsem_post(&pJob->rspSem);
}
return TSDB_CODE_SUCCESS;
......@@ -670,13 +744,13 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -694,11 +768,13 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo));
return TSDB_CODE_SUCCESS;
_return:
tfree(param);
tfree(pMsgSendInfo);
......@@ -720,35 +796,31 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
SEpSet epSet;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
schConvertAddrToEpSet(addr, &epSet);
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: {
if (NULL == pTask->msg || pTask->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = pTask->msgLen;
msg = pTask->msg;
break;
}
case TDMT_VND_QUERY: {
if (NULL == pTask->msg) {
qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
......@@ -760,32 +832,31 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
break;
}
case TDMT_VND_FETCH: {
if (NULL == pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
......@@ -795,28 +866,25 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId);
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
break;
}
default:
qError("unknown msg type:%d", msgType);
SCH_TASK_ELOG("unknown msg type:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
SEpSet epSet;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
schConvertAddrToEpSet(addr, &epSet);
SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
......@@ -844,7 +912,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status);
SCH_ERR_RET(pJob->errCode);
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
}
SSubplan *plan = pTask->plan;
......@@ -860,9 +928,11 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
// NOTE: race condition: the task should be put into the hash table before send msg to server
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType));
......@@ -1031,7 +1101,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
SSchJob *job = *(SSchJob **)pJob;
pRes->code = job->errCode;
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
return TSDB_CODE_SUCCESS;
......@@ -1061,7 +1131,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
if (job->status == JOB_TASK_STATUS_FAILED) {
job->res = NULL;
SCH_RET(job->errCode);
SCH_RET(atomic_load_32(&job->errCode));
}
if (job->status == JOB_TASK_STATUS_SUCCEED) {
......@@ -1081,7 +1151,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
tsem_wait(&job->rspSem);
if (job->status == JOB_TASK_STATUS_FAILED) {
code = job->errCode;
code = atomic_load_32(&job->errCode);
}
if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册