提交 e4338377 编写于 作者: D dapan1121

feature/qnode

上级 79ce13e8
...@@ -40,8 +40,9 @@ namespace { ...@@ -40,8 +40,9 @@ namespace {
#define qwtTestQueryQueueSize 1000 #define qwtTestQueryQueueSize 1000
#define qwtTestFetchQueueSize 1000 #define qwtTestFetchQueueSize 1000
#define qwtTestMaxExecTaskUsec 1000000 #define qwtTestMaxExecTaskUsec 2
uint64_t qwtTestQueryId = 0;
bool qwtTestEnableSleep = true; bool qwtTestEnableSleep = true;
bool qwtTestStop = false; bool qwtTestStop = false;
bool qwtTestDeadLoop = true; bool qwtTestDeadLoop = true;
...@@ -70,14 +71,17 @@ int32_t qwtTestSinkBlockNum = 0; ...@@ -70,14 +71,17 @@ int32_t qwtTestSinkBlockNum = 0;
int32_t qwtTestSinkMaxBlockNum = 0; int32_t qwtTestSinkMaxBlockNum = 0;
bool qwtTestSinkQueryEnd = false; bool qwtTestSinkQueryEnd = false;
SRWLatch qwtTestSinkLock = 0; SRWLatch qwtTestSinkLock = 0;
int32_t qwtTestSinkLastLen = 0;
SSubQueryMsg qwtqueryMsg = {0};
SRpcMsg qwtfetchRpc = {0}; SRpcMsg qwtfetchRpc = {0};
SResFetchReq qwtfetchMsg = {0}; SResFetchReq qwtfetchMsg = {0};
SRpcMsg qwtreadyRpc = {0}; SRpcMsg qwtreadyRpc = {0};
SResReadyReq qwtreadyMsg = {0}; SResReadyReq qwtreadyMsg = {0};
SRpcMsg qwtdropRpc = {0}; SRpcMsg qwtdropRpc = {0};
STaskDropReq qwtdropMsg = {0}; STaskDropReq qwtdropMsg = {0};
SSchTasksStatusReq qwtstatusMsg = {0};
void qwtInitLogFile() { void qwtInitLogFile() {
...@@ -96,18 +100,17 @@ void qwtInitLogFile() { ...@@ -96,18 +100,17 @@ void qwtInitLogFile() {
} }
void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
queryMsg->queryId = htobe64(1); qwtqueryMsg.sId = htobe64(1);
queryMsg->sId = htobe64(1); qwtqueryMsg.taskId = htobe64(1);
queryMsg->taskId = htobe64(1); qwtqueryMsg.contentLen = htonl(100);
queryMsg->contentLen = htonl(100); queryRpc->pCont = &qwtqueryMsg;
queryRpc->pCont = queryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100; queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
} }
void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) { void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
readyMsg->sId = htobe64(1); readyMsg->sId = htobe64(1);
readyMsg->queryId = htobe64(1); readyMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
readyMsg->taskId = htobe64(1); readyMsg->taskId = htobe64(1);
readyRpc->pCont = readyMsg; readyRpc->pCont = readyMsg;
readyRpc->contLen = sizeof(SResReadyReq); readyRpc->contLen = sizeof(SResReadyReq);
...@@ -115,7 +118,7 @@ void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) { ...@@ -115,7 +118,7 @@ void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg->sId = htobe64(1); fetchMsg->sId = htobe64(1);
fetchMsg->queryId = htobe64(1); fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
fetchMsg->taskId = htobe64(1); fetchMsg->taskId = htobe64(1);
fetchRpc->pCont = fetchMsg; fetchRpc->pCont = fetchMsg;
fetchRpc->contLen = sizeof(SResFetchReq); fetchRpc->contLen = sizeof(SResFetchReq);
...@@ -123,7 +126,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { ...@@ -123,7 +126,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1); dropMsg->sId = htobe64(1);
dropMsg->queryId = htobe64(1); dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
dropMsg->taskId = htobe64(1); dropMsg->taskId = htobe64(1);
dropRpc->pCont = dropMsg; dropRpc->pCont = dropMsg;
dropRpc->contLen = sizeof(STaskDropReq); dropRpc->contLen = sizeof(STaskDropReq);
...@@ -273,7 +276,8 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { ...@@ -273,7 +276,8 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if (endExec) { if (endExec) {
usleep(rand() % qwtTestMaxExecTaskUsec); usleep(rand() % qwtTestMaxExecTaskUsec);
*pRes = (SSDataBlock*)0x1; *pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock));
(*pRes)->info.rows = rand() % 1000;
} else { } else {
*pRes = NULL; *pRes = NULL;
usleep(rand() % qwtTestMaxExecTaskUsec); usleep(rand() % qwtTestMaxExecTaskUsec);
...@@ -297,6 +301,8 @@ int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* p ...@@ -297,6 +301,8 @@ int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* p
assert(0); assert(0);
} }
free((void *)pInput->pData);
taosWLockLatch(&qwtTestSinkLock); taosWLockLatch(&qwtTestSinkLock);
qwtTestSinkBlockNum++; qwtTestSinkBlockNum++;
...@@ -339,6 +345,7 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { ...@@ -339,6 +345,7 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
} else { } else {
*pLen = 0; *pLen = 0;
} }
qwtTestSinkLastLen = *pLen;
taosWUnLockLatch(&qwtTestSinkLock); taosWUnLockLatch(&qwtTestSinkLock);
*pQueryEnd = qwtTestSinkQueryEnd; *pQueryEnd = qwtTestSinkQueryEnd;
...@@ -348,11 +355,23 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { ...@@ -348,11 +355,23 @@ void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
taosWLockLatch(&qwtTestSinkLock); taosWLockLatch(&qwtTestSinkLock);
if (qwtTestSinkBlockNum > 0) { if (qwtTestSinkLastLen > 0) {
qwtTestSinkBlockNum--;
pOutput->numOfRows = rand() % 10 + 1; pOutput->numOfRows = rand() % 10 + 1;
pOutput->compressed = 1; pOutput->compressed = 1;
pOutput->pData = (char *)malloc(pOutput->numOfRows); pOutput->queryEnd = qwtTestSinkQueryEnd;
if (qwtTestSinkBlockNum == 0) {
pOutput->bufStatus = DS_BUF_EMPTY;
} else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) {
pOutput->bufStatus = DS_BUF_LOW;
} else {
pOutput->bufStatus = DS_BUF_FULL;
}
pOutput->useconds = rand() % 10 + 1;
pOutput->precision = 1;
} else if (qwtTestSinkLastLen == 0) {
pOutput->numOfRows = 0;
pOutput->compressed = 1;
pOutput->pData = NULL;
pOutput->queryEnd = qwtTestSinkQueryEnd; pOutput->queryEnd = qwtTestSinkQueryEnd;
if (qwtTestSinkBlockNum == 0) { if (qwtTestSinkBlockNum == 0) {
pOutput->bufStatus = DS_BUF_EMPTY; pOutput->bufStatus = DS_BUF_EMPTY;
...@@ -534,7 +553,6 @@ void *queryThread(void *param) { ...@@ -534,7 +553,6 @@ void *queryThread(void *param) {
while (!qwtTestStop) { while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
usleep(rand()%5); usleep(rand()%5);
} }
...@@ -654,8 +672,6 @@ void *clientThread(void *param) { ...@@ -654,8 +672,6 @@ void *clientThread(void *param) {
usleep(1); usleep(1);
} }
free(queryRpc.pCont);
if (qwtTestEnableSleep) { if (qwtTestEnableSleep) {
usleep(rand()%5); usleep(rand()%5);
} }
...@@ -763,41 +779,11 @@ TEST(seqTest, normalCase) { ...@@ -763,41 +779,11 @@ TEST(seqTest, normalCase) {
SRpcMsg statusRpc = {0}; SRpcMsg statusRpc = {0};
qwtInitLogFile(); qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchReq fetchMsg = {0}; qwtBuildQueryReqMsg(&queryRpc);
fetchMsg.sId = htobe64(1); qwtBuildReadyReqMsg(&qwtreadyMsg, &readyRpc);
fetchMsg.queryId = htobe64(1); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
fetchMsg.taskId = htobe64(1); qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
...@@ -814,35 +800,35 @@ TEST(seqTest, normalCase) { ...@@ -814,35 +800,35 @@ TEST(seqTest, normalCase) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -859,26 +845,9 @@ TEST(seqTest, cancelFirst) { ...@@ -859,26 +845,9 @@ TEST(seqTest, cancelFirst) {
qwtInitLogFile(); qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); qwtBuildQueryReqMsg(&queryRpc);
queryMsg->queryId = htobe64(1); qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
queryMsg->sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
...@@ -886,21 +855,21 @@ TEST(seqTest, cancelFirst) { ...@@ -886,21 +855,21 @@ TEST(seqTest, cancelFirst) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, TSDB_CODE_QRY_TASK_DROPPED); ASSERT_TRUE(0 != code);
statusMsg.sId = htobe64(1); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -941,7 +910,6 @@ TEST(seqTest, randCase) { ...@@ -941,7 +910,6 @@ TEST(seqTest, randCase) {
printf("Query,%d\n", t++); printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
} else if (r >= maxr/5 && r < maxr * 2/5) { } else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++); printf("Ready,%d\n", t++);
qwtBuildReadyReqMsg(&readyMsg, &readyRpc); qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册