From e4338377c4d6881ba59a53736c3689170e387c6d Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 22 Jan 2022 11:54:57 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/test/qworkerTests.cpp | 130 ++++++++-------------- 1 file changed, 49 insertions(+), 81 deletions(-) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 5720c2d47c..362156ebcd 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -40,8 +40,9 @@ namespace { #define qwtTestQueryQueueSize 1000 #define qwtTestFetchQueueSize 1000 -#define qwtTestMaxExecTaskUsec 1000000 +#define qwtTestMaxExecTaskUsec 2 +uint64_t qwtTestQueryId = 0; bool qwtTestEnableSleep = true; bool qwtTestStop = false; bool qwtTestDeadLoop = true; @@ -70,14 +71,17 @@ int32_t qwtTestSinkBlockNum = 0; int32_t qwtTestSinkMaxBlockNum = 0; bool qwtTestSinkQueryEnd = false; SRWLatch qwtTestSinkLock = 0; +int32_t qwtTestSinkLastLen = 0; +SSubQueryMsg qwtqueryMsg = {0}; SRpcMsg qwtfetchRpc = {0}; SResFetchReq qwtfetchMsg = {0}; SRpcMsg qwtreadyRpc = {0}; SResReadyReq qwtreadyMsg = {0}; SRpcMsg qwtdropRpc = {0}; STaskDropReq qwtdropMsg = {0}; +SSchTasksStatusReq qwtstatusMsg = {0}; void qwtInitLogFile() { @@ -96,18 +100,17 @@ void qwtInitLogFile() { } void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { - SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); - queryMsg->queryId = htobe64(1); - queryMsg->sId = htobe64(1); - queryMsg->taskId = htobe64(1); - queryMsg->contentLen = htonl(100); - queryRpc->pCont = queryMsg; + qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1)); + qwtqueryMsg.sId = htobe64(1); + qwtqueryMsg.taskId = htobe64(1); + qwtqueryMsg.contentLen = htonl(100); + queryRpc->pCont = &qwtqueryMsg; queryRpc->contLen = sizeof(SSubQueryMsg) + 100; } void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) { readyMsg->sId = htobe64(1); - readyMsg->queryId = htobe64(1); + readyMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); readyMsg->taskId = htobe64(1); readyRpc->pCont = readyMsg; readyRpc->contLen = sizeof(SResReadyReq); @@ -115,7 +118,7 @@ void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) { void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { fetchMsg->sId = htobe64(1); - fetchMsg->queryId = htobe64(1); + fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); fetchMsg->taskId = htobe64(1); fetchRpc->pCont = fetchMsg; fetchRpc->contLen = sizeof(SResFetchReq); @@ -123,7 +126,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropMsg->sId = htobe64(1); - dropMsg->queryId = htobe64(1); + dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); dropMsg->taskId = htobe64(1); dropRpc->pCont = dropMsg; dropRpc->contLen = sizeof(STaskDropReq); @@ -273,7 +276,8 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (endExec) { usleep(rand() % qwtTestMaxExecTaskUsec); - *pRes = (SSDataBlock*)0x1; + *pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock)); + (*pRes)->info.rows = rand() % 1000; } else { *pRes = NULL; usleep(rand() % qwtTestMaxExecTaskUsec); @@ -297,6 +301,8 @@ int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* p assert(0); } + free((void *)pInput->pData); + taosWLockLatch(&qwtTestSinkLock); qwtTestSinkBlockNum++; @@ -339,6 +345,7 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { } else { *pLen = 0; } + qwtTestSinkLastLen = *pLen; taosWUnLockLatch(&qwtTestSinkLock); *pQueryEnd = qwtTestSinkQueryEnd; @@ -348,11 +355,23 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { taosWLockLatch(&qwtTestSinkLock); - if (qwtTestSinkBlockNum > 0) { - qwtTestSinkBlockNum--; + if (qwtTestSinkLastLen > 0) { pOutput->numOfRows = rand() % 10 + 1; pOutput->compressed = 1; - pOutput->pData = (char *)malloc(pOutput->numOfRows); + pOutput->queryEnd = qwtTestSinkQueryEnd; + if (qwtTestSinkBlockNum == 0) { + pOutput->bufStatus = DS_BUF_EMPTY; + } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) { + pOutput->bufStatus = DS_BUF_LOW; + } else { + pOutput->bufStatus = DS_BUF_FULL; + } + pOutput->useconds = rand() % 10 + 1; + pOutput->precision = 1; + } else if (qwtTestSinkLastLen == 0) { + pOutput->numOfRows = 0; + pOutput->compressed = 1; + pOutput->pData = NULL; pOutput->queryEnd = qwtTestSinkQueryEnd; if (qwtTestSinkBlockNum == 0) { pOutput->bufStatus = DS_BUF_EMPTY; @@ -534,7 +553,6 @@ void *queryThread(void *param) { while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); - free(queryRpc.pCont); if (qwtTestEnableSleep) { usleep(rand()%5); } @@ -654,8 +672,6 @@ void *clientThread(void *param) { usleep(1); } - free(queryRpc.pCont); - if (qwtTestEnableSleep) { usleep(rand()%5); } @@ -763,41 +779,11 @@ TEST(seqTest, normalCase) { SRpcMsg statusRpc = {0}; qwtInitLogFile(); - - SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); - queryMsg->queryId = htobe64(1); - queryMsg->sId = htobe64(1); - queryMsg->taskId = htobe64(1); - queryMsg->contentLen = htonl(100); - queryRpc.pCont = queryMsg; - queryRpc.contLen = sizeof(SSubQueryMsg) + 100; - - SResReadyReq readyMsg = {0}; - readyMsg.sId = htobe64(1); - readyMsg.queryId = htobe64(1); - readyMsg.taskId = htobe64(1); - readyRpc.pCont = &readyMsg; - readyRpc.contLen = sizeof(SResReadyReq); - SResFetchReq fetchMsg = {0}; - fetchMsg.sId = htobe64(1); - fetchMsg.queryId = htobe64(1); - fetchMsg.taskId = htobe64(1); - fetchRpc.pCont = &fetchMsg; - fetchRpc.contLen = sizeof(SResFetchReq); - - STaskDropReq dropMsg = {0}; - dropMsg.sId = htobe64(1); - dropMsg.queryId = htobe64(1); - dropMsg.taskId = htobe64(1); - dropRpc.pCont = &dropMsg; - dropRpc.contLen = sizeof(STaskDropReq); - - SSchTasksStatusReq statusMsg = {0}; - statusMsg.sId = htobe64(1); - statusRpc.pCont = &statusMsg; - statusRpc.contLen = sizeof(SSchTasksStatusReq); - statusRpc.msgType = TDMT_VND_TASKS_STATUS; + qwtBuildQueryReqMsg(&queryRpc); + qwtBuildReadyReqMsg(&qwtreadyMsg, &readyRpc); + qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); + qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -814,35 +800,35 @@ TEST(seqTest, normalCase) { code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); @@ -859,26 +845,9 @@ TEST(seqTest, cancelFirst) { qwtInitLogFile(); - SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); - queryMsg->queryId = htobe64(1); - queryMsg->sId = htobe64(1); - queryMsg->taskId = htobe64(1); - queryMsg->contentLen = htonl(100); - queryRpc.pCont = queryMsg; - queryRpc.contLen = sizeof(SSubQueryMsg) + 100; - - STaskDropReq dropMsg = {0}; - dropMsg.sId = htobe64(1); - dropMsg.queryId = htobe64(1); - dropMsg.taskId = htobe64(1); - dropRpc.pCont = &dropMsg; - dropRpc.contLen = sizeof(STaskDropReq); - - SSchTasksStatusReq statusMsg = {0}; - statusMsg.sId = htobe64(1); - statusRpc.pCont = &statusMsg; - statusRpc.contLen = sizeof(SSchTasksStatusReq); - statusRpc.msgType = TDMT_VND_TASKS_STATUS; + qwtBuildQueryReqMsg(&queryRpc); + qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -886,21 +855,21 @@ TEST(seqTest, cancelFirst) { code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); ASSERT_EQ(code, 0); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); - ASSERT_EQ(code, TSDB_CODE_QRY_TASK_DROPPED); + ASSERT_TRUE(0 != code); - statusMsg.sId = htobe64(1); + qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); @@ -941,7 +910,6 @@ TEST(seqTest, randCase) { printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); - free(queryRpc.pCont); } else if (r >= maxr/5 && r < maxr * 2/5) { printf("Ready,%d\n", t++); qwtBuildReadyReqMsg(&readyMsg, &readyRpc); -- GitLab