From 913455684f950cf30dd2ebde77c14318d65a6370 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 18 Jan 2022 14:12:25 +0800 Subject: [PATCH] feature/qnode --- include/libs/executor/dataSinkMgt.h | 2 +- include/libs/executor/executor.h | 6 +- source/libs/executor/inc/dataSinkInt.h | 2 +- source/libs/executor/src/dataDispatcher.c | 4 +- source/libs/executor/src/dataSinkMgt.c | 2 +- source/libs/executor/src/executorMain.c | 26 +++--- source/libs/executor/test/executorTests.cpp | 3 +- source/libs/qworker/src/qworker.c | 48 ++++++++--- source/libs/scheduler/src/scheduler.c | 11 ++- source/libs/scheduler/test/schedulerTests.cpp | 80 ++++++++++++++----- 10 files changed, 132 insertions(+), 52 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 5cef3b2253..371cb12405 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH */ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); -void dsEndPut(DataSinkHandle handle, int64_t useconds); +void dsEndPut(DataSinkHandle handle, uint64_t useconds); /** * Get the length of the data returned by the next call to dsGetDataBlock. diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e4f8d291eb..0fc7fd679e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "common.h" + typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SSubplan; @@ -34,7 +36,7 @@ struct SSubplan; * @param qId * @return */ -int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo); +int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, @@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI * @param handle * @return */ -struct SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle); +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds); /** * Retrieve the produced results information, if current query is not paused or completed, diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 69727626af..7003564365 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -32,7 +32,7 @@ typedef struct SDataSinkManager { } SDataSinkManager; typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); -typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 8280f9d0af..edba4fc97d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle { SDataDispatchBuf nextOutput; int32_t status; bool queryEnd; - int64_t useconds; + uint64_t useconds; pthread_mutex_t mutex; } SDataDispatchHandle; @@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, return TSDB_CODE_SUCCESS; } -static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { +static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); pDispatcher->queryEnd = true; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 80d99f96c6..eb1f75f359 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); } -void dsEndPut(DataSinkHandle handle, int64_t useconds) { +void dsEndPut(DataSinkHandle handle, uint64_t useconds) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; return pHandleImpl->fEndPut(pHandleImpl, useconds); } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e451f888bb..1f5d0cd059 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -68,7 +68,7 @@ void freeParam(STaskParam *param) { tfree(param->prevResult); } -int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) { +int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; @@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); + *handle = (*pTask)->dsHandle; + _error: // if failed to add ref for all tables in this query, abort current query return code; @@ -135,16 +137,18 @@ int waitMoment(SQInfo* pQInfo){ } #endif -SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + *pRes = NULL; + int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; - return NULL; + return pTaskInfo->code; } if (pTaskInfo->cost.start == 0) { @@ -153,7 +157,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { if (isTaskKilled(pTaskInfo)) { qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); - return NULL; + return TSDB_CODE_SUCCESS; } // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; @@ -170,7 +174,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { pTaskInfo->code = ret; qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); - return NULL; + return pTaskInfo->code; } qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); @@ -179,21 +183,21 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); int64_t st = 0; - if (handle) { - *handle = pTaskInfo->dsHandle; - } - st = taosGetTimestampUs(); - SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (NULL == *pRes) { + *useconds = pTaskInfo->cost.elapsedTime; + } + qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo), 0, 0L, 0); atomic_store_64(&pTaskInfo->owner, 0); - return pRes; + return pTaskInfo->code; } int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5f5fddbe28..c528d879a3 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) { "}"; SExecTaskInfo* pTaskInfo = nullptr; - int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo); + DataSinkHandle sinkHandle = nullptr; + int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle); } \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 61bd82a49a..a0beaba61d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -458,6 +458,37 @@ _return: QW_RET(code); } +int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) { + int32_t code = 0; + bool qcontinue = true; + SSDataBlock* pRes = NULL; + uint64_t useconds = 0; + + while (qcontinue) { + code = qExecTask(taskHandle, &pRes, &useconds); + if (code) { + QW_TASK_ELOG("qExecTask failed, code:%x", code); + QW_ERR_JRET(code); + } + + if (NULL == pRes) { + QW_TASK_DLOG("query done, useconds:%"PRIu64, useconds); + dsEndPut(sinkHandle, useconds); + break; + } + + SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; + code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); + if (code) { + QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code); + QW_ERR_JRET(code); + } + } + +_return: + + QW_RET(code); +} int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { @@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo); + DataSinkHandle sinkHandle = NULL; + + code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_ERR_JRET(code); @@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t queryRsped = true; - DataSinkHandle sinkHandle = NULL; - SSDataBlock* pRes = qExecTask(pTaskInfo, &sinkHandle); - if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x", code); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle)); _return: @@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - code = qExecTask(taskHandle, &sinkHandle); - if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x", code); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle)); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2d5322fc2c..c53926f8c1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + ++addNum; } } @@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (rspCode != TSDB_CODE_SUCCESS) { SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } + + SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg; + if (rsp) { + pJob->resNumOfRows += rsp->affectedRows; + } #endif SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru SSchJob *job = NULL; - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); - *pJob = job; + job = *pJob; pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index d72c4920d5..5332c6fcd1 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -34,10 +34,12 @@ #include "stub.h" #include "addr_any.h" + namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); + void schtInitLogFile() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; @@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = taosArrayInit(1, POINTER_BYTES); - SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan)); + SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SSubplan insertPlan[2] = {0}; + SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan)); insertPlan[0].id.queryId = qId; insertPlan[0].id.templateId = 0x0000000000000003; @@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[0].pParents = NULL; insertPlan[0].pNode = NULL; insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + insertPlan[0].msgType = TDMT_VND_SUBMIT; insertPlan[1].id.queryId = qId; insertPlan[1].id.templateId = 0x0000000000000003; @@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[1].pParents = NULL; insertPlan[1].pNode = NULL; insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + insertPlan[1].msgType = TDMT_VND_SUBMIT; - - taosArrayPush(inserta, &insertPlan[0]); - taosArrayPush(inserta, &insertPlan[1]); + taosArrayPush(inserta, &insertPlan); + insertPlan += 1; + taosArrayPush(inserta, &insertPlan); taosArrayPush(dag->pSubplans, &inserta); } @@ -210,6 +214,24 @@ void schtSetRpcSendRequest() { } } +int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + return 0; +} + + +void schtSetAsyncSendMsgToServer() { + static Stub stub; + stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result); + for (const auto& f : result) { + stub.set(f.second, schtAsyncSendMsgToServer); + } + } +} + void *schtSendRsp(void *param) { SSchJob *job = NULL; @@ -230,7 +252,7 @@ void *schtSendRsp(void *param) { SShellSubmitRsp rsp = {0}; rsp.affectedRows = 10; - schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -238,6 +260,23 @@ void *schtSendRsp(void *param) { return NULL; } +void *schtCreateFetchRspThread(void *param) { + struct SSchJob* job = (struct SSchJob*)param; + + sleep(1); + + int32_t code = 0; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + rsp->completed = 1; + rsp->numOfRows = 10; + code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0); + + assert(code == 0); +} + + + + struct SSchJob *pInsertJob = NULL; } @@ -266,6 +305,7 @@ TEST(queryTest, normalCase) { schtSetPlanToString(); schtSetExecNode(); + schtSetAsyncSendMsgToServer(); code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); @@ -276,7 +316,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -287,8 +327,8 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); - + code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + printf("code:%d", code); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -298,7 +338,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -309,22 +349,19 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); } - SRetrieveTableRsp rsp = {0}; - rsp.completed = 1; - rsp.numOfRows = 10; - code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); - - ASSERT_EQ(code, 0); + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job); - void *data = NULL; - + void *data = NULL; code = scheduleFetchRows(job, &data); ASSERT_EQ(code, 0); @@ -340,6 +377,8 @@ TEST(queryTest, normalCase) { scheduleFreeJob(pJob); schtFreeQueryDag(&dag); + + schedulerDestroy(); } @@ -369,6 +408,7 @@ TEST(insertTest, normalCase) { schtBuildInsertDag(&dag); schtSetPlanToString(); + schtSetAsyncSendMsgToServer(); pthread_attr_t thattr; pthread_attr_init(&thattr); @@ -382,6 +422,8 @@ TEST(insertTest, normalCase) { ASSERT_EQ(res.numOfRows, 20); scheduleFreeJob(pInsertJob); + + schedulerDestroy(); } TEST(multiThread, forceFree) { -- GitLab