From a2856bb2b41b716f3c209cba8aa1b645410fcf39 Mon Sep 17 00:00:00 2001 From: dapan Date: Mon, 21 Mar 2022 08:32:22 +0800 Subject: [PATCH] feature/scheduler --- source/libs/qworker/test/qworkerTests.cpp | 18 +-- source/libs/scheduler/test/schedulerTests.cpp | 110 ++++++++++++++++++ 2 files changed, 119 insertions(+), 9 deletions(-) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index cc4233bd47..bed476198a 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -884,8 +884,8 @@ TEST(seqTest, normalCase) { code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); ASSERT_EQ(code, 0); - code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); - ASSERT_EQ(code, 0); + //code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + //ASSERT_EQ(code, 0); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); ASSERT_EQ(code, 0); @@ -976,12 +976,12 @@ TEST(seqTest, randCase) { qwtBuildQueryReqMsg(&queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); } else if (r >= maxr/5 && r < maxr * 2/5) { - printf("Ready,%d\n", t++); - qwtBuildReadyReqMsg(&readyMsg, &readyRpc); - code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); - if (qwtTestEnableSleep) { - taosUsleep(1); - } + //printf("Ready,%d\n", t++); + //qwtBuildReadyReqMsg(&readyMsg, &readyRpc); + //code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + //if (qwtTestEnableSleep) { + // taosUsleep(1); + //} } else if (r >= maxr * 2/5 && r < maxr* 3/5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); @@ -1042,7 +1042,7 @@ TEST(seqTest, multithreadRand) { pthread_t t1,t2,t3,t4,t5,t6; pthread_create(&(t1), &thattr, queryThread, mgmt); - pthread_create(&(t2), &thattr, readyThread, NULL); + //pthread_create(&(t2), &thattr, readyThread, NULL); pthread_create(&(t3), &thattr, fetchThread, NULL); pthread_create(&(t4), &thattr, dropThread, NULL); pthread_create(&(t5), &thattr, statusThread, NULL); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 376ed1e2bc..1ecb9347f3 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -713,6 +713,116 @@ TEST(queryTest, normalCase) { schedulerDestroy(); } +TEST(queryTest, readyFirstCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + int64_t job = 0; + SQueryPlan dag; + + memset(&dag, 0, sizeof(dag)); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + + SEp qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildQueryDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + ASSERT_EQ(code, 0); + + + SSchJob *pJob = schAcquireJob(job); + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + printf("code:%d", code); + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + void *pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); + + void *data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + tfree(data); + + data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_TRUE(data == NULL); + + schReleaseJob(job); + + schedulerFreeJob(job); + + schtFreeQueryDag(&dag); + + schedulerDestroy(); +} + + + TEST(queryTest, flowCtrlCase) { void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; -- GitLab