提交 9b4bda4f 编写于 作者: D dapan

feature/scheduler

上级 6830d866
...@@ -114,18 +114,18 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m ...@@ -114,18 +114,18 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
case TDMT_VND_FETCH_RSP: case TDMT_VND_FETCH_RSP:
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
if (lastMsgType != (msgType - 1)) { if (lastMsgType != (msgType - 1)) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
break; break;
default: default:
SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
......
...@@ -113,6 +113,9 @@ void schtBuildQueryDag(SQueryPlan *dag) { ...@@ -113,6 +113,9 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY; mergePlan->msgType = TDMT_VND_QUERY;
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
nodesListAppend(merge->pNodeList, (SNode*)mergePlan); nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
nodesListAppend(scan->pNodeList, (SNode*)scanPlan); nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
...@@ -170,6 +173,8 @@ void schtBuildInsertDag(SQueryPlan *dag) { ...@@ -170,6 +173,8 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode)); insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
insertPlan[1].msgType = TDMT_VND_SUBMIT; insertPlan[1].msgType = TDMT_VND_SUBMIT;
inserta->pNodeList = nodesMakeList();
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
insertPlan += 1; insertPlan += 1;
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
...@@ -537,8 +542,6 @@ TEST(queryTest, normalCase) { ...@@ -537,8 +542,6 @@ TEST(queryTest, normalCase) {
int64_t job = 0; int64_t job = 0;
SQueryPlan dag; SQueryPlan dag;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0}; SEp qnodeAddr = {0};
...@@ -675,7 +678,8 @@ TEST(queryTest, flowCtrlCase) { ...@@ -675,7 +678,8 @@ TEST(queryTest, flowCtrlCase) {
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter); taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
} }
pIter = taosHashIterate(pJob->execTasks, NULL); pIter = taosHashIterate(pJob->execTasks, NULL);
...@@ -686,7 +690,8 @@ TEST(queryTest, flowCtrlCase) { ...@@ -686,7 +690,8 @@ TEST(queryTest, flowCtrlCase) {
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code); printf("code:%d", code);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter); taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
} }
pIter = taosHashIterate(pJob->execTasks, NULL); pIter = taosHashIterate(pJob->execTasks, NULL);
...@@ -697,7 +702,8 @@ TEST(queryTest, flowCtrlCase) { ...@@ -697,7 +702,8 @@ TEST(queryTest, flowCtrlCase) {
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter); taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
} }
pIter = taosHashIterate(pJob->execTasks, NULL); pIter = taosHashIterate(pJob->execTasks, NULL);
...@@ -708,7 +714,8 @@ TEST(queryTest, flowCtrlCase) { ...@@ -708,7 +714,8 @@ TEST(queryTest, flowCtrlCase) {
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter); taosHashCancelIterate(pJob->execTasks, pIter);
pIter = NULL;
} }
pthread_attr_t thattr; pthread_attr_t thattr;
...@@ -750,8 +757,6 @@ TEST(insertTest, normalCase) { ...@@ -750,8 +757,6 @@ TEST(insertTest, normalCase) {
SQueryPlan dag; SQueryPlan dag;
uint64_t numOfRows = 0; uint64_t numOfRows = 0;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0}; SEp qnodeAddr = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册